In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .appName("openfood-kmeans") \
        .master("local[*]") \
        .config("spark.driver.cores", 4) \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "16g") \
        .getOrCreate()

23/06/12 15:47:31 WARN Utils: Your hostname, Daniils-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.110 instead (on interface en0)
23/06/12 15:47:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/12 15:47:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark \
    .read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", "\t") \
    .csv("../data/en.openfoodfacts.org.products.csv")

                                                                                

In [4]:
df.printSchema()

root
 |-- code: double (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: integer (nullable = true)
 |-- created_datetime: timestamp (nullable = true)
 |-- last_modified_t: integer (nullable = true)
 |-- last_modified_datetime: timestamp (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_en: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categories_tags: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- origins: string (nullable = true)
 |-- origins_tags: strin

In [5]:
len(df.columns)

203

In [6]:
import pyspark.sql.functions as F

In [7]:
gramm_cols = [c for c in df.columns if c.endswith("_100g")]
gramm_cols

['energy-kj_100g',
 'energy-kcal_100g',
 'energy_100g',
 'energy-from-fat_100g',
 'fat_100g',
 'saturated-fat_100g',
 'butyric-acid_100g',
 'caproic-acid_100g',
 'caprylic-acid_100g',
 'capric-acid_100g',
 'lauric-acid_100g',
 'myristic-acid_100g',
 'palmitic-acid_100g',
 'stearic-acid_100g',
 'arachidic-acid_100g',
 'behenic-acid_100g',
 'lignoceric-acid_100g',
 'cerotic-acid_100g',
 'montanic-acid_100g',
 'melissic-acid_100g',
 'unsaturated-fat_100g',
 'monounsaturated-fat_100g',
 'polyunsaturated-fat_100g',
 'omega-3-fat_100g',
 'alpha-linolenic-acid_100g',
 'eicosapentaenoic-acid_100g',
 'docosahexaenoic-acid_100g',
 'omega-6-fat_100g',
 'linoleic-acid_100g',
 'arachidonic-acid_100g',
 'gamma-linolenic-acid_100g',
 'dihomo-gamma-linolenic-acid_100g',
 'omega-9-fat_100g',
 'oleic-acid_100g',
 'elaidic-acid_100g',
 'gondoic-acid_100g',
 'mead-acid_100g',
 'erucic-acid_100g',
 'nervonic-acid_100g',
 'trans-fat_100g',
 'cholesterol_100g',
 'carbohydrates_100g',
 'sugars_100g',
 'added-

In [8]:
df = df.select([F.col("product_name")] + [F.col(c) for c in gramm_cols])
df.show(n=3)

23/06/12 15:47:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------+--------------+----------------+-----------+--------------------+--------+------------------+-----------------+-----------------+------------------+----------------+----------------+------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------------+------------------+------------------+--------------------+------------------------+------------------------+----------------+-------------------------+--------------------------+-------------------------+----------------+------------------+---------------------+-------------------------+--------------------------------+----------------+---------------+-----------------+-----------------+--------------+----------------+------------------+--------------+----------------+------------------+-----------+-----------------+------------+------------+-------------+------------+------------+------------------+-----------+------------+---------------+----------+---------

In [9]:
df = df.na.fill(0.0)

In [10]:
import pyspark.ml.feature as ml

assembler = ml.VectorAssembler(inputCols=gramm_cols, outputCol="vector_features")
df = assembler.transform(df)

In [11]:
df.select("vector_features").show(n=3, truncate=False)

+-------------------------------------------------------------------+
|vector_features                                                    |
+-------------------------------------------------------------------+
|(118,[],[])                                                        |
|(118,[1,2,4,5,41,42,53,56],[165.0,690.0,2.0,2.0,65.0,12.6,3.0,1.5])|
|(118,[4,5,41,42,56,60,62],[1.4,0.9,9.8,9.8,2.7,0.1,0.04])          |
+-------------------------------------------------------------------+
only showing top 3 rows



In [12]:
scaler = ml.StandardScaler(inputCol="vector_features", outputCol="features")
scalerModel = scaler.fit(df)
df = scalerModel.transform(df)

df.select("features").show(n=3, truncate=False)



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(118,[],[])                                                                                                                                                                                             |
|(118,[1,2,4,5,41,42,53,56],[5.350725664479935E-41,5.34793543729527E-41,3.093835477799566E-11,0.0011221673389586627,0.6730276959030767,2.1440280024654745E-10,5.201962214362218E-44,0.020335

                                                                                

In [13]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

import tqdm

metric = ClusteringEvaluator()
results = []
for k in tqdm.tqdm(range(2, 9, 2), desc="k"):
    for max_iter in [10, 20]:
        kmeans = KMeans().setK(k).setSeed(42).setMaxIter(max_iter)
        cluster_model = kmeans.fit(df)
        predictions = cluster_model.transform(df)
        performance_measure = metric.evaluate(predictions)
        results.append((k, max_iter, performance_measure))

23/06/12 15:48:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/12 15:48:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
k: 100%|██████████| 4/4 [05:54<00:00, 88.74s/it]                                


In [14]:
best_params = max(results, key=lambda x: x[2])
print(f"Best params for k-means (k, max_iter, clustering_score): {best_params}")

Best params for k-means (k, max_iter, clustering_score): (2, 10, 0.9999733656649217)
