# Explication des cellules d'initialisation
Nous avons testé plusieurs approches : en local, avec Pycharm. Dans le cloud, avec Kaggle & Google Colab.
Nous avons donc mis en place plusieurs configurations pour chaque environnement.
Si local, vérifier que java 11 est bien installé. Si Kaggle, uploader le csv comme dataset afin de l'avoir à disposition.


In [None]:
import time
from pyspark.sql import SparkSession
import shutil
import os

In [None]:
!pip install pyspark
file_path_csv = "/kaggle/input/openfoodfacts/en.openfoodfacts.org.products.csv"
file_path_parquet = "/kaggle/working/en.openfoodfacts.org.products.parquet"

In [54]:
file_path_csv = "./data/en.openfoodfacts.org.products.csv"
file_path_parquet = "./data/en.openfoodfacts.org.products.parquet"
file_path_mysql = "/home/cedric/PycharmProjects/ETL_project/mysql-connector-j-9.1.0/mysql-connector-j-9.1.0.jar"


In [55]:


start_time = time.time()
print("Démarrage du script...")

# Initialiser une SparkSession avec des logs réduits
spark = SparkSession.builder \
    .appName("Exploration OpenFoodFacts") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.executor.memory", "2g")  \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.jars", file_path_mysql) \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")  # Réduction des logs

print("PySpark chargé")

try:
    # Charger le fichier CSV en tant que DataFrame Spark puis échantillonne 20%
    df_csv_before_sample = spark.read.csv(file_path_csv, header=True, inferSchema=True, sep="\t")
    print("Fichier CSV chargé.")
    df_csv = df_csv_before_sample.sample(withReplacement=False, fraction=0.2)  # Échantillonnage à 20%
    print("Echantillonage terminé")

    # Sauvegarder le DataFrame au format Parquet
    df_csv.write.parquet(file_path_parquet, mode="overwrite")
    print("Données sauvegardées au format Parquet.")

    # Charger le fichier Parquet pour une analyse future
    df_parquet = spark.read.parquet(file_path_parquet)
    print("Fichier Parquet chargé.")

    # Vérifier et supprimer l'ancienne table Hive
    table_name = "hive_table"
    hive_table_path = f"spark-warehouse/{table_name}"
    if table_name in [t.name for t in spark.catalog.listTables("default")]:
        print(f"La table '{table_name}' existe déjà et doit être supprimé.")
        spark.sql(f"DROP TABLE {table_name}")
        print(f"Table Hive '{table_name}' supprimée.")
    if os.path.exists(hive_table_path):
        shutil.rmtree(hive_table_path)
        print(f"Répertoire associé '{hive_table_path}' supprimé.")

    # Création de la table Hive
    print("Création et insertion dans la table Hive...")
    hive_table_start_time = time.time()
    df_csv.write.mode("overwrite").saveAsTable("hive_table")
    
except Exception as e:
    print(f"Erreur rencontrée : {e}")

finally:
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Temps d'exécution : {elapsed_time:.2f} secondes")


Démarrage du script...
PySpark chargé


                                                                                

Fichier CSV chargé.
Echantillonage terminé


                                                                                

Données sauvegardées au format Parquet.
Fichier Parquet chargé.
La table 'hive_table' existe déjà et doit être supprimé.
Table Hive 'hive_table' supprimée.
Création et insertion dans la table Hive...




Temps d'exécution : 78.75 secondes


                                                                                

In [None]:
# Mesure du temps pour compter les lignes du DataFrame CSV
csv_start_time = time.time()
csv_row_count = df_csv.count()  # Compter les lignes
csv_end_time = time.time()
csv_elapsed_time_ms = (csv_end_time - csv_start_time) * 1000
print(f"CSV (comptage des lignes): {csv_elapsed_time_ms:.3f} ms - Nombre de lignes: {csv_row_count}")

# Mesure du temps pour compter les lignes du DataFrame Parquet
parquet_start_time = time.time()
parquet_row_count = df_parquet.count()  # Compter les lignes
parquet_end_time = time.time()
parquet_elapsed_time_ms = (parquet_end_time - parquet_start_time) * 1000
print(f"Parquet (comptage des lignes): {parquet_elapsed_time_ms:.3f} ms - Nombre de lignes: {parquet_row_count}")

# Mesure du temps pour compter les lignes de la table Hive
hive_start_time = time.time()
df_hive = spark.sql("SELECT * FROM hive_table")  # Charger la table Hive
hive_row_count = df_hive.count()  # Compter les lignes
hive_end_time = time.time()
hive_elapsed_time_ms = (hive_end_time - hive_start_time) * 1000
print(f"Hive (comptage des lignes): {hive_elapsed_time_ms:.3f} ms - Nombre de lignes: {hive_row_count}")

# Comparaison des temps
print("\nComparaison des performances :")
print(f"CSV Execution Time: {csv_elapsed_time_ms:.3f} ms")
print(f"Parquet Execution Time: {parquet_elapsed_time_ms:.3f} ms")
print(f"Hive Execution Time: {hive_elapsed_time_ms:.3f} ms")

# Analyse préliminaire
### 1. Mise en valeur des lignes, colonnes



In [None]:
# Mesure du temps pour compter les lignes du DataFrame CSV
csv_start_time = time.time()
csv_row_count = df_csv.count()
csv_end_time = time.time()
csv_elapsed_time_ms = (csv_end_time - csv_start_time) * 1000
print(f"CSV (comptage des lignes): {csv_elapsed_time_ms:.3f} ms - Nombre de lignes: {csv_row_count}")

# Mesure du temps pour compter les lignes du DataFrame Parquet
parquet_start_time = time.time()
parquet_row_count = df_parquet.count()
parquet_end_time = time.time()
parquet_elapsed_time_ms = (parquet_end_time - parquet_start_time) * 1000
print(f"Parquet (comptage des lignes): {parquet_elapsed_time_ms:.3f} ms - Nombre de lignes: {parquet_row_count}")

# Mesure du temps pour compter les lignes de la table Hive
hive_start_time = time.time()
df_hive = spark.sql("SELECT * FROM hive_table")
hive_row_count = df_hive.count()
hive_end_time = time.time()
hive_elapsed_time_ms = (hive_end_time - hive_start_time) * 1000
print(f"Hive (comptage des lignes): {hive_elapsed_time_ms:.3f} ms - Nombre de lignes: {hive_row_count}")

# Comparaison des temps
print("\nComparaison des performances :")
print(f"CSV Execution Time: {csv_elapsed_time_ms:.3f} ms")
print(f"Parquet Execution Time: {parquet_elapsed_time_ms:.3f} ms")
print(f"Hive Execution Time: {hive_elapsed_time_ms:.3f} ms")

### .2 Gestion des valeurs manquantes


In [None]:
from pyspark.sql.functions import col, count, when

start_time = time.time()

# Calculate missing data percentage for each column
total_rows = df_parquet.count()
missing_data = (
    df_parquet.select([
        (count(when(col(c).isNull() | (col(c) == ""), c)) / total_rows).alias(c)
        for c in df_parquet.columns
    ])
)

# Transform columns into rows (melt operation)
missing_data_melted = missing_data.selectExpr(
    "stack({0}, {1}) as (Column, MissingPercentage)".format(
        len(df_parquet.columns),
        ", ".join([f"'{col}', `{col}`" for col in df_parquet.columns])
    )
).filter(col("MissingPercentage").isNotNull()).orderBy(col("MissingPercentage").desc())

# Identify columns with 100% missing data
columns_to_drop = (
    missing_data_melted.filter(col("MissingPercentage") == 1.0)
    .select("Column")
    .rdd.flatMap(lambda x: x)
    .collect()
)

# Drop columns with 100% missing values
df_cleanedby_missing_value = df_parquet.drop(*columns_to_drop)

# Display the top 10 columns with the highest missing percentages
print("Top 10 columns with the highest missing percentages:")
missing_data_melted.show(10, truncate=False)

# Print dropped columns
print(f"Columns dropped due to 100% missing values: {columns_to_drop}")

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution completed in {elapsed_time:.2f} seconds")


### 3. Gestion des valeurs doublons

In [None]:
from pyspark.sql.functions import col, count

start_time = time.time()

# Analyzing Duplicates in 'code', 'product_name', and 'brands'
duplicates = (
    df_cleanedby_missing_value.groupBy("code", "product_name", "brands")
    .count()
    .filter(col("count") > 1)
)

# Affiche le nombre de doublons identifiés
print(f"There are {duplicates.count()} duplicate rows based on 'code', 'product_name', and 'brands'.")
duplicates.show(truncate=False)

# Remove duplicates where 'code', 'product_name', and 'brands' are the same
df_cleaned_by_duplicate = df_cleanedby_missing_value.dropDuplicates(["code", "product_name", "brands"])

print(f"Number of rows before removing duplicates: {df_cleanedby_missing_value.count()}")
print(f"Number of rows after removing duplicates: {df_cleaned_by_duplicate.count()}")

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution completed in {elapsed_time:.2f} seconds")


### 4. Handle outliers

In [None]:
from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.types import IntegerType, DoubleType, FloatType

# Extraire les valeurs numériques de la colonne "quantity"
df_cleaned_by_outliers = df_cleaned_by_duplicate.withColumn(
    "quantity_numeric",
    regexp_extract(col("quantity"), r"(\d+)", 1).cast("double")
)

# Détecter les colonnes numériques
numeric_columns = [
    field.name for field in df_cleaned_by_outliers.schema.fields
    if isinstance(field.dataType, (IntegerType, DoubleType, FloatType))
]
print(f"Numeric columns detected: {numeric_columns}")

if not numeric_columns:
    print("No numeric columns found. Please check your data.")
else:
    total_rows_before = df_cleaned_by_outliers.count()
    removed_rows_total = 0

    # Boucle sur les colonnes numériques pour détecter les outliers
    for column in numeric_columns:
        try:
            # Calcul des quartiles avec approxQuantile
            quantiles = df_cleaned_by_outliers.approxQuantile(column, [0.25, 0.75], 0.05)
            if len(quantiles) < 2:
                print(f"Column '{column}' has insufficient data. Skipping...")
                continue

            q1, q3 = quantiles
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            # Filtrer les outliers
            df_outliers = df_cleaned_by_outliers.filter((col(column) < lower_bound) | (col(column) > upper_bound))
            removed_rows = df_outliers.count()
            removed_rows_total += removed_rows

            print(f"Column '{column}': {removed_rows} rows detected as outliers.")

        except Exception as e:
            print(f"Error processing column '{column}': {e}")

    print(f"Total rows before filtering: {total_rows_before}")
    print(f"Total rows removed as outliers: {removed_rows_total}")
    print(f"Total rows remaining: {total_rows_before - removed_rows_total}")

# Data cleaning


In [None]:
# display the schema of the cleaned DataFrame
df_parquet = df_outliers
df_parquet.describe()

In [None]:
#select columns to keep
selected_column = [
    'code',
    'product_name',
    'brands',
    'categories',
    "main_category",
    'quantity',
    'packaging',
    'countries',
    'ingredients_text',
    'allergens',
    'serving_size',
    'energy-kcal_100g',
    'fat_100g',
    'saturated-fat_100g',
    "proteins_100g",
    'salt_100g',
    'nutriscore_score',
    'nutriscore_grade',
    "food_groups_en",
    "sodium_100g",
    "sugars_100g",
    "fiber_100g"
]

df_transformed = df_parquet.select(selected_column)
df_transformed.show(5, truncate=False)

In [None]:
# convert the columns to the appropriate format
column_to_convert = ["quantity", "nutriscore_score", "energy-kcal_100g",
                     "fat_100g", "saturated-fat_100g", "proteins_100g", "sugars_100g", "salt_100g"]
# apply the conversion
for column in column_to_convert:
    df_transformed = df_transformed.withColumn(column, col(column).cast("double"))


# Transformation des données Transform :
Ajouter des colonnes calculées, par exemple : Indice de qualité nutritionnelle 
Calculer un score basé sur les nutriments (e.g., sodium, sugar, fiber). 
Extraire la catégorie principale d'un produit (e.g., "boissons", "snacks"). 
Regrouper les données par catégories (categories) pour analyser les tendances (e.g., moyenne des calories par catégorie).

--> Quel calcules effectuer ?  
--> Quel catégories créer ?


In [None]:
# display the schema of the transformed DataFrame
df_transformed.printSchema()

In [None]:
# convert code in string
df_transformed = df_transformed.withColumn("code", col("code").cast("string"))

In [None]:
# display the schema of the transformed DataFrame
df_transformed.printSchema()

In [None]:
# add a new column 'main_category' by extracting the first category from 'categories'
from pyspark.sql.functions import split

df_transformed = df_transformed.withColumn("main_category", split(col("categories"), ",").getItem(0))
df_transformed.show(5, truncate=False)


In [None]:
# create a new column with quality nutrition score (IQN)
df_transformed = df_transformed.withColumn(
    "nutrition_score",
    0.3 * col("fiber_100g") +
    0.2 * col("proteins_100g") -
    0.4 * col("sugars_100g") -
    0.3 * col("saturated-fat_100g") -
    0.1 * col("salt_100g")
)
df_transformed.show(5, truncate=False)


In [None]:
print("Transformation")

# Analyse exploratoire :
Utiliser des fonctions de calcul sur fenêtre pour : 
Trouver les produits les plus caloriques par catégorie. 
Identifier les tendances de production par brands (marques). 
Générer des statistiques descriptives (e.g., médiane, moyenne des nutriments par catégorie

In [None]:
from pyspark.sql.functions import col, when, avg, max, min, count, row_number
from pyspark.sql.window import Window

# --- Étape 1 : Catégoriser les produits en fonction du score nutritionnel ---
# Ajouter une catégorie de score nutritionnel
df_transformed = df_transformed.withColumn(
    "score_category",
    when(col("nutrition_score") > 0, "Positif")
    .when(col("nutrition_score") == 0, "Neutre")
    .otherwise("Négatif")
)

# Compter les produits par catégorie de score
score_category_counts = df_transformed.groupBy("score_category").count()

# Afficher le résultat
print("Distribution des catégories de score nutritionnel :")
score_category_counts.show()

# --- Étape 2 : Identifier les produits les plus sains et les moins sains ---
# Top 10 des produits les plus sains
top_healthy = df_transformed.orderBy(col("nutrition_score").desc()).select("product_name", "nutrition_score").limit(10)
print("Top 10 des produits les plus sains :")
top_healthy.show(truncate=False)

# Top 10 des produits les moins sains
top_unhealthy = df_transformed.orderBy(col("nutrition_score").asc()).select("product_name", "nutrition_score").limit(10)
print("Top 10 des produits les moins sains :")
top_unhealthy.show(truncate=False)

# --- Étape 3 : Produits les plus caloriques par catégorie ---
# Définir une fenêtre pour trouver les produits les plus caloriques
window_spec = Window.partitionBy("categories").orderBy(col("energy-kcal_100g").desc())

# Ajouter une colonne avec le rang des calories
df_transformed = df_transformed.withColumn("calorie_rank", row_number().over(window_spec))

# Filtrer pour obtenir les produits les plus caloriques par catégorie
most_caloric_products = df_transformed.filter(col("calorie_rank") == 1).select(
    "categories", "product_name", "energy-kcal_100g"
)
print("Produits les plus caloriques par catégorie :")
most_caloric_products.show(truncate=False)

# --- Étape 4 : Analyse des tendances des marques ---
# Compter le nombre de produits par marque
brand_trends = df_transformed.groupBy("brands").agg(
    count("*").alias("product_count")
).orderBy(col("product_count").desc())

print("Tendances de production par marques (Top 10) :")
brand_trends.limit(10).show(truncate=False)

# --- Étape 5 : Statistiques descriptives par catégorie ---
# Calculer les statistiques descriptives pour chaque catégorie
category_statistics = df_transformed.groupBy("categories").agg(
    avg("energy-kcal_100g").alias("avg_calories"),
    avg("fat_100g").alias("avg_fat"),
    avg("proteins_100g").alias("avg_proteins"),
    avg("sugars_100g").alias("avg_sugars"),
    avg("salt_100g").alias("avg_salt")
).orderBy("categories")

print("Statistiques descriptives par catégorie :")
category_statistics.show(truncate=False)

# Sauvegarde des données Save :
Partitionner les données par catégories (categories) et années (year). 
Sauvegarder les résultats transformés en format Parquet avec compression Snappy. 
Sauvegarder les résultats transformés dans les bases de données: postgresql/sqlserver/mysql/Snowflake/BigQuery

In [53]:
print("Sauvegarde des données (load)")

mysql_url = "jdbc:mysql://localhost:3306/openfood_db"
mysql_table = "food_facts_entity"
mysql_user = "root"
mysql_password = "root"

df_transformed.write \
    .format("jdbc") \
    .option("url", mysql_url) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", mysql_table) \
    .option("user", mysql_user) \
    .option("password", mysql_password) \
    .mode("overwrite") \
    .save()


Sauvegarde des données (load)


Py4JJavaError: An error occurred while calling o17608.save.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)




# Présentation des résultats :
Visualiser les résultats sous forme de graphiques ou tableaux 
(les étudiants peuvent utiliser un outil comme Jupyter Notebook en local ou Google Colab 

In [None]:
print("Présentation des données")