# üìä 04 - Analyse de Donn√©es avec Spark
Ce notebook a pour objectif de fournir des donn√©es d'analyse √† partir des donn√©es d√©j√† transform√©es, nettoy√©es et enrichies.


In [1]:


# 0. Stoppe toute session existante
try:
    spark.stop()
except:
    pass

# 1. Recr√©ation SparkSession avec 16 Go de RAM allou√©s
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("04_analyse")
        .master("local[*]")  # Utilise tous les c≈ìurs disponibles
        .config("spark.hadoop.fs.defaultFS", "file:///")
        .config("spark.driver.host", "127.0.0.1")
        .config("spark.driver.bindAddress", "0.0.0.0")
        .config("spark.driver.memory", "16g")         # <-- alloue 16 Go au driver Spark
        .config("spark.executor.memory", "16g")       # <-- alloue 16 Go aux t√¢ches ex√©cut√©es (optionnel en local)
        .config("spark.sql.shuffle.partitions", "8")  # <-- limite les partitions pour limiter le thread/m√©moire
        .getOrCreate()
)


In [2]:
from pyspark.sql.functions import col, lower, trim, count
import os

# 1. Chargement
path = os.path.abspath(os.path.join(os.getcwd(), "../data/step3_enriched_csv"))
df_enriched = spark.read.option("header", "true").option("sep", ";").csv(path)

In [3]:
# Marques les plus fr√©quentes
top_brands = df_enriched.groupBy("brands") \
    .count() \
    .orderBy("count", ascending=False) \
    .filter(col("brands").isNotNull() & (col("brands") != "")) \
    .limit(30)

# Sauvegarde pour la visualisation
top_brands.coalesce(1) \
    .write.option("header", "true") \
    .mode("overwrite") \
    .csv("../data/viz/top_brands")

In [4]:
# Pays les plus fr√©quents
top_countries = df_enriched.groupBy("country") \
    .count() \
    .orderBy("count", ascending=False) \
    .filter(col("country").isNotNull() & (col("country") != "")) \
    .limit(20)

top_countries.coalesce(1) \
    .write.option("header", "true") \
    .mode("overwrite") \
    .csv("../data/viz/top_countries")

In [5]:
# üìä Moyenne score_env_composite_flexible (impact environnementale) par pays (class√©s par fr√©quence d‚Äôapparition)
from pyspark.sql import functions as F

# Fr√©quence des pays
country_freq = df_enriched.groupBy("country").count()

# Moyenne des scores environnementaux
score_by_country = df_enriched.groupBy("country") \
    .agg(F.avg("score_env_composite_flexible").alias("avg_score_env_composite_flexible"))

# Jointure et tri par fr√©quence d√©croissante
result_country = score_by_country.join(country_freq, on="country") \
    .orderBy(F.desc("count"))

result_country.coalesce(1) \
    .write.option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .csv("../data/viz/score_env_by_country")


In [6]:
# Moyenne score_composite (aliments sains) par pays (class√©s par fr√©quence d‚Äôapparition) 

score_composite_by_country = df_enriched.groupBy("country") \
    .agg(F.avg("score_composite").alias("avg_score_composite"))

result_composite_country = score_composite_by_country.join(country_freq, on="country") \
    .orderBy(F.desc("count"))

result_composite_country.coalesce(1) \
    .write.option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .csv("../data/viz/score_composite_by_country")


In [7]:
# üè∑Ô∏è Moyenne score_env_composite_flexible par marque (class√©es par fr√©quence)

brand_freq = df_enriched.groupBy("brands").count()

score_by_brand = df_enriched.groupBy("brands") \
    .agg(F.avg("score_env_composite_flexible").alias("avg_score_env_composite_flexible"))

result_brand = score_by_brand.join(brand_freq, on="brands") \
    .orderBy(F.desc("count"))

result_brand.coalesce(1) \
    .write.option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .csv("../data/viz/score_env_by_brand")


In [8]:
# üè∑Ô∏è Moyenne score_composite par marque (class√©es par fr√©quence)

score_composite_by_brand = df_enriched.groupBy("brands") \
    .agg(F.avg("score_composite").alias("avg_score_composite"))

result_composite_brand = score_composite_by_brand.join(brand_freq, on="brands") \
    .orderBy(F.desc("count"))

result_composite_brand.coalesce(1) \
    .write.option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .csv("../data/viz/score_composite_by_brand")


In [9]:
from pyspark.sql.functions import col, avg, max as spark_max, when, lit
from pyspark.sql.types import BooleanType

# üß± 1. Base des m√©triques quantitatives
metrics_cols = [
    col("score_env_composite_flexible").cast("double").alias("score_env_composite_flexible"),
    col("score_composite").cast("double").alias("score_composite"),
    col("energy-kcal_100g").cast("double").alias("energy_kcal_100g")
]

# üîé 2. Colonnes bool√©ennes personnalis√©es
bool_cols = [c for c in df_enriched.columns if c.startswith("is_")]
print(f"‚úÖ Colonnes bool√©ennes d√©tect√©es ({len(bool_cols)}) :", bool_cols)

# üßº 3. Forcer le cast vers bool√©en + nulls ‚Üí False
df_clean = df_enriched
for c in bool_cols:
    df_clean = df_clean.withColumn(
        c,
        when(col(c).cast(BooleanType()).isNull(), lit(False)).otherwise(col(c).cast(BooleanType()))
    )

# üßÆ 4. Cr√©ation de la base avec cast vers int pour aggregation
df_base = df_clean.select(
    col("country"), col("brands"),
    *metrics_cols,
    *[col(c).cast("int").alias(c) for c in bool_cols]
)
df_base.createOrReplaceTempView("df_base")

# üìä 5. G√©n√©ration dynamique de la requ√™te CUBE
base_expr = """
    country,
    brands,
    AVG(score_env_composite_flexible) AS avg_env_score,
    AVG(score_composite) AS avg_composite_score,
    AVG(energy_kcal_100g) AS avg_kcal,
    GROUPING(country) AS grouping_country,
    GROUPING(brands) AS grouping_brands,
    GROUPING_ID(country, brands) AS grouping_id
"""
bool_exprs = ",\n    ".join([
    f"MAX({c}) = 1 AS has_{c.replace('is_', '')}" for c in bool_cols
])
query = f"""
SELECT
    {base_expr},
    {bool_exprs}
FROM df_base
GROUP BY CUBE(country, brands)
"""

df_final = spark.sql(query)

# ‚úÖ V√©rification
has_cols = [f"has_{c.replace('is_', '')}" for c in bool_cols]
null_counts = df_final.select([
    col(c).isNull().cast("int").alias(c + "_nulls") for c in has_cols
]).groupBy().sum().collect()[0].asDict()

print("üìã V√©rification finale des nulls :")
for k, v in null_counts.items():
    print(f" - {k.replace('_nulls','')}: {v} ligne(s) nulles")

# Remplacement √©ventuel des derniers nulls
from pyspark.sql.functions import coalesce
for c in has_cols:
    df_final = df_final.withColumn(c, coalesce(col(c), lit(False)))

# üíæ Export final
df_final.cache()
print("‚úÖ Nombre de lignes dans df_final :", df_final.count())

df_final.coalesce(1) \
    .write.option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .csv("../data/viz/cube_agg_country_brand_scores_enriched")


‚úÖ Colonnes bool√©ennes d√©tect√©es (6) : ['is_vegan', 'is_vegetarian', 'is_sans_sucre', 'is_protein_plus', 'is_light', 'is_ultra_transformed']
üìã V√©rification finale des nulls :
 - sum(has_vegan): 0 ligne(s) nulles
 - sum(has_vegetarian): 0 ligne(s) nulles
 - sum(has_sans_sucre): 0 ligne(s) nulles
 - sum(has_protein_plus): 0 ligne(s) nulles
 - sum(has_light): 0 ligne(s) nulles
 - sum(has_ultra_transformed): 0 ligne(s) nulles
‚úÖ Nombre de lignes dans df_final : 684661


In [10]:
from pyspark.sql.functions import col, mean, stddev, min, max, count, isnan, when, percentile_approx
from pyspark.sql.types import NumericType, DoubleType
from functools import reduce
from math import ceil

# üßº 0. Cast auto des strings contenant des chiffres
for f in df_enriched.schema.fields:
    if f.dataType.simpleString() == "string":
        try:
            sample = df_enriched.select(col(f.name)).dropna().limit(10).rdd.map(lambda row: row[0]).collect()
            if all(isinstance(v, str) and v.replace(",", ".").replace(".", "", 1).isdigit() for v in sample):
                df_enriched = df_enriched.withColumn(f.name, col(f.name).cast(DoubleType()))
        except Exception:
            pass

# üîé 1. D√©tection des colonnes num√©riques avec valeurs valides
numeric_cols = [
    f.name for f in df_enriched.schema.fields
    if isinstance(f.dataType, NumericType)
    and df_enriched.filter(~col(f.name).isNull() & ~isnan(col(f.name))).limit(1).count() > 0
]
print(f"‚úÖ Colonnes num√©riques avec valeurs valides : {numeric_cols}")

if not numeric_cols:
    print("‚ùå Aucune colonne num√©rique avec des valeurs valides.")
else:
    # üìä 2a. Agr√©gats double
    agg_exprs_double = []
    for c in numeric_cols:
        agg_exprs_double.extend([
            mean(col(c)).alias(f"{c}_mean"),
            stddev(col(c)).alias(f"{c}_stddev"),
            min(col(c)).alias(f"{c}_min"),
            max(col(c)).alias(f"{c}_max"),
            percentile_approx(col(c), 0.5).alias(f"{c}_median"),
        ])

    # üìä 2b. Agr√©gats entier (bigint)
    agg_exprs_int = []
    for c in numeric_cols:
        agg_exprs_int.extend([
            count(when(col(c).isNull() | isnan(col(c)), c)).alias(f"{c}_nulls"),
            count(col(c)).alias(f"{c}_count")
        ])

    df_double = df_enriched.agg(*agg_exprs_double)
    df_int = df_enriched.agg(*agg_exprs_int)

    def explode_columns(df):
        chunk_size = 50
        columns = df.columns
        num_chunks = ceil(len(columns) / chunk_size)
        escape = lambda c: f"`{c.replace('`', '``')}`"
        parts = []
        for i in range(num_chunks):
            chunk = columns[i * chunk_size:(i + 1) * chunk_size]
            stack_expr = f"stack({len(chunk)}, " + ", ".join([f"'{c}', {escape(c)}" for c in chunk]) + ") as (metric, value)"
            parts.append(df.selectExpr(stack_expr))
        return reduce(lambda a, b: a.unionByName(b), parts)

    profiling_long_double = explode_columns(df_double)
    profiling_long_int = explode_columns(df_int)
    profiling_long = profiling_long_double.unionByName(profiling_long_int)

    # üíæ 5. Export
    profiling_long.coalesce(1) \
        .write.option("header", "true") \
        .option("sep", ";") \
        .mode("overwrite") \
        .csv("../data/viz/univariate_profiling_export")

    print("‚úÖ Export termin√© : ../data/viz/univariate_profiling_export")


‚úÖ Colonnes num√©riques avec valeurs valides : ['serving_quantity', 'additives_n', 'nova_group', 'environmental_score_score', 'product_quantity', 'completeness', 'energy-kj_100g', 'energy-kcal_100g', 'energy-from-fat_100g', 'fat_100g', 'saturated-fat_100g', 'caprylic-acid_100g', 'capric-acid_100g', 'myristic-acid_100g', 'arachidic-acid_100g', 'lignoceric-acid_100g', 'cerotic-acid_100g', 'unsaturated-fat_100g', 'monounsaturated-fat_100g', 'polyunsaturated-fat_100g', 'omega-6-fat_100g', 'eicosapentaenoic-acid_100g', 'docosahexaenoic-acid_100g', 'dihomo-gamma-linolenic-acid_100g', 'oleic-acid_100g', 'mead-acid_100g', 'nervonic-acid_100g', 'trans-fat_100g', 'cholesterol_100g', 'carbohydrates_100g', 'sugars_100g', 'added-sugars_100g', 'sucrose_100g', 'glucose_100g', 'fructose_100g', 'galactose_100g', 'lactose_100g', 'starch_100g', 'polyols_100g', 'erythritol_100g', 'fiber_100g', 'soluble-fiber_100g', 'insoluble-fiber_100g', 'proteins_100g', 'casein_100g', 'serum-proteins_100g', 'nucleotide