**Chargement du dataset**

In [None]:
from datasets import load_dataset

dataset = load_dataset(
    "openfoodfacts/product-database",
    data_files="food.parquet"
)

**Affichage des elements uniques de la colonne "countries_tags"**

In [None]:
from itertools import chain

non_null_lists = [tags for tags in dataset['train']['countries_tags'] if tags is not None]

# Aplatir et extraire les valeurs uniques
all_tags = list(chain.from_iterable(non_null_lists))

In [None]:
set(all_tags)

**Selectionner uniquement les produits pour le Maroc et la France**

In [None]:
target_tags = {"en:france", "en:franca", "fr:francia", "en:فرنسا", "en:francese", "en:المغرب", "en:morocco", "en:maroc", "ar:المغرب-🇲🇦"}

def has_target_country(example):
    tags = example.get('countries_tags')
    if not tags:
        return False
    return any(tag in target_tags for tag in tags)

filtered_dataset = dataset.filter(has_target_country)


In [2]:
filtered_dataset.shape

(190000, 110)

**Selectionner les colonnes utiles**

In [21]:
columns_needed = [
    "product_name",
    "ingredients_text",
    "ingredients_tags",
    "ingredients",
    "ingredients_analysis_tags",
    "labels_tags",
    "allergens_tags",
    "categories_tags",
    "nutriscore_grade",
    "quantity",
    "brands",
    "code"
]

filtered_dataset = filtered_dataset[columns_needed]

KeyError: "['ingredients', 'code'] not in index"

In [24]:
filtered_dataset.columns

Index(['product_name', 'ingredients_text', 'ingredients_tags',
       'ingredients_analysis_tags', 'labels_tags', 'allergens_tags',
       'categories_tags', 'nutriscore_grade', 'quantity', 'brands', 'index',
       'ingredients_main_text_only', 'ingredients_tags_fr'],
      dtype='object')

**Creation de la Session Spark**

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType, IntegerType

In [26]:
spark = SparkSession.builder \
        .appName("OpenFoodFacts Recommender preprocessing")\
        .getOrCreate()

**definition d'un schema Spark structure decrivant la structure et les types des colonnes attendues dans un DataFrame**

In [8]:
name_text_schema = ArrayType(
    StructType([
        StructField("lang", StringType(), True),
        StructField("text", StringType(), True)
    ])
)

In [9]:

schema = StructType([
    StructField("product_name", name_text_schema, True),
    StructField("ingredients_text", name_text_schema, True),
    StructField("ingredients_tags", ArrayType(StringType()), True),
    StructField("ingredients", ArrayType(StringType()), True),
    StructField("ingredients_analysis_tags", ArrayType(StringType()), True),
    StructField("labels_tags", ArrayType(StringType()), True),
    StructField("allergens_tags", ArrayType(StringType()), True),
    StructField("categories_tags", ArrayType(StringType()), True),
    StructField("nutriscore_grade", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("brands", StringType(), True),
    StructField("code", StringType(), True)
])

**Creation de la DataFrame Spark**

In [28]:
df = spark.createDataFrame(filtered_dataset, schema=schema)

PySparkTypeError: [CANNOT_INFER_TYPE_FOR_FIELD] Unable to infer the type of the field `ingredients_text`.

**Exploration des donnees**

In [11]:
df.printSchema()

root
 |-- product_name: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- ingredients_text: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- ingredients_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ingredients_analysis_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- labels_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- allergens_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- categories_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nutriscore_grade: string (nullable = true)
 |-- quanti

In [14]:
df.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------------------+-----------+--------------------+--------------------+----------------+--------+-------------------+-------------+
|        product_name|    ingredients_text|    ingredients_tags|         ingredients|ingredients_analysis_tags|labels_tags|      allergens_tags|     categories_tags|nutriscore_grade|quantity|             brands|         code|
+--------------------+--------------------+--------------------+--------------------+-------------------------+-----------+--------------------+--------------------+----------------+--------+-------------------+-------------+
|[{main, Country k...|[{main, Tomatoes ...|[en:tomato, en:ve...|[[, {, ", p, e, r...|     [en:palm-oil-free...|         []|                  []|[en:condiments, e...|               d|   454 g|  Stonewall kitchen|0711381023129|
|[{main, Edamame g...|                  []|                NULL|                NULL|           

In [40]:
df.count()

In [None]:
df.columns

['product_name',
 'ingredients_text',
 'ingredients_tags',
 'ingredients',
 'ingredients_analysis_tags',
 'labels_tags',
 'allergens_tags',
 'categories_tags',
 'nutriscore_grade',
 'quantity',
 'brands',
 'code']

In [None]:
df.dtypes

[('product_name', 'array<struct<lang:string,text:string>>'),
 ('ingredients_text', 'array<struct<lang:string,text:string>>'),
 ('ingredients_tags', 'array<string>'),
 ('ingredients', 'array<string>'),
 ('ingredients_analysis_tags', 'array<string>'),
 ('labels_tags', 'array<string>'),
 ('allergens_tags', 'array<string>'),
 ('categories_tags', 'array<string>'),
 ('nutriscore_grade', 'string'),
 ('quantity', 'string'),
 ('brands', 'string'),
 ('code', 'string')]

In [None]:
df.summary().show()

+-------+----------------+--------------------+--------------------+
|summary|nutriscore_grade|            quantity|              brands|
+-------+----------------+--------------------+--------------------+
|  count|           18606|                6656|                8260|
|   mean|            NULL|8.068872987040187...|2.114071127899526...|
| stddev|            NULL|1.483443790488867...| 3.10371488918999E12|
|    min|               a|                    |                    |
|    25%|            NULL|                 5.0|              2025.0|
|    50%|            NULL|                80.0|        6.27308401E8|
|    75%|            NULL|               250.0|   6.111242100183E12|
|    max|         unknown|             ‭700‬ g|                  💐|
+-------+----------------+--------------------+--------------------+



**l'ajout d'une colonne "index" au DataFrame df avec un identifiant unique et croissant pour chaque ligne.**

In [13]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())

**Calcul de nombre de lignes dupliquees**

In [41]:
df.count() - df.distinct().count()

**Suppression des dupliqants**

In [14]:
df = df.dropDuplicates()

**Calcul de nombre de lignes nulles**

In [None]:
null_rows = df.count() - df.na.drop(how="all").count()
print(null_rows)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

**Suppression des lignes où toutes les colonnes sont nulles**

In [15]:
df = df.na.drop(how='all')

In [None]:
df.sample(withReplacement=False, fraction=0.4).show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

**Affichage d'une description sur la colonne "quantity"**

In [None]:
df.describe("quantity").show()

+-------+--------------------+
|summary|            quantity|
+-------+--------------------+
|  count|                6482|
|   mean|8.068872987040187...|
| stddev|1.483443790488867...|
|    min|                    |
|    max|             ‭700‬ g|
+-------+--------------------+



**Choix d'un exemple pour mieux comprendre la structure des donnees**

In [20]:
from pyspark.sql.functions import col

exemple = df.filter(col('index') == 176)

In [22]:
exemple.columns

['product_name',
 'ingredients_text',
 'ingredients_tags',
 'ingredients',
 'ingredients_analysis_tags',
 'labels_tags',
 'allergens_tags',
 'categories_tags',
 'nutriscore_grade',
 'quantity',
 'brands',
 'index']

In [None]:
exemple.select("product_name").show(truncate=False)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
exemple.select("ingredients_tags_fr").show(truncate=False)

+-----------------------------------------------------------------------------+
|ingredients_tags_fr                                                          |
+-----------------------------------------------------------------------------+
|lait ecreme de vache additifs alimentaires issue de lait de vache gout fraise|
+-----------------------------------------------------------------------------+



**Suppression des colonnes initules**

In [12]:
df = df.drop("code")

In [16]:
df = df.drop("ingredients")

**creation de la colonne "ingredients_main_text_only" contenant le text brut extrait des elements dont la langue est "main"**

In [17]:
from pyspark.sql.functions import expr

df = df.withColumn(
    "ingredients_main_text",
    expr("filter(ingredients_text, x -> x.lang = 'main')")
).withColumn(
    "ingredients_main_text_only",
    expr("transform(ingredients_main_text, x -> x.text)")
)


In [18]:
df = df.drop("ingredients_main_text")

**Creation d'une nouvelle colonne "ingredients_tags_fr" qui contient uniquement les elements de "ingredients_tags" commençant par 'fr:'**

In [19]:
df = df.withColumn(
    "ingredients_tags_fr",
    expr("filter(ingredients_tags, x -> x like 'fr:%')")
)

**transformation de la colonne "ingredients_tags_fr" en extrayant la partie après les deux-points ":" dans chaque element.**

In [20]:
df = df.withColumn(
    "ingredients_tags_fr",
    expr("transform(ingredients_tags_fr, x -> split(x, ':')[1])")
)

**Garder que les lignes non nulles**

In [21]:
from pyspark.sql.functions import size, col

df = df.filter(col("ingredients_main_text_only").isNotNull())


**Nettoyer les colonnes "ingredients_tags_fr" et "ingredients_main_text_only"**

In [22]:
from pyspark.sql.functions import regexp_replace, lower, col, concat_ws

df = df.withColumn(
    "ingredients_tags_fr",
    lower(
        regexp_replace(
            regexp_replace(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(
                            regexp_replace(
                                concat_ws(" ", col("ingredients_tags_fr")),  # convertit array -> string
                                r"[\[\]]", ""           # supprime [ et ]
                            ),
                            r"[^\w\s\-:]", ""           # supprime caracteres speciaux sauf lettres, chiffres, espaces, -
                        ),
                        r"[0-9%]", ""                  # supprime les chiffres et %
                    ),
                    r":", ""                          # supprime les deux-points
                ),
                r"-", " "                             # remplace les tirets par un espace
            ),
            r"\s+", " "                               # remplace les espaces multiples par un seul espace
        )
    )
)


In [None]:
from pyspark.sql.functions import regexp_replace, lower, col, concat_ws

df = df.withColumn(
    "ingredients_main_text_only",
    lower(
        regexp_replace(
            regexp_replace(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(
                            regexp_replace(
                                concat_ws(" ", col("ingredients_main_text_only")),  # convertit array -> string
                                r"[\[\]]", ""           # supprime [ et ]
                            ),
                            r"[^\w\s\-:]", ""           # supprime caracteres speciaux sauf lettres, chiffres, espaces, -
                        ),
                        r"[0-9%]", ""                  # supprime les chiffres et %
                    ),
                    r":", ""                          # supprime les deux-points
                ),
                r"-", " "                             # remplace les tirets par un espace
            ),
            r"\s+", " "                               # remplace les espaces multiples par un seul espace
        )
    )
)


**extrayant uniquement le texte (text) de l’element dont la langue (lang) est 'main' dans la colonne "product_name"**

In [23]:
df = df.withColumn(
    "product_name",
    expr("""
        filter(product_name, x -> x.lang = 'main')[0].text
    """)
)

In [24]:
df.count()

13971

**Telechargement de la dataset traite**

In [25]:
df.coalesce(1).write.mode("overwrite").parquet("/content/tmp_parquet")

In [26]:
import os
import shutil

for file in os.listdir("/content/tmp_parquet"):
    if file.endswith(".parquet"):
        shutil.move(f"/content/tmp_parquet/{file}", "/content/dataset_traite.parquet")
        break