In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, isnull, when, count, lit, sum
from pyspark.sql.types import StringType, DoubleType

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.instances", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "6") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
%ls -la /sparkdata

total 10666812
drwxrwxrwx 1 root root         512 Apr 15 22:27 [0m[34;42m.[0m/
drwxr-xr-x 1 root root        4096 Apr 16 06:46 [01;34m..[0m/
-rwxrwxrwx 1 root root 10915138215 Apr 14 06:31 [01;32men.openfoodfacts.org.products.csv[0m*
-rwxrwxrwx 1 root root     7669276 Apr 14 17:28 [01;32mgoogleplaystore_user_reviews.csv[0m*


In [4]:
csv_path = "/sparkdata/en.openfoodfacts.org.products.csv"

In [5]:
df = spark.read.option("delimiter", "\t") \
    .csv(csv_path, header=True, inferSchema=True) \
    .cache()

In [6]:
df.show(5, False) # use caching

+-----+----------------------------------------------------------------------------------------------------------+-------+----------+-------------------+---------------+----------------------+----------------+--------------+---------------------+----------------------------------------+------------------------+------------+------------------+---------+--------------+------------+--------------+--------------+-----------------+--------------+-------------------------------------------+----------------------------------------------------+----------------------------------------------+-------+------------+----------+--------------------+-------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
df.rdd.getNumPartitions()

82

In [8]:
# Get total number of rows
total_rows = df.count()
total_rows

3788985

In [9]:
df.describe().show()

+-------+--------------------+--------------------+---------+--------------------+--------------------+----------------+--------------------+------------+------------------------+--------------------+--------------------+---------+------------------+-------------+----------------------------+---------+-----------+---------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+-----------+---------------+----------------------+----------------------+------------------------+------+--------------------+------------------+--------------------+--------------------+--------------+-------------------+--------------------+--------------------+-------------------------+---------------------------+------------+--------------------+-----------+--------------------+--------------------+--------------------+-----------------+-----------------+--------------------+-

In [9]:
len(df.columns)

207

In [10]:
df.columns

['code',
 'url',
 'creator',
 'created_t',
 'created_datetime',
 'last_modified_t',
 'last_modified_datetime',
 'last_modified_by',
 'last_updated_t',
 'last_updated_datetime',
 'product_name',
 'abbreviated_product_name',
 'generic_name',
 'quantity',
 'packaging',
 'packaging_tags',
 'packaging_en',
 'packaging_text',
 'brands',
 'brands_tags',
 'brands_en',
 'categories',
 'categories_tags',
 'categories_en',
 'origins',
 'origins_tags',
 'origins_en',
 'manufacturing_places',
 'manufacturing_places_tags',
 'labels',
 'labels_tags',
 'labels_en',
 'emb_codes',
 'emb_codes_tags',
 'first_packaging_code_geo',
 'cities',
 'cities_tags',
 'purchase_places',
 'stores',
 'countries',
 'countries_tags',
 'countries_en',
 'ingredients_text',
 'ingredients_tags',
 'ingredients_analysis_tags',
 'allergens',
 'allergens_en',
 'traces',
 'traces_tags',
 'traces_en',
 'serving_size',
 'serving_quantity',
 'no_nutrition_data',
 'additives_n',
 'additives',
 'additives_tags',
 'additives_en',
 'nu

# Drop metadata

In [15]:
metadata_columns = [
    'creator', 
    'created_t', 
    'created_datetime', 
    'last_modified_t', 
    'last_modified_datetime',
    'last_modified_by',
    'last_updated_t', 
    'last_updated_datetime', 
    'last_image_t',
    'last_image_datetime',
]
metadata_columns

['creator',
 'created_t',
 'created_datetime',
 'last_modified_t',
 'last_modified_datetime',
 'last_modified_by',
 'last_updated_t',
 'last_updated_datetime',
 'last_image_t',
 'last_image_datetime']

In [16]:
df = df.drop(*metadata_columns)
df.show()

+------+--------------------+--------------------+------------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+------------+--------------------+--------------------+--------------------+------------+----------------+-----------------+-----------+---------+--------------------+--------------------+----------------+---------------

In [17]:
len(df.columns)

197

# Drop columns with urls

In [18]:
url_cols = [col for col in df.columns if 'url' in col]
url_cols

['url',
 'image_url',
 'image_small_url',
 'image_ingredients_url',
 'image_ingredients_small_url',
 'image_nutrition_url',
 'image_nutrition_small_url']

In [19]:
df = df.drop(*url_cols)
df.show()

+------+--------------------+------------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+------------+--------------------+--------------------+--------------------+------------+----------------+-----------------+-----------+---------+--------------------+--------------------+----------------+----------------+----------+--------

In [20]:
len(df.columns)

190

# Drop states

In [21]:
states_cols = [col for col in df.columns if 'states' in col]
states_cols

['states', 'states_tags', 'states_en']

In [22]:
df = df.drop(*states_cols)
df.show()

+------+--------------------+------------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+--------------------+------------+--------------------+--------------------+--------------------+------------+----------------+-----------------+-----------+---------+--------------------+--------------------+----------------+----------------+----------+--------

In [23]:
len(df.columns)

187

# Count null values

In [24]:
null_percentages = df.select([
    (count(when(
        col(c).contains('None') | 
        col(c).contains('NULL') |
        col(c).contains('unknown') |
        (col(c) == '') | 
        isnull(c) | 
        isnan(c), 
        c
    )) / lit(total_rows) * 100).alias(c)
    for c in df.columns
]).collect()[0].asDict()

In [25]:
null_percentages

{'code': 0.0,
 'product_name': 6.373659436498165,
 'abbreviated_product_name': 99.24869061239356,
 'generic_name': 95.42861742656675,
 'quantity': 67.10047677676212,
 'packaging': 89.96068868047774,
 'packaging_tags': 89.96583517749477,
 'packaging_en': 89.95535743741398,
 'packaging_text': 99.2469223288031,
 'brands': 40.238533538665365,
 'brands_tags': 40.246715149307796,
 'brands_en': 40.24404952777591,
 'categories': 56.738678036466226,
 'categories_tags': 56.7409213813198,
 'categories_en': 56.73957537440766,
 'origins': 96.03746121982536,
 'origins_tags': 96.05179223459581,
 'origins_en': 96.04516776920468,
 'manufacturing_places': 94.49749735087364,
 'manufacturing_places_tags': 94.4972334279497,
 'labels': 71.436466494325,
 'labels_tags': 71.44462171267503,
 'labels_en': 71.43670402495655,
 'emb_codes': 94.89456938995535,
 'emb_codes_tags': 94.89636406583821,
 'first_packaging_code_geo': 97.57544566684744,
 'cities': 100.0,
 'cities_tags': 97.38531559243438,
 'purchase_places':

In [27]:
high_null_cols = [
    col_name for col_name, percent in null_percentages.items() 
    if percent > 50]

high_null_cols

['abbreviated_product_name',
 'generic_name',
 'quantity',
 'packaging',
 'packaging_tags',
 'packaging_en',
 'packaging_text',
 'categories',
 'categories_tags',
 'categories_en',
 'origins',
 'origins_tags',
 'origins_en',
 'manufacturing_places',
 'manufacturing_places_tags',
 'labels',
 'labels_tags',
 'labels_en',
 'emb_codes',
 'emb_codes_tags',
 'first_packaging_code_geo',
 'cities',
 'cities_tags',
 'purchase_places',
 'stores',
 'ingredients_text',
 'ingredients_tags',
 'ingredients_analysis_tags',
 'allergens',
 'allergens_en',
 'traces',
 'traces_tags',
 'traces_en',
 'serving_size',
 'serving_quantity',
 'no_nutrition_data',
 'additives_n',
 'additives',
 'additives_tags',
 'additives_en',
 'nutriscore_score',
 'nutriscore_grade',
 'nova_group',
 'pnns_groups_1',
 'pnns_groups_2',
 'food_groups',
 'food_groups_tags',
 'food_groups_en',
 'brand_owner',
 'environmental_score_score',
 'environmental_score_grade',
 'nutrient_levels_tags',
 'product_quantity',
 'owner',
 'data_q

In [28]:
df = df.drop(*high_null_cols)
df.show()

+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----------------+-----------+--------+------------------+------------------+-----------+-------------+---------+-----------+
|  code|        product_name|              brands|         brands_tags|           brands_en|           countries|      countries_tags|        countries_en|completeness|energy-kcal_100g|energy_100g|fat_100g|saturated-fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|sodium_100g|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----------------+-----------+--------+------------------+------------------+-----------+-------------+---------+-----------+
|  54.0|Limonade artisana...|                NULL|                NULL|                NULL|               en:fr|        

In [29]:
len(df.columns)

18

In [30]:
df.columns

['code',
 'product_name',
 'brands',
 'brands_tags',
 'brands_en',
 'countries',
 'countries_tags',
 'countries_en',
 'completeness',
 'energy-kcal_100g',
 'energy_100g',
 'fat_100g',
 'saturated-fat_100g',
 'carbohydrates_100g',
 'sugars_100g',
 'proteins_100g',
 'salt_100g',
 'sodium_100g']

In [32]:
df.schema.fields

[StructField('code', DoubleType(), True),
 StructField('product_name', StringType(), True),
 StructField('brands', StringType(), True),
 StructField('brands_tags', StringType(), True),
 StructField('brands_en', StringType(), True),
 StructField('countries', StringType(), True),
 StructField('countries_tags', StringType(), True),
 StructField('countries_en', StringType(), True),
 StructField('completeness', DoubleType(), True),
 StructField('energy-kcal_100g', DoubleType(), True),
 StructField('energy_100g', DoubleType(), True),
 StructField('fat_100g', DoubleType(), True),
 StructField('saturated-fat_100g', DoubleType(), True),
 StructField('carbohydrates_100g', DoubleType(), True),
 StructField('sugars_100g', DoubleType(), True),
 StructField('proteins_100g', DoubleType(), True),
 StructField('salt_100g', DoubleType(), True),
 StructField('sodium_100g', DoubleType(), True)]

In [34]:
str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
dbl_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DoubleType)]
dbl_cols

['code',
 'completeness',
 'energy-kcal_100g',
 'energy_100g',
 'fat_100g',
 'saturated-fat_100g',
 'carbohydrates_100g',
 'sugars_100g',
 'proteins_100g',
 'salt_100g',
 'sodium_100g']

In [35]:
{col_name: percent for col_name, percent in null_percentages.items() 
 if col_name in dbl_cols}

{'code': 0.0,
 'completeness': 0.00675642685310182,
 'energy-kcal_100g': 27.088283537675657,
 'energy_100g': 25.795377917832873,
 'fat_100g': 26.60865640798261,
 'saturated-fat_100g': 30.320072526019505,
 'carbohydrates_100g': 26.490682860977284,
 'sugars_100g': 29.268946696806665,
 'proteins_100g': 26.47413489364566,
 'salt_100g': 35.93896518460748,
 'sodium_100g': 35.93901796919228}

# Nutrition 

- `completeness` - непонятная колонка
- `energy-kcal_100g` содержит больше пустых значений, чем `energy_100g`
- аналогично `saturated-fat_100g`

Carbohydrates, Proteins, Fats, Minerals, Dietary fibre

In [37]:
useful_cols = [
    'code',
    'energy_100g',
    'fat_100g',
    'carbohydrates_100g',
    'sugars_100g',
    'proteins_100g',
    'salt_100g',
    'sodium_100g',
]

In [38]:
df = df.select(*useful_cols)
df.show()

+------+-----------+--------+------------------+-----------+-------------+---------+-----------+
|  code|energy_100g|fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|sodium_100g|
+------+-----------+--------+------------------+-----------+-------------+---------+-----------+
|  54.0|       NULL|    NULL|              NULL|       NULL|         NULL|     NULL|       NULL|
|  63.0|     1389.0|    25.0|               3.0|        1.0|         23.0|      1.2|       0.48|
| 114.0|     2415.0|    44.0|              30.0|       27.0|          7.1|    0.025|       0.01|
|   1.0|       NULL|    NULL|              NULL|       NULL|         NULL|     NULL|       NULL|
| 105.0|       NULL|    NULL|              NULL|       NULL|         NULL|     NULL|       NULL|
|   2.0|      392.0|     5.6|               7.5|        6.2|          2.3|      0.4|       0.16|
|   3.0|       NULL|    NULL|              NULL|       NULL|         NULL|     NULL|       NULL|
|   4.0|     2401.0|     0.0| 

# Filter rows which contain more than one null values

In [None]:
df = df.na.drop()
df.show()

+-----+-----------+--------+------------------+-----------+-------------+---------+-----------+
| code|energy_100g|fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|sodium_100g|
+-----+-----------+--------+------------------+-----------+-------------+---------+-----------+
| 63.0|     1389.0|    25.0|               3.0|        1.0|         23.0|      1.2|       0.48|
|114.0|     2415.0|    44.0|              30.0|       27.0|          7.1|    0.025|       0.01|
|  2.0|      392.0|     5.6|               7.5|        6.2|          2.3|      0.4|       0.16|
|  4.0|     2401.0|     0.0|              26.0|       15.0|          1.0|      0.0|        0.0|
|475.0|      929.0|     0.0|              50.8|       50.6|          0.8|      0.0|        0.0|
|  5.0|     1620.0|     1.6|               6.7|        1.3|         82.0|      1.7|       0.68|
|  6.0|      962.0|    11.0|              25.0|       0.98|         22.0|     0.95|       0.38|
|  7.0|        4.0|     1.0|            

In [44]:
df.count()

2335084

In [45]:
total_rows

3788985