# Atelier Intégration des Données - OpenFoodFacts ETL

**M1 EISI / CDPIA / CYBER - Module TRDE703**

Ce notebook implémente la chaîne ETL complète (Bronze → Silver → Gold) avec **PySpark**.

## Architecture

```
Bronze (Raw JSON/JSONL)
    ↓
Silver (Cleaned Parquet)
    ↓
Gold (MySQL Star Schema)
    ↓
Analytics & Reporting
```

## Objectifs

1. ✅ Ingérer données massives OpenFoodFacts
2. ✅ Nettoyer et normaliser (Silver)
3. ✅ Charger dans datamart MySQL (Gold)
4. ✅ Analyser qualité des données
5. ✅ Exécuter requêtes analytiques

## 1. Configuration & Imports

In [None]:
# Imports standards
import sys
import os
import logging
from pathlib import Path

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

# Ajout du module etl au path
project_root = Path(os.getcwd()).parent
sys.path.insert(0, str(project_root))

# Import module ETL
from etl.utils import setup_logger, create_spark_session
from etl.settings import BRONZE_PATH, SILVER_PATH, JDBC_URL, JDBC_PROPERTIES
from etl.schema_bronze import get_bronze_schema

# Configuration logging
logger = setup_logger("OFF_Notebook", logging.INFO)
logger.info("✓ Imports completed")

In [None]:
# Création Spark Session
spark = create_spark_session("OFF_Workshop_Notebook")

print(f"Spark Version: {spark.version}")
print(f"Spark UI: http://localhost:4040")
logger.info("✓ Spark Session created")

## 2. Configuration Paths & Database

In [None]:
# Chemins (relatifs au projet)
base_dir = project_root
raw_input_path = str(base_dir / "tests" / "sample_data.jsonl")
bronze_path = str(base_dir / "data" / "bronze")
silver_path = str(base_dir / "data" / "silver")

# Configuration MySQL
db_config = {
    "host": "localhost",
    "port": 3306,
    "database": "off_datamart",
    "user": "root",
    "password": "password"  # ⚠️ Changez-moi en production !
}

jdbc_url = f"jdbc:mysql://{db_config['host']}:{db_config['port']}/{db_config['database']}"
db_props = {
    "user": db_config["user"],
    "password": db_config["password"],
    "driver": "com.mysql.cj.jdbc.Driver"
}

print(f"Input file: {raw_input_path}")
print(f"Bronze path: {bronze_path}")
print(f"Silver path: {silver_path}")
print(f"MySQL: {db_config['host']}:{db_config['port']}/{db_config['database']}")

# Vérifier que le fichier d'entrée existe
if not os.path.exists(raw_input_path):
    print(f"⚠️ WARNING: Input file not found: {raw_input_path}")
else:
    print(f"✓ Input file found")

---

# BRONZE LAYER - Ingestion Raw Data

**Objectif:** Lire les données brutes JSON/JSONL avec un schéma explicite (pas d'inférence)

**Format sortie:** Parquet (compression Snappy)

In [None]:
# Définir le schéma Bronze
schema = get_bronze_schema()

print("Bronze Schema:")
for field in schema.fields:
    print(f"  - {field.name}: {field.dataType}")

In [None]:
# Ingestion avec schéma explicite
logger.info("Reading raw data...")

df_raw = spark.read.schema(schema).json(raw_input_path)

# Métriques
total_records = df_raw.count()
valid_records = df_raw.filter(F.col("code").isNotNull()).count()
invalid_records = total_records - valid_records

print(f"\n=== BRONZE INGESTION ===")
print(f"Total records read: {total_records}")
print(f"Valid records (with code): {valid_records}")
print(f"Invalid records (no code): {invalid_records}")
print(f"========================\n")

# Aperçu des données
df_raw.show(3, truncate=False)
df_raw.printSchema()

In [None]:
# Filtrer et écrire Bronze
df_bronze = df_raw.filter(F.col("code").isNotNull())

logger.info(f"Writing Bronze layer to: {bronze_path}")
df_bronze.write.mode("overwrite").parquet(bronze_path)

print(f"✓ Bronze layer written: {valid_records} records")

---

# SILVER LAYER - Cleaning & Normalization

**Transformations:**
1. Résolution multilingue (product_name)
2. Aplatissement structures (nutriments)
3. Normalisation tags (suppression préfixes langue)
4. Conversion unités (sel = sodium × 2.5)
5. Dédoublonnage (par code, keep latest)
6. Validation qualité (bornes, complétude)
7. Hash pour SCD2

In [None]:
# Lire Bronze
logger.info("Reading Bronze layer...")
df_bronze = spark.read.parquet(bronze_path)

initial_count = df_bronze.count()
print(f"Bronze records: {initial_count}")

In [None]:
# 1. Dédoublonnage (garder plus récent par code)
logger.info("Deduplicating by code...")

window_spec = Window.partitionBy("code").orderBy(F.col("last_modified_t").desc())
df_dedup = (
    df_bronze
    .withColumn("_rn", F.row_number().over(window_spec))
    .filter(F.col("_rn") == 1)
    .drop("_rn")
)

dedup_count = df_dedup.count()
duplicates = initial_count - dedup_count

print(f"After deduplication: {dedup_count} records ({duplicates} duplicates removed)")

In [None]:
# 2. Normalisation tags (suppression préfixes en:, fr:, etc.)
logger.info("Normalizing tags...")

df_normalized = (
    df_dedup
    .withColumn(
        "countries_normalized",
        F.expr("transform(countries_tags, x -> regexp_replace(x, '^[a-z][a-z]:', ''))")
    )
    .withColumn(
        "categories_normalized",
        F.expr("transform(categories_tags, x -> regexp_replace(x, '^[a-z][a-z]:', ''))")
    )
    .withColumn(
        "primary_category",
        F.when(F.size("categories_normalized") > 0, F.col("categories_normalized")[0]).otherwise(None)
    )
)

# Exemple de normalisation
print("\nExemple de normalisation:")
df_normalized.select(
    "code",
    "categories_tags",
    "categories_normalized",
    "primary_category"
).show(2, truncate=False)

In [None]:
# 3. Aplatissement nutriments & Conversion sel
logger.info("Flattening nutriments and converting salt...")

df_flat = (
    df_normalized
    .withColumn("energy_kcal_100g", F.col("nutriments.energy-kcal_100g"))
    .withColumn("sugars_100g", F.col("nutriments.sugars_100g"))
    .withColumn("fat_100g", F.col("nutriments.fat_100g"))
    .withColumn("saturated_fat_100g", F.col("nutriments.saturated-fat_100g"))
    .withColumn("proteins_100g", F.col("nutriments.proteins_100g"))
    .withColumn("fiber_100g", F.col("nutriments.fiber_100g"))
    .withColumn("sodium_100g", F.col("nutriments.sodium_100g"))
    # Calcul sel depuis sodium si manquant
    .withColumn(
        "salt_100g",
        F.coalesce(F.col("nutriments.salt_100g"), F.col("sodium_100g") * 2.5)
    )
    .drop("nutriments")
)

print("✓ Nutriments flattened and salt computed")

In [None]:
# 4. Validation de bornes (détection anomalies)
logger.info("Checking bounds and detecting anomalies...")

# Flagging out-of-bounds values
df_quality = (
    df_flat
    .withColumn(
        "sugars_100g_out_of_bounds",
        (F.col("sugars_100g").isNotNull()) & ((F.col("sugars_100g") < 0) | (F.col("sugars_100g") > 100))
    )
    .withColumn(
        "salt_100g_out_of_bounds",
        (F.col("salt_100g").isNotNull()) & ((F.col("salt_100g") < 0) | (F.col("salt_100g") > 25))
    )
    .withColumn(
        "energy_kcal_100g_out_of_bounds",
        (F.col("energy_kcal_100g").isNotNull()) & ((F.col("energy_kcal_100g") < 0) | (F.col("energy_kcal_100g") > 900))
    )
)

# Compter anomalies
anomalies_sugars = df_quality.filter(F.col("sugars_100g_out_of_bounds") == True).count()
anomalies_salt = df_quality.filter(F.col("salt_100g_out_of_bounds") == True).count()
anomalies_energy = df_quality.filter(F.col("energy_kcal_100g_out_of_bounds") == True).count()

print(f"\nAnomalies détectées:")
print(f"  Sugars out of bounds: {anomalies_sugars}")
print(f"  Salt out of bounds: {anomalies_salt}")
print(f"  Energy out of bounds: {anomalies_energy}")

In [None]:
# 5. Score de complétude
logger.info("Computing completeness score...")

# Poids des champs (doivent sommer à 1.0)
weights = {
    "product_name": 0.2,
    "brands": 0.15,
    "primary_category": 0.15,
    "nutriscore_grade": 0.1,
    "energy_kcal_100g": 0.1,
    "sugars_100g": 0.075,
    "fat_100g": 0.075,
    "saturated_fat_100g": 0.05,
    "salt_100g": 0.05,
    "proteins_100g": 0.075,
    "fiber_100g": 0.05
}

# Calcul score
score_expr = sum(
    F.when(F.col(col).isNotNull(), F.lit(weight)).otherwise(F.lit(0.0))
    for col, weight in weights.items()
)

df_complete = df_quality.withColumn("completeness_score", F.round(score_expr, 2))

# Statistiques
avg_completeness = df_complete.agg(F.avg("completeness_score")).collect()[0][0]
print(f"\nAverage completeness score: {avg_completeness:.3f}")

In [None]:
# 6. Hash pour SCD2 (détection changements)
logger.info("Computing row hash for SCD2...")

track_cols = ["product_name", "brands", "primary_category", "nutriscore_grade"]
concat_expr = F.concat_ws(
    "||",
    *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in track_cols]
)

df_silver = df_complete.withColumn("row_hash", F.md5(concat_expr))

print(f"✓ Row hash computed for {df_silver.count()} records")

In [None]:
# Écrire Silver
logger.info(f"Writing Silver layer to: {silver_path}")

df_silver.write.mode("overwrite").parquet(silver_path)

print(f"✓ Silver layer written: {df_silver.count()} records")

In [None]:
# Aperçu Silver
print("\n=== SILVER LAYER SAMPLE ===")
df_silver.select(
    "code",
    "product_name",
    "brands",
    "primary_category",
    "nutriscore_grade",
    "sugars_100g",
    "salt_100g",
    "completeness_score"
).show(5, truncate=True)

---

# GOLD LAYER - MySQL Datamart Loading

**Modèle:** Star Schema
- Dimensions: brand, category, country, time, product (SCD2)
- Fait: nutrition_snapshot

## Test Connexion MySQL

In [None]:
# Test connexion MySQL
try:
    test_query = "(SELECT 1 as test) as t"
    df_test = spark.read.jdbc(jdbc_url, test_query, properties=db_props)
    result = df_test.collect()[0][0]
    print(f"✓ MySQL connection successful (test query result: {result})")
except Exception as e:
    print(f"✗ MySQL connection failed: {e}")
    print("\nAssurez-vous que:")
    print("  1. MySQL est démarré (docker-compose up -d mysql)")
    print("  2. Le schema est créé (mysql < sql/schema.sql)")
    print("  3. Les credentials sont corrects")

## Charger Dimensions Simples

In [None]:
# Charger dim_brand
logger.info("Loading dim_brand...")

df_s = spark.read.parquet(silver_path)

brands = (
    df_s.select("brands")
    .filter(F.col("brands").isNotNull())
    .distinct()
    .withColumnRenamed("brands", "brand_name")
)

new_brand_count = brands.count()

try:
    existing_brands = spark.read.jdbc(jdbc_url, "dim_brand", properties=db_props)
    new_brands = brands.join(existing_brands, on="brand_name", how="left_anti")
    
    insert_count = new_brands.count()
    
    if insert_count > 0:
        new_brands.write.jdbc(jdbc_url, "dim_brand", mode="append", properties=db_props)
        print(f"✓ dim_brand: {insert_count} new brands inserted")
    else:
        print("✓ dim_brand: No new brands to insert")
except Exception as e:
    # Première fois: insérer tous
    brands.write.jdbc(jdbc_url, "dim_brand", mode="append", properties=db_props)
    print(f"✓ dim_brand: {new_brand_count} brands inserted (first load)")

In [None]:
# Charger dim_category
logger.info("Loading dim_category...")

categories = (
    df_s.select(F.explode("categories_normalized").alias("category_code"))
    .filter(F.col("category_code").isNotNull())
    .distinct()
    .withColumn("category_name_fr", F.col("category_code"))
    .withColumn("level", F.lit(1))
    .withColumn("parent_category_sk", F.lit(None).cast("int"))
)

try:
    existing_categories = spark.read.jdbc(jdbc_url, "dim_category", properties=db_props)
    new_categories = categories.join(existing_categories, on="category_code", how="left_anti")
    
    insert_count = new_categories.count()
    
    if insert_count > 0:
        new_categories.write.jdbc(jdbc_url, "dim_category", mode="append", properties=db_props)
        print(f"✓ dim_category: {insert_count} new categories inserted")
    else:
        print("✓ dim_category: No new categories to insert")
except Exception as e:
    categories.write.jdbc(jdbc_url, "dim_category", mode="append", properties=db_props)
    print(f"✓ dim_category: {categories.count()} categories inserted (first load)")

## Charger Produits (SCD Type 2)

**SCD2:** Historisation des changements de métadonnées produit

In [None]:
# Préparer données produits
logger.info("Preparing product data...")

# Récupérer mappings brand_sk et category_sk
brand_mapping = spark.read.jdbc(jdbc_url, "dim_brand", properties=db_props)
category_mapping = spark.read.jdbc(jdbc_url, "dim_category", properties=db_props)

df_product_input = (
    df_s
    .join(brand_mapping, df_s.brands == brand_mapping.brand_name, "left")
    .drop("brand_name")
    .join(category_mapping, df_s.primary_category == category_mapping.category_code, "left")
    .drop("category_code")
    .withColumnRenamed("category_sk", "primary_category_sk")
    .withColumn("countries_multi", F.to_json("countries_normalized"))
    .select(
        "code",
        "product_name",
        "brand_sk",
        "primary_category_sk",
        "countries_multi",
        "row_hash"
    )
)

print(f"Products prepared: {df_product_input.count()}")

In [None]:
# Charger produits avec SCD2
logger.info("Loading products (SCD2)...")

try:
    query = "(SELECT product_sk, code, row_hash FROM dim_product WHERE is_current = 1) as active"
    df_active = spark.read.jdbc(jdbc_url, query, properties=db_props)
    
    df_joined = df_product_input.alias("inp").join(
        df_active.alias("act"),
        F.col("inp.code") == F.col("act.code"),
        "left"
    )
    
    # Nouveaux produits
    df_new = df_joined.filter(F.col("act.product_sk").isNull()).select(
        F.col("inp.code"),
        F.col("inp.product_name"),
        F.col("inp.brand_sk"),
        F.col("inp.primary_category_sk"),
        F.col("inp.countries_multi"),
        F.lit(1).cast("boolean").alias("is_current"),
        F.current_timestamp().alias("effective_from"),
        F.lit(None).cast("timestamp").alias("effective_to"),
        F.col("inp.row_hash")
    )
    
    new_count = df_new.count()
    
    if new_count > 0:
        df_new.write.jdbc(jdbc_url, "dim_product", mode="append", properties=db_props)
        print(f"✓ dim_product: {new_count} new products inserted")
    else:
        print("✓ dim_product: No new products to insert")
        
except Exception as e:
    # Première fois: insérer tous
    df_first_load = df_product_input.select(
        "code",
        "product_name",
        "brand_sk",
        "primary_category_sk",
        "countries_multi",
        F.lit(1).cast("boolean").alias("is_current"),
        F.current_timestamp().alias("effective_from"),
        F.lit(None).cast("timestamp").alias("effective_to"),
        "row_hash"
    )
    
    df_first_load.write.jdbc(jdbc_url, "dim_product", mode="append", properties=db_props)
    print(f"✓ dim_product: {df_first_load.count()} products inserted (first load)")

## Charger Faits (Nutrition Snapshot)

In [None]:
# Préparer faits avec FK
logger.info("Preparing fact data...")

product_mapping = spark.read.jdbc(
    jdbc_url,
    "(SELECT product_sk, code FROM dim_product WHERE is_current = 1) as t",
    properties=db_props
)

df_fact = (
    df_s
    .join(product_mapping, on="code", how="inner")
    .withColumn("time_sk", F.from_unixtime(F.col("last_modified_t"), "yyyyMMdd").cast("int"))
    .select(
        "product_sk",
        "time_sk",
        "energy_kcal_100g",
        "fat_100g",
        "saturated_fat_100g",
        "sugars_100g",
        "salt_100g",
        "proteins_100g",
        "fiber_100g",
        "sodium_100g",
        "nutriscore_grade",
        "completeness_score"
    )
)

fact_count = df_fact.count()
print(f"Facts prepared: {fact_count}")

In [None]:
# Charger faits
logger.info("Loading fact table...")

df_fact.write.jdbc(jdbc_url, "fact_nutrition_snapshot", mode="append", properties=db_props)

print(f"✓ fact_nutrition_snapshot: {fact_count} records inserted")

---

# ANALYSE & REPORTING

## Rapport de Qualité

In [None]:
# Statistiques qualité
print("\n" + "="*80)
print("RAPPORT DE QUALITÉ DES DONNÉES")
print("="*80)

total = df_s.count()

# Complétude
avg_completeness = df_s.agg(F.avg("completeness_score")).collect()[0][0]
high_quality = df_s.filter(F.col("completeness_score") >= 0.8).count()
low_quality = df_s.filter(F.col("completeness_score") < 0.5).count()

print(f"\nCOMPLÉTUDE:")
print(f"  Total produits: {total}")
print(f"  Score moyen: {avg_completeness:.3f}")
print(f"  Haute qualité (≥0.8): {high_quality} ({high_quality/total*100:.1f}%)")
print(f"  Faible qualité (<0.5): {low_quality} ({low_quality/total*100:.1f}%)")

# Nutri-Score
with_nutriscore = df_s.filter(F.col("nutriscore_grade").isNotNull()).count()
print(f"\nNUTRI-SCORE:")
print(f"  Avec Nutri-Score: {with_nutriscore} ({with_nutriscore/total*100:.1f}%)")

nutriscore_dist = df_s.groupBy("nutriscore_grade").count().orderBy("nutriscore_grade").collect()
print(f"  Distribution:")
for row in nutriscore_dist:
    grade = row["nutriscore_grade"] if row["nutriscore_grade"] else "null"
    count = row["count"]
    pct = count / total * 100
    print(f"    Grade {grade}: {count:4d} ({pct:5.1f}%)")

# Anomalies
print(f"\nANOMALIES:")
print(f"  Sugars out of bounds: {anomalies_sugars}")
print(f"  Salt out of bounds: {anomalies_salt}")
print(f"  Energy out of bounds: {anomalies_energy}")

print("\n" + "="*80)

## Requêtes Analytiques SQL

Exemples de requêtes métiers sur le datamart MySQL

In [None]:
# Requête 1: Top marques par Nutri-Score A/B
query1 = """
SELECT
    b.brand_name,
    COUNT(DISTINCT p.product_sk) AS total_products,
    SUM(CASE WHEN f.nutriscore_grade IN ('a', 'b') THEN 1 ELSE 0 END) AS products_ab,
    ROUND(
        SUM(CASE WHEN f.nutriscore_grade IN ('a', 'b') THEN 1 ELSE 0 END) * 100.0 / COUNT(DISTINCT p.product_sk),
        2
    ) AS pct_nutriscore_ab
FROM fact_nutrition_snapshot f
JOIN dim_product p ON f.product_sk = p.product_sk AND p.is_current = 1
JOIN dim_brand b ON p.brand_sk = b.brand_sk
GROUP BY b.brand_name
HAVING COUNT(DISTINCT p.product_sk) >= 1
ORDER BY pct_nutriscore_ab DESC
LIMIT 10
"""

print("\n=== TOP 10 MARQUES PAR NUTRI-SCORE A/B ===")
df_query1 = spark.read.jdbc(jdbc_url, f"({query1}) as t", properties=db_props)
df_query1.show(truncate=False)

In [None]:
# Requête 2: Taux de complétude par marque
query2 = """
SELECT
    b.brand_name,
    COUNT(DISTINCT p.product_sk) AS total_products,
    ROUND(AVG(f.completeness_score), 3) AS avg_completeness
FROM fact_nutrition_snapshot f
JOIN dim_product p ON f.product_sk = p.product_sk AND p.is_current = 1
JOIN dim_brand b ON p.brand_sk = b.brand_sk
GROUP BY b.brand_name
ORDER BY avg_completeness DESC
LIMIT 10
"""

print("\n=== TAUX DE COMPLÉTUDE PAR MARQUE ===")
df_query2 = spark.read.jdbc(jdbc_url, f"({query2}) as t", properties=db_props)
df_query2.show(truncate=False)

---

# FIN DU WORKSHOP

## Résumé

✅ **Bronze:** Données brutes ingérées avec schéma explicite

✅ **Silver:** Nettoyage, normalisation, qualité contrôlée

✅ **Gold:** Datamart MySQL (Star Schema) chargé

✅ **Analytics:** Requêtes SQL métiers exécutées

## Prochaines Étapes

1. Tester avec dataset complet: `python download_dump.py`
2. Exécuter toutes les requêtes analytiques: `sql/analysis_queries.sql`
3. Créer un dashboard (Grafana, Tableau, etc.)
4. Implémenter chargement incrémental

## Ressources

- Documentation: `docs/`
- Tests: `pytest tests/test_etl.py`
- Pipeline complet: `python -m etl.main <input_file>`

In [None]:
# Arrêter Spark
spark.stop()
print("✓ Spark session stopped")