<a href="https://colab.research.google.com/github/Saint-Pedro/TP-M1-Integration-des-donnees-EPSI/blob/main/TP_Integration_Donnees_LAURENS_SUBIRANA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# @title 1. Installation des dépendances et Démarrage MySQL
import os
import sys

# 1.1 Installation de Java et Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark -q

# 1.2 Installation et Démarrage de MySQL Server (Localement dans Colab)
!apt-get install mysql-server -qq > /dev/null
!service mysql start

# 1.3 Téléchargement du connecteur MySQL JDBC (Requis pour l'étape Gold)
!wget -q https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.2.0.tar.gz
!tar -xf mysql-connector-j-8.2.0.tar.gz
!cp mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar /content/mysql-connector.jar

# 1.4 Configuration de la BDD (Création User et Database)
# On respecte le nom du Datamart : OpenFoodFacts Nutrition & Qualité [cite: 11]
!mysql -e "CREATE DATABASE IF NOT EXISTS off_datamart;"
!mysql -e "CREATE USER 'spark_user'@'localhost' IDENTIFIED BY 'spark_password';"
!mysql -e "GRANT ALL PRIVILEGES ON off_datamart.* TO 'spark_user'@'localhost';"
!mysql -e "FLUSH PRIVILEGES;"

print("Environnement prêt : Spark installé et MySQL tourne en arrière-plan.")

 * Starting MySQL database server mysqld
   ...done.
Environnement prêt : Spark installé et MySQL tourne en arrière-plan.


In [6]:
# @title 2.1 (Diagnostic) Recherche automatique du fichier
import os

print("Recherche du fichier 'en.openfoodfacts.org.products.csv' dans le Drive...")

found_path = None
for root, dirs, files in os.walk("/content/drive/MyDrive"):
    if "en.openfoodfacts.org.products.csv" in files:
        found_path = os.path.join(root, "en.openfoodfacts.org.products.csv")
        print(f"\nLe fichier est ici :")
        print(f"--> {found_path}")
        break

if not found_path:
    print("\nFichier introuvable via la recherche automatique.")
else:
    # On définit le chemin pour la suite automatiquement
    input_csv_path = found_path
    print(f"\nVariable 'input_csv_path' mise à jour automatiquement.")

Recherche du fichier 'en.openfoodfacts.org.products.csv' dans le Drive...

Le fichier est ici :
--> /content/drive/MyDrive/en.openfoodfacts.org.products.csv

Variable 'input_csv_path' mise à jour automatiquement.


In [9]:
# @title 3. ETL Spark : Ingestion
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType, LongType

# 1. Config Spark
spark = SparkSession.builder \
    .appName("OFF_ETL_Project") \
    .config("spark.jars", "/content/mysql-connector.jar") \
    .config("spark.driver.memory", "6g") \
    .master("local[*]") \
    .getOrCreate()

input_csv_path = "/content/drive/MyDrive/en.openfoodfacts.org.products.csv"

# 2. Lecture avec détection automatique des colonnes
print(f"Lecture du CSV : {input_csv_path}")
df_raw_fix = spark.read.option("header", "true") \
                       .option("sep", "\t") \
                       .option("inferSchema", "false") \
                       .csv(input_csv_path)

# 3. Sélection et Renommage (Mapping)
try:
    df_selected = df_raw_fix.select(
        col("code"),
        col("url"),
        col("product_name"),
        col("brands"),
        col("categories_tags"),
        col("countries_tags"),
        col("last_modified_t").cast(LongType()),
        col("nutriscore_grade"),
        col("nova_group"),
        col("environmental_score_grade").alias("ecoscore_grade"),
        col("energy-kcal_100g").cast(FloatType()),
        col("fat_100g").cast(FloatType()),
        col("saturated-fat_100g").cast(FloatType()),
        col("sugars_100g").cast(FloatType()),
        col("salt_100g").cast(FloatType()),
        col("sodium_100g").cast(FloatType()),
        col("proteins_100g").cast(FloatType()),
        col("fiber_100g").cast(FloatType()),
        col("additives_n").cast(IntegerType())
    )

    # 4. Sauvegarde Bronze
    bronze_path = "/content/TRDE703_Projet_Groupe/data/bronze/raw_data"
    df_selected.write.mode("overwrite").parquet(bronze_path)

    print(f"INGESTION RÉUSSIE ! Données sauvegardées dans : {bronze_path}")
    print("--- Aperçu des données ---")
    df_selected.select("code", "product_name", "brands", "ecoscore_grade").show(5, truncate=False)

except Exception as e:
    print("Erreur :", e)

Lecture du CSV : /content/drive/MyDrive/en.openfoodfacts.org.products.csv
INGESTION RÉUSSIE ! Données sauvegardées dans : /content/TRDE703_Projet_Groupe/data/bronze/raw_data
--- Aperçu des données ---
+--------+------------+------+--------------+
|code    |product_name|brands|ecoscore_grade|
+--------+------------+------+--------------+
|00000002|NULL        |NULL  |NULL          |
|00000003|NULL        |NULL  |NULL          |
|00000004|NULL        |NULL  |NULL          |
|00000005|NULL        |NULL  |NULL          |
|00000006|NULL        |NULL  |NULL          |
+--------+------------+------+--------------+
only showing top 5 rows


In [10]:
# @title 3.1 Vérification de Santé (Sanity Check)
# On relit le dossier Bronze qu'on vient de créer
df_check = spark.read.parquet("/content/TRDE703_Projet_Groupe/data/bronze/raw_data")

print("Recherche de produits valides (Nom + Marque renseignés)...")

# On filtre pour voir autre chose que des NULL
df_check.filter(col("product_name").isNotNull() & col("brands").isNotNull()) \
        .select("code", "product_name", "brands", "nutriscore_grade", "ecoscore_grade") \
        .show(5, truncate=False)

Recherche de produits valides (Nom + Marque renseignés)...
+-------------+------------------------------+----------+----------------+--------------+
|code         |product_name                  |brands    |nutriscore_grade|ecoscore_grade|
+-------------+------------------------------+----------+----------------+--------------+
|4892288850158|Lava Custard Mooncake - 8pcs  |Hang Heung|e               |unknown       |
|4892294090234|Corn Soup Chips               |Calbee    |unknown         |NULL          |
|4892294101114|Grill-a-Corn Barbecue Flavored|Calbee    |e               |b             |
|4892294101121|Grill-A-Corn BBQ Flavour      |Calbee    |e               |b             |
|4892294101428|Calbee                        |Calbee    |unknown         |NULL          |
+-------------+------------------------------+----------+----------------+--------------+
only showing top 5 rows


In [11]:
# @title 4. ETL Spark : Transformation Silver (Nettoyage & Dédoublonnage)
from pyspark.sql.window import Window
from pyspark.sql.functions import col, trim, when, row_number, desc

print("Démarrage du nettoyage (Couche Silver)...")

# 1. Lecture de la couche Bronze
df_bronze = spark.read.parquet("/content/TRDE703_Projet_Groupe/data/bronze/raw_data")

# 2. Nettoyage de base (Trim des espaces)
df_clean = df_bronze.withColumn("product_name", trim(col("product_name"))) \
                    .withColumn("brands", trim(col("brands"))) \
                    .withColumn("code", trim(col("code")))

# 3. Règle Métier : Harmonisation Sel/Sodium
# Si 'salt_100g' est vide MAIS qu'on a 'sodium_100g', on calcule le sel.
df_clean = df_clean.withColumn(
    "salt_100g_final",
    when(col("salt_100g").isNotNull(), col("salt_100g"))
    .otherwise(col("sodium_100g") * 2.5)
)

# 4. Dédoublonnage (Gardons la version la plus récente de chaque produit)
# On groupe par 'code' et on classe par date de modif décroissante
window_spec = Window.partitionBy("code").orderBy(col("last_modified_t").desc())

# On attribue un numéro de ligne (1 = le plus récent, 2 = ancien...)
df_dedup = df_clean.withColumn("rn", row_number().over(window_spec)) \
                   .filter(col("rn") == 1) \
                   .drop("rn")

# 5. Filtrage Qualité (On retire les produits sans nom ou sans code)
df_silver = df_dedup.filter(
    col("code").isNotNull() &
    (col("product_name").isNotNull()) &
    (col("product_name") != "")
)

# 6. Sauvegarde Silver
silver_path = "/content/TRDE703_Projet_Groupe/data/silver/clean_data"
df_silver.write.mode("overwrite").parquet(silver_path)

print(f"Transformation Silver terminée, Données propres dans : {silver_path}")
print(f"Nombre de produits restants (estimé) : {df_silver.count()}")
df_silver.select("code", "product_name", "salt_100g_final").show(5, truncate=False)

Démarrage du nettoyage (Couche Silver)...
Transformation Silver terminée, Données propres dans : /content/TRDE703_Projet_Groupe/data/silver/clean_data
Nombre de produits restants (estimé) : 3946702
+--------+-----------------------+-------------------+
|code    |product_name           |salt_100g_final    |
+--------+-----------------------+-------------------+
|00000010|xxx                    |NULL               |
|00000017|Collagen For Her       |0.08820000290870667|
|00000022|Farandole de madeleine |0.8799999952316284 |
|00000025|Frog Fuel Power Protein|NULL               |
|00000026|The Smartest Cookie    |2.680000066757202  |
+--------+-----------------------+-------------------+
only showing top 5 rows


In [12]:
# @title 5. ETL Spark : Modélisation Gold & Chargement MySQL
from pyspark.sql.functions import from_unixtime, to_date, year, month, dayofmonth, hour, weekofyear, col, lit, current_timestamp

# --- CONFIGURATION ---
# Mettre à None pour tout charger
# Mettre un chiffre (ex: 50000) pour tester rapidement. ("eviter que colab ne plante")
LIMIT_ROWS = 50000

jdbc_url = "jdbc:mysql://localhost:3306/off_datamart?useSSL=false&allowPublicKeyRetrieval=true"
db_props = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "com.mysql.cj.jdbc.Driver"
}

print("Chargement des données Silver...")
df_silver = spark.read.parquet("/content/TRDE703_Projet_Groupe/data/silver/clean_data")

# OPTIMISATION DEV : On ne garde qu'un échantillon si LIMIT_ROWS est défini
if LIMIT_ROWS:
    print(f"MODE DÉVELOPPEMENT : On ne charge que les {LIMIT_ROWS} premiers produits pour aller vite.")
    df_silver = df_silver.limit(LIMIT_ROWS)

# --- 1. DIMENSION TIME ---
print("Construction de dim_time...")
# On convertit le timestamp UNIX en vraies dates
dim_time = df_silver.select(col("last_modified_t").alias("time_sk")) \
    .distinct() \
    .withColumn("full_date", from_unixtime(col("time_sk"))) \
    .select(
        col("time_sk"), # Clé primaire
        to_date(col("full_date")).alias("date"),
        year(col("full_date")).alias("year"),
        month(col("full_date")).alias("month"),
        dayofmonth(col("full_date")).alias("day"),
        weekofyear(col("full_date")).alias("week")
    ).na.drop()

# --- 2. DIMENSION PRODUCT ---
print("Construction de dim_product...")
dim_product = df_silver.select(
    col("code").alias("product_code"),
    col("product_name"),
    col("brands"),
    col("categories_tags"),
    col("countries_tags"),
    col("ecoscore_grade"),
    col("nova_group"),
    lit(1).alias("is_current"),
    current_timestamp().alias("effective_from")
).dropDuplicates(["product_code"])

# --- 3. FACT TABLE ---
print("Construction de fact_nutrition_snapshot...")
fact_nutrition = df_silver.select(
    col("code").alias("product_code"), # FK vers dim_product
    col("last_modified_t").alias("time_sk"), # FK vers dim_time
    col("energy-kcal_100g"),
    col("sugars_100g"),
    col("salt_100g_final").alias("salt_100g"),
    col("fat_100g"),
    col("proteins_100g"),
    col("nutriscore_grade"),
    col("additives_n")
)

# --- 4. ÉCRITURE DANS MYSQL ---
def write_to_mysql_batch(df, table_name):
    print(f"Écriture dans MySQL table '{table_name}' ({df.count()} lignes)...")
    try:
        df.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode="overwrite",
            properties=db_props
        )
        print(f"Table {table_name} chargée avec succès.")
    except Exception as e:
        print(f"Erreur sur {table_name}: {e}")

# Lancement des écritures
write_to_mysql_batch(dim_time, "dim_time")
write_to_mysql_batch(dim_product, "dim_product")
write_to_mysql_batch(fact_nutrition, "fact_nutrition_snapshot")

print("\nTERMINÉ ! Le Datamart est prêt dans MySQL.")

Chargement des données Silver...
MODE DÉVELOPPEMENT : On ne charge que les 50000 premiers produits pour aller vite.
Construction de dim_time...
Construction de dim_product...
Construction de fact_nutrition_snapshot...
Écriture dans MySQL table 'dim_time' (47999 lignes)...
Table dim_time chargée avec succès.
Écriture dans MySQL table 'dim_product' (50000 lignes)...
Table dim_product chargée avec succès.
Écriture dans MySQL table 'fact_nutrition_snapshot' (50000 lignes)...
Table fact_nutrition_snapshot chargée avec succès.

TERMINÉ ! Le Datamart est prêt dans MySQL.


In [14]:
# @title 6. Analyse SQL & KPIs Métier
# 1. Installation de la librairie Python manquante
!pip install mysql-connector-python -q

import pandas as pd
import sqlalchemy

# 6.1 Connexion au Datamart MySQL
# On utilise SQLAlchemy pour récupérer les résultats proprement dans des tableaux Pandas
db_connection_str = 'mysql+mysqlconnector://spark_user:spark_password@localhost/off_datamart'
db_connection = sqlalchemy.create_engine(db_connection_str)

def run_query(query, title):
    print(f"\n--- {title} ---")
    try:
        # Exécution de la requête SQL et conversion en DataFrame Pandas pour l'affichage
        df_query = pd.read_sql(query, db_connection)
        if df_query.empty:
             print("Aucun résultat trouvé.")
        else:
             display(df_query)
    except Exception as e:
        print("Erreur SQL:", e)

# --- REQUÊTE 1 : Top 10 Marques (Qualité A/B) ---
sql_top_brands = """
SELECT p.brands, COUNT(*) as nb_produits_sains
FROM dim_product p
JOIN fact_nutrition_snapshot f ON p.product_code = f.product_code
WHERE f.nutriscore_grade IN ('a', 'b')
AND p.brands IS NOT NULL AND p.brands != ''
GROUP BY p.brands
ORDER BY nb_produits_sains DESC
LIMIT 10;
"""

# --- REQUÊTE 2 : Sucre moyen par Catégorie (Heatmap simplifiée) ---
# On filtre pour ne garder que les catégories significatives
sql_category_sugar = """
SELECT
    SUBSTRING_INDEX(p.categories_tags, ',', 1) as main_category,
    ROUND(AVG(f.sugars_100g), 2) as avg_sugar_100g,
    COUNT(*) as nb_products
FROM dim_product p
JOIN fact_nutrition_snapshot f ON p.product_code = f.product_code
WHERE f.sugars_100g IS NOT NULL
GROUP BY main_category
HAVING nb_products > 10
ORDER BY avg_sugar_100g DESC
LIMIT 10;
"""

# --- REQUÊTE 3 : Détection d'Anomalies (Qualité) ---
sql_anomalies = """
SELECT
    p.product_name,
    p.brands,
    f.sugars_100g,
    f.salt_100g
FROM fact_nutrition_snapshot f
JOIN dim_product p ON f.product_code = p.product_code
WHERE (f.sugars_100g > 100 OR f.salt_100g > 100)
LIMIT 10;
"""

# --- REQUÊTE 4 : Évolution Temporelle ---
sql_evolution = """
SELECT t.year, COUNT(*) as updates_count
FROM fact_nutrition_snapshot f
JOIN dim_time t ON f.time_sk = t.time_sk
GROUP BY t.year
ORDER BY t.year DESC;
"""

# Exécution
run_query(sql_top_brands, "TOP 10 Marques (Nutriscore A & B)")
run_query(sql_category_sugar, "Les 10 Catégories les plus sucrées (Moyenne)")
run_query(sql_anomalies, "Anomalies Détectées (Valeurs > 100g)")
run_query(sql_evolution, "Volume de mises à jour par Année")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.1/34.1 MB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[?25h
--- TOP 10 Marques (Nutriscore A & B) ---


Unnamed: 0,brands,nb_produits_sains
0,Trader Joe's,63
1,Great Value,60
2,Kroger,56
3,Roundy's,41
4,Spartan,41
5,Bob's Red Mill,35
6,Weis,33
7,Essential Everyday,32
8,Goya,32
9,Harris Teeter,29



--- Les 10 Catégories les plus sucrées (Moyenne) ---


Unnamed: 0,main_category,avg_sugar_100g,nb_products
0,en:null,74.65,107
1,en:sweeteners,68.95,176
2,en:breakfasts,60.68,143
3,en:baking-decorations,58.56,139
4,en:undefined,47.63,2867
5,en:snacks,30.79,5031
6,en:cooking-helpers,26.23,133
7,en:desserts,23.08,798
8,en:sweet-pies,20.67,23
9,,13.44,17392



--- Anomalies Détectées (Valeurs > 100g) ---


Unnamed: 0,product_name,brands,sugars_100g,salt_100g
0,Milk Chocolate With Caramelized Almonds,"Ritter Sport, Alfred Ritter Gmbh & Co. Kg",224.0,0.569
1,Creamed Honey With Hibiscus,Vintage Bee Inc.,363.0,0.0
2,Minis Creamed Honey With Cinnamon,"Vintage, Vintage Bee Inc.",800.0,0.0
3,Milk Chocolate Peanut Butter Malt Balls,Sunridge,144.0,0.241
4,Cranberries,Torn & Glasser,222.0,0.0
5,Himalayan pint salt,,,105.0
6,Organic Thompson Raisins,Unfi,175.0,0.159
7,Ginger Candy Sweetened With Raw Cane,Reed's Inc.,900.0,0.0
8,Italian White Wine Vinegar Dressing Marinade,Acetificio M. De Nigris S.R.L.,222.0,26.700001
9,Maple Syrup,Butternut Mountain Farm,147.0,0.0333



--- Volume de mises à jour par Année ---


Unnamed: 0,year,updates_count
0,2026,1233
1,2025,14807
2,2024,8234
3,2023,5594
4,2022,6537
5,2021,1560
6,2020,11027
7,2019,457
8,2018,110
9,2017,438


In [18]:
# @title 7. Génération des scripts de production
import os

project_root = "/content/TRDE703_Projet_Groupe"

# --- 1. SCRIPT D'INGESTION (BRONZE) ---
script_ingest = """from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType, LongType, IntegerType
import sys

def run_ingest(input_csv, output_parquet):
    # Init Spark
    spark = SparkSession.builder \\
        .appName("OFF_Ingest_Bronze") \\
        .getOrCreate()

    # Lecture avec header et tabulations (Configuration OFF standard)
    # On utilise l'option permissive pour ne pas bloquer sur les lignes corrompues
    df_raw = spark.read.option("header", "true") \\
                       .option("sep", "\\t") \\
                       .option("inferSchema", "false") \\
                       .csv(input_csv)

    # Sélection et Cast explicite (Schéma imposé)
    # Gestion du mapping environmental_score_grade -> ecoscore_grade
    try:
        df_selected = df_raw.select(
            col("code"),
            col("url"),
            col("product_name"),
            col("brands"),
            col("categories_tags"),
            col("countries_tags"),
            col("last_modified_t").cast(LongType()),
            col("nutriscore_grade"),
            col("nova_group"),
            col("environmental_score_grade").alias("ecoscore_grade"),
            col("energy-kcal_100g").cast(FloatType()),
            col("fat_100g").cast(FloatType()),
            col("saturated-fat_100g").cast(FloatType()),
            col("sugars_100g").cast(FloatType()),
            col("salt_100g").cast(FloatType()),
            col("sodium_100g").cast(FloatType()),
            col("proteins_100g").cast(FloatType()),
            col("fiber_100g").cast(FloatType()),
            col("additives_n").cast(IntegerType())
        )

        print(f"Writing Bronze data to {output_parquet}")
        df_selected.write.mode("overwrite").parquet(output_parquet)

    except Exception as e:
        print(f"Error during ingestion: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    # Chemins par défaut ou via arguments
    input_path = "/data/raw/en.openfoodfacts.org.products.csv"
    output_path = "/data/bronze"
    run_ingest(input_path, output_path)
"""

# --- 2. SCRIPT DE TRANSFORMATION (SILVER) ---
script_transform = """from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, when, row_number, desc
from pyspark.sql.window import Window

def run_transform(input_path, output_path):
    spark = SparkSession.builder.appName("OFF_Transform_Silver").getOrCreate()

    df_bronze = spark.read.parquet(input_path)

    # 1. Nettoyage standard
    df_clean = df_bronze.withColumn("product_name", trim(col("product_name"))) \\
                        .withColumn("brands", trim(col("brands"))) \\
                        .withColumn("code", trim(col("code")))

    # 2. Règle Métier : Harmonisation Sel (Salt vs Sodium)
    # Approximation : Sel = Sodium * 2.5
    df_clean = df_clean.withColumn(
        "salt_100g_final",
        when(col("salt_100g").isNotNull(), col("salt_100g"))
        .otherwise(col("sodium_100g") * 2.5)
    )

    # 3. Dédoublonnage (Gardons la version la plus récente)
    # Partition par Code Produit, Tri par date de modification desc
    window_spec = Window.partitionBy("code").orderBy(col("last_modified_t").desc())

    df_dedup = df_clean.withColumn("rn", row_number().over(window_spec)) \\
                       .filter(col("rn") == 1) \\
                       .drop("rn")

    # 4. Filtrage Qualité (Exclusion des produits sans identifiant ou nom)
    df_silver = df_dedup.filter(
        col("code").isNotNull() &
        (col("product_name").isNotNull()) &
        (col("product_name") != "")
    )

    print(f"Writing Silver data to {output_path}")
    df_silver.write.mode("overwrite").parquet(output_path)

if __name__ == "__main__":
    run_transform("/data/bronze", "/data/silver")
"""

# --- 3. SCRIPT DE CHARGEMENT (GOLD) ---
script_load = """from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, to_date, year, month, dayofmonth, weekofyear, col, lit, current_timestamp

def run_load(input_path, db_url, db_props, limit_rows=None):
    spark = SparkSession.builder \\
        .appName("OFF_Load_Gold") \\
        .config("spark.jars", "/libs/mysql-connector.jar") \\
        .getOrCreate()

    print("Reading Silver data...")
    df_silver = spark.read.parquet(input_path)

    # OPTIMISATION PERFORMANCE
    # Sur un cluster de prod, mettre limit_rows à None.
    # Sur environnement restreint (ex: local/colab), utiliser 50000.
    if limit_rows:
        print(f"WARNING: Processing limited to {limit_rows} rows for performance.")
        df_silver = df_silver.limit(limit_rows)

    # --- 1. DIMENSION TIME ---
    dim_time = df_silver.select(col("last_modified_t").alias("time_sk")) \\
        .distinct() \\
        .withColumn("full_date", from_unixtime(col("time_sk"))) \\
        .select(
            col("time_sk"),
            to_date(col("full_date")).alias("date"),
            year(col("full_date")).alias("year"),
            month(col("full_date")).alias("month"),
            dayofmonth(col("full_date")).alias("day"),
            weekofyear(col("full_date")).alias("week")
        ).na.drop()

    # --- 2. DIMENSION PRODUCT (SCD Type 1 logic for init) ---
    dim_product = df_silver.select(
        col("code").alias("product_code"),
        col("product_name"),
        col("brands"),
        col("categories_tags"),
        col("countries_tags"),
        col("ecoscore_grade"),
        col("nova_group"),
        lit(1).alias("is_current"),
        current_timestamp().alias("effective_from")
    ).dropDuplicates(["product_code"])

    # --- 3. FACT NUTRITION ---
    fact_nutrition = df_silver.select(
        col("code").alias("product_code"),
        col("last_modified_t").alias("time_sk"),
        col("energy-kcal_100g"),
        col("sugars_100g"),
        col("salt_100g_final").alias("salt_100g"),
        col("fat_100g"),
        col("proteins_100g"),
        col("nutriscore_grade"),
        col("additives_n")
    )

    # --- 4. ECRITURE JDBC ---
    tables = {
        "dim_time": dim_time,
        "dim_product": dim_product,
        "fact_nutrition_snapshot": fact_nutrition
    }

    for name, df in tables.items():
        print(f"Writing table {name} to MySQL...")
        try:
            df.write.jdbc(url=db_url, table=name, mode="overwrite", properties=db_props)
            print(f"{name} written successfully.")
        except Exception as e:
            print(f"Error writing {name}: {str(e)}")

if __name__ == "__main__":
    # Configuration DB
    jdbc_url = "jdbc:mysql://localhost:3306/off_datamart?useSSL=false&allowPublicKeyRetrieval=true"
    db_properties = {
        "user": "spark_user",
        "password": "spark_password",
        "driver": "com.mysql.cj.jdbc.Driver"
    }

    # Lancement avec limite de sécurité pour dev
    run_load("/data/silver", jdbc_url, db_properties, limit_rows=50000)
"""

# Écriture physique des fichiers
with open(f"{project_root}/etl/1_ingest_bronze.py", "w") as f: f.write(script_ingest)
with open(f"{project_root}/etl/2_transform_silver.py", "w") as f: f.write(script_transform)
with open(f"{project_root}/etl/3_load_gold.py", "w") as f: f.write(script_load)

print("Scripts ETL complets générés dans /etl")

# --- 4. SCRIPTS SQL ---
sql_ddl = """
CREATE DATABASE IF NOT EXISTS off_datamart;
USE off_datamart;

-- Dimension Produit
-- Ref:
CREATE TABLE IF NOT EXISTS dim_product (
    product_code VARCHAR(255) PRIMARY KEY,
    product_name TEXT,
    brands TEXT,
    categories_tags TEXT,
    countries_tags TEXT,
    ecoscore_grade VARCHAR(10),
    nova_group VARCHAR(10),
    is_current BOOLEAN DEFAULT 1,
    effective_from TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Dimension Temps
-- Ref:
CREATE TABLE IF NOT EXISTS dim_time (
    time_sk BIGINT PRIMARY KEY,
    date DATE,
    year INT,
    month INT,
    day INT,
    week INT
);

-- Table de Faits Nutrition
-- Ref:
CREATE TABLE IF NOT EXISTS fact_nutrition_snapshot (
    product_code VARCHAR(255),
    time_sk BIGINT,
    energy_kcal_100g FLOAT,
    sugars_100g FLOAT,
    salt_100g FLOAT,
    fat_100g FLOAT,
    proteins_100g FLOAT,
    nutriscore_grade VARCHAR(5),
    additives_n INT,
    FOREIGN KEY (product_code) REFERENCES dim_product(product_code),
    FOREIGN KEY (time_sk) REFERENCES dim_time(time_sk)
);
"""

sql_analysis = """
-- Requêtes Analytiques / KPIs
-- Ref:

-- 1. Top Marques avec Nutriscore A/B
SELECT p.brands, COUNT(*) as count
FROM dim_product p
JOIN fact_nutrition_snapshot f ON p.product_code = f.product_code
WHERE f.nutriscore_grade IN ('a', 'b')
GROUP BY p.brands ORDER BY count DESC LIMIT 10;

-- 2. Moyenne de sucre par catégorie (Approx via tags)
SELECT SUBSTRING_INDEX(p.categories_tags, ',', 1) as cat, AVG(f.sugars_100g) as avg_sugar
FROM dim_product p
JOIN fact_nutrition_snapshot f ON p.product_code = f.product_code
GROUP BY cat ORDER BY avg_sugar DESC LIMIT 20;

-- 3. Détection d'anomalies (Sucre > 100g)
SELECT p.product_name, f.sugars_100g
FROM dim_product p
JOIN fact_nutrition_snapshot f ON p.product_code = f.product_code
WHERE f.sugars_100g > 100;
"""

with open(f"{project_root}/sql/schema.sql", "w") as f: f.write(sql_ddl)
with open(f"{project_root}/sql/analysis.sql", "w") as f: f.write(sql_analysis)

print("Scripts SQL générés dans /sql")

Scripts ETL complets générés dans /etl
Scripts SQL générés dans /sql


In [19]:
# @title 8. Génération du ZIP Final
import shutil
import os

zip_path = "/content/Livrable_Projet_OFF.zip"
if os.path.exists(zip_path):
    os.remove(zip_path)

print("Compression du projet final...")
shutil.make_archive("/content/Livrable_Projet_OFF", 'zip', project_root)

print(f"Terminé ! Le fichier {zip_path} est prêt à être téléchargé.")

Compression du projet final...
Terminé ! Le fichier /content/Livrable_Projet_OFF.zip est prêt à être téléchargé.
