## 1. Ingestion (Bronze)

In [4]:
#Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [5]:
#Create spark session
spark = SparkSession.builder \
    .appName("OpenFoodFacts_ETL") \
    .getOrCreate()

In [6]:
csv_path = "../data/raw/openfoodfacts.csv"

bronze_df = spark.read \
    .option("header", "true") \
    .option("sep", "\t") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("mode", "PERMISSIVE") \
    .csv(csv_path)

bronze_df.printSchema()
bronze_df.show(5)

root
 |-- code: string (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: string (nullable = true)
 |-- created_datetime: string (nullable = true)
 |-- last_modified_t: string (nullable = true)
 |-- last_modified_datetime: string (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- last_updated_t: string (nullable = true)
 |-- last_updated_datetime: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_en: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- brands_en: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categories_tags: s

In [7]:
print("=== Nombre de lignes ===")
print(bronze_df.count())

=== Nombre de lignes ===
4190671


In [8]:
print("=== Nombre de colonnes ===")
print(len(bronze_df.columns))

=== Nombre de colonnes ===
215


In [9]:
print("=== Quelques colonnes ===")
print(bronze_df.columns[:50])

=== Quelques colonnes ===
['code', 'url', 'creator', 'created_t', 'created_datetime', 'last_modified_t', 'last_modified_datetime', 'last_modified_by', 'last_updated_t', 'last_updated_datetime', 'product_name', 'abbreviated_product_name', 'generic_name', 'quantity', 'packaging', 'packaging_tags', 'packaging_en', 'packaging_text', 'brands', 'brands_tags', 'brands_en', 'categories', 'categories_tags', 'categories_en', 'origins', 'origins_tags', 'origins_en', 'manufacturing_places', 'manufacturing_places_tags', 'labels', 'labels_tags', 'labels_en', 'emb_codes', 'emb_codes_tags', 'first_packaging_code_geo', 'cities', 'cities_tags', 'purchase_places', 'stores', 'countries', 'countries_tags', 'countries_en', 'ingredients_text', 'ingredients_tags', 'ingredients_analysis_tags', 'allergens', 'allergens_en', 'traces', 'traces_tags', 'traces_en']


In [10]:
print("=== Schéma inféré ===")
bronze_df.printSchema()

=== Schéma inféré ===
root
 |-- code: string (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: string (nullable = true)
 |-- created_datetime: string (nullable = true)
 |-- last_modified_t: string (nullable = true)
 |-- last_modified_datetime: string (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- last_updated_t: string (nullable = true)
 |-- last_updated_datetime: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_en: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- brands_en: string (nullable = true)
 |-- categories: string (nullable = true)
 

In [11]:
# display 20 lines
bronze_df.show(20, truncate=True)

+--------+--------------------+--------------------+----------+--------------------+---------------+----------------------+----------------+--------------+---------------------+--------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------+-------------+------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+------------+--------------------+--------------------+--------------------+------------------

## 2. Nettoyage & qualité (Silver)

In [12]:
cols_needed = [
    "code",
    "product_name",
    "brands",
    "categories",
    "countries",
    "last_modified_t",
    "nutriscore_grade",
    "nova_group",
    "environmental_score_grade",
    "energy-kcal_100g",
    "fat_100g",
    "saturated-fat_100g",
    "sugars_100g",
    "salt_100g",
    "proteins_100g",
    "fiber_100g",
    "sodium_100g",
    "completeness"
]

silver_raw = bronze_df.select(*[c for c in cols_needed if c in bronze_df.columns])


In [13]:
# trim des chaînes + cast des numériques
string_cols = ["product_name", "brands", "categories", "countries",
               "nutriscore_grade", "environmental_score_grade"]

for c_name in string_cols:
    if c_name in silver_raw.columns:
        silver_raw = silver_raw.withColumn(c_name, trim(col(c_name)))

numeric_cols = [
    "energy-kcal_100g", "fat_100g", "saturated-fat_100g",
    "sugars_100g", "salt_100g", "proteins_100g",
    "fiber_100g", "sodium_100g", "completeness"
]

for c_name in numeric_cols:
    if c_name in silver_raw.columns:
        silver_raw = silver_raw.withColumn(c_name, col(c_name).cast("double"))

# last_modified_t -> timestamp
from pyspark.sql.types import TimestampType

if "last_modified_t" in silver_raw.columns:
    silver_raw = silver_raw.withColumn(
        "last_modified_ts",
        to_timestamp(col("last_modified_t").cast("double"))
    )


In [14]:
silver_clean = silver_raw

if "sugars_100g" in silver_clean.columns:
    silver_clean = silver_clean.filter((col("sugars_100g") >= 0) & (col("sugars_100g") <= 100))

if "salt_100g" in silver_clean.columns:
    silver_clean = silver_clean.filter((col("salt_100g") >= 0) & (col("salt_100g") <= 25))

# etc. pour d'autres nutriments


In [15]:
w = Window.partitionBy("code").orderBy(col("last_modified_t").cast("long").desc())

silver_dedup = silver_clean.withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")


## 3. Dimensions (Gold)

In [16]:
dim_brand = silver_dedup \
    .select(trim(col("brands")).alias("brand_name")) \
    .filter(col("brand_name").isNotNull() & (col("brand_name") != "")) \
    .distinct() \
    .withColumn("brand_sk", monotonically_increasing_id())

# on garde pour join
dim_brand_cached = dim_brand.cache()


In [17]:
def extract_first_category(col_categories):
    # categories du style "Snacks>Biscuits>Chocolat"
    return split(col_categories, ",")[0]

dim_category = silver_dedup \
    .select(
        extract_first_category(col("categories")).alias("category_name_fr")
    ) \
    .filter(col("category_name_fr").isNotNull() & (col("category_name_fr") != "")) \
    .distinct() \
    .withColumn("category_sk", monotonically_increasing_id())

dim_category_cached = dim_category.cache()


In [18]:
dim_country = silver_dedup \
    .select(
        split(col("countries"), ",")[0].alias("country_name_fr")
    ) \
    .filter(col("country_name_fr").isNotNull() & (col("country_name_fr") != "")) \
    .distinct() \
    .withColumn("country_sk", monotonically_increasing_id())

dim_country_cached = dim_country.cache()


In [19]:
dim_time = silver_dedup \
    .select(col("last_modified_ts").alias("date")) \
    .filter(col("date").isNotNull()) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day", dayofmonth("date")) \
    .withColumn("week", weekofyear("date")) \
    .distinct() \
    .withColumn("time_sk", monotonically_increasing_id())

dim_time_cached = dim_time.cache()


In [20]:
from pyspark.sql.functions import col, split, trim, monotonically_increasing_id

# join pour récupérer les SK + garder explicitement les colonnes SK
product_with_fk = (
    silver_dedup.alias("s")
    .join(dim_brand_cached.alias("b"),
          trim(col("s.brands")) == col("b.brand_name"), "left")
    .join(dim_category_cached.alias("c"),
          split(col("s.categories"), ",")[0] == col("c.category_name_fr"), "left")
    .join(dim_country_cached.alias("co"),
          split(col("s.countries"), ",")[0] == col("co.country_name_fr"), "left")
    .join(dim_time_cached.alias("t"),
          col("s.last_modified_ts").cast("date") == col("t.date").cast("date"), "left")
    .select(
        col("s.*"),
        col("b.brand_sk"),
        col("c.category_sk"),
        col("co.country_sk"),
        col("t.time_sk")
    )
)

# dim_product (1 ligne par code)
dim_product = (
    product_with_fk
    .select(
        monotonically_increasing_id().alias("product_sk"),
        col("code"),
        col("product_name"),
        col("brand_sk"),
        col("category_sk").alias("primary_category_sk"),
        col("countries").alias("countries_multi"),
        col("last_modified_ts").alias("effective_from"),
        lit(None).cast("timestamp").alias("effective_to"),
        lit(1).alias("is_current")
    )
    .dropDuplicates(["code"])
)



## 4. Table de faits

In [21]:
from pyspark.sql.functions import to_json, create_map, lit, monotonically_increasing_id

fact_base = (
    product_with_fk.alias("pwf")
    .join(dim_product.alias("p"), col("pwf.code") == col("p.code"), "left")
    .select(col("pwf.*"), col("p.product_sk"))
)

fact = fact_base.select(
    monotonically_increasing_id().alias("fact_id"),
    col("product_sk"),
    col("time_sk"),
    col("`energy-kcal_100g`").alias("energy_kcal_100g"),
    col("fat_100g"),
    col("`saturated-fat_100g`").alias("saturated_fat_100g"),
    col("sugars_100g"),
    col("salt_100g"),
    col("proteins_100g"),
    col("fiber_100g"),
    col("sodium_100g"),
    col("nutriscore_grade"),
    col("nova_group"),
    col("environmental_score_grade").alias("ecoscore_grade"),
    col("completeness").alias("completeness_score")
)

fact = fact.withColumn(
    "quality_issues_json",
    to_json(
        create_map(
            lit("missing_nutriscore"), col("nutriscore_grade").isNull().cast("int"),
            lit("missing_energy"), col("energy_kcal_100g").isNull().cast("int")
        )
    )
)


In [22]:
metrics = {
    "source": "OpenFoodFacts CSV",
    "nb_lus": bronze_df.count(),
    "nb_apres_dedup": silver_dedup.count(),
    "pct_nutriscore": silver_dedup.filter(col("nutriscore_grade").isNotNull()).count() / silver_dedup.count(),
    "nb_sugars_anomalies": silver_dedup.filter(col("sugars_100g") > 100).count()
}

In [23]:
import json
import os
from datetime import datetime

output_dir = "../docs/quality"
os.makedirs(output_dir, exist_ok=True)

# file name with timestamp
run_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"{output_dir}/metrics_{run_ts}.json"

# write JSON
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(metrics, f, ensure_ascii=False, indent=2)

print(f"Metrics saved in : {output_path}")

Metrics saved in : ../docs/quality/metrics_20260113_165505.json


In [24]:
spark.stop()