# B Team - Group Assignment

### These are the questions we expect you to deal with:
•	Find the oldest product   
•	Find the newest product   
•	Average product age, where age means how long the product has been in the system.  
•	List of other countries where products are sold too.  
•	Identify category of products and the compute:    
          o	Number of products by category      
          o	List containing names of products by category      
•	Identify traces and compute:  
        o	Number of products by trace      
        o	List containing names of products by trace      
•	Data quality analysis on fields of interest (see appendix 1):  
        o	Number of products with complete info.     
        o	% of products without complete analysis per 100g  
        o	% of products without additives info  
        o	% of products without traces info  
•	Data profiling on fields of interest (see appendix 1): 
        o	Stats on analysis per 100g fields  

Additionally, your group has to **determine how healthy the different products are.** In order to come up with this conclusion, use an official health organism (ex. [NHS)](https://www.nhs.uk/live-well/eat-well/how-to-read-food-labels/?tabname=digestive-health)  
As you can see, it’s a very open exercise and you’re more than encouraged to make your own decisions. Please, share decisions and considerations taking into account during the final presentation.  
As usual, feel free to use the Discussion Board to ask questions and clarify doubts.  
Raúl  
Appendix 1.  
 
For more information about fields available in the data set, have a look at fields’ [description.](https://static.openfoodfacts.org/data/data-fields.txt) 
The following fields are considered fields of interest:  
•	Creator  
•	Created_datetime  
•	Last_modified_datetime  
•	Product_name  
•	Countries_en   
•	Traces_en  
•	Additives_tags  
•	Main_category_en  
•	Image_url  
•	Quantity  
•	Packaging_tags  
•	Categories_en  
•	Ingredients_text  
•	Additives_en  
•	Energy-kcal_100g  
•	Fat_100g  
•	Saturated-fat_100g  
•	Sugars_100g  
•	Salt_100g/sodium_100g  
If you think more fields should be considered fields of interest, feel free to add them to the list.   



## 1. PySpark environment setup

In [1]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [2]:
from IPython.display import display, Markdown
from pyspark.sql.functions import sum,avg,max,min,mean,count
from pyspark.sql.types import *
from pyspark.sql.functions import *
#from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit
from pyspark.ml.feature import Imputer

## 2. Data source and Spark data abstraction (DataFrame) setup

In [3]:
originalDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("multiLine", "true")\
                 .option("escape", "\"")\
                 .option("header", "true") \
                 .csv("en.openfoodfacts.org.products.Belgium.csv.gz")

## 3. Data set metadata analysis
### Display schema and size of the DataFrame

In [4]:
originalDF.printSchema()

root
 |-- code: double (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: integer (nullable = true)
 |-- created_datetime: timestamp (nullable = true)
 |-- last_modified_t: integer (nullable = true)
 |-- last_modified_datetime: timestamp (nullable = true)
 |-- product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categories_tags: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- origins: string (nullable = true)
 |-- origins_tags: string (nullable = true)
 |-- manufacturing_places: string (nullable = true)
 |-- manufacturing_places_tags: string (nullable = true)
 |-- labels: string (nullable = true)
 |-- labels_tags: string (n

In [5]:
display(Markdown("This DataFrame has **%d rows** and ***%d columns***." % (originalDF.count(), len(originalDF.columns))))

This DataFrame has **51797 rows** and ***181 columns***.

In [6]:
originalDF.cache() # optimization to make the processing faster
originalDF.sample(False, 0.1).take(1)

[Row(code=625.0, url='http://world-en.openfoodfacts.org/product/00000625/chips-de-banane', creator='kiliweb', created_t=1571139104, created_datetime=datetime.datetime(2019, 10, 15, 13, 31, 44), last_modified_t=1571139105, last_modified_datetime=datetime.datetime(2019, 10, 15, 13, 31, 45), product_name='Chips de banane', generic_name=None, quantity=None, packaging=None, packaging_tags=None, brands=None, brands_tags=None, categories=None, categories_tags=None, categories_en=None, origins=None, origins_tags=None, manufacturing_places=None, manufacturing_places_tags=None, labels=None, labels_tags=None, labels_en=None, emb_codes=None, emb_codes_tags=None, first_packaging_code_geo=None, cities=None, cities_tags=None, purchase_places=None, stores=None, countries='en:be', countries_tags='en:belgium', countries_en='Belgium', ingredients_text=None, allergens=None, allergens_en=None, traces=None, traces_tags=None, traces_en=None, serving_size=None, serving_quantity=None, no_nutriments=None, addit

In [7]:
#Lets see if any columns can be used as index keys

In [8]:
pot_IK=originalDF\
    .select('code', 'product_name', 'created_t' )

In [9]:
print("Checking amount of distinct values in nutriscore_score fields:")
pot_IK.select([countDistinct(c).alias(c) for c in pot_IK.columns]).show()

Checking amount of distinct values in nutriscore_score fields:
+-----+------------+---------+
| code|product_name|created_t|
+-----+------------+---------+
|51773|       39394|    51745|
+-----+------------+---------+



We will use "code' as an index in case we need o to split the DF or do any joins

### Actual df with important columns

In [10]:
productsDF = originalDF\
    .select('code','Creator', 'Created_datetime', 'Last_modified_datetime', 'Product_name', 'Countries_en',\
            'Traces_en', 'Additives_tags', 'Main_category_en', 'Image_url', 'Quantity', 'Packaging_tags',\
            "countries",'Categories_en', 'Ingredients_text', 'Additives_en', 'Energy-kcal_100g', 'Fat_100g',\
            'Saturated-fat_100g', 'Sugars_100g', 'Salt_100g', 'sodium_100g')

In [11]:
productsDF.printSchema()


root
 |-- code: double (nullable = true)
 |-- Creator: string (nullable = true)
 |-- Created_datetime: timestamp (nullable = true)
 |-- Last_modified_datetime: timestamp (nullable = true)
 |-- Product_name: string (nullable = true)
 |-- Countries_en: string (nullable = true)
 |-- Traces_en: string (nullable = true)
 |-- Additives_tags: string (nullable = true)
 |-- Main_category_en: string (nullable = true)
 |-- Image_url: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Packaging_tags: string (nullable = true)
 |-- countries: string (nullable = true)
 |-- Categories_en: string (nullable = true)
 |-- Ingredients_text: string (nullable = true)
 |-- Additives_en: string (nullable = true)
 |-- Energy-kcal_100g: double (nullable = true)
 |-- Fat_100g: double (nullable = true)
 |-- Saturated-fat_100g: double (nullable = true)
 |-- Sugars_100g: double (nullable = true)
 |-- Salt_100g: double (nullable = true)
 |-- sodium_100g: double (nullable = true)



## 4. Data quality analysis and data profiling on fields of interest

### Number of products with complete info

In [12]:
display(Markdown("Products DataFrame has **%d products** with complete information." % productsDF.na.drop().count()))

Products DataFrame has **1333 products** with complete information.

### % of products without complete analysis per 100g

In [13]:
analysis100gDF = productsDF.select(originalDF.colRegex("`.+_100g$`"))
display(Markdown("Products without complete analysis per 100g: **%.2f %%**." % (100 * (1 - (analysis100gDF.na.drop().count() / productsDF.count())))))

Products without complete analysis per 100g: **26.17 %**.

### % of products without additives info

In [14]:
additivesDF = productsDF.select(originalDF.colRegex("`additives.+`"))
display(Markdown("Products without additives info: **%.2f %%**." % (100 * (1 - (additivesDF.na.drop().count() / productsDF.count())))))

Products without additives info: **87.52 %**.

### % of products without traces info

In [15]:
tracesDF = productsDF.select(originalDF.colRegex("`traces.+`"))
display(Markdown("Products without traces info: **%.2f %%**." % (100 * (1 - (tracesDF.na.drop().count() / productsDF.count())))))

Products without traces info: **92.47 %**.

### Stats on analysis per 100g fields

In [16]:
print ("Summary of ´per 100g fields´:")
analysis100gDF.summary().show()

print("Checking for nulls on ´per 100g fields´:")
analysis100gDF.select([count(when(col(c).isNull(), c)).alias(c) for c in analysis100gDF.columns]).show()

print("Checking amount of distinct values in ´per 100g fields´:")
analysis100gDF.select([countDistinct(c).alias(c) for c in analysis100gDF.columns]).show()

Summary of ´per 100g fields´:
+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+
|summary| Energy-kcal_100g|          Fat_100g|Saturated-fat_100g|       Sugars_100g|        Salt_100g|       sodium_100g|
+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            39842|             40866|             41088|             40940|            40648|             40648|
|   mean|302.7755980208913|14.048285559262917| 4.982827799539132|12.491814439693892|1.115387922925355|0.4461550914099248|
| stddev|6681.553529305641| 18.33876447086986|10.517148813034654|18.925982945612105| 3.77764405770672|1.5110575956759489|
|    min|              0.0|               0.0|               0.0|               0.0|              0.0|               0.0|
|    25%|             96.0|               0.7|  0.10000000149012|  0.80000001192093|             0.03|             0

## 5.Business questions 

###  Newest and Oldest product

In [17]:
#define reference to oldest and newest product
oldprod = productsDF.orderBy(col("Created_datetime").asc()).first()
newprod = productsDF.orderBy(col("Created_datetime").desc()).first()

display(Markdown("""
| %s | %s |
|----|----|
| %s | %s | 
""" % ("Oldest Product", "Newest Product", \
       "%s (%s)" % (oldprod["Product_name"], oldprod["Created_datetime"]),\
       "%s (%s)" % (newprod["Product_name"], newprod["Created_datetime"]))))


| Oldest Product | Newest Product |
|----|----|
| Lulu La Barquette Fraise (2012-02-11 16:07:23) | Andalouse (2020-09-13 03:23:53) | 


### Average product age

In [18]:
import datetime
#define current datetime
now = datetime.datetime.now()
#Getting Current date and time
current_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
productsDF = productsDF.withColumn("current_time", lit(current_datetime))

In [19]:
productsDF = productsDF\
.withColumn('time_diff_hours',(unix_timestamp('current_time') - unix_timestamp('Created_datetime'))/3600)\
    .withColumn("time_diff_days", datediff(col("current_time"),col("Created_datetime")))\
    .withColumn("time_diff_months", months_between(col("current_time"),col("Created_datetime")))\
    .withColumn("time_diff_years", year(col("current_time")) - year(col("Created_datetime")))

In [20]:
display(Markdown("**The average product age**"))
productsDF.agg(avg("time_diff_days").alias("avg_product_age_in_days"),\
avg("time_diff_months").alias("avg_product_age_in_months"),\
avg("time_diff_years").alias("avg_product_age_in_years"))\
.show()

**The average product age**

+-----------------------+-------------------------+------------------------+
|avg_product_age_in_days|avg_product_age_in_months|avg_product_age_in_years|
+-----------------------+-------------------------+------------------------+
|      678.3958144294071|       22.266827689725304|      1.4964573237832306|
+-----------------------+-------------------------+------------------------+



In [70]:
display(Markdown("The average product age is **678 days, 22.26 months or 1.5 years** which is true for the following products:"))
productsDF.where(col("time_diff_days")== 678) \
                 .select('Product_name','Created_datetime')\
.show(200)

The average product age is **678 days, 22.26 months or 1.5 years** which is true for the following products:

+--------------------+-------------------+
|        Product_name|   Created_datetime|
+--------------------+-------------------+
|                null|2018-12-16 22:59:09|
|                null|2018-12-16 22:56:53|
|  Lait de croissance|2018-12-16 22:52:03|
|   Gâteaux aux oeufs|2018-12-16 23:06:23|
|Filet de saumon A...|2018-12-16 22:58:43|
|            Gianduja|2018-12-16 23:03:03|
|Pommes de terres ...|2018-12-16 22:59:14|
|         Nono orange|2018-12-16 22:57:49|
|Scampi's décortiqués|2018-12-16 22:59:15|
|     mix pâtisseries|2018-12-16 23:05:44|
|                null|2018-12-16 22:58:33|
|Chocolat fourré a...|2018-12-16 23:08:11|
|    Truffes de cacao|2018-12-16 23:08:19|
|Couscous aux legu...|2018-12-16 22:54:33|
|      Les Grenailles|2018-12-16 22:59:23|
| Legumes pour potage|2018-12-16 22:55:15|
|   Wok légumes épicé|2018-12-16 22:54:31|
|(H)eat me - Spagh...|2018-12-16 22:54:40|
|       Soupe fraiche|2018-12-16 22:59:28|
|Mountain klontjes...|2018-12-16 22:52:10|
|Foe yong h

### List of other countries where products are sold to

In [22]:
#create df where countries are split off
df = productsDF\
.withColumn("country_list",split(col("Countries_en"),","))\
.select(col("Product_name"), explode(col("country_list")).alias("Country"))\
.withColumn("Product_list",split(col("Product_name"),","))\
.select(col("Country"), explode(col("Product_list")).alias("Product"))

#create count and filter out Country Belgium, Product Name can be changed as needed
countDF = df.groupBy("Product","Country").count()\
.filter(col("Country") !="Belgium")\
.filter(col('Product') == 'Café').show()

+-------+-------+-----+
|Product|Country|count|
+-------+-------+-----+
|   Café| France|    4|
|   Café|  Spain|    1|
+-------+-------+-----+



### •	Identify category of products and compute:    
  ####        o	Number of products by category      

In [23]:
productsDF.groupBy("Categories_en") \
    .agg(count("Product_name").alias("Number of products by category"))\
.sort(col("Number of products by category").desc()).show()

+--------------------+------------------------------+
|       Categories_en|Number of products by category|
+--------------------+------------------------------+
|                null|                         34401|
|Snacks,Sweet snac...|                           152|
|Groceries,Sauces,...|                           138|
|Snacks,Sweet snac...|                           136|
|           Beverages|                           118|
|Plant-based foods...|                           108|
|Snacks,Sweet snac...|                            92|
|Spreads,Breakfast...|                            91|
|Snacks,Sweet snac...|                            75|
|Dairies,Fermented...|                            72|
|Meats,Poultries,C...|                            68|
|    Groceries,Sauces|                            62|
|Snacks,Sweet snac...|                            59|
|Plant-based foods...|                            57|
|Groceries,Sauces,...|                            56|
|Dairies,Fermented...|      

  ####        o	List containing names of products by category    

In [24]:
#create df where categories and products are split off
df = productsDF\
.withColumn("Product_list",split(col("Product_name"),","))\
.select(col("Categories_en"), explode(col("Product_list")).alias("Product"))\
.withColumn("Categories_list",split(col("Categories_en"),","))\
.select(col("Product"), explode(col("Categories_list")).alias("Category"))

#create count for category, which can be changed as needed
countDF = df.groupBy("Category","Product").count()\
.filter(col('Category') == 'Snacks').show()

+--------+--------------------+-----+
|Category|             Product|count|
+--------+--------------------+-----+
|  Snacks|Waterbridge Belgi...|    1|
|  Snacks|Spéculoos au lait...|    1|
|  Snacks|Gaufres aux Œufs ...|    1|
|  Snacks|Lotus Zebra Choco...|    1|
|  Snacks|Collection 24 Min...|    1|
|  Snacks|Chamallows cocoballs|    1|
|  Snacks|Cantucci Toscani IGP|    1|
|  Snacks|Grissini del Roer...|    1|
|  Snacks|Lay's Oven baked ...|    1|
|  Snacks|        Mikado & GO!|    1|
|  Snacks|        M&M's Peanut|    1|
|  Snacks|       Quatre Quarts|    1|
|  Snacks|           Spéculoos|    7|
|  Snacks|Melk chocolade - ...|    1|
|  Snacks|            Winegums|    2|
|  Snacks|Figurines de Père...|    1|
|  Snacks| Gingembre sucré bio|    1|
|  Snacks|Heinz Tomato Ketc...|    2|
|  Snacks|Belgian Chocolate...|    1|
|  Snacks|               Snack|    1|
+--------+--------------------+-----+
only showing top 20 rows



   ####    What kind of category contains the most products?

In [25]:
df.groupBy("Category") \
    .agg(count("Product").alias("Number of products by category"))\
.sort(col("Number of products by category").desc()).show()

+--------------------+------------------------------+
|            Category|Number of products by category|
+--------------------+------------------------------+
|Plant-based foods...|                          5414|
|   Plant-based foods|                          4554|
|           Beverages|                          2667|
|              Snacks|                          2434|
|        Sweet snacks|                          1971|
|             Dairies|                          1880|
|Cereals and potatoes|                          1817|
|     Fermented foods|                          1392|
|Fruits and vegeta...|                          1371|
|Fermented milk pr...|                          1358|
|Cereals and their...|                          1263|
|           Groceries|                          1228|
|          Breakfasts|                          1182|
|               Meals|                          1142|
|             Spreads|                          1075|
|Plant-based bever...|      

### •	Identify traces and compute:  
   ####     o	Number of products by trace      


In [26]:
productsDF.groupBy("Traces_en") \
    .agg(count("Product_name").alias("Number of products by trace"))\
.sort(col("Number of products by trace").desc()).show()

+--------------------+---------------------------+
|           Traces_en|Number of products by trace|
+--------------------+---------------------------+
|                null|                      46376|
|                Nuts|                        445|
|                Milk|                        246|
|            Soybeans|                        132|
|              Gluten|                        129|
|        Nuts,Peanuts|                        124|
|           Milk,Nuts|                         94|
|         Gluten,Nuts|                         84|
|       Nuts,Soybeans|                         81|
|                Eggs|                         70|
|        Sesame seeds|                         60|
|    Eggs,Gluten,Nuts|                         58|
|  Milk,Nuts,Soybeans|                         55|
|   Nuts,Sesame seeds|                         49|
|              Celery|                         48|
|           Eggs,Nuts|                         43|
|    Gluten,Milk,Nuts|         

   ####     o	List containing names of products by trace  

In [27]:
#create df where products are split off
df = productsDF\
.withColumn("Product_list",split(col("Product_name"),","))\
.select(col("Traces_en"), explode(col("Product_list")).alias("Product"))\
.withColumn("Traces_list",split(col("Traces_en"),","))\
.select(col("Product"), explode(col("Traces_list")).alias("Traces"))\

#create count for category, which can be changed as needed
countDF = df.groupBy("Traces","Product").count()\
.filter(col('Traces') == 'Nuts').show()

+------+--------------------+-----+
|Traces|             Product|count|
+------+--------------------+-----+
|  Nuts|       Saucisson Sec|    1|
|  Nuts| Twix Barres Glacées|    1|
|  Nuts|       Barre Protein|    1|
|  Nuts|Cacahuètes caramé...|    1|
|  Nuts|Mélange fe fruits...|    1|
|  Nuts|           Chocotini|    2|
|  Nuts|Gaufres Au Sucre ...|    1|
|  Nuts|       Speculoos Eis|    1|
|  Nuts|          Alpro coco|    1|
|  Nuts|Collection 24 Min...|    1|
|  Nuts|       Milka Caramel|    1|
|  Nuts|        Sorbet Mango|    1|
|  Nuts|Super Crunchy Muesli|    1|
|  Nuts|             Amandes|    2|
|  Nuts|  Nature sans sucres|    1|
|  Nuts| Biscottes heudebert|    1|
|  Nuts|        Petit Beurre|    2|
|  Nuts|Spécial K Nourish...|    1|
|  Nuts|       Riz Cantonais|    1|
|  Nuts|Pâte à tartiner N...|    1|
+------+--------------------+-----+
only showing top 20 rows



   ####    Let's see what kind of trace is present in most products

In [28]:
df.groupBy("Traces") \
    .agg(count("Product").alias("Number of products by trace"))\
.sort(col("Number of products by trace").desc()).show()

+--------------------+---------------------------+
|              Traces|Number of products by trace|
+--------------------+---------------------------+
|                Nuts|                       2003|
|                Milk|                       1338|
|            Soybeans|                       1269|
|              Gluten|                       1006|
|                Eggs|                        875|
|        Sesame seeds|                        810|
|             Peanuts|                        522|
|              Celery|                        486|
|             Mustard|                        466|
|                Fish|                        243|
|               Lupin|                        208|
|         Crustaceans|                        198|
|            Molluscs|                        159|
|Sulphur dioxide a...|                        136|
|    fr:phenylalanine|                         37|
|           fr:noyaux|                         10|
|    fr:lait-d-amande|         

## 6. How healthy are the products?

### We identified a health index in the data

In [29]:
# Lets analyze this health index
Nut_SC_DF = originalDF\
    .select('nutriscore_score', 'nutrition-score-fr_100g', 'nutrition-score-uk_100g' )

In [30]:
print ("Summary nutriscore_score fields:")
Nut_SC_DF.summary().show()

print("Checking for nulls on nutriscore_score fields:")
Nut_SC_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in Nut_SC_DF.columns]).show()

print("Checking amount of distinct values in nutriscore_score fields:")
Nut_SC_DF.select([countDistinct(c).alias(c) for c in Nut_SC_DF.columns]).show()

Summary nutriscore_score fields:
+-------+-----------------+-----------------------+-----------------------+
|summary| nutriscore_score|nutrition-score-fr_100g|nutrition-score-uk_100g|
+-------+-----------------+-----------------------+-----------------------+
|  count|            13127|                  13126|                      3|
|   mean|9.039235105896694|      9.039235105896694|                    2.0|
| stddev|8.811044125665036|      8.811044125665036|      6.244997998398398|
|    min|              sel|                    -13|                     -3|
|    25%|              1.0|                      1|                     -3|
|    50%|              9.0|                      9|                      0|
|    75%|             16.0|                     16|                      9|
|    max|                9|                     34|                      9|
+-------+-----------------+-----------------------+-----------------------+

Checking for nulls on nutriscore_score fields:
+------

There are 38671 nulls, so while we could use this, we will explore other options

### Lets see if we have information on other health indicators

In [31]:
## Some nutrients used in the Nutri-index and others
O_Nutr_DF = originalDF\
    .select('carbohydrates_100g', 'fiber_100g', 'proteins_100g','energy-kj_100g','energy-kcal_100g')

# We tried all of these, but they are mostly nulls 'omega-3-fat_100g','trans-fat_100g','cholesterol_100g','-soluble-fiber_100g', 
#'-insoluble-fiber_100g','chlorophyl_100g'

In [32]:
print ("Summary other nutri fields:")
O_Nutr_DF.summary().show()

print("Checking for nulls on other nutri fields:")
O_Nutr_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in O_Nutr_DF.columns]).show()

print("Checking amount of distinct values in other nutri fields:")
O_Nutr_DF.select([countDistinct(c).alias(c) for c in O_Nutr_DF.columns]).show()

Summary other nutri fields:
+-------+------------------+------------------+-----------------+------------------+-----------------+
|summary|carbohydrates_100g|        fiber_100g|    proteins_100g|    energy-kj_100g| energy-kcal_100g|
+-------+------------------+------------------+-----------------+------------------+-----------------+
|  count|             40860|              7648|            41005|              2503|            39842|
|   mean|26.169418019180583|3.0425237101202893|8.085457162918072|1128.2764214942063|302.7755980208913|
| stddev|27.670610888821475|  5.30777235078442|9.217497274115868| 814.3256479287658|6681.553529305641|
|    min|               0.0|               0.0|              0.0|               0.0|              0.0|
|    25%|               3.3|               0.1|  1.2999999523163|             356.0|             96.0|
|    50%|              12.1|               1.5|              5.6|            1083.0|            251.0|
|    75%|              51.5|             140.

In [33]:
# We see that  energy-kj_100g has many NAs, and energy-kcal_100g has less, we will add (in the original DF) to energy-kj_100g info 
# from energy-kcal_100g as we will use this column later.

originalDF=originalDF.withColumn('energy-kj_100g', \
            when(originalDF["energy-kj_100g"].isNull(), originalDF['energy-kcal_100g']*4.184)\
            .otherwise(originalDF["energy-kj_100g"]))


In [34]:
# We won't be able to use Fibers as there many NAs, lets look at Fruits, another component of the index
FNV_DF = originalDF\
    .select('fruits-vegetables-nuts_100g','fruits-vegetables-nuts-dried_100g','fruits-vegetables-nuts-estimate_100g' )

In [35]:
print ("Summary Fruits and Vegetables fields:")
FNV_DF.summary().show()

print("Checking for nulls on Fruits and Vegetables fields:")
FNV_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in FNV_DF.columns]).show()

print("Checking amount of distinct values in Fruits and Vegetables fields:")
FNV_DF.select([countDistinct(c).alias(c) for c in FNV_DF.columns]).show()

Summary Fruits and Vegetables fields:
+-------+---------------------------+---------------------------------+------------------------------------+
|summary|fruits-vegetables-nuts_100g|fruits-vegetables-nuts-dried_100g|fruits-vegetables-nuts-estimate_100g|
+-------+---------------------------+---------------------------------+------------------------------------+
|  count|                        259|                               17|                                 432|
|   mean|         29.146447876447873|                48.88235294117647|                  42.406134259259254|
| stddev|         37.685193776457766|                33.95547149013907|                  30.934257751799812|
|    min|                        0.0|                              0.0|                                 0.0|
|    25%|                        0.0|                             33.0|                                12.5|
|    50%|                        9.0|                             57.5|                   

In [36]:
# Lets see how much info we have on vitamins
Vitamins_DF = originalDF\
    .select('vitamin-a_100g', 'beta-carotene_100g', 'vitamin-d_100g', 'vitamin-e_100g', 'vitamin-k_100g', 'vitamin-c_100g')

In [37]:
print ("Summary Vitamins:")
Vitamins_DF.summary().show()

print("Checking for nulls on Vitamins fields:")
Vitamins_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in Vitamins_DF.columns]).show()

print("Checking amount of distinct values in Vitamins fields:")
Vitamins_DF.select([countDistinct(c).alias(c) for c in Vitamins_DF.columns]).show()

Summary Vitamins:
+-------+--------------------+------------------+------------------+------------------+--------------------+------------------+
|summary|      vitamin-a_100g|beta-carotene_100g|    vitamin-d_100g|    vitamin-e_100g|      vitamin-k_100g|    vitamin-c_100g|
+-------+--------------------+------------------+------------------+------------------+--------------------+------------------+
|  count|                 128|                 1|               189|               144|                  10|               212|
|   mean|4.905472656250002E-4|              0.26|0.3717056251322751|1.0303782875000007|         0.001042455|0.7546620754716986|
| stddev|9.008793256273883E-4|               NaN| 4.703834618704441| 4.742594164475769|0.003147705036693...|5.2221010285027525|
|    min|                 0.0|              0.26|               0.0|               0.0|                 0.0|               0.0|
|    25%|              3.0E-5|              0.26|            7.5E-7|            0.0018

### Lets now create own own proxy index

In [38]:
# The only nutrients we have 80% or more info for are 'energy-kj_100g','sugars_100g','saturated-fat_100g','sodium_100g','proteins_100g'
NS_Proxy_DF=originalDF\
    .select('code','energy-kj_100g','sugars_100g','saturated-fat_100g','sodium_100g','proteins_100g','nutrition-score-fr_100g','nutriscore_grade')

In [39]:
#Lets now convert key numerics to floats
NS_Proxy_DF = NS_Proxy_DF.withColumn('energy-kj_100g', col('energy-kj_100g').cast('float'))\
                    .withColumn('sugars_100g', col('sugars_100g').cast('float'))\
                    .withColumn('saturated-fat_100g', col('saturated-fat_100g').cast('float'))\
                    .withColumn('sodium_100g', col('sodium_100g').cast('float'))\
                    .withColumn('proteins_100g', col('proteins_100g').cast('float'))\
                    .withColumn('nutrition-score-fr_100g', col('nutrition-score-fr_100g').cast('float'))

In [40]:
# As seen above there 38671 rows with out a value for Nutriscore

NS_Proxy_DF.select('sugars_100g','nutrition-score-fr_100g').groupby('nutrition-score-fr_100g')\
.agg(count(lit(1))).orderBy('nutrition-score-fr_100g').show(20)

#.agg(F.collect_set("Product_name")).orderBy("Traces_en")
#([count(when(col(c).isNull(), c)).alias(c)
#,'sugars_100g','saturated-fat_100g','sodium_100g','proteins_100g'

+-----------------------+--------+
|nutrition-score-fr_100g|count(1)|
+-----------------------+--------+
|                   null|   38671|
|                  -13.0|       2|
|                  -12.0|       6|
|                  -11.0|       7|
|                  -10.0|       9|
|                   -9.0|      15|
|                   -8.0|      34|
|                   -7.0|      47|
|                   -6.0|     151|
|                   -5.0|     180|
|                   -4.0|     284|
|                   -3.0|     281|
|                   -2.0|     316|
|                   -1.0|     523|
|                    0.0|     820|
|                    1.0|     628|
|                    2.0|     653|
|                    3.0|     551|
|                    4.0|     514|
|                    5.0|     449|
+-----------------------+--------+
only showing top 20 rows



In [41]:
#Let's see if we have full info on the nutrients for the ones we don't have a Nutriscore for
print("Nulls for columns in Health Index with no nutrition score:")
NS_Proxy_DF.filter(col("nutrition-score-fr_100g").isNull())\
        .select([count(when(col(c).isNull(), c)).alias(c) for c in NS_Proxy_DF.columns]).show()

#originalDF.groupby('Sugar?').pivot('Nutriscore?')\
#.agg(count(lit(1))).show()
#filter(col("nutrition-score-fr_100g")!='None')\

Nulls for columns in Health Index with no nutrition score:
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+
|code|energy-kj_100g|sugars_100g|saturated-fat_100g|sodium_100g|proteins_100g|nutrition-score-fr_100g|nutriscore_grade|
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+
|   0|         10458|      10778|             10630|      11091|        10713|                  38671|           38670|
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+



In [42]:
print("NUlls columns in Health Index for all rows:")
NS_Proxy_DF.select([count(when(col(c).isNull(), c)).alias(c) for c in NS_Proxy_DF.columns]).show()

NUlls columns in Health Index for all rows:
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+
|code|energy-kj_100g|sugars_100g|saturated-fat_100g|sodium_100g|proteins_100g|nutrition-score-fr_100g|nutriscore_grade|
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+
|   0|         10564|      10857|             10709|      11149|        10792|                  38671|           38670|
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+



We see from the above that most of the values that we don't have nutritional infor for our Index, we don't either have a calculated nutriscore, so we can't calculate a nutritional index, so we will drop all the rows with no information on nutrients  and see what our data frame looks like.

In [43]:
#We still have some nulls so we can remove all the sat fat nulls
NS_Proxy_DF.filter(col("sodium_100g").isNotNull()&(col("sugars_100g").isNotNull())&(col("saturated-fat_100g").isNotNull())\
                   &(col("energy-kj_100g").isNotNull())&(col("proteins_100g").isNotNull()))\
                    .select([count(when(col(c).isNull(), c)).alias(c) for c in NS_Proxy_DF.columns]).show()



+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+
|code|energy-kj_100g|sugars_100g|saturated-fat_100g|sodium_100g|proteins_100g|nutrition-score-fr_100g|nutriscore_grade|
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+
|   0|             0|          0|                 0|          0|            0|                  26670|           26670|
+----+--------------+-----------+------------------+-----------+-------------+-----------------------+----------------+



In [44]:
## CHeck the size of the DF
NS_Proxy_DF.filter(col("sodium_100g").isNotNull()&(col("sugars_100g").isNotNull())&(col("saturated-fat_100g").isNotNull())&(col("energy-kj_100g").isNotNull())&(col("proteins_100g").isNotNull())).count()

39684

In [45]:
##Actually create the bew DF with no nulls in nutrients to be usend in index
NS_Filtered=NS_Proxy_DF.filter(col("sodium_100g").isNotNull()&(col("sugars_100g").isNotNull())&(col("saturated-fat_100g").isNotNull())\
                   &(col("energy-kj_100g").isNotNull())&(col("proteins_100g").isNotNull()))

In [46]:
NS_Filtered.sample(False, 0.1).take(1)

[Row(code=11210115644.0, energy-kj_100g=67.0, sugars_100g=0.0, saturated-fat_100g=0.20000000298023224, sodium_100g=0.7200000286102295, proteins_100g=1.0, nutrition-score-fr_100g=6.0, nutriscore_grade='c')]

In [47]:
#Lets analyze the 5 nutrients we have infor for and the nutriscore column.
NS_Filtered['energy-kj_100g','sugars_100g','saturated-fat_100g','sodium_100g'].summary().show()

+-------+------------------+------------------+------------------+------------------+
|summary|    energy-kj_100g|       sugars_100g|saturated-fat_100g|       sodium_100g|
+-------+------------------+------------------+------------------+------------------+
|  count|             39684|             39684|             39684|             39684|
|   mean|1276.6580476085833|  12.3751026196086| 5.052917419416349|0.4497129997435734|
| stddev|  28010.8717556618|18.783854371124892|10.593822528828682|1.4594212693037263|
|    min|               0.0|               0.0|               0.0|               0.0|
|    25%|             418.4|               0.8|               0.2|              0.02|
|    50%|          1058.552|               3.5|               1.8|               0.2|
|    75%|          1665.232|              14.0|               6.8|              0.52|
|    max|         5578665.5|             119.0|            1400.0|              72.0|
+-------+------------------+------------------+-------

In [48]:
NS_Filtered['proteins_100g','nutrition-score-fr_100g','nutriscore_grade'].summary().show()

+-------+-----------------+-----------------------+----------------+
|summary|    proteins_100g|nutrition-score-fr_100g|nutriscore_grade|
+-------+-----------------+-----------------------+----------------+
|  count|            39684|                  13014|           13014|
|   mean|8.155955470560041|       9.09689565083756|            null|
| stddev|9.237262930134275|       8.81303714991348|            null|
|    min|              0.0|                  -13.0|               a|
|    25%|              1.3|                    2.0|            null|
|    50%|              5.7|                    9.0|            null|
|    75%|             11.8|                   16.0|            null|
|    max|            100.0|                   34.0|               e|
+-------+-----------------+-----------------------+----------------+



In [49]:
display(Markdown("This DataFrame has **%d rows** and ***%d columns***." % (NS_Filtered.count(), len(NS_Filtered.columns))))

This DataFrame has **39684 rows** and ***8 columns***.

There are some extreme values at the top end, so we will deal with the possible outliers in all 5 groups we will use for the Proxy Index

In [50]:
quartilesDF = NS_Filtered.select('energy-kj_100g','sugars_100g','saturated-fat_100g','sodium_100g','proteins_100g').summary("25%","50%","75%")

In [51]:
#energy-kj_100g
Q1=float(quartilesDF.where("summary='25%'").first()["energy-kj_100g"])
Q3=float(quartilesDF.where("summary='75%'").first()["energy-kj_100g"])
E_Med=float(quartilesDF.where("summary='50%'").first()["energy-kj_100g"])
E_UB=Q3 + (1.5 * (Q3-Q1))


In [52]:
#sugars_100g
Q1=float(quartilesDF.where("summary='25%'").first()["sugars_100g"])
Q3=float(quartilesDF.where("summary='75%'").first()["sugars_100g"])
S_Med=float(quartilesDF.where("summary='50%'").first()["sugars_100g"])
S_UB=Q3 + (1.5 * (Q3-Q1))

In [53]:
#saturated-fat_100g
Q1=float(quartilesDF.where("summary='25%'").first()["saturated-fat_100g"])
Q3=float(quartilesDF.where("summary='75%'").first()["saturated-fat_100g"])
F_Med=float(quartilesDF.where("summary='50%'").first()["saturated-fat_100g"])
F_UB=Q3 + (1.5 * (Q3-Q1))

In [54]:
#sodium_100g
Q1=float(quartilesDF.where("summary='25%'").first()["sodium_100g"])
Q3=float(quartilesDF.where("summary='75%'").first()["sodium_100g"])
SO_Med=float(quartilesDF.where("summary='50%'").first()["sodium_100g"])
SO_UB=Q3 + (1.5 * (Q3-Q1))

In [55]:
#proteins_100g
Q1=float(quartilesDF.where("summary='25%'").first()["proteins_100g"])
Q3=float(quartilesDF.where("summary='75%'").first()["proteins_100g"])
P_Med=float(quartilesDF.where("summary='50%'").first()["proteins_100g"])
P_UB=Q3 + (1.5 * (Q3-Q1))

In [56]:
#We will replace outliers with the median 

NS_Filtered=NS_Filtered.withColumn('energy-kj_100g', \
           when(NS_Filtered['energy-kj_100g']>E_UB,E_Med)\
            .otherwise(NS_Filtered['energy-kj_100g']))

NS_Filtered=NS_Filtered.withColumn('sugars_100g', \
            when(NS_Filtered['sugars_100g']>S_UB,S_Med)\
            .otherwise(NS_Filtered['sugars_100g']))

NS_Filtered=NS_Filtered.withColumn('saturated-fat_100g', \
           when(NS_Filtered['saturated-fat_100g']>F_UB,F_Med)\
            .otherwise(NS_Filtered['saturated-fat_100g']))

NS_Filtered=NS_Filtered.withColumn('sodium_100g', \
           when(NS_Filtered['sodium_100g']>SO_UB,SO_Med)\
            .otherwise(NS_Filtered['sodium_100g']))

NS_Filtered=NS_Filtered.withColumn('proteins_100g', \
            when(NS_Filtered['proteins_100g']>P_UB,P_Med)\
            .otherwise(NS_Filtered['proteins_100g']))

In [57]:
NS_Filtered.select('energy-kj_100g','sugars_100g','saturated-fat_100g','sodium_100g','proteins_100g').summary().show()

+-------+------------------+-----------------+-------------------+--------------------+------------------+
|summary|    energy-kj_100g|      sugars_100g| saturated-fat_100g|         sodium_100g|     proteins_100g|
+-------+------------------+-----------------+-------------------+--------------------+------------------+
|  count|             39684|            39684|              39684|               39684|             39684|
|   mean|1106.9307687621465|5.546259781231657|  3.086016121942874| 0.26891306391862757| 7.189432858922341|
| stddev| 775.6630732597955|7.343360271781261|  4.002988067976653| 0.29127238366218583|  6.94789573880771|
|    min|               0.0|              0.0|                0.0|                 0.0|               0.0|
|    25%| 418.3999938964844|0.800000011920929|0.20000000298023224|0.019999999552965164|1.2999999523162842|
|    50%|          1058.552|              3.5| 1.7999999523162842|                 0.2| 5.699999809265137|
|    75%|  1644.31201171875|         

We now seem to have less extreme values

#### Lets create the index

In [58]:
## Lets get a score for energy-kj_100g

NS_Filtered=NS_Filtered.withColumn('energy-kj_Score', \
            when(NS_Filtered["energy-kj_100g"]<=335,0)\
            .when((NS_Filtered["energy-kj_100g"]>335)&(NS_Filtered["energy-kj_100g"]<670),1)\
            .when((NS_Filtered["energy-kj_100g"]>670)&(NS_Filtered["energy-kj_100g"]<1005),2)\
            .when((NS_Filtered["energy-kj_100g"]>1005)&(NS_Filtered["energy-kj_100g"]<1340),3)\
            .when((NS_Filtered["energy-kj_100g"]>1340)&(NS_Filtered["energy-kj_100g"]<1675),4)\
            .when((NS_Filtered["energy-kj_100g"]>1675)&(NS_Filtered["energy-kj_100g"]<2010),5)\
            .when((NS_Filtered["energy-kj_100g"]>2010)&(NS_Filtered["energy-kj_100g"]<2345),6)\
            .when((NS_Filtered["energy-kj_100g"]>2345)&(NS_Filtered["energy-kj_100g"]<2680),7)\
            .when((NS_Filtered["energy-kj_100g"]>2680)&(NS_Filtered["energy-kj_100g"]<3015),8)\
            .when((NS_Filtered["energy-kj_100g"]>3015)&(NS_Filtered["energy-kj_100g"]<3350),9)\
            .otherwise(10))

Lets create a sugar score

In [59]:

NS_Filtered=NS_Filtered.withColumn('sugars_Score', \
            when(NS_Filtered["sugars_100g"]<=4.5,0)\
            .when((NS_Filtered["sugars_100g"]>4.5)&(NS_Filtered["sugars_100g"]<9),1)\
            .when((NS_Filtered["sugars_100g"]>9)&(NS_Filtered["sugars_100g"]<13.5),2)\
            .when((NS_Filtered["sugars_100g"]>13.5)&(NS_Filtered["sugars_100g"]<18),3)\
            .when((NS_Filtered["sugars_100g"]>18)&(NS_Filtered["sugars_100g"]<22.5),4)\
            .when((NS_Filtered["sugars_100g"]>22.5)&(NS_Filtered["sugars_100g"]<27),5)\
            .when((NS_Filtered["sugars_100g"]>27)&(NS_Filtered["sugars_100g"]<31),6)\
            .when((NS_Filtered["sugars_100g"]>31)&(NS_Filtered["sugars_100g"]<36),7)\
            .when((NS_Filtered["sugars_100g"]>36)&(NS_Filtered["sugars_100g"]<10),8)\
            .when((NS_Filtered["sugars_100g"]>40)&(NS_Filtered["sugars_100g"]<45),9)\
            .otherwise(10))

Lets create a fat score

In [60]:
NS_Filtered=NS_Filtered.withColumn('sat_fat_Score', \
            when(NS_Filtered["saturated-fat_100g"]<=1,0)\
            .when((NS_Filtered["saturated-fat_100g"]>1)&(NS_Filtered["saturated-fat_100g"]<2),1)\
            .when((NS_Filtered["saturated-fat_100g"]>2)&(NS_Filtered["saturated-fat_100g"]<3),2)\
            .when((NS_Filtered["saturated-fat_100g"]>3)&(NS_Filtered["saturated-fat_100g"]<4),3)\
            .when((NS_Filtered["saturated-fat_100g"]>4)&(NS_Filtered["saturated-fat_100g"]<5),4)\
            .when((NS_Filtered["saturated-fat_100g"]>5)&(NS_Filtered["saturated-fat_100g"]<6),5)\
            .when((NS_Filtered["saturated-fat_100g"]>6)&(NS_Filtered["saturated-fat_100g"]<7),6)\
            .when((NS_Filtered["saturated-fat_100g"]>7)&(NS_Filtered["saturated-fat_100g"]<8),7)\
            .when((NS_Filtered["saturated-fat_100g"]>8)&(NS_Filtered["saturated-fat_100g"]<9),8)\
            .when((NS_Filtered["saturated-fat_100g"]>9)&(NS_Filtered["saturated-fat_100g"]<10),9)\
            .otherwise(10))

Lets create a sodium score

In [61]:
NS_Filtered=NS_Filtered.withColumn('sodium_Score', \
            when(NS_Filtered["sodium_100g"]<=90,0)\
            .when((NS_Filtered["sodium_100g"]>90)&(NS_Filtered["sodium_100g"]<180),1)\
            .when((NS_Filtered["sodium_100g"]>180)&(NS_Filtered["sodium_100g"]<270),2)\
            .when((NS_Filtered["sodium_100g"]>270)&(NS_Filtered["sodium_100g"]<360),3)\
            .when((NS_Filtered["sodium_100g"]>360)&(NS_Filtered["sodium_100g"]<450),4)\
            .when((NS_Filtered["sodium_100g"]>450)&(NS_Filtered["sodium_100g"]<540),5)\
            .when((NS_Filtered["sodium_100g"]>540)&(NS_Filtered["sodium_100g"]<630),6)\
            .when((NS_Filtered["sodium_100g"]>630)&(NS_Filtered["sodium_100g"]<720),7)\
            .when((NS_Filtered["sodium_100g"]>720)&(NS_Filtered["sodium_100g"]<810),8)\
            .when((NS_Filtered["sodium_100g"]>810)&(NS_Filtered["sodium_100g"]<900),9)\
            .otherwise(10))

Lastly a protein score

In [62]:
NS_Filtered=NS_Filtered.withColumn('proteins_Score', \
            when(NS_Filtered["proteins_100g"]<=1.6,0)\
            .when((NS_Filtered["proteins_100g"]>1.6)&(NS_Filtered["proteins_100g"]<3.2),1)\
            .when((NS_Filtered["proteins_100g"]>3.2)&(NS_Filtered["proteins_100g"]<4.8),2)\
            .when((NS_Filtered["proteins_100g"]>4.8)&(NS_Filtered["proteins_100g"]<6.4),3)\
            .when((NS_Filtered["proteins_100g"]>6.4)&(NS_Filtered["proteins_100g"]<8),4)\
            .otherwise(5))

In [63]:
NS_Filtered.select('energy-kj_Score','sugars_Score','sat_fat_Score','sodium_Score','proteins_Score').summary().show()

+-------+-----------------+------------------+-----------------+------------+------------------+
|summary|  energy-kj_Score|      sugars_Score|    sat_fat_Score|sodium_Score|    proteins_Score|
+-------+-----------------+------------------+-----------------+------------+------------------+
|  count|            39684|             39684|            39684|       39684|             39684|
|   mean|2.826857171656083|0.9080485838121157|2.751763935087189|         0.0|2.6905049894163895|
| stddev|2.305593371555645|1.8926236483868455|3.668884959235313|         0.0| 2.058445860824061|
|    min|                0|                 0|                0|           0|                 0|
|    25%|                1|                 0|                0|           0|                 0|
|    50%|                3|                 0|                1|           0|                 3|
|    75%|                4|                 1|                5|           0|                 5|
|    max|               10|   

Put all three together to get our simplified proxy for the Nutriscore. We gave tripple the weight to proteins as it was the only
"good" atribute we had.

In [64]:
NS_Filtered=NS_Filtered.withColumn('Nutri_Score_Proxy', ((col("energy-kj_Score")+col("sugars_Score")\
                                   +col("sat_fat_Score")+col("sodium_Score")-3*col("proteins_Score"))))

Lets create the color code using the same scale as the Nutrindex
  

In [65]:
NS_Filtered=NS_Filtered.withColumn('Nutri_Score_Let', \
            when(NS_Filtered["Nutri_Score_Proxy"]<0,'A')\
            .when((NS_Filtered["Nutri_Score_Proxy"]>=0)&(NS_Filtered["Nutri_Score_Proxy"]<3),'B')\
            .when((NS_Filtered["Nutri_Score_Proxy"]>=3)&(NS_Filtered["Nutri_Score_Proxy"]<11),'C')\
            .when((NS_Filtered["Nutri_Score_Proxy"]>=11)&(NS_Filtered["Nutri_Score_Proxy"]<19),'D')\
            .otherwise('E'))

In [66]:
NS_Filtered.groupBy(NS_Filtered['Nutri_Score_Let']).agg(count(lit(1)).alias("#of_prods")).orderBy(col("Nutri_Score_Let")).show()

+---------------+---------+
|Nutri_Score_Let|#of_prods|
+---------------+---------+
|              A|    20571|
|              B|     9607|
|              C|     7320|
|              D|     1914|
|              E|      272|
+---------------+---------+



In [67]:
#Lets compare our clasification, to the Nutriscore
NS_Filtered.groupBy('Nutri_Score_Let').pivot('nutriscore_grade').agg(count(lit(1))).orderBy(col("Nutri_Score_Let")).show()

+---------------+-----+----+----+----+----+---+
|Nutri_Score_Let| null|   a|   b|   c|   d|  e|
+---------------+-----+----+----+----+----+---+
|              A|14064|1380|1247|1386|1780|714|
|              B| 6458| 408| 549| 888| 747|557|
|              C| 4803|  73|  98| 486|1003|857|
|              D| 1177|   2|   6|  96| 275|358|
|              E|  168|null|null|  51|  35| 18|
+---------------+-----+----+----+----+----+---+



As expected they are different, but at least we can get a proxy for how healthy each product is

In [68]:
#Lets see the distribution of scores
NS_Filtered.groupBy(round(NS_Filtered['Nutri_Score_Proxy'],0)).agg(count(lit(1)).alias("#of_prods")).orderBy(round(NS_Filtered['Nutri_Score_Proxy'],0)).show()

+---------------------------+---------+
|round(Nutri_Score_Proxy, 0)|#of_prods|
+---------------------------+---------+
|                        -15|      262|
|                        -14|     1077|
|                        -13|      675|
|                        -12|     1024|
|                        -11|     2224|
|                        -10|     1677|
|                         -9|     1249|
|                         -8|     1354|
|                         -7|     1021|
|                         -6|     1096|
|                         -5|     1617|
|                         -4|     1428|
|                         -3|     2042|
|                         -2|     2263|
|                         -1|     1562|
|                          0|     4216|
|                          1|     2609|
|                          2|     2782|
|                          3|     1567|
|                          4|     1600|
+---------------------------+---------+
only showing top 20 rows



In [69]:
#Lets find the 10 healthiest products according to our index
NS_Filtered.join(originalDF,NS_Filtered.code==originalDF.code,how='left')\
.select('Nutri_Score_Proxy','Product_name').orderBy('Nutri_Score_Proxy').show(10)

+-----------------+--------------------+
|Nutri_Score_Proxy|        Product_name|
+-----------------+--------------------+
|              -15|My authentic Gree...|
|              -15|Filets Cabillaud ...|
|              -15|       Fage total 0%|
|              -15|           Crevettes|
|              -15|  Black tiger scampi|
|              -15|  Filet de cabillaud|
|              -15|Cottage Cheese light|
|              -15|Brochettes de cre...|
|              -15|   Carpaccio de bœuf|
|              -15|Steak maitre d'hotel|
+-----------------+--------------------+
only showing top 10 rows

