# <ins>Group project Spark Team D</ins>

ACME corporation is trying to understand
eating habits in different european countries
and, given that they don’t have any team able
to do data analysis, they are asking professor
Raúl Marín for help an he reaches out to his best team available.

## <ins> Introduction</ins>

For the analysis we will focus on the following field of interest

- Creator 
- Created_datetime
- Last_modified_datetime
- Product_name
- Countries_en
- Traces_en
- Additives_tags
- Main_category_en
- Image_url
- Quantity
- Packaging_tags
- Categories_en
- Ingredients_text
- Additives_en
- Energy-kcal_100g
- Fat_100g 
- Saturated-fat_100g
- Sugars_100g
- Salt_100g
- Sodium_100g

Also the data was beforehand cleaned in Dataiku to work appropriately with it.

## <ins> Initialization of the spark session</ins>

In [2]:
# Initialization of spark session and context
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## <ins>Data source and Spark data abstraction (DataFrame) setup</ins>

In [13]:
# Reading of the data
ACME_DF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .option('multiline','true') \
                 .csv("Spark_data_cleaning_prepared_v3.csv")

# Selecting the columns of interest
ACME_selection_DF = ACME_DF.select("code","creator", "created_datetime", "last_modified_datetime", "product_name", "countries_en", "traces_en", "additives_tags", "main_category_en", "image_url", "quantity", "packaging_tags", "categories_en", "ingredients_text", "additives_en", "energy-kcal_100g", "fat_100g", "saturated-fat_100g","sugars_100g", "salt_100g", "sodium_100g")

## <ins>Data set metadata analysis</ins>
### <ins>Display schema and size of the DataFrame</ins>

In [10]:
from IPython.display import display, Markdown

ACME_selection_DF.printSchema()
display(Markdown("This DataFrame has **%d rows**." % ACME_selection_DF.count()))

root
 |-- code: double (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_datetime: timestamp (nullable = true)
 |-- last_modified_datetime: timestamp (nullable = true)
 |-- product_name: string (nullable = true)
 |-- countries_en: string (nullable = true)
 |-- traces_en: string (nullable = true)
 |-- additives_tags: string (nullable = true)
 |-- main_category_en: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- ingredients_text: string (nullable = true)
 |-- additives_en: string (nullable = true)
 |-- energy-kcal_100g: string (nullable = true)
 |-- fat_100g: string (nullable = true)
 |-- saturated-fat_100g: double (nullable = true)
 |-- sugars_100g: string (nullable = true)
 |-- salt_100g: string (nullable = true)
 |-- sodium_100g: string (nullable = true)



This DataFrame has **177629 rows**.

### <ins>Getting one or multiple random samples from the data set</ins>

In [51]:
ACME_selection_DF.cache() # optimization to make the processing faster
ACME_selection_DF.sample(False, 0.1).take(2)

[Row(code=1694.0, creator='kiliweb', created_datetime=datetime.datetime(2020, 1, 3, 21, 42, 43), last_modified_datetime=datetime.datetime(2020, 1, 3, 21, 42, 45), product_name='butifarra blanca', countries_en='spain', traces_en=None, additives_tags=None, main_category_en=None, image_url='https://static.openfoodfacts.org/images/products/00001694/front_es.3.400.jpg', quantity=None, packaging_tags=None, categories_en=None, ingredients_text=None, additives_en=None, energy-kcal_100g='241.0', fat_100g='20.0', saturated-fat_100g=6.8, sugars_100g='0.0', salt_100g='1.75', sodium_100g='0.7'),
 Row(code=4.0, creator='elcoco', created_datetime=datetime.datetime(2019, 6, 10, 16, 20, 26), last_modified_datetime=datetime.datetime(2020, 8, 19, 11, 40, 9), product_name=None, countries_en='spain', traces_en=None, additives_tags=None, main_category_en='dietary supplements', image_url='https://static.openfoodfacts.org/images/products/00004/front_es.3.400.jpg', quantity=None, packaging_tags=None, categorie

### <ins>Data entities, metrics and dimensions</ins>

We've identified the following elements:

* **Entities:** Products (main one which is measured - facts), Contributors (dimension), Countries (dimension), Quantity (dimension) and Packaging type (dimension)
* **Metrics:** created_datetime, last_modified_datetime, quantity, energy-kcal_100g, fat_100g, saturated_fat_100g, sugars_100g, salt_100g and sodium_100g
* **Dimensions:** creator, product_name, countries_en, traces_en, additives_tags, main_categroy_en, image_url, packaging_tags, categories_en, ingredients_en and additives_en, code

### <ins>Column categorization</ins>

The following could be a potential column categorization:

* **Timing related columns:** *created_datetime* and *last_modified_datetime*
* **Product related columns:** *creator*, *product_name*, *countries_en*, *traces_en*, *additives_tags*, *main_category_en*, *image_url*, *packaging_tags* and *categories_en*,
* **Ingredients composition related columns:** *quantity*, *energy-kcal_100g*, *fat_100g*, *saturated_fat_100g*, *sugars_100g*, *salt_100g*, *sodium_100g*, *additives_tags*, *ingredients_text* and *additives_en*

## <ins>Columns groups basic profiling to better understand our data set</ins>
### <ins>Timing related columns basic profiling and questions</ins>

In [64]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit, year, month, dayofmonth, hour, date_format, minute, current_timestamp
from pyspark.mllib.linalg import SparseVector

display(Markdown("**Summary of time related columns**:"))
ACME_selection_DF.select(year(col('created_datetime')).alias('year_created'),year(col('last_modified_datetime')).alias('year_last_modified')).summary().show()

# Different DF used for the analysis
year_c = ACME_selection_DF.select(year(col('created_datetime'))).distinct().count()
month_c = ACME_selection_DF.select(month(col('created_datetime'))).distinct().count()
day_c = ACME_selection_DF.select(dayofmonth(col('created_datetime'))).distinct().count()
hour_c = ACME_selection_DF.select(hour(col('created_datetime'))).distinct().count()

# Table for the representation of the data
display(Markdown("**Checking amount of distinct values in column created_datetime**:"))

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("year", "month", "day", "hour", \
       "%d  occurrences" % year_c,\
       "%d  occurrences" % month_c,\
       "%d  occurrences" % day_c,\
       "%d  occurrences" % hour_c)))

year_m = ACME_selection_DF.select(year(col('last_modified_datetime'))).distinct().count()
month_m = ACME_selection_DF.select(month(col('last_modified_datetime'))).distinct().count()
day_m = ACME_selection_DF.select(dayofmonth(col('last_modified_datetime'))).distinct().count()
hour_m = ACME_selection_DF.select(hour(col('last_modified_datetime'))).distinct().count()


display(Markdown("**Checking amount of distinct values in column last_modified_datetime**:"))

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("year", "month", "day", "hour", \
       "%d  occurrences" % year_m,\
       "%d  occurrences" % month_m,\
       "%d  occurrences" % day_m,\
       "%d  occurrences" % hour_m)))

display(Markdown("**Checking null values**:"))

ACME_selection_DF.select(count(when(col('created_datetime').isNull(), 1)).alias('null_values_created')).show()
ACME_selection_DF.select(count(when(col('last_modified_datetime').isNull(), 1)).alias('null_values_modified')).show()

**Summary of time related columns**:

+-------+------------------+-------------------+
|summary|      year_created| year_last_modified|
+-------+------------------+-------------------+
|  count|            177629|             177629|
|   mean| 2019.058492701079| 2019.6823942036492|
| stddev|1.0833854714449735|0.47553679724179587|
|    min|              2012|               2013|
|    25%|              2019|               2019|
|    50%|              2019|               2020|
|    75%|              2020|               2020|
|    max|              2020|               2020|
+-------+------------------+-------------------+



**Checking amount of distinct values in column created_datetime**:


| year | month | day | hour |
|----|----|----|----|
| 9  occurrences | 12  occurrences | 31  occurrences | 24  occurrences |


**Checking amount of distinct values in column last_modified_datetime**:


| year | month | day | hour |
|----|----|----|----|
| 8  occurrences | 12  occurrences | 31  occurrences | 24  occurrences |


**Checking null values**:

+-------------------+
|null_values_created|
+-------------------+
|                  0|
+-------------------+

+--------------------+
|null_values_modified|
+--------------------+
|                   0|
+--------------------+



We are finding the newest and oldest product over all the listed products found

In [62]:
# Oldest product
display(Markdown("**The oldest product in the sortiment**:"))
ACME_selection_DF.select('created_datetime','code','product_name').orderBy(col('created_datetime').asc()).show(1,False)

# Newest product
display(Markdown("**The newest product in the sortiment**:"))
ACME_selection_DF.select('created_datetime','code','product_name').orderBy(col('created_datetime').desc()).show(1,False)


**The oldest product in the sortiment**:

+-------------------+-----------------+----------------------+
|created_datetime   |code             |product_name          |
+-------------------+-----------------+----------------------+
|2012-03-20 16:34:12|8.712566246465E12|moutarde fine de dijon|
+-------------------+-----------------+----------------------+
only showing top 1 row



**The newest product in the sortiment**:

+-------------------+-----------------+----------------------------------+
|created_datetime   |code             |product_name                      |
+-------------------+-----------------+----------------------------------+
|2020-09-13 02:49:51|8.412540019282E12|palometa austral ahumada en aceite|
+-------------------+-----------------+----------------------------------+
only showing top 1 row



Next we are looking at the average product age, where age means how long the product has been in the system.

In [122]:
from pyspark.sql.functions import datediff, months_between, mean, month

ACME_selection_DF = ACME_selection_DF.withColumn("Diff",datediff(current_timestamp(),"created_datetime"))

# The 10 youngest products on average
display(Markdown("**The 10 youngest products on average**:"))
ACME_selection_DF.select('Diff','code','product_name').groupBy('code','product_name').\
agg(mean('Diff').alias('Avg_days')).orderBy(col('Avg_days').asc()).show(10,False)

# The 10 oldest products on average
display(Markdown("**The 10 oldest products on average**:"))
ACME_selection_DF.select('Diff','code','product_name').groupBy('code','product_name').\
agg(mean('Diff').alias('Avg_days')).orderBy(col('Avg_days').desc()).show(10,False)


**The 10 youngest products on average**:

+---------------------+----------------------------------+--------+
|code                 |product_name                      |Avg_days|
+---------------------+----------------------------------+--------+
|8.436575091648E12    |vitamins hair/skins/nails         |15.0    |
|8.412540019282E12    |palometa austral ahumada en aceite|15.0    |
|8.414121190014E12    |garbanzos con alga kombu          |15.0    |
|8.480017225016E12    |patatas bravas                    |15.0    |
|8.437008675084E12    |almogrote                         |15.0    |
|8.424465231045E12    |donuts chocolate                  |15.0    |
|5.9712490452000205E17|lomo de cerdo ibérico             |16.0    |
|5.8702480012000371E17|paleta de cebo iberica            |16.0    |
|4.316734084596E12    |miso light                        |16.0    |
|2.358220001573E12    |panceta salada iberica            |16.0    |
+---------------------+----------------------------------+--------+
only showing top 10 rows



**The 10 oldest products on average**:

+-----------------+---------------------------+--------+
|code             |product_name               |Avg_days|
+-----------------+---------------------------+--------+
|8.712566246465E12|moutarde fine de dijon     |3114.0  |
|3.179732348913E12|perrier                    |3111.0  |
|4.008400811727E12|mon chéri                  |3110.0  |
|3.270190008613E12|couscous grain moyen       |3105.0  |
|5.4492387E7      |coca cola light            |3096.0  |
|5.410041040807E12|pim's framboise            |3092.0  |
|5.6004083E7      |pedras                     |3086.0  |
|5.606293100127E12|rucula                     |3082.0  |
|5.449000169327E12|coca cola zéro sans caféine|3066.0  |
|8.076808060654E12|farfalle                   |3058.0  |
+-----------------+---------------------------+--------+
only showing top 10 rows



### <ins>Product related columns basic profiling and questions</ins>

In [121]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

display(Markdown("**Summary of columns code, product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:"))
ACME_selection_DF.select('code',"product_name", "creator", "countries_en", "traces_en").summary().show()
ACME_selection_DF.select( "additives_tags",'packaging_tags','categories_en', "main_category_en").summary().show()

display(Markdown("**Checking for nulls on columns product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:"))
ACME_selection_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in ['code',"product_name", "creator", "countries_en", "traces_en", "additives_tags",'packaging_tags','categories_en',"main_category_en"]]).show()

display(Markdown("**Checking for distinct values in columns product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:"))
ACME_selection_DF.select([countDistinct(c).alias(c) for c in ['code',"product_name", "creator", "countries_en", "traces_en", "additives_tags",'packaging_tags','categories_en', "main_category_en"]]).show()

# Different DF used for the analysis
display(Markdown("**Most and least frequent occurrences for product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:"))
creatorDF = ACME_selection_DF.groupBy("creator").agg(count(lit(1)).alias("Total"))
countries_enDF   = ACME_selection_DF.groupBy("countries_en").agg(count(lit(1)).alias("Total"))
traces_enDF    = ACME_selection_DF.groupBy("traces_en").agg(count(lit(1)).alias("Total"))
additives_tagsDF      = ACME_selection_DF.groupBy("additives_tags").agg(count(lit(1)).alias("Total"))
packaging_tagsDF    = ACME_selection_DF.groupBy("packaging_tags").agg(count(lit(1)).alias("Total"))
categories_enDF   = ACME_selection_DF.groupBy("categories_en").agg(count(lit(1)).alias("Total"))

# First table for lowest and higest of different categories
leastFreqcreator    = creatorDF.orderBy(col("Total").asc()).first()
mostFreqcreator     = creatorDF.orderBy(col("Total").desc()).first()
leastFreqcountries_en      = countries_enDF.orderBy(col("Total").asc()).first()
mostFreqcountries_en       = countries_enDF.orderBy(col("Total").desc()).first()
# Second table for lowest and higest of different categories
leastFreqtraces_en       = traces_enDF.orderBy(col("Total").asc()).first()
mostFreqtraces_en        = traces_enDF.orderBy(col("Total").desc()).first()
leastFreqadditives_tags         = additives_tagsDF.orderBy(col("Total").asc()).first()
mostFreqadditives_tags          = additives_tagsDF.orderBy(col("Total").desc()).first()
# Third table for lowest and higest of different categories
leastFreqpackaging_tags     = packaging_tagsDF.orderBy(col("Total").asc()).first()
mostFreqpackaging_tags      = packaging_tagsDF.orderBy(col("Total").desc()).first()
leastFreqcategories_en    = categories_enDF.orderBy(col("Total").asc()).first()
mostFreqcategories_en     = categories_enDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqcreator", "mostFreqcreator", "leastFreqcountries_en", "mostFreqcountries_en", \
       "%s (%d occurrences)" % (leastFreqcreator["creator"], leastFreqcreator["Total"]), \
       "%s (%d occurrences)" % (mostFreqcreator["creator"], mostFreqcreator["Total"]), \
       "%s (%d occurrences)" % (leastFreqcountries_en["countries_en"], leastFreqcountries_en["Total"]), \
       "%s (%d occurrences)" % (mostFreqcountries_en["countries_en"], mostFreqcountries_en["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqtraces_en", "mostFreqtraces_en", "leastFreqadditives_tags", "mostFreqadditives_tags", \
       "%s (%d occurrences)" % (leastFreqtraces_en["traces_en"], leastFreqtraces_en["Total"]), \
       "%s (%d occurrences)" % (mostFreqtraces_en["traces_en"], mostFreqtraces_en["Total"]), \
       "%s (%d occurrences)" % (leastFreqadditives_tags["additives_tags"], leastFreqadditives_tags["Total"]), \
       "%s (%d occurrences)" % (mostFreqadditives_tags["additives_tags"], mostFreqadditives_tags["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqpackaging_tags", "mostFreqpackaging_tags", "leastFreqcategories_en ", "mostFreqcategories_en ", \
       "%s (%d occurrences)" % (leastFreqpackaging_tags["packaging_tags"], leastFreqpackaging_tags["Total"]), \
       "%s (%d occurrences)" % (mostFreqpackaging_tags["packaging_tags"], mostFreqpackaging_tags["Total"]), \
       "%s (%d occurrences)" % (leastFreqcategories_en["categories_en"], leastFreqcategories_en["Total"]), \
       "%s (%d occurrences)" % (mostFreqcategories_en["categories_en"], mostFreqcategories_en["Total"]))))


**Summary of columns code, product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:

+-------+--------------------+------------------+--------+------------+--------------------+
|summary|                code|      product_name| creator|countries_en|           traces_en|
+-------+--------------------+------------------+--------+------------+--------------------+
|  count|              177629|            166529|  177629|      177628|                6184|
|   mean|6.777577793804416E37|               NaN|Infinity|        null|                null|
| stddev|2.588182533079289E40|               NaN|     NaN|        null|                null|
|    min|                 1.0|#ke by kelia pipas|  99fran|No added MSG|balanced-diet-and...|
|    25%|   5.998749107898E12|              95.0|Infinity|        null|                null|
|    50%|   8.414606795482E12|            1900.0|Infinity|        null|                null|
|    75%|   8.436038811431E12| 8.437004036308E12|Infinity|        null|                null|
|    max|1.084370028664641...|        饮用天然水|   zqott|    spain-uk|sulp

**Checking for nulls on columns product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:

+----+------------+-------+------------+---------+--------------+--------------+-------------+----------------+
|code|product_name|creator|countries_en|traces_en|additives_tags|packaging_tags|categories_en|main_category_en|
+----+------------+-------+------------+---------+--------------+--------------+-------------+----------------+
|   0|       11100|      0|           1|   171445|        167842|        165514|       112401|          112400|
+----+------------+-------+------------+---------+--------------+--------------+-------------+----------------+



**Checking for distinct values in columns product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:

+------+------------+-------+------------+---------+--------------+--------------+-------------+----------------+
|  code|product_name|creator|countries_en|traces_en|additives_tags|packaging_tags|categories_en|main_category_en|
+------+------------+-------+------------+---------+--------------+--------------+-------------+----------------+
|177221|       94082|    921|         692|     1042|          4175|          2405|         6625|            3594|
+------+------------+-------+------------+---------+--------------+--------------+-------------+----------------+



**Most and least frequent occurrences for product_name, creator, countries_en, traces_en, additives_tags, packaging_tags, categories_en and main_category_en**:


| leastFreqcreator | mostFreqcreator | leastFreqcountries_en | mostFreqcountries_en |
|----|----|----|----|
| aurelien31 (1 occurrences) | kiliweb (131229 occurrences) | european union,france,germany,spain (1 occurrences) | spain (163042 occurrences) |



| leastFreqtraces_en | mostFreqtraces_en | leastFreqadditives_tags | mostFreqadditives_tags |
|----|----|----|----|
| es:6-proteina-de-leche-1,es:8-hecho-en-mexico-elaborado-por-comercializadora-de-lacteos-y-derivados,es:almidon-modificado,es:azucar,es:grenetina-estabilizantes-cultivos-lacticos-y-natamicina-conservador-grasa-de-leche-1,es:leche-descremada-en-polvo,es:leche-semidescremada-pasteurizada-de-vaca,es:preparado-de-fruta-10-fresa,es:s-a-de-c-v,es:saborizante-natural,es:sorbato-de-potasio-conservador,es:y-curcumina-y-carmin-colorantes,es:acido-citrico-requdad-de-acidez (1 occurrences) | None (171445 occurrences) | e120,e301 (1 occurrences) | None (167842 occurrences) |



| leastFreqpackaging_tags | mostFreqpackaging_tags | leastFreqcategories_en  | mostFreqcategories_en  |
|----|----|----|----|
| plastic-bottle,green-dot (1 occurrences) | None (165514 occurrences) | plant-based foods and beverages,beverages,plant-based beverages,milk substitute,plant milks,non-alcoholic beverages (1 occurrences) | None (112401 occurrences) |


We can see that we have a lot of issues in these kind of categories. A lot of missing values, things pasted together with commas and a wild mixture of different types in columns.

Next point of interest is a list of other countries than spain where products are sold too.

In [223]:
from pyspark.sql.functions import split, posexplode, regexp_replace

# Splitting the colums to get the individual countries
countries = ACME_selection_DF.select(posexplode(split('countries_en',','))).toDF('position','country')

# Dropping of the first column to only have the countries and count them
countries.drop('position').groupBy('country').agg(count('*').alias('n')).where(col('country') != 'spain').orderBy(col('n').desc()).show(20,False)


+----------------------+-----+
|country               |n    |
+----------------------+-----+
|france                |12147|
|germany               |1107 |
|belgium               |886  |
|switzerland           |694  |
|usa                   |646  |
|portugal              |488  |
|italy                 |474  |
|uk                    |358  |
|netherlands           |272  |
|mexico                |228  |
|argentina             |206  |
|morocco               |136  |
|canada                |121  |
|austria               |119  |
|norway                |106  |
|venezuela             |84   |
|russia-русский        |82   |
|croatia-hrvatski      |82   |
|macau-中文            |82   |
|finetherlandsand-suomi|82   |
+----------------------+-----+
only showing top 20 rows



Identify category of products and then compute:
 - Number of products by category
 - List containing names of products by category

In [30]:
from pyspark.sql.functions import split, posexplode, regexp_replace

# Variety of categories and sorted by most frequent category
ACME_selection_DF.select('main_category_en','categories_en').dropna()\
.groupBy('main_category_en').agg(count('categories_en').alias('counts'))\
.orderBy(desc('counts')).drop('counts').show(20, False)

+-----------------------+
|main_category_en       |
+-----------------------+
|extra-virgin olive oils|
|cheeses                |
|biscuits               |
|prepared meats         |
|salty snacks           |
|dietary supplements    |
|breakfast cereals      |
|dark chocolates        |
|dairies                |
|fruit-jams             |
|breads                 |
|yogurts                |
|sauces                 |
|cow milks              |
|canned fishes          |
|smoked salmons         |
|biscuits and cakes     |
|beers                  |
|honeys                 |
|candies                |
+-----------------------+
only showing top 20 rows



In [35]:
from pyspark.sql.functions import split, posexplode, regexp_replace

# Number of products and sorted by most frequent count of products in each category
ACME_selection_DF.select('main_category_en','product_name').dropna()\
.groupBy('main_category_en').agg(count('product_name').alias('product_name_count'))\
.orderBy(desc('product_name_count')).show(20, False)

+-----------------------+------------------+
|main_category_en       |product_name_count|
+-----------------------+------------------+
|extra-virgin olive oils|999               |
|cheeses                |960               |
|biscuits               |880               |
|prepared meats         |737               |
|salty snacks           |551               |
|dark chocolates        |534               |
|breakfast cereals      |515               |
|dietary supplements    |511               |
|dairies                |497               |
|fruit-jams             |429               |
|breads                 |389               |
|yogurts                |376               |
|sauces                 |343               |
|cow milks              |332               |
|smoked salmons         |329               |
|canned fishes          |323               |
|sheep's-milk cheeses   |319               |
|honeys                 |315               |
|biscuits and cakes     |312               |
|candies  

In [44]:
from pyspark.sql.functions import split, posexplode, regexp_replace

# List of product names by category
ACME_selection_DF.select('main_category_en','product_name').dropna()\
.groupBy('main_category_en','product_name').agg(count('product_name').alias('product_name_count'))\
.orderBy(desc('product_name_count')).show(20, False)

+--------------------------------------+-----------------------------------+------------------+
|main_category_en                      |product_name                       |product_name_count|
+--------------------------------------+-----------------------------------+------------------+
|extra-virgin olive oils               |aceite de oliva virgen extra       |502               |
|smoked salmons                        |salmón ahumado                     |143               |
|es:mejillones-en-escabeche            |mejillones en escabeche            |142               |
|fried tomato sauces                   |tomate frito                       |138               |
|virgin olive oils                     |aceite de oliva virgen extra       |138               |
|es:bonito-del-norte-en-aceite-de-oliva|bonito del norte en aceite de oliva|112               |
|ketchup                               |ketchup                            |93                |
|natural mineral waters                |

Identify traces and compute:
- Number of products by trace
- List containing names of products by trace

In [39]:
from pyspark.sql.functions import split, posexplode, regexp_replace, count, desc

# Variety of traces and sorted by most frequent category
ACME_selection_DF.select('traces_en').dropna()\
.groupBy('traces_en').agg(count('traces_en').alias('traces_en_count'))\
.orderBy(desc('traces_en_count')).drop('traces_en_count').show(20, False)

+-----------------------------+
|traces_en                    |
+-----------------------------+
|nuts                         |
|milk                         |
|milk,nuts                    |
|soybeans                     |
|nuts,peanuts                 |
|eggs                         |
|gluten                       |
|milk,soybeans                |
|gluten,nuts                  |
|nuts,soybeans                |
|milk,nuts,soybeans           |
|nuts,sesame seeds            |
|sesame seeds                 |
|sesame seeds,soybeans        |
|sulphur dioxide and sulphites|
|gluten,milk,nuts             |
|eggs,nuts                    |
|celery                       |
|milk,nuts,peanuts            |
|eggs,nuts,peanuts            |
+-----------------------------+
only showing top 20 rows



In [45]:
from pyspark.sql.functions import split, posexplode, regexp_replace, count, desc

# List of product names by category
ACME_selection_DF.select('traces_en','product_name').dropna()\
.groupBy('traces_en').agg(count('product_name').alias('product_name_count'))\
.orderBy(desc('product_name_count')).show(20, False)

+-----------------------------+------------------+
|traces_en                    |product_name_count|
+-----------------------------+------------------+
|nuts                         |566               |
|milk                         |362               |
|milk,nuts                    |266               |
|soybeans                     |256               |
|nuts,peanuts                 |228               |
|eggs                         |163               |
|gluten                       |161               |
|milk,soybeans                |129               |
|gluten,nuts                  |127               |
|nuts,soybeans                |121               |
|milk,nuts,soybeans           |118               |
|nuts,sesame seeds            |117               |
|sesame seeds                 |101               |
|sesame seeds,soybeans        |84                |
|sulphur dioxide and sulphites|78                |
|gluten,milk,nuts             |71                |
|eggs,nuts                    |

In [46]:
from pyspark.sql.functions import split, posexplode, regexp_replace, count, desc

# List of product names by category
ACME_selection_DF.select('traces_en','product_name').dropna()\
.groupBy('traces_en','product_name').agg(count('product_name').alias('product_name_count'))\
.orderBy(desc('product_name_count')).show(20, False)

+----------------------------------------------------------+------------------------------------+------------------+
|traces_en                                                 |product_name                        |product_name_count|
+----------------------------------------------------------+------------------------------------+------------------+
|gluten                                                    |lenteja pardina                     |7                 |
|nuts                                                      |chocolate con leche                 |7                 |
|milk,soybeans                                             |digestive avena                     |7                 |
|milk,nuts                                                 |chocolate negro                     |6                 |
|nuts,soybeans                                             |bebida de avena                     |5                 |
|milk,nuts                                                 |tabl

### <ins>Ingredients composition related columns basic profiling and questions</ins>

In [48]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit

display(Markdown("**Summary of columns quantity, energy-kcal_100g, fat_100g, saturated_fat_100g, sugars_100g, salt_100g, sodium_100g, additives_en and ingredients_text**:"))
ACME_selection_DF.select("quantity", "energy-kcal_100g", "fat_100g", "saturated-fat_100g").summary().show()
ACME_selection_DF.select( "sugars_100g",'salt_100g','sodium_100g','additives_en', "ingredients_text").summary().show()

display(Markdown("**Checking for nulls on columns quantity, energy_kcal_100g, fat_100g, saturated-fat_100g, sugars_100g, salt_100g, sodium_100g,additives_en and ingredients_text**:"))
ACME_selection_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["quantity", "energy-kcal_100g", "fat_100g", "saturated-fat_100g", "sugars_100g",'salt_100g','sodium_100g','additives_en',"ingredients_text"]]).show()

display(Markdown("**Checking for distinct values in columns quantity, energy_kcal_100g, fat_100g, saturated_fat_100g, sugars_100g, salt_100g, sodium_100g, additives_en and ingredients_text**:"))
ACME_selection_DF.select([countDistinct(c).alias(c) for c in ["quantity", "energy-kcal_100g", "fat_100g", "saturated-fat_100g", "sugars_100g",'salt_100g','sodium_100g','additives_en', "ingredients_text"]]).show()

display(Markdown("**Most and least frequent occurrences for quantity, energy_kcal_100g, fat_100g, saturated-fat_100g, sugars_100g, salt_100g, sodium_100g, additives_en and ingredients_text**:"))
energy_kcal_100gDF = ACME_selection_DF.groupBy("energy-kcal_100g").agg(count(lit(1)).alias("Total"))
fat_100gDF   = ACME_selection_DF.groupBy("fat_100g").agg(count(lit(1)).alias("Total"))
saturated_fat_100gDF    = ACME_selection_DF.groupBy("saturated-fat_100g").agg(count(lit(1)).alias("Total"))
sugars_100gDF      = ACME_selection_DF.groupBy("sugars_100g").agg(count(lit(1)).alias("Total"))
salt_100gDF    = ACME_selection_DF.groupBy("salt_100g").agg(count(lit(1)).alias("Total"))
sodium_100gDF   = ACME_selection_DF.groupBy("sodium_100g").agg(count(lit(1)).alias("Total"))
additives_enDF   = ACME_selection_DF.groupBy("additives_en").agg(count(lit(1)).alias("Total"))
ingredients_textDF  = ACME_selection_DF.groupBy("ingredients_text").agg(count(lit(1)).alias("Total"))

# First table for lowest and higest of different categories
leastFreqenergy_kcal_100g    = energy_kcal_100gDF.orderBy(col("Total").asc()).first()
mostFreqenergy_kcal_100g     = energy_kcal_100gDF.orderBy(col("Total").desc()).first()
leastFreqfat_100g      = fat_100gDF.orderBy(col("Total").asc()).first()
mostFreqfat_100g       = fat_100gDF.orderBy(col("Total").desc()).first()
# Second table for lowest and higest of different categories
leastFreqsaturated_fat_100g       = saturated_fat_100gDF.orderBy(col("Total").asc()).first()
mostFreqsaturated_fat_100g        = saturated_fat_100gDF.orderBy(col("Total").desc()).first()
leastFreqsugars_100g         = sugars_100gDF.orderBy(col("Total").asc()).first()
mostFreqsugars_100g          = sugars_100gDF.orderBy(col("Total").desc()).first()
# Third table for lowest and higest of different categories
leastFreqsalt_100g     = salt_100gDF.orderBy(col("Total").asc()).first()
mostFreqsalt_100g      = salt_100gDF.orderBy(col("Total").desc()).first()
leastFreqsodium_100g    = sodium_100gDF.orderBy(col("Total").asc()).first()
mostFreqsodium_100g     = sodium_100gDF.orderBy(col("Total").desc()).first()
# Fourth table for lowest and higest of different categories
leastFreqadditives_en    = additives_enDF.orderBy(col("Total").asc()).first()
mostFreqadditives_en     = additives_enDF.orderBy(col("Total").desc()).first()
leastFreqingredients_text    = ingredients_textDF.orderBy(col("Total").asc()).first()
mostFreqingredients_text     = ingredients_textDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqenergy_kcal_100g", "mostFreqenergy_kcal_100g", "leastFreqfat_100g", "mostFreqfat_100g", \
       "%s (%d occurrences)" % (leastFreqenergy_kcal_100g["energy-kcal_100g"], leastFreqenergy_kcal_100g["Total"]), \
       "%s (%d occurrences)" % (mostFreqenergy_kcal_100g["energy-kcal_100g"], mostFreqenergy_kcal_100g["Total"]), \
       "%s (%d occurrences)" % (leastFreqfat_100g["fat_100g"], leastFreqfat_100g["Total"]), \
       "%s (%d occurrences)" % (mostFreqfat_100g["fat_100g"], mostFreqfat_100g["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqsaturated_fat_100g", "mostFreqsaturated_fat_100g", "leastFreqsugars_100g", "mostFreqsugars_100g", \
       "%s (%d occurrences)" % (leastFreqsaturated_fat_100g["saturated-fat_100g"], leastFreqsaturated_fat_100g["Total"]), \
       "%s (%d occurrences)" % (mostFreqsaturated_fat_100g["saturated-fat_100g"], mostFreqsaturated_fat_100g["Total"]), \
       "%s (%d occurrences)" % (leastFreqsugars_100g["sugars_100g"], leastFreqsugars_100g["Total"]), \
       "%s (%d occurrences)" % (mostFreqsugars_100g["sugars_100g"], mostFreqsugars_100g["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqadditives_en", "mostFreqadditives_en", "leastFreqingredients_text ", "mostFreqingredients_text ", \
       "%s (%d occurrences)" % (leastFreqadditives_en["additives_en"], leastFreqadditives_en["Total"]), \
       "%s (%d occurrences)" % (mostFreqadditives_en["additives_en"], mostFreqadditives_en["Total"]), \
       "%s (%d occurrences)" % (leastFreqingredients_text["ingredients_text"], leastFreqingredients_text["Total"]), \
       "%s (%d occurrences)" % (mostFreqingredients_text["ingredients_text"], mostFreqingredients_text["Total"]))))


**Summary of columns quantity, energy-kcal_100g, fat_100g, saturated_fat_100g, sugars_100g, salt_100g, sodium_100g, additives_en and ingredients_text**:

+-------+------------------+------------------+--------------------+------------------+
|summary|          quantity|  energy-kcal_100g|            fat_100g|saturated-fat_100g|
+-------+------------------+------------------+--------------------+------------------+
|  count|             37270|            135435|              136542|            136543|
|   mean|156.85462807881774|292.02799817283926|  15.593317012586347| 5.321165252786318|
| stddev|218.02797269892247|221.78471317168402|  20.019686905822905|   7.9463911596595|
|    min| &lt;318g 6 huevos|               0.0|                 0.0|               0.0|
|    25%|               3.0|             106.0|                 1.3|               0.3|
|    50%|              50.0|             273.0|                 8.1|               2.0|
|    75%|             250.0|             427.0|                24.0|               7.0|
|    max|             ¨8 ml|         en:gluten|en:product-name-c...|             140.0|
+-------+------------------+----

**Checking for nulls on columns quantity, energy_kcal_100g, fat_100g, saturated-fat_100g, sugars_100g, salt_100g, sodium_100g,additives_en and ingredients_text**:

+--------+----------------+--------+------------------+-----------+---------+-----------+------------+----------------+
|quantity|energy-kcal_100g|fat_100g|saturated-fat_100g|sugars_100g|salt_100g|sodium_100g|additives_en|ingredients_text|
+--------+----------------+--------+------------------+-----------+---------+-----------+------------+----------------+
|  140359|           42194|   41087|             41086|      41525|    39855|      39855|      167841|          158193|
+--------+----------------+--------+------------------+-----------+---------+-----------+------------+----------------+



**Checking for distinct values in columns quantity, energy_kcal_100g, fat_100g, saturated_fat_100g, sugars_100g, salt_100g, sodium_100g, additives_en and ingredients_text**:

+--------+----------------+--------+------------------+-----------+---------+-----------+------------+----------------+
|quantity|energy-kcal_100g|fat_100g|saturated-fat_100g|sugars_100g|salt_100g|sodium_100g|additives_en|ingredients_text|
+--------+----------------+--------+------------------+-----------+---------+-----------+------------+----------------+
|    3825|            3751|    3876|              2963|       3517|     2636|       2644|        4176|           17889|
+--------+----------------+--------+------------------+-----------+---------+-----------+------------+----------------+



**Most and least frequent occurrences for quantity, energy_kcal_100g, fat_100g, saturated-fat_100g, sugars_100g, salt_100g, sodium_100g, additives_en and ingredients_text**:


| leastFreqenergy_kcal_100g | mostFreqenergy_kcal_100g | leastFreqfat_100g | mostFreqfat_100g |
|----|----|----|----|
| 1496.0 (1 occurrences) | None (42194 occurrences) | 25.799999237061 (1 occurrences) | None (41087 occurrences) |



| leastFreqsaturated_fat_100g | mostFreqsaturated_fat_100g | leastFreqsugars_100g | mostFreqsugars_100g |
|----|----|----|----|
| 9.6800003051758 (1 occurrences) | None (41086 occurrences) | 20.39999961853 (1 occurrences) | None (41525 occurrences) |



| leastFreqadditives_en | mostFreqadditives_en | leastFreqingredients_text  | mostFreqingredients_text  |
|----|----|----|----|
| e150c - ammonia caramel,e322 - lecithins,e322i - lecithin (1 occurrences) | None (167841 occurrences) | carne de pollo 48 %, carne de pavo 34 %, agua, sal, especias, conservador (sulfito de sodio ) y colorante (carmín y extracto de pimentón)  embutido con tripa estrecha de cordero. (1 occurrences) | None (158193 occurrences) |


The majority of food added to the platform are missing nutritional facts.

Data quality analysis on fields of interest (see appendix 1):
- Number of products with complete info.
- % of products without complete analysis per 100g
- % of products without additives info
- % of products without traces info

In [89]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import split, posexplode, regexp_replace, count, desc, lit, col

total = ACME_selection_DF.select("*").dropna().count()
display(Markdown("The total number of complete columns is %d" % total))

The total number of complete columns is 1234

In [113]:
# Creation of neccessary DF for analysis
products_100gDF = \
  ACME_selection_DF.withColumn("energy-kcal_100g", col("energy-kcal_100g").cast(IntegerType()))\
                   .withColumn("fat_100g", col("fat_100g").cast(IntegerType()))\
                   .withColumn("saturated-fat_100g", col("saturated-fat_100g").cast(IntegerType()))\
                   .withColumn("sugars_100g", col("sugars_100g").cast(IntegerType()))\
                   .withColumn("salt_100g", col("salt_100g").cast(IntegerType()))\
                   .withColumn("sodium_100g", col("sodium_100g").cast(IntegerType()))\
                   .select("energy-kcal_100g","fat_100g","saturated-fat_100g",\
                               "sugars_100g",'salt_100g','sodium_100g')

products_100gDF.cache() # to make it run faster

# % of products without complete analysis per 100g
wth_comp_ana = (1 - products_100gDF.dropna().count()/products_100gDF.count())*100
display(Markdown("**%2.2f %% of products are without complete analysis per 100g.**" % wth_comp_ana))

**26.48 % of products are without complete analysis per 100g.**

In [119]:
# % of products without additives info

add_non_miss = (1 - ACME_selection_DF.select('additives_en').dropna().count()/ACME_selection_DF.count())*100
display(Markdown("**%2.2f %% of products are without additives info.**" % add_non_miss))

**94.49 % of products are without additives info.**

In [120]:
# % of products without traces info

trace_miss = (1 - ACME_selection_DF.select('traces_en').dropna().count()/ACME_selection_DF.count())*100
display(Markdown("**%2.2f %% of products are without traces info.**" % trace_miss))

**96.52 % of products are without traces info.**

### <ins>Dangerous products categorization</ins>

In [123]:
from pyspark.sql.functions import count, round
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, split

totalProducts = ACME_selection_DF.count()

# Labeling and categorization of the different additives

safe = ["e100", "e101","e140","e160a","e160b","e160c","e160d","e160e","e160f","e161a","e161b","e161c","e161c",\
        "e161d","e161e","e161f","e162","e163","e170","e174","e175","e236","e237","e238","e263","e270","e280",\
        "e281","e282","e283","e300","e301","e302","e304","e306","e307","e308","e309","e310","e322","e325","e326",\
        "e327","e331","e332""e333","e335","e336","e337","e400","e401","e402","e403","e404","e405","e406","e408",
        "e410","e411","e412","e414","e415","e420","e421","e422","e471","e472","e473","e474","e475","e480","107","154","234"]

forbidden = ["e103","e105","e111","e121","e125","e126","e130","e152","e181"]

suspicious = ["e122","e141","e150","e153","e171","e172","e173","e180","e240","e241","e413","e477","128","133"]

toxic = ["621","622","623"]

dangerous = ["e102","e104","e110","e120","e124","e127","e151","e200","e201","e202","e203","e210","e211","e212",\
             "e213","e214","e215","e216","e217","e218","e219","e220","e221","e222","e223","e224","e226","e227",\
             "e230","e231","e232","e333","e249","e250","e251","e252","e260","e261","e262","e290","e320","e311",\
             "e312","e321","e330","e334","e338","e339","e340","e341","e407","e450a","e450b","e450c","e461","e462",\
             "e463","e465","466"]

very_dangerous = ["e123"]

cancer = ["e131","e132","e142","e239"]

emul_stabilizers = ["430","431","432","433","434","435","436","476","478","491","492","493","494","495","e460",\
                    "e464","e470","e481","e482","e483","e440a","e440b"]
miscellaneous = ["262","296","297","350","351","352","353","355","363","370","375","380","381","385","500","501","503",\
                 "504","507","508","509","510","513","514","515","516","518","524","525","526","527","528","529","530","535",\
                 "536","540","541","542","544","545","551","552","554","556","558","559","570","572","575","576","577","578",\
                 "620","627","631","635","636","637","900","901","903","904","905","907","920","924","925","926","927"]

In [None]:

CategorizationDF = foodDF\
   .where(col("additives_tags")!="NA")\
   .withColumn("FoodSafety", when(col("additives_tags") in safe,"Safe")\
                               .when(col("additives_tags") in forbidden,"Forbidden")\
                               .when(col("additives_tags") in toxic ,"Toxic")\
                               .when(col("additives_tags") in dangerous ,"Dangerous")\
                               .when(col("additives_tags") in very_dangerous ,"Very Dangerous")\
                               .when(col("additives_tags") in cancer ,"Cancer")\
                               .when(col("additives_tags") in emul_stabilizers ,"Stabilizers")\
                               .when(col("additives_tags") in miscellaneous ,"Miscellaneous")\
                               .otherwise("Not Defined"))
CategorizationDF.cache() # optimization to make the processing faster
# 2. Ready to answer to this business question
CategorizationDF.where(col("additives_tags")!="NA")\
                     .select("FoodSafety", "additives_tags")\
                     .groupBy("FoodSafety")\
                     .agg(count("FoodSafety").alias("NumFoodCategorized"), \
                          (count("FoodSafety")/totalProducts*100).alias("Ratio"))\
                     .orderBy("FoodSafety")\
                     .select("FoodSafety","NumFoodCategorized",round("Ratio",2).alias("RoundedRatio")).show()

In [130]:
dangerous_prodDF = ACME_selection_DF.where(col("additives_tags")!="NA")\
                   .select(posexplode(split('additives_tags',',')),'product_name').toDF('pos','e_numbers','products')

dangerous_prodDF.withColumn('Dangerous', when(col('e_numbers') in dangerous,'dangerous')\
                            .otherwise('Fine')).show()

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.