## Setting up PySpark

In [1]:
# PySpark is not on the system path by default and sometimes Python cannot find it
# so we use this library (just in case) to initialize PySpark
import findspark
findspark.init()

In [2]:
# import the libraries
# pyspark.sql -> is a component that facilitates processing of structured (ex: CSV) and semi-structured data (ex: JSOn) & it allows us to use SQL syntax
# SparkSession -> is the entry point to create spark dataframe or RDD (resilient distributed datasets)
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, avg, count

In [3]:
# build the application
spark = SparkSession.builder.appName("openfoodAnalysis").getOrCreate()

In [4]:
## load the data

# delimiter = how to separate
# header = to get the column name
# inferSchema = to get the correct data type for each column 
data = spark.read.options(delimiter='\t', header= True, inferSchema=True).csv("D:\Downloads\openfood.csv")
data.printSchema()

root
 |-- code: double (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: integer (nullable = true)
 |-- created_datetime: timestamp (nullable = true)
 |-- last_modified_t: integer (nullable = true)
 |-- last_modified_datetime: timestamp (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_en: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categories_tags: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- origins: string (nullable = true)
 |-- origins_tags: string (nullable = true)
 |-- origins_en: string (nul

## Exploring the data

In [5]:
# check the object type
type(data)

pyspark.sql.dataframe.DataFrame

In [6]:
# check the size of the data
print(data.count()) # rows -> 2.445.422
print(len(data.columns)) # columns -> 186

2445422
186


In [7]:
# shows the first row in a list format
data.head(1)

[Row(code=225.0, url='http://world-en.openfoodfacts.org/product/00000000000000225/jeunes-pousses-endives', creator='nutrinet-sante', created_t=1623855208, created_datetime=datetime.datetime(2021, 6, 16, 15, 53, 28), last_modified_t=1623855209, last_modified_datetime=datetime.datetime(2021, 6, 16, 15, 53, 29), product_name='jeunes pousses', abbreviated_product_name=None, generic_name=None, quantity=None, packaging=None, packaging_tags=None, packaging_en=None, packaging_text=None, brands='endives', brands_tags='endives', categories=None, categories_tags=None, categories_en=None, origins=None, origins_tags=None, origins_en=None, manufacturing_places=None, manufacturing_places_tags=None, labels=None, labels_tags=None, labels_en=None, emb_codes=None, emb_codes_tags=None, first_packaging_code_geo=None, cities=None, cities_tags=None, purchase_places=None, stores=None, countries='en:france', countries_tags='en:france', countries_en='France', ingredients_text=None, ingredients_tags=None, allerg

In [8]:
# shows the first row in a dataframe format
# we cannot see well as we have a lot of columns
data.show(1)

+-----+--------------------+--------------+----------+-------------------+---------------+----------------------+--------------+------------------------+------------+--------+---------+--------------+------------+--------------+-------+-----------+----------+---------------+-------------+-------+------------+----------+--------------------+-------------------------+------+-----------+---------+---------+--------------+------------------------+------+-----------+---------------+------+---------+--------------+------------+----------------+----------------+---------+------------+------+-----------+---------+------------+----------------+-------------+-----------+---------+--------------+------------+----------------+----------------+----------+-------------+-------------+-----------+----------------+--------------+--------------------+--------------------+--------------------+-----------+--------------+--------------+-------------+----------------+---------+---------------+--------------

## working with the subset

In [9]:
# let's work with the sample to not break my computer (20%)
samples = data.sample(0.2)

In [10]:
# count how many rows
samples.count() # 489.937

489937

In [6]:
# to save it into csv, can break if it's too big
# samples.toPandas().to_csv("sample_openfood.csv", header=True)

In [11]:
# selecting only the important columns for our analysis
important = ['origins_en','product_name','brands', 'categories_en', 'countries_en', 'food_groups_en', "energy_100g",'proteins_100g', 'sugars_100g','nutriscore_score', 'nutriscore_grade']

In [12]:
# dropping the rows where it has any nulls in the column we chose
dropped = samples.na.drop(how = "any", subset = important)

In [13]:
# put only the columns we need in a new variable
# see how much rows we have
data2 = dropped.select(important)
data2.count() # 11.852

11852

In [14]:
# see the first row
# much more readable
data2.head(1)

[Row(origins_en='fr:quebec', product_name='Salade Cesar', brands='Kirkland Signature', categories_en='Plant-based foods and beverages,Plant-based foods,Fruits and vegetables based foods,Vegetables based foods,Leaf vegetables', countries_en='Canada', food_groups_en='Fruits and vegetables,Vegetables', energy_100g=1210.0, proteins_100g=22.0, sugars_100g=0.0, nutriscore_score=6, nutriscore_grade='c')]

In [16]:
# to rename a column 
data2 = data2.withColumnRenamed("categories_en", "categories").withColumnRenamed("food_groups_en", "food_groups").withColumnRenamed("origins_en", "origins").withColumnRenamed("countries_en", "countries")
print(data2.columns)

['origins', 'product_name', 'brands', 'categories', 'countries', 'food_groups', 'energy_100g', 'proteins_100g', 'sugars_100g', 'nutriscore_score', 'nutriscore_grade']


In [17]:
# see the first 2 rows
data2.show(2)

+---------+--------------------+------------------+--------------------+---------+--------------------+-----------+-------------+-----------+----------------+----------------+
|  origins|        product_name|            brands|          categories|countries|         food_groups|energy_100g|proteins_100g|sugars_100g|nutriscore_score|nutriscore_grade|
+---------+--------------------+------------------+--------------------+---------+--------------------+-----------+-------------+-----------+----------------+----------------+
|fr:quebec|        Salade Cesar|Kirkland Signature|Plant-based foods...|   Canada|Fruits and vegeta...|     1210.0|         22.0|        0.0|               6|               c|
|fr:quebec|Chaussons tressés...|Kirkland Signature|Snacks,Sweet snac...|   Canada|Sugary snacks,Pas...|     1090.0|         3.33|       24.7|               9|               c|
+---------+--------------------+------------------+--------------------+---------+--------------------+-----------+-----

In [18]:
## how to get a specific object inside the column?
data2['food_groups'][0] # returns a column object

# 2 ways
print(data2.collect()[0][5])
data2.head(1)[0]['food_groups']


Fruits and vegetables,Vegetables


'Fruits and vegetables,Vegetables'

In [20]:
# SQL query 
data2.createOrReplaceTempView('food_table')
query = "SELECT food_groups FROM food_table;"
spark.sql(query).show(1, truncate=False)

+--------------------------------+
|food_groups                     |
+--------------------------------+
|Fruits and vegetables,Vegetables|
+--------------------------------+
only showing top 1 row



## question 1: which categories has on average the best and worst nutriscore?
The lower the score of the nutriscore then it is considered more nutritious

| Points      | Nutriscore Grade| 
| :---       |  :----: | 
|  -15 to 1   |  A      |
|   0 to 2    |  B      | 
|   3 to 10   |  C      | 
|  11 to 18   |  D      |
|  19 to 40   |  E      |

In [21]:
# let's first see how many categories are there 

data2.groupBy('categories').count().show(10, truncate= False)
# as we see the categories consists of many things
# say that the first category that appears is the main category and we want to only consider that.
#  we need to iterate through each rows and do a split on the comma (,) 

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|categories                                                                                                                                                                                  |count|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Plant-based foods and beverages,Plant-based foods,Fruits and vegetables based foods,Vegetables based foods,Leaf vegetables                                                                  |1    |
|Plant-based foods and beverages,Plant-based foods,Fruits and vegetables based foods,Vegetables based foods,Tomatoes and their products,Tomato pastes                                        |11   |
|Plant-based fo

In [22]:
# let's focus on the first row to see how to do it
print(data2.head(1)[0])

# we want to get the nutriscore and the first category
# make a lambda function to split the categories and get the first one
(lambda x: (x['nutriscore_score'], x['categories'].split(',')[0]))(data2.head(1)[0])

Row(origins='fr:quebec', product_name='Salade Cesar', brands='Kirkland Signature', categories='Plant-based foods and beverages,Plant-based foods,Fruits and vegetables based foods,Vegetables based foods,Leaf vegetables', countries='Canada', food_groups='Fruits and vegetables,Vegetables', energy_100g=1210.0, proteins_100g=22.0, sugars_100g=0.0, nutriscore_score=6, nutriscore_grade='c')


(6, 'Plant-based foods and beverages')

In [23]:
# we cannot apply map() function to a dataframe so we need to convert it to an rdd object first
rdd=data2.rdd.map(lambda x: (x['nutriscore_score'], x['categories'].split(',')[0]))  # returns rdd object
rdd

PythonRDD[78] at RDD at PythonRDD.scala:53

In [24]:
# to see what is inside the first row (2 ways)
rdd.take(1)
rdd.first()

(6, 'Plant-based foods and beverages')

In [25]:
# let's convert it back to a dataframe
newdf = rdd.toDF(['nutriscore','main_category'])
newdf.show(2)

+----------+--------------------+
|nutriscore|       main_category|
+----------+--------------------+
|         6|Plant-based foods...|
|         9|              Snacks|
+----------+--------------------+
only showing top 2 rows



In [26]:
# to see the full value of the dataframe we can specify truncate=False
newdf.show(3, truncate= False)

+----------+-------------------------------+
|nutriscore|main_category                  |
+----------+-------------------------------+
|6         |Plant-based foods and beverages|
|9         |Snacks                         |
|23        |Biscuits and cakes             |
+----------+-------------------------------+
only showing top 3 rows



In [27]:
from pyspark.sql.functions import avg

#  the best nutriscore
newdf.groupBy('main_category') \
    .agg(avg('nutriscore').alias('mean_score')) \
    .sort('mean_score') \
    .show(10, truncate= False)

+-------------------------------+-------------------+
|main_category                  |mean_score         |
+-------------------------------+-------------------+
|Milk-substitute                |-2.0               |
|Meat-based products            |-1.0               |
|Farming products               |-0.9516129032258065|
|Chips and fries                |0.125              |
|Plant-based foods and beverages|1.6856492027334853 |
|Canned foods                   |4.0                |
|Frozen foods                   |4.492227979274611  |
|Pickles                        |4.5                |
|Meals                          |4.6120857699805065 |
|Seafood                        |6.063439065108514  |
+-------------------------------+-------------------+
only showing top 10 rows



In [28]:
from pyspark.sql.functions import desc, avg

# the worst nutriscore
newdf.groupBy('main_category') \
    .agg(avg('nutriscore').alias('mean_score')) \
    .sort(desc('mean_score')) \
    .show(10, truncate= False)

+----------------------+------------------+
|main_category         |mean_score        |
+----------------------+------------------+
|Fish and meat and eggs|19.36842105263158 |
|Sweet pies            |17.0              |
|Snacks                |16.80310378273521 |
|Biscuits and cakes    |16.28301886792453 |
|Pies                  |14.833333333333334|
|Fats                  |14.6              |
|Breakfasts            |14.444444444444445|
|Spreads               |13.482758620689655|
|Crêpes and galettes   |13.25             |
|Sweeteners            |12.703703703703704|
+----------------------+------------------+
only showing top 10 rows



## question 2: which country has on average the best nutriscore?

In [29]:
## see how many distinct origins country we have
print(data2.select('origins').distinct().count())

data2.select('origins').show(5, truncate=False)

2200
+---------+
|origins  |
+---------+
|fr:quebec|
|fr:quebec|
|France   |
|France   |
|France   |
+---------+
only showing top 5 rows



In [37]:
# we see that the name of the country is very messy, so we need to clean it a bit
# if there is more than one country, i will only consider the first one
# if the name of the country consist of ':' i will ignore those country and put null value
# else i will keep the value
# note: i can do much better cleaning this but i think this is the quickest way, we will get most of the important country

def clean_country(df):
    countries = df.origins
    ns = df.nutriscore_score
    export = df.countries

    if "," in countries:
        clean = countries.split(',')[0].title()
        if ":" in countries:
            clean = None
    elif ":" in countries:
        clean = None
    else:
        clean = countries.title()

    return clean, ns, export

In [40]:
# apply it to the rdd
rdd=data2.rdd.map(lambda x: clean_country(x))
newdf2 = rdd.toDF(['origins','nutriscore', 'exports'])
newdf2.show(4, truncate=False)

+-------+----------+-------+
|origins|nutriscore|exports|
+-------+----------+-------+
|null   |6         |Canada |
|null   |9         |Canada |
|France |23        |France |
|France |5         |France |
+-------+----------+-------+
only showing top 4 rows



In [41]:
# dropping the null values
newdf2 = newdf2.na.drop(how = "any")

# see how many distinct country
newdf2.select('origins').distinct().count()

400

In [42]:
## let's see which country has on average the lowest nutriscore
# i will only consider countries that appears 30 or more time

newdf2.groupBy('origins') \
    .agg(avg('nutriscore').alias('mean_score'), count('origins').alias('count_country')) \
    .filter(count('origins') >= 30) \
    .sort('mean_score') \
    .show(10, truncate= False)

+-------------------------------------+------------------+-------------+
|origins                              |mean_score        |count_country|
+-------------------------------------+------------------+-------------+
|Thailand                             |2.422680412371134 |97           |
|Turkey                               |2.4415584415584415|77           |
|China                                |2.4725274725274726|91           |
|Vietnam                              |3.1142857142857143|35           |
|Italy                                |3.776034236804565 |701          |
|Portugal                             |4.574468085106383 |47           |
|Peru                                 |4.746268656716418 |67           |
|Finland                              |4.859375          |64           |
|European Union And Non European Union|4.888888888888889 |81           |
|United States                        |4.9411764705882355|238          |
+-------------------------------------+------------

## question 3: which country on average do more exports?

In [43]:
## for this question i will only consider those countries that are in the top 10 of nutriscore score
best_countries = newdf2.groupBy('origins') \
                        .agg(avg('nutriscore').alias('mean_score')) \
                        .filter(count('origins') >= 30).select('origins')

# putting the countries names in the list
best_countries = best_countries.select('origins').rdd.map(lambda x : x[0]).collect()

In [48]:
# filter the dataframe and put it in a new one
filtered = newdf2.filter(newdf2['origins'].isin(best_countries))
filtered.show(4)

+--------------+----------+--------------------+
|       origins|nutriscore|             exports|
+--------------+----------+--------------------+
|        France|        23|              France|
|        France|         5|              France|
|        France|        16|              France|
|United Kingdom|         9|France,United Kin...|
+--------------+----------+--------------------+
only showing top 4 rows



In [53]:
# this function counts how many countries the product available in
# if there is only one country then it means the product is only available in the origins country
# so i will return 0
# if there is more than 1, i will count based on splitting the comma then the total is -1 
# because i don't want to count the origin country
def count_exports(df):
    country = df.origins
    exports = df.exports

    if ',' in exports:
        c = exports.split(',')
        return country, len(c)-1
    else:
        return country, 0

In [54]:
# apply it to rdd and transform it back to dataframe
rdd=filtered.rdd.map(lambda x: count_exports(x))
newdf3 = rdd.toDF(['origins','export_count'])
newdf3.show(4, truncate=False)

+--------------+------------+
|origins       |export_count|
+--------------+------------+
|France        |0           |
|France        |0           |
|France        |0           |
|United Kingdom|1           |
+--------------+------------+
only showing top 4 rows



In [56]:
# from my sample, France is the country that has the most exports
from pyspark.sql.functions import sum

newdf3.groupBy('origins') \
    .agg(sum('export_count').alias('total_export')) \
    .sort(desc('total_export')) \
    .show(10,truncate= False)

+--------------+------------+
|origins       |total_export|
+--------------+------------+
|France        |334         |
|Italy         |189         |
|European Union|172         |
|Spain         |131         |
|Germany       |106         |
|Switzerland   |51          |
|United States |50          |
|Belgium       |47          |
|United Kingdom|44          |
|Argentina     |43          |
+--------------+------------+
only showing top 10 rows



## Stop the session

In [57]:
# show the UI 
spark

In [58]:
# to stop the session
spark.stop()