# ETL PySpark - OpenFoodFacts Datamart
## Architecture Dimensionnelle (Star Schema)

Ce notebook ex√©cute un pipeline ETL complet :
- **Bronze** ‚Üí Lecture CSV brute
- **Silver** ‚Üí Nettoyage, validation, d√©duplication
- **Gold** ‚Üí Dimensions (SCD2) + Table de faits

**Source** : 300k produits OpenFoodFacts
**Destination** : PostgreSQL datamart `off_dm`

## Section 0 : Setup et imports
Configure le chemin du driver PostgreSQL et initialise les biblioth√®ques Spark.

In [None]:
import os
from datetime import datetime, timezone
import json

# Configuration du driver PostgreSQL pour Spark
# √Ä adapter selon votre chemin r√©el du JAR PostgreSQL
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    r'--jars "C:\Users\Test\Desktop\TP Benoit\postgresql-42.7.8.jar" pyspark-shell'
)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType
)

print("‚úì Imports r√©ussis")

## Section 1 : Constantes m√©tier

D√©finit les r√®gles de validation :
- **NUTRIENT_BOUNDS** : Plages acceptables pour chaque nutriment
- **ANOMALY_THRESHOLDS** : Limites pour d√©tecter les valeurs aberrantes
- **MIN_NUTRIENTS_REQUIRED** : Minimum de nutriments pour inclure un produit

In [None]:
# Plages de validit√© pour chaque nutriment (min, max) en g/100g
NUTRIENT_BOUNDS = {
    "energy_kcal_100g": (0, 900),
    "fat_100g": (0, 100),
    "saturated_fat_100g": (0, 100),
    "sugars_100g": (0, 100),
    "salt_100g": (0, 100),
    "proteins_100g": (0, 100),
    "fiber_100g": (0, 100),
    "sodium_100g": (0, 40),
}

# Seuils pour d√©tecter les anomalies extr√™mes
ANOMALY_THRESHOLDS = {
    "sugars_100g": 80,       # Pas de produit avec > 80g de sucre/100g
    "salt_100g": 25,         # Pas de produit avec > 25g de sel/100g
    "proteins_100g": 90,     # Pas de produit avec > 90g prot√©ines/100g
}

# Facteur de conversion sel ‚Üî sodium (formule chimique NaCl)
SALT_SODIUM_FACTOR = 2.5

# Minimum de nutriments pour garder un produit
MIN_NUTRIENTS_REQUIRED = 3

print(f"‚úì Constantes d√©finies")
print(f"  ‚Üí {len(NUTRIENT_BOUNDS)} nutriments √† valider")
print(f"  ‚Üí {len(ANOMALY_THRESHOLDS)} seuils d'anomalies")

## Section 2 : Fonction utilitaire de nettoyage

Convertit les colonnes texte en valeurs num√©riques valides :
1. Remplace virgules par points (notation europ√©enne)
2. Rejette les URLs
3. Valide le format num√©rique (y compris notation scientifique)

In [None]:
def clean_numeric(col_name):
    """Convertit une colonne texte en nombre valide (ou NULL si invalide)."""
    col = F.col(col_name)
    # Normalise la d√©cimale (virgule ‚Üí point)
    col = F.regexp_replace(col, ",", ".")
    # Rejette les URLs
    col = F.when(col.like("http%"), None).otherwise(col)
    # Valide le format num√©rique (accepte aussi notation sci : 1.2e-3)
    col = F.when(
        col.rlike(r'^[+-]?[0-9]+(\.[0-9]+)?([eE][+-]?[0-9]+)?$'),
        col.cast("double")
    ).otherwise(None)
    return col

print("‚úì Fonction clean_numeric d√©finie")

## Section 3 : Cr√©ation de la SparkSession

Lance le contexte Spark avec configuration pour parser les dates au format legacy.

In [None]:
spark = SparkSession.builder \
    .appName("OFF_ETL") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print(f"‚úì SparkSession cr√©√©e : {spark.version}")

## Section 4 : Lecture Bronze (CSV)

Charge le fichier CSV 300k produits avec :
- S√©parateur TAB
- Cast explicite de chaque colonne
- Nettoyage des colonnes num√©riques via `clean_numeric()`
- Limitation √† 500k lignes pour test

In [None]:
# Lecture CSV brute
df_raw = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("sep", "\t")          # S√©parateur TAB
    .option("inferSchema", "false") # Pas d'inf√©rence, cast explicite
    .load("data/300k_off.csv")
)

# S√©lection et cast des colonnes utiles
df_raw = df_raw.select(
    # Identifiants et dates
    F.col("code").cast("string"),
    F.col("creator").cast("string"),
    F.col("created_datetime").cast("string"),
    F.col("last_modified_t").cast("int"),
    F.col("last_modified_datetime").cast("string"),
    F.col("last_modified_by").cast("string"),
    F.col("last_updated_t").cast("int"),
    F.col("last_updated_datetime").cast("string"),
    
    # Descriptifs
    F.col("product_name").cast("string"),
    F.col("generic_name").cast("string"),
    F.col("quantity").cast("string"),
    F.col("packaging").cast("string"),
    F.col("packaging_tags").cast("string"),
    F.col("packaging_en").cast("string"),
    F.col("packaging_text").cast("string"),
    
    # Cat√©gorisation
    F.col("brands").cast("string"),
    F.col("categories").cast("string"),
    F.col("countries").cast("string"),
    
    # Scores nutritionnels
    clean_numeric("nutriscore_score").alias("nutriscore_score"),
    F.col("nutriscore_grade").cast("string").alias("nutriscore_grade"),
    clean_numeric("nova_group").cast("int").alias("nova_group"),
    
    # Classification PNNS
    F.col("pnns_groups_1").cast("string"),
    F.col("pnns_groups_2").cast("string"),
    
    # Nutriments (conversion et nettoyage)
    clean_numeric("energy-kcal_100g").alias("energy_kcal_100g"),
    clean_numeric("fat_100g").alias("fat_100g"),
    clean_numeric("saturated-fat_100g").alias("saturated_fat_100g"),
    clean_numeric("sugars_100g").alias("sugars_100g"),
    clean_numeric("salt_100g").alias("salt_100g"),
    clean_numeric("proteins_100g").alias("proteins_100g"),
    clean_numeric("fiber_100g").alias("fiber_100g"),
    clean_numeric("sodium_100g").alias("sodium_100g"),
)

# Limite pour test (enlever en prod)
df_raw = df_raw.limit(500000)

print(f"‚úì Bronze charg√©e : {df_raw.count()} lignes")

## Section 5 : D√©duplication Silver (SCD2 Ready)

Pour chaque code produit, garde la version la plus r√©cente via `last_modified_t`.
Utilise une fen√™tre Spark avec `row_number()` pour identifier et filtrer les doublons.

In [None]:
# Filtre pr√©alable : √©carte produits sans code ou sans nom
df_filtered = (
    df_raw
    .filter(F.col("code").isNotNull())
    .filter(F.col("product_name").isNotNull())
)

print(f"Apr√®s filtre de base : {df_filtered.count()} lignes")

# D√©duplication : garde la version la plus r√©cente par code
w = Window.partitionBy("code").orderBy(F.col("last_modified_t").desc_nulls_last())

df_silver = (
    df_filtered
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)  # Garde seulement la premi√®re (la plus r√©cente)
    .drop("rn")
)

print(f"‚úì Silver d√©dupliqu√©e : {df_silver.count()} lignes uniques")

## Section 6 : Validation des nutriments (NUTRIENT_BOUNDS)

Met les valeurs hors bornes √† NULL selon NUTRIENT_BOUNDS.
Exemple : √©nergies > 900 kcal/100g ‚Üí consid√©r√©es comme erreurs.

In [None]:
# Applique les bornes de validation √† chaque nutriment
for col_name, (min_val, max_val) in NUTRIENT_BOUNDS.items():
    if col_name in df_silver.columns:
        df_silver = df_silver.withColumn(
            col_name,
            F.when(
                (F.col(col_name) >= min_val) & (F.col(col_name) <= max_val),
                F.col(col_name),
            ).otherwise(None),  # Hors bornes ‚Üí NULL
        )

print(f"‚úì Nutriments valid√©s contre NUTRIENT_BOUNDS")

## Section 7 : Coh√©rence gras satur√© ‚â§ gras total

V√©rifie que `saturated_fat ‚â§ fat_total`. Si violation ‚Üí met saturated_fat √† NULL.
C'est une r√®gle chimique obligatoire (les graisses satur√©es sont un sous-ensemble).

In [None]:
# Diagnostique : compte les incoh√©rences avant correction
nb_incoherent_satfat = df_silver.filter(
    (F.col("saturated_fat_100g").isNotNull()) &
    (F.col("fat_100g").isNotNull()) &
    (F.col("saturated_fat_100g") > F.col("fat_100g"))
).count()

print(f"Coh√©rence : {nb_incoherent_satfat} produits avec saturated_fat > fat")

# Correction : met √† NULL les gras satur√©s incoh√©rents
df_silver = df_silver.withColumn(
    "saturated_fat_100g",
    F.when(
        F.col("saturated_fat_100g") > F.col("fat_100g"),
        None  # Violation chimique ‚Üí NULL
    ).otherwise(F.col("saturated_fat_100g")),
)

print(f"‚úì Gras satur√© corrig√©")

## Section 8 : Harmonisation sel / sodium

Compl√®te les valeurs manquantes via la relation : **sodium = sel / 2.5**

Si sodium absent ‚Üí calcule depuis sel
Si sel absent ‚Üí calcule depuis sodium

In [None]:
# Si sodium manquant, le calcule depuis sel
df_silver = df_silver.withColumn(
    "sodium_100g",
    F.when(
        F.col("sodium_100g").isNull() & F.col("salt_100g").isNotNull(),
        F.round(F.col("salt_100g") / SALT_SODIUM_FACTOR, 2),
    ).otherwise(F.col("sodium_100g")),
)

# Si sel manquant, le calcule depuis sodium
df_silver = df_silver.withColumn(
    "salt_100g",
    F.when(
        F.col("salt_100g").isNull() & F.col("sodium_100g").isNotNull(),
        F.round(F.col("sodium_100g") * SALT_SODIUM_FACTOR, 2),
    ).otherwise(F.col("salt_100g")),
)

print(f"‚úì Sel/sodium harmonis√©s (facteur: {SALT_SODIUM_FACTOR})")

## Section 9 : Compte de nutriments pr√©sents

Pour chaque produit, compte le nombre de nutriments non-NULL.
Filtre ensuite : garde seulement les produits avec ‚â• MIN_NUTRIENTS_REQUIRED nutriments.

In [None]:
# Liste des colonnes nutriments √† compter
nutrient_cols = [
    "energy_kcal_100g",
    "fat_100g",
    "saturated_fat_100g",
    "sugars_100g",
    "salt_100g",
    "proteins_100g",
    "fiber_100g",
]

# Ajoute colonne de compte
df_silver = df_silver.withColumn(
    "nutrients_present",
    sum([F.when(F.col(c).isNotNull(), 1).otherwise(0) for c in nutrient_cols]),
)

nb_before = df_silver.count()

# Filtre : garde seulement les produits avec assez de nutriments
df_silver = df_silver.filter(F.col("nutrients_present") >= MIN_NUTRIENTS_REQUIRED)

nb_after = df_silver.count()
nb_removed_nutrients = nb_before - nb_after

print(f"‚úì Filtre nutriments : {nb_removed_nutrients} produits supprim√©s")
print(f"  ‚Üí {nb_after} produits conserv√©s (‚â• {MIN_NUTRIENTS_REQUIRED} nutriments)")

## Section 10 : Suppression des anomalies extr√™mes

Utilise ANOMALY_THRESHOLDS pour rejeter les valeurs aberrantes.
Exemple : aucun produit normal ne contient 80g+ de sucre/100g.

In [None]:
anomalies_removed = {}  # Suivi des suppressions par colonne

for col_name, threshold in ANOMALY_THRESHOLDS.items():
    if col_name in df_silver.columns:
        before_col = df_silver.count()
        # Filtre : garde les NULLs ou les valeurs <= seuil
        df_silver = df_silver.filter(
            F.col(col_name).isNull() | (F.col(col_name) <= threshold)
        )
        after_col = df_silver.count()
        anomalies_removed[col_name] = before_col - after_col

print(f"‚úì Anomalies supprim√©es :")
for col_name, count in anomalies_removed.items():
    print(f"  ‚Üí {col_name} : {count} lignes")

## Section 11 : Validation somme nutriments ‚â§ 110g

V√©rifie que la somme gras + sucres + prot√©ines + fibres + sel ‚â§ 110g/100g
(car 100g max, mais eau + autres composants)

In [None]:
# Calcule la somme des nutriments cl√©s
df_silver = df_silver.withColumn(
    "_nutrient_sum",
    F.coalesce(F.col("fat_100g"), F.lit(0))
    + F.coalesce(F.col("sugars_100g"), F.lit(0))
    + F.coalesce(F.col("proteins_100g"), F.lit(0))
    + F.coalesce(F.col("fiber_100g"), F.lit(0))
    + F.coalesce(F.col("salt_100g"), F.lit(0)),
)

nb_before_sum = df_silver.count()
df_silver = df_silver.filter(F.col("_nutrient_sum") <= 110)
nb_removed_sum = nb_before_sum - df_silver.count()

print(f"‚úì Validation somme nutriments : {nb_removed_sum} produits supprim√©s")
print(f"  ‚Üí {df_silver.count()} produits avec somme coh√©rente")

## Section 12 : Normalisation des scores

Valide et normalise :
- Nutri-Score ‚Üí uniquement A/B/C/D/E en majuscules
- NOVA Group ‚Üí uniquement 1, 2, 3, 4

In [None]:
# Nutri-Score : garde seulement les grades valides (A-E en majuscules)
df_silver = df_silver.withColumn(
    "nutriscore_grade",
    F.when(
        F.upper(F.col("nutriscore_grade")).isin(["A", "B", "C", "D", "E"]),
        F.upper(F.col("nutriscore_grade")),
    ).otherwise(None),
)

# NOVA Group : garde seulement 1, 2, 3, 4
df_silver = df_silver.withColumn(
    "nova_group",
    F.when(F.col("nova_group").between(1, 4), F.col("nova_group")).otherwise(None),
)

print(f"‚úì Scores normalis√©s")

## Section 13 : Extraction valeurs principales

De champs multi-valu√©s (listes s√©par√©es par virgule), extrait la premi√®re valeur utile :
- Premi√®re marque dans `brands`
- Premier pays dans `countries` (nettoie le pr√©fixe "en:")
- Premi√®re cat√©gorie dans `categories` (nettoie le pr√©fixe "en:")

Puis filtre les valeurs "unknown".

In [None]:
# Extrait la premi√®re valeur de chaque liste
df_silver = (
    df_silver
    .withColumn("brand_primary", F.trim(F.split(F.col("brands"), ",").getItem(0)))
    .withColumn(
        "country_primary",
        F.trim(F.regexp_replace(F.split(F.col("countries"), ",").getItem(0), "^en:", "")),
    )
    .withColumn(
        "category_primary",
        F.trim(F.regexp_replace(F.split(F.col("categories"), ",").getItem(0), "^en:", "")),
    )
)

# Nettoie les valeurs "unknown"
cols_to_clean_unknown = [
    "brand_primary",
    "country_primary",
    "category_primary",
    "pnns_groups_1",
    "pnns_groups_2",
]

for col_name in cols_to_clean_unknown:
    if col_name in df_silver.columns:
        df_silver = df_silver.withColumn(
            col_name,
            F.when(F.lower(F.col(col_name)) == "unknown", None).otherwise(F.col(col_name)),
        )

print(f"‚úì Valeurs principales extraites et nettoy√©es")

## Section 14 : Score de compl√©tude

Mesure de 0 √† 1 bas√©e sur :
- Pr√©sence du nom du produit (25%)
- Pr√©sence des marques (25%)
- Pr√©sence des cat√©gories (25%)
- Ratio nutriments pr√©sents / nutriments totaux (25%)

In [None]:
df_silver = df_silver.withColumn(
    "completeness_score",
    (
        F.when(F.col("product_name").isNotNull(), 1).otherwise(0) +
        F.when(F.col("brands").isNotNull(), 1).otherwise(0) +
        F.when(F.col("categories").isNotNull(), 1).otherwise(0) +
        (F.col("nutrients_present") / F.lit(len(nutrient_cols)))
    ) / F.lit(4.0)
)

avg_completeness = df_silver.agg(F.avg("completeness_score")).first()[0] or 0.0
print(f"‚úì Score compl√©tude calcul√© (moyenne: {avg_completeness:.1%})")

## Section 15 : Probl√®mes de qualit√© (JSON)

Cr√©e un array JSON listant les probl√®mes de chaque produit :
- missing_product_name
- missing_brands
- missing_categories
- no_nutrients

In [None]:
df_silver = df_silver.withColumn(
    "quality_issues",
    F.array(
        F.when(F.col("product_name").isNull(), F.lit("missing_product_name")),
        F.when(F.col("brands").isNull(), F.lit("missing_brands")),
        F.when(F.col("categories").isNull(), F.lit("missing_categories")),
        F.when(F.col("nutrients_present") == 0, F.lit("no_nutrients"))
    )
)

# Filtre pour ne garder que les non-NULLs
df_silver = df_silver.withColumn(
    "quality_issues",
    F.expr("filter(quality_issues, x -> x is not null)")
)

# S√©rialise en JSON pour stockage
df_silver = df_silver.withColumn(
    "quality_issues_json",
    F.to_json(F.col("quality_issues"))
).drop("quality_issues")

print(f"‚úì Probl√®mes de qualit√© identifi√©s et s√©rialis√©s")

## Section 16 : Diagnostic et statistiques Silver

Affiche :
- Dimensions du DataFrame
- Sch√©ma complet
- Statistiques descriptives (colonnes num√©riques)
- Taux de remplissage par colonne
- Aper√ßu des donn√©es
- Distribution Nutri-Score

In [None]:
print("\n" + "="*60)
print("DIAGNOSTIC SILVER")
print("="*60)

nb_rows = df_silver.count()
nb_cols = len(df_silver.columns)
print(f"\nüìä Dimensions : {nb_rows} lignes √ó {nb_cols} colonnes")

print("\nüîç Sch√©ma du DataFrame :")
df_silver.printSchema()

# Colonnes num√©riques pour statistiques
numeric_cols = [
    f.name for f in df_silver.schema.fields
    if f.dataType.simpleString() in ("double", "int", "bigint")
]

if numeric_cols:
    print("\nüìà Statistiques descriptives (colonnes num√©riques) :")
    df_silver.select(*numeric_cols).describe().show(truncate=20)

print("\nüíß Taux de remplissage par colonne :")
print(f"{'Colonne':<30} {'Non-Null':>10} {'Null':>10} {'Remplissage':>12}")
print("-" * 62)
for c in df_silver.columns:
    non_null = df_silver.filter(F.col(c).isNotNull()).count()
    null_count = nb_rows - non_null
    pct = (non_null / nb_rows * 100) if nb_rows > 0 else 0
    print(f"{c:<30} {non_null:>10} {null_count:>10} {pct:>11.2f}%")

print("\nüëÄ Aper√ßu (5 premi√®res lignes) :")
df_silver.show(5, truncate=50)

print(f"\nüíæ Estimation m√©moire : ~{nb_rows * nb_cols * 50 / 1024 / 1024:.2f} MB")

## Section 17 : Distribution Nutri-Score et compl√©tude

Affiche la distribution des notes (A/B/C/D/E) et le score de compl√©tude moyen.

In [None]:
print("\nüìä DISTRIBUTION NUTRI-SCORE")
print("-" * 40)
df_silver.groupBy("nutriscore_grade").count().orderBy("nutriscore_grade").show()

avg_comp = df_silver.agg(F.avg("completeness_score")).first()[0] or 0.0
print(f"\nüìç Score de compl√©tude moyen : {avg_comp:.1%}")

## Section 18 : Cr√©ation des dimensions (Fact & Dimension Tables)

### dim_brand
Extrait les marques uniques et les regroupe avec une cl√© de remplacement (`brand_sk`).
Utilise `monotonically_increasing_id()` pour g√©n√©rer des SKs.

In [None]:
# DIM_BRAND : liste unique des marques
df_dim_brand = (
    df_silver
    .select(F.explode(F.split(F.col("brands"), ",")).alias("brand_name_raw"))
    .withColumn("brand_name", F.trim(F.col("brand_name_raw")))
    .filter(F.col("brand_name").isNotNull() & (F.col("brand_name") != ""))
    .dropDuplicates(["brand_name"])
    .withColumn("brand_sk", F.monotonically_increasing_id())
    .select("brand_sk", "brand_name")
)

print(f"‚úì dim_brand cr√©√©e : {df_dim_brand.count()} marques uniques")

### dim_country
Extrait les pays uniques, nettoie le pr√©fixe "en:", regroupe avec cl√© de remplacement.

In [None]:
# DIM_COUNTRY : liste unique des pays
df_dim_country = (
    df_silver
    .select(F.explode(F.split(F.col("countries"), ",")).alias("country_name_raw"))
    .withColumn(
        "country_name",
        F.trim(F.regexp_replace(F.col("country_name_raw"), "^en:", "")),
    )
    .filter(F.col("country_name").isNotNull() & (F.col("country_name") != ""))
    .dropDuplicates(["country_name"])
    .withColumn("country_sk", F.monotonically_increasing_id())
    .select("country_sk", "country_name")
)

print(f"‚úì dim_country cr√©√©e : {df_dim_country.count()} pays uniques")

### dim_category
Extrait les cat√©gories uniques, nettoie le pr√©fixe "en:", regroupe avec cl√© de remplacement.

In [None]:
# DIM_CATEGORY : liste unique des cat√©gories
df_dim_category = (
    df_silver
    .select(F.explode(F.split(F.col("categories"), ",")).alias("category_name_raw"))
    .withColumn(
        "category_name",
        F.trim(F.regexp_replace(F.col("category_name_raw"), "^en:", "")),
    )
    .filter(F.col("category_name").isNotNull() & (F.col("category_name") != ""))
    .dropDuplicates(["category_name"])
    .withColumn("category_sk", F.monotonically_increasing_id())
    .select("category_sk", "category_name")
)

print(f"‚úì dim_category cr√©√©e : {df_dim_category.count()} cat√©gories uniques")

## Section 19 : dim_product avec SCD2 (Slowly Changing Dimension Type 2)

Cr√©e la dimension produit avec support SCD2 :
1. **Enrichissement** : join avec dim_brand, dim_country, dim_category
2. **Hash des attributs** : SHA256 de tous les champs pour d√©tecter changements
3. **S√©lection finale** : colonnes n√©cessaires pour SCD2

In [None]:
# Enrichit silver avec les noms de dimensions pour les joins
df_products_enriched = (
    df_silver
    .withColumn("brand_name", F.col("brand_primary"))
    .withColumn("country_name", F.col("country_primary"))
    .withColumn("category_name", F.col("category_primary"))
)

# Dimension produit de base (avec doublons si m√™me produit plusieurs fois)
df_dim_product = (
    df_products_enriched
    .select(
        "code",
        "product_name",
        "brand_name",
        "country_name",
        "category_name",
        "nutriscore_grade",
        "nova_group",
        "pnns_groups_1",
        "pnns_groups_2",
        "completeness_score",
        "quality_issues_json"
    )
    .dropDuplicates(["code"])  # Garde 1 version par code pour ce run
)

# Join avec les dimensions pour r√©cup√©rer les SKs
df_dim_product = (
    df_dim_product
    .join(df_dim_brand, on="brand_name", how="left")
    .join(df_dim_country, on="country_name", how="left")
    .join(df_dim_category, on="category_name", how="left")
    .withColumn("product_sk", F.monotonically_increasing_id())
)

# Hash SHA256 pour d√©tecter les changements (SCD2)
cols_for_hash = [
    "product_name",
    "brand_sk",
    "country_sk",
    "category_sk",
    "nutriscore_grade",
    "nova_group",
    "pnns_groups_1",
    "pnns_groups_2",
    "completeness_score",
    "quality_issues_json",
]

df_dim_product = df_dim_product.withColumn(
    "attr_hash",
    F.sha2(F.concat_ws("||".
           *[F.col(c).cast("string") for c in cols_for_hash]), 256)
)

# S√©lection finale pour SCD2
df_dim_product_new = df_dim_product.select(
    "code",
    "product_name",
    "brand_sk",
    "country_sk",
    "category_sk",
    "nutriscore_grade",
    "nova_group",
    "pnns_groups_1",
    "pnns_groups_2",
    "completeness_score",
    "quality_issues_json",
    "attr_hash"
)

print(f"‚úì dim_product_new cr√©√©e : {df_dim_product_new.count()} produits")

## Section 20 : dim_time enrichie

Cr√©e la dimension temps avec :
- Date
- Ann√©e, mois, jour
- Semaine

In [None]:
# Extrait dates uniques et ajoute composantes temporelles
df_time = (
    df_silver
    .select("last_modified_datetime")
    .withColumn("date", F.to_date("last_modified_datetime"))
    .dropDuplicates(["date"])
    .withColumn("time_sk", F.monotonically_increasing_id())
    .select("time_sk", "date")
)

# Ajoute colonnes temporelles
df_dim_time_pg = df_time.select(
    "time_sk",
    F.col("date").alias("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.weekofyear("date").alias("week")
)

print(f"‚úì dim_time cr√©√©e : {df_dim_time_pg.count()} dates uniques")

## Section 21 : Configuration JDBC PostgreSQL

Param√®tres de connexion √† la base de donn√©es PostgreSQL.
√Ä adapter avec vos identifiants r√©els.

In [None]:
# Param√®tres JDBC pour PostgreSQL
jdbc_url = "jdbc:postgresql://localhost:5432/Tp_OFF"
jdbc_properties = {
    "user": "postgres",
    "password": "root",
    "driver": "org.postgresql.Driver",
    "batchsize": "5000"
}

print(f"‚úì Configuration JDBC d√©finie")
print(f"  ‚Üí URL: {jdbc_url}")

## Section 22 : √âcriture des dimensions simples

√âcrit les dimensions qui ne changent pas (non-SCD2) en mode APPEND :
- dim_brand
- dim_country
- dim_category
- dim_time

In [None]:
# √âcriture des dimensions en mode APPEND (accumulation)
print("\nüìù √âcriture des dimensions...")

df_dim_brand.coalesce(4).write \
    .mode("append") \
    .jdbc(jdbc_url, "off_dm.dim_brand", properties=jdbc_properties)
print("  ‚úì dim_brand √©crite")

df_dim_country.coalesce(4).write \
    .mode("append") \
    .jdbc(jdbc_url, "off_dm.dim_country", properties=jdbc_properties)
print("  ‚úì dim_country √©crite")

df_dim_category.coalesce(4).write \
    .mode("append") \
    .jdbc(jdbc_url, "off_dm.dim_category", properties=jdbc_properties)
print("  ‚úì dim_category √©crite")

df_dim_time_pg.coalesce(4).write \
    .mode("append") \
    .jdbc(jdbc_url, "off_dm.dim_time", properties=jdbc_properties)
print("  ‚úì dim_time √©crite")

## Section 23 : Gestion SCD2 pour dim_product

Impl√©mente le "Slowly Changing Dimension Type 2" pour la dimension produit :

1. **Lire** la dimension existante (ou cr√©er vide au 1er run)
2. **Identifier** :
   - Nouveaux produits (code n'existe pas)
   - Produits inchang√©s (hash identique)
   - Produits modifi√©s (hash diff√©rent)
3. **Fermer** les anciennes lignes (effective_to = now, is_current = False)
4. **Ins√©rer** les nouvelles versions

Cela garde l'historique complet des changements produit.

In [None]:
print("\n‚öôÔ∏è Gestion SCD2 pour dim_product...")

current_ts = F.current_timestamp()

# Lecture de la dimension existante
try:
    df_dim_product_existing = spark.read.jdbc(
        jdbc_url,
        "off_dm.dim_product",
        properties=jdbc_properties
    )
    print("  ‚úì dim_product existante lue")
except Exception as e:
    print(f"  ‚ÑπÔ∏è Premi√®re ex√©cution (dim_product cr√©√©e √† partir de z√©ro)")
    # Sch√©ma pour premi√®re cr√©ation
    df_dim_product_existing = spark.createDataFrame(
        [],
        schema=df_dim_product_new.schema
        .add("product_sk", "long")
        .add("effective_from", "timestamp")
        .add("effective_to", "timestamp")
        .add("is_current", "boolean")
    )

# Lignes courantes existantes
df_dim_product_curr = df_dim_product_existing.filter(F.col("is_current") == True)

# Join nouveau vs existant
joined = df_dim_product_new.alias("n").join(
    df_dim_product_curr.select(
        "product_sk", "code", "attr_hash"
    ).alias("e"),
    on="code",
    how="left"
)

# Cat√©gorise les produits
# Nouveaux produits (n'existaient pas)
df_new_products = joined.filter(F.col("e.code").isNull()).select("n.*")
print(f"  ‚Üí {df_new_products.count()} nouveaux produits")

# Produits inchang√©s (hash identique)
df_unchanged = joined.filter(
    (F.col("e.code").isNotNull()) &
    (F.col("n.attr_hash") == F.col("e.attr_hash"))
)
print(f"  ‚Üí {df_unchanged.count()} produits inchang√©s")

# Produits modifi√©s (hash diff√©rent)
df_changed = joined.filter(
    (F.col("e.code").isNotNull()) &
    (F.col("n.attr_hash") != F.col("e.attr_hash"))
).select("n.*", "e.product_sk")
print(f"  ‚Üí {df_changed.count()} produits modifi√©s")

## Section 24 : Fermeture des anciennes lignes SCD2

Pour les produits modifi√©s, met √† jour les anciennes lignes :
- `effective_to` = timestamp actuel
- `is_current` = FALSE

In [None]:
# Ferme les anciennes lignes des produits modifi√©s
df_dim_product_to_close = df_dim_product_existing.join(
    df_changed.select("product_sk").distinct(),
    on="product_sk",
    how="inner"
).withColumn("effective_to", current_ts) \
 .withColumn("is_current", F.lit(False))

print(f"  ‚Üí {df_dim_product_to_close.count()} anciennes lignes √† fermer")

## Section 25 : Insertion des nouvelles lignes SCD2

Pr√©pare les nouvelles lignes √† ins√©rer :
- **Nouveaux produits** : directement ajout√©s
- **Produits modifi√©s** : cr√©ent une nouvelle ligne (nouvelle version)

Ajoute les colonnes SCD2 :
- `effective_from` = timestamp actuel
- `effective_to` = NULL (ligne courante)
- `is_current` = TRUE

In [None]:
# Nouvelles lignes pour les nouveaux produits
df_new_for_insert = df_new_products.withColumn(
    "product_sk",
    F.monotonically_increasing_id()
)

# Nouvelles lignes pour les versions modifi√©es
df_changed_for_insert = df_changed.drop("product_sk").withColumn(
    "product_sk",
    F.monotonically_increasing_id()
)

# Combine nouveaux + modifi√©s
df_dim_product_insert = df_new_for_insert.unionByName(df_changed_for_insert)

# S√©lectionne colonnes et ajoute m√©tadonn√©es SCD2
df_dim_product_insert = df_dim_product_insert.select(
    "product_sk",
    "code",
    "product_name",
    "brand_sk",
    "country_sk",
    "category_sk",
    "nutriscore_grade",
    "nova_group",
    "pnns_groups_1",
    "pnns_groups_2",
    "completeness_score",
    "quality_issues_json",
    "attr_hash"
).withColumn("effective_from", current_ts) \
 .withColumn("effective_to", F.lit(None).cast("timestamp")) \
 .withColumn("is_current", F.lit(True))

print(f"  ‚Üí {df_dim_product_insert.count()} nouvelles lignes √† ins√©rer")

## Section 26 : √âcriture SCD2 de dim_product

√âcrit les modifications SCD2 :
1. Les anciennes lignes ferm√©es (effective_to renseign√©)
2. Les nouvelles lignes (nouvelles versions + nouvelles entr√©es)

In [None]:
print("\nüìù √âcriture SCD2 de dim_product...")

# √âcrit les anciennes lignes ferm√©es
if df_dim_product_to_close.count() > 0:
    df_dim_product_to_close.coalesce(4).write \
        .mode("append") \
        .jdbc(jdbc_url, "off_dm.dim_product", properties=jdbc_properties)
    print(f"  ‚úì {df_dim_product_to_close.count()} anciennes lignes ferm√©es")
else:
    print(f"  ‚ÑπÔ∏è Aucune ancienne ligne √† fermer")

# √âcrit les nouvelles lignes
if df_dim_product_insert.count() > 0:
    df_dim_product_insert.coalesce(4).write \
        .mode("append") \
        .jdbc(jdbc_url, "off_dm.dim_product", properties=jdbc_properties)
    print(f"  ‚úì {df_dim_product_insert.count()} nouvelles lignes ins√©r√©es")
else:
    print(f"  ‚ÑπÔ∏è Aucune nouvelle ligne")

print("\n  ‚ÑπÔ∏è Recharge de dim_product pour la fact...")
# Recharge pour la table de faits
df_dim_product_after = spark.read.jdbc(
    jdbc_url,
    "off_dm.dim_product",
    properties=jdbc_properties
)

# Extrait les product_sk courants (is_current = True)
df_dim_product_current_for_fact = df_dim_product_after \
    .filter(F.col("is_current") == True) \
    .select("product_sk", "code")

print(f"  ‚úì {df_dim_product_current_for_fact.count()} product_sk courants pour la fact")

## Section 27 : Cr√©ation de la table de faits

Construite via joins avec les dimensions :
1. S√©lectionne les mesures et dimensions du silver
2. Join avec dim_time (date ‚Üí time_sk)
3. Join avec dim_product (code ‚Üí product_sk)
4. Attribue un identifiant unique (fact_id)

In [None]:
print("\n‚öôÔ∏è Construction de la table de faits...")

df_fact = (
    df_silver
    .select(
        "code",
        "last_modified_datetime",
        # Mesures nutriments
        "energy_kcal_100g",
        "fat_100g",
        "saturated_fat_100g",
        "sugars_100g",
        "salt_100g",
        "proteins_100g",
        "fiber_100g",
        "sodium_100g",
        # Dimensions
        "nutriscore_grade",
        "nova_group",
        "completeness_score",
        "quality_issues_json"
    )
    # Ajoute time_sk via join
    .withColumn("date", F.to_date("last_modified_datetime"))
    .join(df_time, on="date", how="left")
    # Ajoute product_sk via join
    .join(df_dim_product_current_for_fact, on="code", how="left")
    # Identifie chaque fait
    .withColumn("fact_id", F.monotonically_increasing_id())
    # S√©lection finale
    .select(
        "fact_id",
        "product_sk",
        "time_sk",
        "energy_kcal_100g",
        "fat_100g",
        "saturated_fat_100g",
        "sugars_100g",
        "salt_100g",
        "proteins_100g",
        "fiber_100g",
        "sodium_100g",
        "nutriscore_grade",
        "nova_group",
        "completeness_score",
        "quality_issues_json"
    )
)

print(f"‚úì fact_nutrition_snapshot cr√©√©e : {df_fact.count()} lignes")

## Section 28 : R√©sum√© du datamart

Affiche les tailles de toutes les tables cr√©√©es.

In [None]:
print("\n" + "="*60)
print("R√âSUM√â DU DATAMART")
print("="*60)

print(f"""
üìä Dimensions
  ‚Ä¢ dim_brand           : {df_dim_brand.count():>10,} lignes
  ‚Ä¢ dim_country         : {df_dim_country.count():>10,} lignes
  ‚Ä¢ dim_category        : {df_dim_category.count():>10,} lignes
  ‚Ä¢ dim_product (SCD2)  : {df_dim_product_current_for_fact.count():>10,} versions courantes
  ‚Ä¢ dim_time            : {df_dim_time_pg.count():>10,} dates

üìà Faits
  ‚Ä¢ fact_nutrition_snapshot : {df_fact.count():>10,} mesures
""")

## Section 29 : √âcriture de la table de faits

√âcrit fact_nutrition_snapshot en mode APPEND.

In [None]:
print("\nüìù √âcriture de la table de faits...")

df_fact_pg = df_fact.select(
    "fact_id",
    "product_sk",
    "time_sk",
    "energy_kcal_100g",
    "fat_100g",
    "saturated_fat_100g",
    "sugars_100g",
    "salt_100g",
    "proteins_100g",
    "fiber_100g",
    "sodium_100g",
    "nutriscore_grade",
    "nova_group",
    "completeness_score",
    "quality_issues_json"
)

df_fact_pg.coalesce(4).write \
    .mode("append") \
    .jdbc(jdbc_url, "off_dm.fact_nutrition_snapshot", properties=jdbc_properties)

print(f"‚úì fact_nutrition_snapshot √©crite : {df_fact.count()} lignes")

## Section 30 : Collecte des m√©triques

Cr√©e un dictionnaire JSON avec les KPIs du run :
- Timestamps
- Compteurs par table
- Score de compl√©tude moyen
- Distribution Nutri-Score
- Compteurs d'anomalies corrig√©es

In [None]:
print("\nüìä Calcul des m√©triques...")

# Distribution Nutri-Score
nutri_dist_df = df_silver.groupBy("nutriscore_grade").count()
nutri_dist = {
    (row["nutriscore_grade"] if row["nutriscore_grade"] is not None else "NULL"): int(row["count"])
    for row in nutri_dist_df.collect()
}

# Timestamp actuel
now_utc = datetime.now(timezone.utc)

# M√©triques globales
metrics = {
    "run_ts": now_utc.isoformat(),
    "nb_raw_products": df_raw.count(),
    "nb_silver_products": df_silver.count(),
    "nb_dim_brand": df_dim_brand.count(),
    "nb_dim_country": df_dim_country.count(),
    "nb_dim_category": df_dim_category.count(),
    "nb_dim_product_current": df_dim_product_current_for_fact.count(),
    "nb_fact": df_fact.count(),
    "avg_completeness_score": float(
        df_silver.agg(F.avg("completeness_score")).first()[0] or 0.0
    ),
    "nutriscore_distribution": nutri_dist,
    "nb_removed_sugars_out_of_bounds": anomalies_removed.get("sugars_100g", 0),
    "nb_removed_salt_out_of_bounds": anomalies_removed.get("salt_100g", 0),
    "nb_removed_proteins_out_of_bounds": anomalies_removed.get("proteins_100g", 0),
    "nb_incoherent_saturated_fat_gt_fat": nb_incoherent_satfat,
}

print(f"‚úì M√©triques calcul√©es")

## Section 31 : Sauvegarde des m√©triques en JSON

√âcrit les m√©triques dans un fichier `logs/metrics_YYYYMMDD_HHMMSS.json` pour suivi.

In [None]:
# Cr√©e r√©pertoire logs s'il n'existe pas
os.makedirs("logs", exist_ok=True)

# Sauvegarde des m√©triques
metrics_file = f"logs/metrics_{now_utc.strftime('%Y%m%d_%H%M%S')}.json"

with open(metrics_file, "w", encoding="utf-8") as f:
    json.dump(metrics, f, ensure_ascii=False, indent=2)

print(f"‚úì M√©triques sauvegard√©es : {metrics_file}")
print(f"\nüìã R√©sum√© des m√©triques :")
print(json.dumps(metrics, indent=2, ensure_ascii=False))