# Explication des cellules d'initialisation
Nous avons testé plusieurs approches : en local, avec Pycharm. Dans le cloud, avec Kaggle & Google Colab.
Nous avons donc mis en place plusieurs configurations pour chaque environnement.
Si local, vérifier que java 11 est bien installé. Si Kaggle, uploader le csv comme dataset afin de l'avoir à disposition.


## Made by :
- Maxime Kets
- Thomas Blaisot
- Cedric Sanchez
- Pierre-Louis Guinel

In [None]:
import time
from pyspark.sql import SparkSession
import shutil
import os
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from pyspark.sql.functions import col, when, avg, max, min, count, row_number, split, explode, trim
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

In [None]:
file_path_csv = "/kaggle/input/openfoodfacts/en.openfoodfacts.org.products.csv"
file_path_parquet = "/kaggle/working/en.openfoodfacts.org.products.parquet"

In [None]:
file_path_csv = "./data/en.openfoodfacts.org.products.csv"
file_path_parquet = "./data/en.openfoodfacts.org.products.parquet"
file_save_path = "./data/saved.parquet"

In [None]:
#database configuration
path_mysql = "mysql/mysql-connector-j-9.1.0.jar"
url = "jdbc:mysql://localhost:3306/openfood_db"
properties = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [None]:
start_time = time.time()
print("Démarrage du script...")

# Initialiser une SparkSession avec des logs réduits
spark = SparkSession.builder \
    .appName("Exploration OpenFoodFacts") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.executor.memory", "2g") \
    .config("spark.jars", path_mysql) \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")  # Réduction des logs

print("PySpark chargé")

try:
    # Charger le fichier CSV en tant que DataFrame Spark puis échantillonne 20%
    df_csv_before_sample = spark.read.csv(file_path_csv, header=True, inferSchema=True, sep="\t")
    print("Fichier CSV chargé.")
    df_csv = df_csv_before_sample.sample(withReplacement=False, fraction=0.2)  # Échantillonnage à 20%
    print("Echantillonage terminé")

    # Sauvegarder le DataFrame au format Parquet
    df_csv.write.parquet(file_path_parquet, mode="overwrite")
    print("Données sauvegardées au format Parquet.")

    # Charger le fichier Parquet pour une analyse future
    df_parquet = spark.read.parquet(file_path_parquet)
    print("Fichier Parquet chargé.")

    # Vérifier et supprimer l'ancienne table Hive
    table_name = "hive_table"
    hive_table_path = f"spark-warehouse/{table_name}"
    if table_name in [t.name for t in spark.catalog.listTables("default")]:
        print(f"La table '{table_name}' existe déjà et doit être supprimé.")
        spark.sql(f"DROP TABLE {table_name}")
        print(f"Table Hive '{table_name}' supprimée.")
    if os.path.exists(hive_table_path):
        shutil.rmtree(hive_table_path)
        print(f"Répertoire associé '{hive_table_path}' supprimé.")

    # Création de la table Hive
    print("Création et insertion dans la table Hive...")
    hive_table_start_time = time.time()
    df_csv.write.mode("overwrite").saveAsTable("hive_table")

except Exception as e:
    print(f"Erreur rencontrée : {e}")

finally:
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Temps d'exécution : {elapsed_time:.2f} secondes")


# Analyse préliminaire
### 1. Mise en valeur des lignes, colonnes



In [None]:
# Mesure du temps pour compter les lignes du DataFrame CSV
csv_start_time = time.time()
csv_row_count = df_csv.count()
csv_end_time = time.time()
csv_elapsed_time_ms = (csv_end_time - csv_start_time) * 1000
print(f"CSV (comptage des lignes): {csv_elapsed_time_ms:.3f} ms - Nombre de lignes: {csv_row_count}")

# Mesure du temps pour compter les lignes du DataFrame Parquet
parquet_start_time = time.time()
parquet_row_count = df_parquet.count()
parquet_end_time = time.time()
parquet_elapsed_time_ms = (parquet_end_time - parquet_start_time) * 1000
print(f"Parquet (comptage des lignes): {parquet_elapsed_time_ms:.3f} ms - Nombre de lignes: {parquet_row_count}")

# Mesure du temps pour compter les lignes de la table Hive
hive_start_time = time.time()
df_hive = spark.sql("SELECT * FROM hive_table")
hive_row_count = df_hive.count()
hive_end_time = time.time()
hive_elapsed_time_ms = (hive_end_time - hive_start_time) * 1000
print(f"Hive (comptage des lignes): {hive_elapsed_time_ms:.3f} ms - Nombre de lignes: {hive_row_count}")

# Comparaison des temps
print("\nComparaison des performances :")
print(f"CSV Execution Time: {csv_elapsed_time_ms:.3f} ms")
print(f"Parquet Execution Time: {parquet_elapsed_time_ms:.3f} ms")
print(f"Hive Execution Time: {hive_elapsed_time_ms:.3f} ms")

### .2 Gestion des valeurs manquantes


In [None]:
from pyspark.sql.functions import col, count, when

start_time = time.time()

# Calculate missing data percentage for each column
total_rows = df_parquet.count()
missing_data = (
    df_parquet.select([
        (count(when(col(c).isNull() | (col(c) == ""), c)) / total_rows).alias(c)
        for c in df_parquet.columns
    ])
)

# Transform columns into rows (melt operation)
missing_data_melted = missing_data.selectExpr(
    "stack({0}, {1}) as (Column, MissingPercentage)".format(
        len(df_parquet.columns),
        ", ".join([f"'{col}', `{col}`" for col in df_parquet.columns])
    )
).filter(col("MissingPercentage").isNotNull()).orderBy(col("MissingPercentage").desc())

# Identify columns with 100% missing data
columns_to_drop = (
    missing_data_melted.filter(col("MissingPercentage") == 1.0)
    .select("Column")
    .rdd.flatMap(lambda x: x)
    .collect()
)

# Drop columns with 100% missing values
df_cleanedby_missing_value = df_parquet.drop(*columns_to_drop)

# Display the top 10 columns with the highest missing percentages
print("Top 10 columns with the highest missing percentages:")
missing_data_melted.show(10, truncate=False)

# Print dropped columns
print(f"Columns dropped due to 100% missing values: {columns_to_drop}")

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution completed in {elapsed_time:.2f} seconds")


### 3. Gestion des valeurs doublons

In [None]:
from pyspark.sql.functions import col, count

start_time = time.time()

# Analyzing Duplicates in 'code', 'product_name', and 'brands'
duplicates = (
    df_cleanedby_missing_value.groupBy("code", "product_name", "brands")
    .count()
    .filter(col("count") > 1)
)

# Affiche le nombre de doublons identifiés
print(f"There are {duplicates.count()} duplicate rows based on 'code', 'product_name', and 'brands'.")
duplicates.show(truncate=False)

# Remove duplicates where 'code', 'product_name', and 'brands' are the same
df_cleaned_by_duplicate = df_cleanedby_missing_value.dropDuplicates(["code", "product_name", "brands"])

print(f"Number of rows before removing duplicates: {df_cleanedby_missing_value.count()}")
print(f"Number of rows after removing duplicates: {df_cleaned_by_duplicate.count()}")

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution completed in {elapsed_time:.2f} seconds")


### 4. Gestion des valeurs aberrantes

In [None]:
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.types import IntegerType, DoubleType, FloatType

# Extraire les valeurs numériques de la colonne "quantity"
df_cleaned_by_outliers = df_cleaned_by_duplicate.withColumn(
    "quantity_numeric",
    regexp_extract(col("quantity"), r"(\d+)", 1).cast("double")
)

# Détecter les colonnes numériques
numeric_columns = [
    field.name for field in df_cleaned_by_outliers.schema.fields
    if isinstance(field.dataType, (IntegerType, DoubleType, FloatType))
]
print(f"Numeric columns detected: {numeric_columns}")

if not numeric_columns:
    print("No numeric columns found. Please check your data.")
else:
    total_rows_before = df_cleaned_by_outliers.count()
    removed_rows_total = 0

    # Boucle sur les colonnes numériques pour détecter les outliers
    for column in numeric_columns:
        try:
            # Calcul des quartiles avec approxQuantile
            quantiles = df_cleaned_by_outliers.approxQuantile(column, [0.25, 0.75], 0.05)
            if len(quantiles) < 2:
                print(f"Column '{column}' has insufficient data. Skipping...")
                continue

            q1, q3 = quantiles
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            # Filtrer les outliers
            df_outliers = df_cleaned_by_outliers.filter((col(column) < lower_bound) | (col(column) > upper_bound))
            removed_rows = df_outliers.count()
            removed_rows_total += removed_rows

            print(f"Column '{column}': {removed_rows} rows detected as outliers.")

        except Exception as e:
            print(f"Error processing column '{column}': {e}")

    print(f"Total rows before filtering: {total_rows_before}")
    print(f"Total rows removed as outliers: {removed_rows_total}")
    print(f"Total rows remaining: {total_rows_before - removed_rows_total}")

# Data cleaning


In [None]:
# display the schema of the cleaned DataFrame
df_parquet = df_outliers
df_parquet.describe()

In [None]:
#select columns to keep
selected_column = [
    'code',
    'product_name',
    'brands',
    'categories',
    "main_category",
    'quantity',
    'packaging',
    'countries',
    'ingredients_text',
    'allergens',
    'serving_size',
    'energy-kcal_100g',
    'fat_100g',
    'saturated-fat_100g',
    "proteins_100g",
    'salt_100g',
    'nutriscore_score',
    'nutriscore_grade',
    "food_groups_en",
    "sodium_100g",
    "sugars_100g",
    "fiber_100g"
]

df_transformed = df_parquet.select(selected_column)
df_transformed.show(5, truncate=False)

In [None]:
# convert the columns to the appropriate format
numeric_cols = [
    'nutriscore_score', 'energy-kcal_100g', 'fat_100g', 'saturated-fat_100g',
    'proteins_100g', 'sugars_100g', 'salt_100g', 'fiber_100g'
]
# apply the conversion
for column in numeric_cols:
    df_transformed = df_transformed.withColumn(column, col(column).cast("double"))


# Transformation des données Transform :
Ajouter des colonnes calculées, par exemple : Indice de qualité nutritionnelle 
Calculer un score basé sur les nutriments (e.g., sodium, sugar, fiber). 
Extraire la catégorie principale d'un produit (e.g., "boissons", "snacks"). 
Regrouper les données par catégories (categories) pour analyser les tendances (e.g., moyenne des calories par catégorie).

--> Quel calcules effectuer ?  
--> Quel catégories créer ?


In [None]:
# display the schema of the transformed DataFrame
df_transformed.printSchema()

In [None]:
# convert code in string
df_transformed = df_transformed.withColumn("code", col("code").cast("string"))

In [None]:
# display the schema of the transformed DataFrame
df_transformed.printSchema()

In [None]:
# add a new column 'main_category' by extracting the first category from 'categories'
from pyspark.sql.functions import split

df_transformed = df_transformed.withColumn("main_category", split(col("categories"), ",").getItem(0))
df_transformed.show(5, truncate=False)


In [None]:
# create a new column with quality nutrition score (IQN)
df_transformed = df_transformed.withColumn(
    "nutrition_score",
    0.3 * col("fiber_100g") +
    0.2 * col("proteins_100g") -
    0.4 * col("sugars_100g") -
    0.3 * col("saturated-fat_100g") -
    0.1 * col("salt_100g")
)
df_transformed.show(5, truncate=False)


# Analyse exploratoire :
Utiliser des fonctions de calcul sur fenêtre pour : 
Trouver les produits les plus caloriques par catégorie. 
Identifier les tendances de production par brands (marques). 
Générer des statistiques descriptives (e.g., médiane, moyenne des nutriments par catégorie

In [None]:

# --- Étape : Suppression des doublons après explosion ---
df_unique_products = df_transformed.dropDuplicates(['code'])

# --- Étape 2 : Catégoriser les produits en fonction du score nutritionnel ---

# Ajouter une catégorie de score nutritionnel
df_unique_products = df_unique_products.withColumn(
    "score_category",
    when(col("nutrition_score") > 0, "Positif")
    .when(col("nutrition_score") == 0, "Neutre")
    .otherwise("Négatif")
)

# Compter les produits par catégorie de score
score_category_counts = df_unique_products.groupBy("score_category").count()

# Afficher le résultat
print("Distribution des catégories de score nutritionnel :")
score_category_counts.show()

# --- Étape 3 : Identifier les produits les plus sains et les moins sains ---

# Inclure des informations supplémentaires pour plus de contexte
top_healthy = df_unique_products.orderBy(col("nutrition_score").desc()).select(
    "product_name", "brands", "main_category", "nutrition_score"
).limit(10)

print("Top 10 des produits les plus sains :")
top_healthy.show(truncate=False)

top_unhealthy = df_unique_products.orderBy(col("nutrition_score").asc()).select(
    "product_name", "brands", "main_category", "nutrition_score"
).limit(10)

print("Top 10 des produits les moins sains :")
top_unhealthy.show(truncate=False)

# --- Étape 4 : Produits les plus caloriques par catégorie principale ---

# Utiliser 'main_category' pour le groupement
window_spec = Window.partitionBy("main_category").orderBy(col("energy-kcal_100g").desc())

# Ajouter une colonne avec le rang des calories
df_unique_products = df_unique_products.withColumn("calorie_rank", row_number().over(window_spec))

# Filtrer pour obtenir les produits les plus caloriques par catégorie principale
most_caloric_products = df_unique_products.filter(col("calorie_rank") == 1).select(
    "main_category", "product_name", "brands", "energy-kcal_100g"
)

print("Produits les plus caloriques par catégorie principale :")
most_caloric_products.show(truncate=False)

# --- Étape 5 : Analyse des tendances des marques ---

# Compter le nombre de produits par marque
brand_trends = df_unique_products.groupBy("brands").agg(
    count("*").alias("product_count")
).orderBy(col("product_count").desc())

print("Tendances de production par marques (Top 10) :")
brand_trends.limit(10).show(truncate=False)

# --- Étape 6 : Statistiques descriptives par catégorie principale ---

# Calculer les statistiques descriptives pour chaque catégorie principale
category_statistics = df_unique_products.groupBy("main_category").agg(
    avg("energy-kcal_100g").alias("avg_calories"),
    avg("fat_100g").alias("avg_fat"),
    avg("proteins_100g").alias("avg_proteins"),
    avg("sugars_100g").alias("avg_sugars"),
    avg("salt_100g").alias("avg_salt")
).orderBy("main_category")

print("Statistiques descriptives par catégorie principale :")
category_statistics.show(truncate=False)

# --- Étape 7 : Analyses supplémentaires ---

# Produits les plus salés
top_salty = df_unique_products.orderBy(col("salt_100g").desc()).select(
    "product_name", "brands", "main_category", "salt_100g"
).limit(10)

print("Top 10 des produits les plus salés :")
top_salty.show(truncate=False)

# Produits les plus sucrés
top_sugary = df_unique_products.orderBy(col("sugars_100g").desc()).select(
    "product_name", "brands", "main_category", "sugars_100g"
).limit(10)

print("Top 10 des produits les plus sucrés :")
top_sugary.show(truncate=False)

# Distribution du score nutritionnel par catégorie principale
category_score_distribution = df_unique_products.groupBy("main_category").agg(
    avg("nutrition_score").alias("avg_nutrition_score"),
    min("nutrition_score").alias("min_nutrition_score"),
    max("nutrition_score").alias("max_nutrition_score")
).orderBy("main_category")

print("Distribution du score nutritionnel par catégorie principale :")
category_score_distribution.show(truncate=False)

# Sauvegarde des données Save :
Partitionner les données par catégories (categories) et années (year). 
Sauvegarder les résultats transformés en format Parquet avec compression Snappy. 
Sauvegarder les résultats transformés dans les bases de données: postgresql/sqlserver/mysql/Snowflake/BigQuery

In [None]:
# Save the transformed data to Docker database
df_transformed.write.jdbc(url=url, table="nom_de_la_table", mode="append", properties=properties)


In [None]:
#saving data with snappy
df_transformed.write \
    .option("compression", "snappy") \
    .parquet(file_save_path)
print("Data saved in Parquet format with Snappy compression.")



# Présentation des résultats :
Visualiser les résultats sous forme de graphiques ou tableaux 
(les étudiants peuvent utiliser un outil comme Jupyter Notebook en local ou Google Colab 

In [None]:
# convert to pandas for visualization
df_unique_products = df_transformed.dropDuplicates(['code'])
pd_transformed = df_unique_products.toPandas()


In [None]:
plt.figure(figsize=(10, 6))
sns.histplot(pd_transformed['nutrition_score'].dropna(), bins=30, kde=True)
plt.title('Distribution du Score Nutritionnel')
plt.xlabel('Score Nutritionnel')
plt.ylabel('Fréquence')
plt.show()


In [None]:
# Sélectionner les top produits les plus caloriques
top_caloric_products = pd_transformed.sort_values('energy-kcal_100g', ascending=False).head(10)
print("Top 10 des Produits les Plus Caloriques :")
print(top_caloric_products[['product_name', 'energy-kcal_100g']])

plt.figure(figsize=(12, 6))
sns.barplot(x='product_name', y='energy-kcal_100g', data=top_caloric_products)
plt.title('Top 10 des Produits les Plus Caloriques')
plt.xlabel('Produit')
plt.ylabel('Énergie (kcal/100g)')
plt.xticks(rotation=90)
plt.show()


In [None]:
top_brands = pd_transformed['brands'].value_counts().head(10).reset_index()
top_brands.columns = ['brands', 'product_count']

plt.figure(figsize=(12, 6))
sns.barplot(x='brands', y='product_count', data=top_brands)
plt.title('Top 10 des Marques par Nombre de Produits')
plt.xlabel('Marque')
plt.ylabel('Nombre de Produits')
plt.xticks(rotation=90)
plt.show()


In [None]:
# Calculer les moyennes
nutrient_means = pd_transformed.groupby('main_category').agg({
    'energy-kcal_100g': 'mean',
    'fat_100g': 'mean',
    'proteins_100g': 'mean',
    'sugars_100g': 'mean',
    'salt_100g': 'mean'
}).reset_index()

# Visualiser la moyenne des calories par catégorie
plt.figure(figsize=(30, 10))
sns.barplot(x='main_category', y='energy-kcal_100g', data=nutrient_means)
plt.title('Moyenne des Calories par Catégorie Principale')
plt.xlabel('Catégorie Principale')
plt.ylabel('Énergie Moyenne (kcal/100g)')
plt.xticks(rotation=90)
plt.show()


In [None]:
fig = px.box(pd_transformed, x='main_category', y='nutrition_score',
             title='Score Nutritionnel par Catégorie Principale')
fig.update_layout(xaxis_title='Catégorie Principale', yaxis_title='Score Nutritionnel')
fig.show()


In [None]:
#stopper la session
spark.stop()