In [29]:
# Add your imports here
import pandas as pd
import numpy as np
import scipy as sp
import unidecode
%matplotlib inline

import findspark
findspark.init('/opt/spark/spark-2.3.2-bin-hadoop2.7')

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.functions import min
from pyspark.sql.functions import lower, col

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Load Data

In [60]:
data = spark.read.option("delimiter", "\t").option("header", "true").csv("data/en.openfoodfacts.org.products.csv") 

### Clean Data

In [61]:
data.dtypes

[('code', 'string'),
 ('url', 'string'),
 ('creator', 'string'),
 ('created_t', 'string'),
 ('created_datetime', 'string'),
 ('last_modified_t', 'string'),
 ('last_modified_datetime', 'string'),
 ('product_name', 'string'),
 ('generic_name', 'string'),
 ('quantity', 'string'),
 ('packaging', 'string'),
 ('packaging_tags', 'string'),
 ('brands', 'string'),
 ('brands_tags', 'string'),
 ('categories', 'string'),
 ('categories_tags', 'string'),
 ('categories_en', 'string'),
 ('origins', 'string'),
 ('origins_tags', 'string'),
 ('manufacturing_places', 'string'),
 ('manufacturing_places_tags', 'string'),
 ('labels', 'string'),
 ('labels_tags', 'string'),
 ('labels_en', 'string'),
 ('emb_codes', 'string'),
 ('emb_codes_tags', 'string'),
 ('first_packaging_code_geo', 'string'),
 ('cities', 'string'),
 ('cities_tags', 'string'),
 ('purchase_places', 'string'),
 ('stores', 'string'),
 ('countries', 'string'),
 ('countries_tags', 'string'),
 ('countries_en', 'string'),
 ('ingredients_text', 'str

In [63]:
data.count()

689561

#### Origins

In [33]:
origins = data.groupBy("origins").count()
origins.count()

7462

This is 7462 dinstinct origin countries. This is inconvenient as we are interested in the independant states only
We therefore want to normalize this data. 

In [99]:
data.filter(data.origins_tags != "").groupBy("origins_tags").count().count()

6643

This category actually shows less 

In [100]:
def convert(unicodestring):
    return unidecode.unidecode(unicodestring)

udf_remove_accent = udf(convert)

origins_formated = data.filter(data.origins != "") \
                        .withColumn("origins", lower(col("origins"))) \
                        .withColumn("origins", udf_remove_accent(col("origins"))) \
                        .select("origins")

origins_grouped = origins_formated.groupBy("origins").count()
origins_grouped.sort(origins_grouped["count"].desc()).count()

6820

We might want to split data on strings, explode and then groupBy again 

In [115]:
countries = spark.read.csv("data/countries_fr.csv").drop("_c0","_c2")
countries = countries.withColumnRenamed("_c1", "country-code") \
            .withColumnRenamed("_c3", "alpha-3") \
            .withColumnRenamed("_c4", "name_fr") \
            .withColumnRenamed("_c5", "name_en") 
countries_fr.show(5)
print(countries_fr.count())

+------------+-------+-----------------+--------------+
|country-code|alpha-3|          name_fr|       name_en|
+------------+-------+-----------------+--------------+
|           4|    AFG|      Afghanistan|   Afghanistan|
|           8|    ALB|          Albanie|       Albania|
|          10|    ATA|      Antarctique|    Antarctica|
|          12|    DZA|          Algérie|       Algeria|
|          16|    ASM|Samoa Américaines|American Samoa|
+------------+-------+-----------------+--------------+
only showing top 5 rows

241


In [116]:
countries_formatted = countries.withColumn("name_fr", lower(col("name_fr")))\
                                .withColumn("name_fr", udf_remove_accent(col("name_fr")))\
                                .withColumn("name_en", lower(col("name_en")))\
                                .withColumn("name_en", udf_remove_accent(col("name_en")))
countries_formatted.show(5)

+------------+-------+-----------------+--------------+
|country-code|alpha-3|          name_fr|       name_en|
+------------+-------+-----------------+--------------+
|           4|    AFG|      afghanistan|   afghanistan|
|           8|    ALB|          albanie|       albania|
|          10|    ATA|      antarctique|    antarctica|
|          12|    DZA|          algerie|       algeria|
|          16|    ASM|samoa americaines|american samoa|
+------------+-------+-----------------+--------------+
only showing top 5 rows



Vous êtes libre de partager, distribuer ou utiliser cette base de données, pour des utilisations commerciales ou non, à condition de conserver cette licence et d’attribuer un lien vers le site sql.sh.

Join with origin dataframe

In [128]:
origins_joined_en = origins_grouped.join(countries_formatted, countries_formatted.name_en == origins_grouped.origins, "inner") \
                                .select("origins", "count", "country-code") \
                                .withColumnRenamed("origins", "origins_en") \
                                .withColumnRenamed("count", "count_en")
origins_joined_en.count()

115

In [129]:
origins_joined_fr = origins_grouped.join(countries_formatted, countries_formatted.name_fr == origins_grouped.origins, "inner") \
                                .select("origins", "count", "country-code") \
                                .withColumnRenamed("origins", "origins_fr") \
                                .withColumnRenamed("count", "count_fr")
origins_joined_fr.count()

124

In [130]:
origins_joined = origins_joined_en.join(origins_joined_fr, ["country-code"], "outer")
origins_joined.count()

129

In [149]:
origins_joined.orderBy("country-code").show(10)

+------------+----------+--------+----------+--------+
|country-code|origins_en|count_en|origins_fr|count_fr|
+------------+----------+--------+----------+--------+
|         100|  bulgaria|       9|  bulgarie|      18|
|         108|   burundi|       3|   burundi|       3|
|         116|  cambodia|       4|  cambodge|      13|
|          12|   algeria|       1|   algerie|      36|
|         120|  cameroon|       8|  cameroun|       6|
|         124|    canada|     235|    canada|     235|
|         144| sri lanka|     102| sri lanka|     102|
|         152|     chile|      27|     chili|      44|
|         156|     china|     210|     chine|     222|
|         158|    taiwan|      45|    taiwan|      45|
+------------+----------+--------+----------+--------+
only showing top 10 rows



In [150]:
origins_total = origins_joined.filter(origins_joined.origins_fr.isin(origins_joined.origins_en) == False)\
                .withColumn("total_count", col("count_en")+col("count_fr")) \
                .select("country-code", "origins_en", "total_count") \
                .orderBy("country-code")\
                .show()
                

+------------+------------------+-----------+
|country-code|        origins_en|total_count|
+------------+------------------+-----------+
|         100|          bulgaria|         27|
|         116|          cambodia|         17|
|          12|           algeria|         37|
|         120|          cameroon|         14|
|         152|             chile|         71|
|         156|             china|        432|
|         170|          colombia|         47|
|         191|           croatia|          7|
|         196|            cyprus|          4|
|         203|    czech republic|         13|
|         208|           denmark|         59|
|         214|dominican republic|         51|
|         218|           ecuador|        136|
|         233|           estonia|          4|
|         246|           finland|         17|
|         268|           georgia|         26|
|         276|           germany|        297|
|         300|            greece|        152|
|         308|           grenada| 

#### Packaging

In [94]:
data.filter(data.packaging != "").groupBy("packaging").count().count()

24873

In [93]:
data.filter(data.packaging_tags != "").groupBy("packaging_tags").count().count()

20035

In [None]:
packaging_join = 

#### Categories

In [95]:
data.filter(data.categories != "").groupBy("categories").count().count()

63319

In [96]:
data.filter(data.categories_tags != "").groupBy("categories_tags").count().count()

36487