In [1]:
import pyodbc
import vdb_pipeline_engine as vpe  # <-- Importe votre moteur
import importlib
import time

# Si vous modifiez vdb_pipeline_engine.py, ré-exécutez cette cellule
# pour forcer le rechargement du module
importlib.reload(vpe)

<module 'vdb_pipeline_engine' from 'c:\\Users\\samaz\\Desktop\\TFF\\MarketReviews_VS_TRUTH\\vdb_pipeline_engine.py'>

In [2]:
print("--- DÉBUT : Étape 0a (Découverte Thèmes) ---")

URL_UNIVERS = "https://www.vandenborre.be/fr/tv-audio"
NOM_UNIVERS = "TV et Audio"

conn = None
cursor = None

try:
    # 1. Extraire
    categories_data = vpe.discover_all_categories(URL_UNIVERS)
    
    if categories_data:
        # 2. Charger
        conn = vpe.get_staging_connection()
        cursor = conn.cursor()
        vpe.save_categories_to_staging(cursor, categories_data, univers_name=NOM_UNIVERS)
        conn.commit()
        print("   ->  Étape 0a terminée.")
    else:
        print("   ->  Étape 0a : Aucun thème trouvé, rien à charger.")

except Exception as e:
    print(f"   ->  ERREUR (Étape 0a) : {e}")
    if conn: conn.rollback()
finally:
    if cursor: cursor.close()
    if conn: conn.close()

--- DÉBUT : Étape 0a (Découverte Thèmes) ---
--- (0a) Scraping de la page Hub : https://www.vandenborre.be/fr/tv-audio...
   -> 8 thèmes trouvés.
--- (0a) Chargement des thèmes dans Staging_Category_Queue ---
   -> 0 nouveaux thèmes insérés.
   -> 8 thèmes réactivés.
   ->  Étape 0a terminée.


In [4]:
print("\n--- DÉBUT : Étape 0b (Découverte Sous-Catégories) ---")
tasks_to_process = []
conn_init = None
cursor_init = None

try:
    conn_init = vpe.get_staging_connection()
    cursor_init = conn_init.cursor()
    
    cursor_init.execute("SELECT CategoryQueueID, CategoryName, CategoryURL FROM Staging_Category_Queue WHERE Status = 'pending' ORDER BY CategoryQueueID")
    tasks = cursor_init.fetchall()
    
    if not tasks:
        print("   ->  Fin : Aucun thème 'pending' à traiter.")
    else:
        print(f"   -> {len(tasks)} thèmes à traiter trouvés. Lancement boucle...")
        tasks_to_process = list(tasks)

    cursor_init.close()
    conn_init.close()

    for task in tasks_to_process:
        task_id, task_name, task_url = task
        loop_conn = None
        loop_cursor = None
        
        try:
            print(f"\n--- Traitement Thème {task_id} : '{task_name}' ---")
            subcategories_data = vpe.discover_all_subcategories(parent_category_name=task_name, hub_url=task_url)

            loop_conn = vpe.get_staging_connection()
            loop_cursor = loop_conn.cursor()

            if subcategories_data:
                vpe.save_subcategories_to_staging(loop_cursor, subcategories_data, parent_category_id=task_id)
                loop_cursor.execute("UPDATE Staging_Category_Queue SET Status = 'processed', LastAttempt = GETDATE() WHERE CategoryQueueID = ?", (task_id))
                loop_conn.commit()
                print(f"   ->  Thème '{task_name}' marqué 'processed'.")
            else:
                loop_cursor.execute("UPDATE Staging_Category_Queue SET Status = 'processed', LastAttempt = GETDATE() WHERE CategoryQueueID = ?", (task_id))
                loop_conn.commit()
                print(f"   ->  Tâche {task_id} marquée 'processed' (sans enfants).")

        except Exception as e:
            print(f"   ->  ERREUR (Tâche {task_id}) : {e}")
            if loop_conn: loop_conn.rollback()
            # Log l'erreur dans la BDD
            try:
                conn_fail = vpe.get_staging_connection()
                cursor_fail = conn_fail.cursor()
                cursor_fail.execute("UPDATE Staging_Category_Queue SET Status = 'failed', LastAttempt = GETDATE() WHERE CategoryQueueID = ?", (task_id))
                conn_fail.commit()
            finally:
                if 'cursor_fail' in locals(): cursor_fail.close()
                if 'conn_fail' in locals(): conn_fail.close()
        finally:
            if loop_cursor: loop_cursor.close()
            if loop_conn: loop_conn.close()
            print("   -> Pause de 3 secondes...")
            time.sleep(3)

    print("\n---  Pipeline d'Étape 0b terminé. ---")

except Exception as e_main:
    print(f"\n---  ERREUR MAJEURE (Étape 0b) : {e_main} ---")
finally:
    print("\nScript terminé.")


--- DÉBUT : Étape 0b (Découverte Sous-Catégories) ---
   ->  Fin : Aucun thème 'pending' à traiter.

---  Pipeline d'Étape 0b terminé. ---

Script terminé.


In [6]:
print("\n--- DÉBUT : Étape 1 (Découverte Produits) ---")
tasks_to_process = []
conn_init = None
cursor_init = None

try:
    conn_init = vpe.get_staging_connection()
    cursor_init = conn_init.cursor()
    
    cursor_init.execute("SELECT SubCategoryQueueID, SubCategoryName, SubCategoryURL, ItemCount FROM Staging_SubCategory_Queue WHERE Status = 'pending' AND ItemCount > 0 ORDER BY SubCategoryQueueID")
    tasks = cursor_init.fetchall()
    
    if not tasks:
        print("   -> Fin : Aucune sous-catégorie 'pending' à traiter.")
    else:
        print(f"   -> {len(tasks)} sous-catégories à traiter trouvées. Lancement boucle...")
        tasks_to_process = list(tasks)

    cursor_init.close()
    conn_init.close()

    for task in tasks_to_process:
        task_id, task_name, task_url, task_item_count = task
        loop_conn = None
        loop_cursor = None
        
        try:
            print(f"\n--- Traitement Sous-Catégorie {task_id} : '{task_name}' ({task_item_count} articles) ---")
            product_data = vpe.discover_all_products(task_name, task_url, task_item_count)

            loop_conn = vpe.get_staging_connection()
            loop_cursor = loop_conn.cursor()

            if product_data:
                vpe.save_products_to_staging(loop_cursor, product_data, sub_category_id=task_id)
                loop_cursor.execute("UPDATE Staging_SubCategory_Queue SET Status = 'processed', LastAttempt = GETDATE() WHERE SubCategoryQueueID = ?", (task_id))
                loop_conn.commit()
                print(f"   ->  Sous-catégorie '{task_name}' marquée 'processed'.")
            else:
                loop_cursor.execute("UPDATE Staging_SubCategory_Queue SET Status = 'failed', LastAttempt = GETDATE() WHERE SubCategoryQueueID = ?", (task_id))
                loop_conn.commit()
                print(f"   -> Tâche {task_id} marquée 'failed' (aucun produit trouvé).")

        except Exception as e:
            print(f"   ->  ERREUR (Tâche {task_id}) : {e}")
            if loop_conn: loop_conn.rollback()
            # Log l'erreur dans la BDD
            try:
                conn_fail = vpe.get_staging_connection()
                cursor_fail = conn_fail.cursor()
                cursor_fail.execute("UPDATE Staging_SubCategory_Queue SET Status = 'failed', LastAttempt = GETDATE() WHERE SubCategoryQueueID = ?", (task_id))
                conn_fail.commit()
            finally:
                if 'cursor_fail' in locals(): cursor_fail.close()
                if 'conn_fail' in locals(): conn_fail.close()
        finally:
            if loop_cursor: loop_cursor.close()
            if loop_conn: loop_conn.close()
            print("   -> Pause de 3 secondes...")
            time.sleep(3)

    print("\n--- ✅ Pipeline d'Étape 1 terminé. ---")

except Exception as e_main:
    print(f"\n---  ERREUR MAJEURE (Étape 1) : {e_main} ---")
finally:
    print("\nScript terminé.")


--- DÉBUT : Étape 1 (Découverte Produits) ---
   -> Fin : Aucune sous-catégorie 'pending' à traiter.

--- ✅ Pipeline d'Étape 1 terminé. ---

Script terminé.


In [8]:
print("\n--- DÉBUT : Étape 2 (Scraping des Produits) ---")
tasks_to_process = []
conn_init = None
cursor_init = None

try:
    conn_init = vpe.get_staging_connection()
    cursor_init = conn_init.cursor()
    
    cursor_init.execute("SELECT ProductQueueID, ProductID_SKU, ProductURL FROM Staging_Product_Queue WHERE Status = 'pending' ORDER BY ProductQueueID")
    tasks = cursor_init.fetchall()
    
    if not tasks:
        print("   ->  Fin : Aucun produit 'pending' à scraper.")
    else:
        print(f"   -> {len(tasks)} produits à scraper trouvés. Lancement boucle...")
        tasks_to_process = list(tasks)

    cursor_init.close()
    conn_init.close()

    total_tasks = len(tasks_to_process)
    for i, task in enumerate(tasks_to_process):
        task_id, task_sku, task_url = task
        loop_conn = None
        loop_cursor = None
        
        print(f"\n--- Traitement Produit {i+1}/{total_tasks} (ID: {task_id}, SKU: {task_sku}) ---")
        
        try:
            product_json = vpe.scrape_product_page(task_url)
            
            loop_conn = vpe.get_staging_connection()
            loop_cursor = loop_conn.cursor()

            if product_json:
                vpe.save_product_json_to_staging(loop_cursor, task_sku, product_json, task_id)
                loop_cursor.execute("UPDATE Staging_Product_Queue SET Status = 'processed', LastAttempt = GETDATE() WHERE ProductQueueID = ?", (task_id))
                loop_conn.commit()
                print(f"   ->  Produit '{task_sku}' marqué 'processed'.")
            else:
                loop_cursor.execute("UPDATE Staging_Product_Queue SET Status = 'failed', LastAttempt = GETDATE() WHERE ProductQueueID = ?", (task_id))
                loop_conn.commit()
                print(f"   ->  Tâche {task_id} marquée 'failed' (scraping JSON échoué).")

        except Exception as e:
            print(f"   ->  ERREUR (Tâche {task_id}) : {e}")
            if loop_conn: loop_conn.rollback()
            try:
                conn_fail = vpe.get_staging_connection()
                cursor_fail = conn_fail.cursor()
                cursor_fail.execute("UPDATE Staging_Product_Queue SET Status = 'failed', LastAttempt = GETDATE() WHERE ProductQueueID = ?", (task_id))
                conn_fail.commit()
            finally:
                if 'cursor_fail' in locals(): cursor_fail.close()
                if 'conn_fail' in locals(): conn_fail.close()
        finally:
            if loop_cursor: loop_cursor.close()
            if loop_conn: loop_conn.close()
            
        print("   -> Pause de 2 secondes...")
        time.sleep(2)

    print("\n---  Pipeline d'Étape 2 terminé. ---")

except Exception as e_main:
    print(f"\n---  ERREUR MAJEURE (Étape 2) : {e_main} ---")
finally:
    print("\nScript terminé.")


--- DÉBUT : Étape 2 (Scraping des Produits) ---
   ->  Fin : Aucun produit 'pending' à scraper.

---  Pipeline d'Étape 2 terminé. ---

Script terminé.


In [9]:
print("\n--- DÉBUT : Étape 3 (Transformation JSON -> Colonnes) ---")
tasks_to_process = []
conn_init = None
cursor_init = None

try:
    conn_init = vpe.get_staging_connection()
    cursor_init = conn_init.cursor()
    
    tasks = vpe.get_pending_raw_json(cursor_init)
    
    cursor_init.close()
    conn_init.close()

    if not tasks:
        print("\n---  Fin : Aucun JSON brut à transformer. ---")

    for task in tasks_to_process:
        staging_key = task['StagingVDBKey']
        sku = task['ProductID_SKU']
        loop_conn = None
        loop_cursor = None
        
        try:
            print(f"\n--- Transformation SKU {sku} (RawKey: {staging_key}) ---")
            
            # (T) Transformer
            cleaned_data = vpe.parse_and_clean_json(task['Raw_JSON_LD'])
            
            # (L) Charger
            loop_conn = vpe.get_staging_connection()
            loop_cursor = loop_conn.cursor()
            
            vpe.save_cleansed_data(loop_cursor, staging_key, task['ScrapeTimestamp'], cleaned_data)
            
            # Mettre à jour le statut de la source
            vpe.update_raw_status(loop_cursor, staging_key, 'processed')
            
            loop_conn.commit()
            print(f"   ->  SKU {sku} transformé avec succès.")

        except Exception as e:
            print(f"   ->  ERREUR (Tâche {staging_key}) : {e}.")
            if loop_conn: loop_conn.rollback()
            try:
                conn_fail = vpe.get_staging_connection()
                cursor_fail = conn_fail.cursor()
                vpe.update_raw_status(cursor_fail, staging_key, 'failed')
                conn_fail.commit()
            finally:
                if 'cursor_fail' in locals(): cursor_fail.close()
                if 'conn_fail' in locals(): conn_fail.close()
        finally:
            if loop_cursor: loop_cursor.close()
            if loop_conn: loop_conn.close()

    print("\n---  Pipeline d'Étape 3 terminé. ---")

except Exception as e_main:
    print(f"\n---  ERREUR MAJEURE (Étape 3) : {e_main} ---")
finally:
    print("\nScript de Staging terminé.")


--- DÉBUT : Étape 3 (Transformation JSON -> Colonnes) ---
   -> (3) Extraction des JSON bruts (Status='pending')...
   -> 0 JSON bruts trouvés à transformer.

---  Fin : Aucun JSON brut à transformer. ---

---  Pipeline d'Étape 3 terminé. ---

Script de Staging terminé.
