## TRDE703 Atelier Int√©gration des Donn√©es

In [None]:
import sys
import os
from pathlib import Path

# --- 1. D'ABORD : ON CONFIGURE LES CHEMINS (Le "Hack") ---
# On doit le faire AVANT d'importer 'etl.*' sinon Python ne trouve pas le dossier.

current_dir = Path(os.getcwd())
# Si le notebook est dans 'etl/', la racine est le dossier parent
project_root = current_dir.parent if current_dir.name == "etl" else current_dir

if str(project_root) not in sys.path:
    sys.path.append(str(project_root))

print(f"‚úÖ Racine ajout√©e au path : {project_root}")

# --- 2. ENSUITE : ON PEUT IMPORTER TON CODE ---
from pyspark.sql import SparkSession
from etl.shared.config import SPARK_CONFIG  # Maintenant √ßa va marcher !

# --- 3. ENFIN : ON LANCE SPARK AVEC LE JAR ---
%load_ext autoreload
%autoreload 2

builder = SparkSession.builder
for key, val in SPARK_CONFIG.items():
    builder = builder.config(key, val)

spark = builder.getOrCreate()

print(f"‚úÖ Session Spark cr√©√©e avec le JAR : {SPARK_CONFIG.get('spark.jars')}")

In [None]:
from etl.shared.config import SPARK_CONFIG, MYSQL_CONFIG
from pyspark.sql import SparkSession

print("‚öôÔ∏è Configuration charg√©e avec succ√®s.")

In [None]:
# On construit le chemin proprement avec pathlib
# project_root est d√©j√† d√©fini dans la premi√®re cellule
json_filepath = str(project_root / "data" / "raw" / "openfoodfacts-products.jsonl")

print(f"üìÇ Fichier cible : {json_filepath}")

if os.path.exists(json_filepath):
    print("‚úÖ Le fichier existe bien.")
else:
    print("‚ùå Fichier introuvable. V√©rifie le dossier data/raw/")

### üí° Un mot sur nos choix (et les consignes du TP)

Pourquoi s'emb√™ter √† √©crire ce sch√©ma manuellement ?

1.  **Respect de la consigne :** Le sujet est strict : *"Lecture JSON/CSV avec sch√©ma explicite (pas d'inf√©rence magique en prod)"*. Utiliser `inferSchema=True` nous ferait perdre des points.
2.  **Gestion de l'Historique (SCD2) :** Le sujet impose de g√©rer le *"SCD2 produit"*. Pour cela, nous avons imp√©rativement besoin du timestamp brut (`last_modified_t` en `LongType`) pour comparer les versions √† la seconde pr√®s.
3.  **Structure Imbriqu√©e :** Le format JSONL groupe les nutriments dans un objet. Notre sch√©ma refl√®te cette r√©alit√© (`StructType` imbriqu√©) pour √©viter de cr√©er 1000 colonnes plates inutiles.
4.  **S√©curit√© (`String`) :** Pour des champs instables comme `nova_group`, on lit en `String` pour √©viter que Spark ne rejette la ligne en cas de format inattendu.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, LongType, ArrayType

def get_jsonl_schema():
    """
    Sch√©ma robuste pour l'ingestion JSONL.
    G√®re les types imbriqu√©s (nested) propres √† MongoDB/JSON.
    """

    # D√©finition de la sous-structure pour les nutriments
    # (Permet de lire l'objet "nutriments": { ... } proprement)
    nutriments_schema = StructType([
        StructField("energy-kcal_100g", FloatType(), True),
        StructField("sugars_100g", FloatType(), True),
        StructField("salt_100g", FloatType(), True),
        StructField("sodium_100g", FloatType(), True),
        StructField("fiber_100g", FloatType(), True),
        StructField("proteins_100g", FloatType(), True)
    ])

    return StructType([
        # --- Identifiants & M√©tadonn√©es (Crucial pour SCD2) ---
        StructField("code", StringType(), True),            # La cl√© primaire m√©tier (EAN)
        StructField("product_name", StringType(), True),
        StructField("last_modified_t", LongType(), True),   # Timestamp UNIX (versioning)
        StructField("created_t", LongType(), True),

        # --- Dimensions (Marques, Cat√©gories...) ---
        StructField("brands", StringType(), True),
        StructField("categories", StringType(), True),
        StructField("countries_tags", ArrayType(StringType()), True), # Liste de pays ["en:france", "en:belgium"]

        # --- Qualit√© & Scores ---
        StructField("nutriscore_grade", StringType(), True),
        StructField("nova_group", IntegerType(), True),     # Souvent un entier dans le JSON
        StructField("ecoscore_grade", StringType(), True),

        # --- Mesures (Imbriqu√©es) ---
        StructField("nutriments", nutriments_schema, True)  # L'objet imbriqu√©
    ])

print("‚úÖ Sch√©ma JSONL d√©fini.")

In [None]:
# Adapte le nom du fichier si n√©cessaire
input_file = "openfoodfacts-products.jsonl"
raw_path = str(project_root / "data" / "raw" / input_file)

print(f"üìÇ Pr√©paration de la lecture : {raw_path}")

try:
    # 1. Lecture "Lazy" (Paresseuse)
    # Spark ne lit rien pour l'instant, il note juste le plan d'action.
    df_raw = spark.read \
        .schema(get_jsonl_schema()) \
        .json(raw_path)

    print("‚úÖ Lecture configur√©e (Lazy). Le chargement r√©el se fera apr√®s le sampling.")


    count = df_raw.count()
    print(f"üìä Nombre de produits ing√©r√©s : {count:,}")

    df_raw.printSchema()

except Exception as e:
    print(f"‚ùå Erreur de lecture : {e}")

## üßπ √âtape 2 : Transformation "Silver" (Nettoyage & Typage)

Maintenant que les donn√©es brutes sont charg√©es, nous devons les rendre utilisables pour l'analyse et le SCD2.
Cette √©tape applique les r√®gles de qualit√© demand√©es :

1.  **Typage Temporel :** Conversion des timestamps UNIX (`Long`) en vraies dates (`Timestamp`) pour `last_modified_t` et `created_t`.
2.  **Nettoyage Textuel :** Suppression des espaces superflus (`trim`) sur les codes-barres et noms.
3.  **Extraction des Nutriments :** Aplatissement de la structure imbriqu√©e `nutriments` pour faciliter les requ√™tes SQL futures.
4.  **Gestion des Nulls :** Conversion s√©curis√©e de `nova_group` (texte vers entier) et filtrage des produits sans code-barre.

In [None]:
from pyspark.sql.functions import col, trim, from_unixtime, to_timestamp, when

print("‚è≥ D√©marrage du nettoyage Silver avec Sampling...")

# --- CORRECTION 1 : ECHANTILLONNAGE (SAMPLING) ---
# On ne garde que 10% des donn√©es (environ 400k lignes) pour sauver ton disque dur.
# seed=42 permet d'avoir toujours les m√™mes 10% si tu relances.
df_sampled = df_raw.sample(withReplacement=False, fraction=0.1, seed=42)

df_silver = df_sampled \
    .select(
        # --- 1. Nettoyage des Cl√©s & Textes ---
        trim(col("code")).alias("code"),
        trim(col("product_name")).alias("product_name"),

        # --- 2. Gestion Temporelle ---
        from_unixtime(col("last_modified_t")).cast("timestamp").alias("last_modified_ts"),
        from_unixtime(col("created_t")).cast("timestamp").alias("created_ts"),

        # --- 3. Normalisation des Dimensions ---
        col("countries_tags"),
        trim(col("brands")).alias("brands"),
        trim(col("categories")).alias("categories"),

        # --- 4. Qualit√© & Scores ---
        trim(col("nutriscore_grade")).alias("nutriscore_grade"),
        trim(col("ecoscore_grade")).alias("ecoscore_grade"),
        col("nova_group").cast("integer").alias("nova_group"),

        # --- 5. Nutriments ---
        col("nutriments.energy-kcal_100g").alias("energy_kcal_100g"),
        col("nutriments.sugars_100g").alias("sugars_100g"),
        col("nutriments.salt_100g").alias("salt_100g"),
        col("nutriments.proteins_100g").alias("proteins_100g")
    ) \
    .filter(col("code").isNotNull()) \
    .filter(col("code") != "")

# On met en cache ce petit √©chantillon
df_silver.cache()

count = df_silver.count()
print(f"‚úÖ Nettoyage termin√© sur l'√©chantillon. Produits restants : {count:,}")
print("(C'est normal d'en avoir moins, on a pris 10% volontairement !)")

display(df_silver.select("code", "last_modified_ts", "product_name").limit(5))

## üîê √âtape 3 : Fingerprinting (Pr√©paration SCD2)

Pour g√©rer l'historique (SCD2) efficacement, nous ne pouvons pas comparer toutes les colonnes √† chaque fois.
Nous allons g√©n√©rer un **Hash Technique (`row_hash`)** : une empreinte digitale unique bas√©e sur les colonnes m√©tier.

* **Strat√©gie :** On concat√®ne toutes les colonnes importantes (Nom, Marque, Nutriscore, Sucre...) et on applique un hachage SHA-256.
* **Int√©r√™t :** Si le hash change, cela signifie que le produit a √©t√© modifi√©. C'est ce qui d√©clenchera la cr√©ation d'une nouvelle version dans le Datamart.

In [None]:
from pyspark.sql.functions import sha2, concat_ws, col

print("‚è≥ Calcul du Hash (Fingerprint) pour chaque produit...")

# On s√©lectionne les colonnes qui doivent d√©clencher une nouvelle version si elles changent.
# Note d'Architecte : On NE met PAS 'last_modified_ts' dans le hash, car on veut d√©tecter les changements de CONTENU m√©tier.
columns_to_hash = [
    "product_name", "brands", "categories", "countries_tags",
    "nutriscore_grade", "nova_group", "ecoscore_grade",
    "energy_kcal_100g", "sugars_100g", "salt_100g", "proteins_100g"
]

df_hashed = df_silver.withColumn(
    "row_hash",
    sha2(concat_ws("||", *[col(c) for c in columns_to_hash]), 256)
)

print("‚úÖ Hashing termin√©.")
display(df_hashed.select("code", "product_name", "row_hash").limit(5))

## üè≠ √âtape 4 : Initialisation du Datamart (DDL)

Avant de charger les donn√©es, nous devons cr√©er la structure des tables dans MySQL.
Nous utilisons une connexion Python directe (hors Spark) pour d√©finir pr√©cis√©ment :
1.  **Les Cl√©s Primaires (PK) :** `product_sk` (Auto-incr√©ment) pour identifier unique une *version* de produit.
2.  **Les Index :** Sur `code` et `row_hash` pour que les recherches (Join/Upsert) soient instantan√©es.
3.  **Les Colonnes SCD2 :** `effective_from` (d√©but), `effective_to` (fin), `is_current` (actif).

In [None]:
import mysql.connector
from etl.shared.config import MYSQL_CONFIG

def init_datamart():
    print("‚è≥ Initialisation et Tuning MySQL...")

    conn = mysql.connector.connect(
        host="localhost",
        port=3306,
        user=MYSQL_CONFIG["user"],
        password=MYSQL_CONFIG["password"],
        database="openfoodfacts"
    )
    cursor = conn.cursor()

    # --- CORRECTION 2 : TUNING MYSQL (CRITIQUE) ---
    # On autorise des paquets jusqu'√† 64 Mo pour √©viter le "PacketTooBigException"
    cursor.execute("SET GLOBAL max_allowed_packet=67108864")
    print("   - Option MySQL 'max_allowed_packet' pass√©e √† 64MB.")

    # Suppression (Faits d'abord, Dimensions ensuite)
    cursor.execute("DROP TABLE IF EXISTS fact_nutrition_snapshot")
    cursor.execute("DROP TABLE IF EXISTS dim_product")

    # Cr√©ation Dimension (avec TEXT pour les champs longs)
    product_ddl = """
    CREATE TABLE dim_product (
        product_sk INT AUTO_INCREMENT PRIMARY KEY,
        code VARCHAR(255) NOT NULL,

        product_name TEXT, -- TEXT car VARCHAR(500) est trop court pour certains produits
        brands TEXT,       -- TEXT car les listes de marques peuvent √™tre immenses
        categories TEXT,

        row_hash CHAR(64) NOT NULL,
        effective_from DATETIME,
        effective_to DATETIME,
        is_current BOOLEAN,

        INDEX idx_code (code),
        INDEX idx_hash (row_hash)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    """
    cursor.execute(product_ddl)
    print("   - Table 'dim_product' cr√©√©e (avec TEXT).")

    # Cr√©ation Faits
    fact_ddl = """
    CREATE TABLE fact_nutrition_snapshot (
        fact_sk INT AUTO_INCREMENT PRIMARY KEY,
        product_sk INT NOT NULL,
        date_sk INT NOT NULL,

        nutriscore_grade CHAR(1),
        ecoscore_grade CHAR(1),
        nova_group INT,
        energy_kcal_100g FLOAT,
        sugars_100g FLOAT,
        salt_100g FLOAT,
        proteins_100g FLOAT,

        FOREIGN KEY (product_sk) REFERENCES dim_product(product_sk)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    """
    cursor.execute(fact_ddl)
    print("   - Table 'fact_nutrition_snapshot' cr√©√©e.")

    conn.close()
    print("‚úÖ Datamart pr√™t.")

try:
    init_datamart()
except Exception as e:
    print(f"‚ùå Erreur MySQL : {e}")

## üöö √âtape 5 : Chargement de la Dimension Produit (Initial Load)

Nous s√©parons les donn√©es en deux flux :
1.  **Dimension (`dim_product`) :** Contient les descriptions et l'historique.
2.  **Faits (`fact_nutrition_snapshot`) :** Contient les chiffres.

Ici, nous chargeons la dimension.
* **Transformation :** On ne garde que les colonnes descriptives.
* **Initialisation SCD2 :** Comme c'est le premier chargement, on fixe :
    * `effective_from` = La date de modification du produit (`last_modified_ts`).
    * `effective_to` = '9999-12-31' (Date infinie = produit actif).
    * `is_current` = True.
* **√âcriture JDBC :** On pousse vers MySQL en mode `append`.

In [None]:
from pyspark.sql.functions import lit
from etl.shared.config import MYSQL_CONFIG

print("‚è≥ Pr√©paration de la dimension Produit...")

# S√©lection finale
df_dim_product_init = df_hashed.select(
    col("code"),
    col("product_name"),
    col("brands"),
    col("categories"),
    col("row_hash"),
    col("last_modified_ts").alias("effective_from"),
    lit("9999-12-31 23:59:59").cast("timestamp").alias("effective_to"),
    lit(True).alias("is_current")
)

# Config JDBC optimis√©e
jdbc_url = MYSQL_CONFIG["url"]
jdbc_props = {
    "user": MYSQL_CONFIG["user"],
    "password": MYSQL_CONFIG["password"],
    "driver": MYSQL_CONFIG["driver"],

    # --- CORRECTION 3 : PETITS PAQUETS ---
    # On r√©duit √† 1000 pour m√©nager le r√©seau et la m√©moire
    "batchsize": "1000"
}

print("üöÄ √âcriture dans MySQL (dim_product)...")

try:
    df_dim_product_init.write \
        .jdbc(url=jdbc_url, table="dim_product", mode="append", properties=jdbc_props)

    print("‚úÖ Chargement termin√© avec succ√®s !")

except Exception as e:
    print(f"‚ùå Erreur d'√©criture : {e}")