In [23]:

from datetime import date

%run ./pipeline_GCP.ipynb

def test_transformation_df():
    
    df = pd.DataFrame({
        "col1": [1, None, None, 1, 1],
        "col2": [1, None, None, 1, 2]
    })
    
    transformed = transformation_df(df)
    print("Résultat transformé :")
    print(transformed)
    

    assert "date_chargement" in transformed.columns
    assert transformed["date_chargement"].iloc[0] == date.today()
    assert len(transformed) == 2  
    assert pd.isna(transformed[['col1', 'col2']]).sum().sum() == 0  
    print("✅ Test FINAL réussi")

def create_sql_engine():
    """Create SQL Alchemy engine using environment variables"""
    # Charger les variables d'environnement si ce n'est pas déjà fait
    load_dotenv()
    
    # Utiliser la même approche que celle qui fonctionne avec pyodbc
    server_name = os.getenv('SQL_SERVER').replace('\\\\', '\\')
    
    # Construction de la chaîne de connexion ODBC exactement comme dans votre exemple fonctionnel
    odbc_conn_str = (
        f"DRIVER={{{os.getenv('SQL_DRIVER')}}};"
        f"SERVER={server_name};"
        f"DATABASE={os.getenv('SQL_DATABASE')};"
        f"UID={os.getenv('SQL_USERNAME')};"
        f"PWD={os.getenv('SQL_PASSWORD')};"
        f"TrustServerCertificate=yes;"
    )
    
    # Utilisation de la notation pyodbc:// avec la chaîne encodée
    conn_str = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(odbc_conn_str)}"
    
    # Création du moteur SQLAlchemy avec des paramètres optimisés
    return create_engine(
        conn_str, 
        fast_executemany=True,  # Optimisation pour les insertions multiples
        pool_pre_ping=True      # Vérifie que la connexion est toujours active
    )


def test_sql_server_integration():
    """Teste que les données sont bien stockées dans SQL Server"""

    engine = create_sql_engine()

    tables_to_check = [
        "dim_produit",
        "dim_entrepot",
        "stg_commandes",
        "stg_livrasion",
        "stg_mouvements",
        "FACT_Mouvement",
        "FACT_Livraison",
        "FACT_commandes"
    ]
    
    
    for table in tables_to_check:
      try:  
        df = pd.read_sql(f"SELECT TOP 1 * FROM {table}", engine)
        if  df.empty :
            logger.info(f"la  Table {table} est vide")
        else :
            logger.info(f"✅ Table {table} contient des données ")
      except Exception as e:
            logger.error(f"❌ Erreur sur la table {table} : {str(e)}")
            raise

def test_bigquery_integration():
    """Teste que les données sont bien stockées dans BigQuery"""

    credential_path = r"C:\Users\user\Pipeline_09_05_2025\credential_path\bigquery_credentials.json"
    credentials = service_account.Credentials.from_service_account_file(
    credential_path,
    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
    client = bigquery.Client(credentials=credentials, project="pipeline-458019")
    
    

    tables_to_check = {
        "FACT_Mouvement": "vente",
        "FACT_Livraison": "vente",
        "FACT_commandes": "vente" ,
         "dim_produit" :"vente" ,
         "dim_entrepot" :"vente"
    }
    
    # 3. Vérification de chaque table
    for table, dataset in tables_to_check.items():
        try:
            query = f"SELECT * FROM `pipeline-458019.{dataset}.{table}` LIMIT 1"
            df = client.query(query).to_dataframe()
            assert not df.empty, f"La table {table} est vide"
            logger.info(f"✅ Table BigQuery {dataset}.{table} contient des données")
        except Exception as e:
            logger.error(f"❌ Erreur sur la table {dataset}.{table} : {str(e)}")
            raise

    logger.info("Test BigQuery réussi - Toutes les tables contiennent des données")

def main():
    test_transformation_df()
    test_sql_server_integration()
    test_bigquery_integration()

if __name__ == "__main__" :
    main()

2025-05-17 13:56:38,710 - __main__ - INFO - 🏁 Début du pipeline
2025-05-17 13:56:39,325 - __main__ - INFO -  ---Chargement de dim_produit dans SQL Server---
2025-05-17 13:56:44,475 - __main__ - INFO -  ---Chargement de dim_entrepot dans SQL Server---
2025-05-17 13:56:50,568 - __main__ - INFO -  ---Chargement de stg_commandes dans SQL Server---
2025-05-17 13:56:51,035 - __main__ - INFO -  ---Chargement de stg_livrasion dans SQL Server---
2025-05-17 13:56:51,334 - __main__ - INFO -  ---Chargement de stg_mouvements dans SQL Server---
2025-05-17 13:56:51,548 - __main__ - INFO -  ---Chargement de FACT_Mouvement dans SQL Server---
2025-05-17 13:56:51,774 - __main__ - INFO -  ---Chargement de FACT_Livraison dans SQL Server---
2025-05-17 13:56:51,942 - __main__ - INFO -  ---Chargement de FACT_commandes dans SQL Server---
2025-05-17 13:56:52,334 - __main__ - INFO - 📤 Uploading FACT_Mouvement to BigQuery → pipeline-458019.vente.FACT_Mouvement...
2025-05-17 13:56:56,570 - __main__ - INFO - ✅ Tabl

Résultat transformé :
   col1  col2 date_chargement
0   1.0   1.0      2025-05-17
4   1.0   2.0      2025-05-17
✅ Test FINAL réussi


2025-05-17 13:57:16,919 - __main__ - INFO - ✅ Table FACT_Livraison contient des données 
2025-05-17 13:57:16,987 - __main__ - INFO - ✅ Table FACT_commandes contient des données 
2025-05-17 13:57:20,503 - __main__ - INFO - ✅ Table BigQuery vente.FACT_Mouvement contient des données
2025-05-17 13:57:23,350 - __main__ - INFO - ✅ Table BigQuery vente.FACT_Livraison contient des données
2025-05-17 13:57:26,180 - __main__ - INFO - ✅ Table BigQuery vente.FACT_commandes contient des données
2025-05-17 13:57:28,717 - __main__ - INFO - ✅ Table BigQuery vente.dim_produit contient des données
2025-05-17 13:57:31,276 - __main__ - INFO - ✅ Table BigQuery vente.dim_entrepot contient des données
2025-05-17 13:57:31,278 - __main__ - INFO - Test BigQuery réussi - Toutes les tables contiennent des données
