In [1]:
import pandas as pd

In [58]:
df = pd.read_csv("../data/openfood.csv")

In [60]:
df['energy_kcal_100g'].astype("float")

0       85.0
1      571.0
2      167.0
3      250.0
4      153.0
       ...  
995    467.0
996    192.0
997     48.0
998     56.0
999    536.0
Name: energy_kcal_100g, Length: 1000, dtype: float64

In [39]:
df.to_csv("../data/openfood.csv", index=False, encoding="utf-8")

In [50]:
df['energy-kcal_100g'].iloc[7]

165.0

In [54]:
10076064010467 - 2^31

10076064010494

In [53]:
df['code'].max()

10076064010467

In [42]:
df['energy-kcal_100g']

0       85.0
1      571.0
2      167.0
3      250.0
4      153.0
       ...  
995    467.0
996    192.0
997     48.0
998     56.0
999    536.0
Name: energy-kcal_100g, Length: 1000, dtype: float64

In [41]:
df['energy-kcal_100g'].astype("float")

0       85.0
1      571.0
2      167.0
3      250.0
4      153.0
       ...  
995    467.0
996    192.0
997     48.0
998     56.0
999    536.0
Name: energy-kcal_100g, Length: 1000, dtype: float64

In [36]:
print(df.dtypes)

code                         int64
product_name                object
energy-kcal_100g           float64
energy_100g                float64
fat_100g                   float64
saturated-fat_100g         float64
trans-fat_100g             float64
cholesterol_100g           float64
carbohydrates_100g         float64
sugars_100g                float64
fiber_100g                 float64
proteins_100g              float64
salt_100g                  float64
sodium_100g                float64
calcium_100g               float64
iron_100g                  float64
nutrition-score-fr_100g    float64
categories_en               object
dtype: object


In [8]:
# import findspark
# findspark.init()

import pyspark
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.master("local").appName("WordCountInline").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 12:09:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [77]:
spark.stop()

24/05/20 13:09:54 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/05/20 13:09:54 INFO SparkUI: Stopped Spark web UI at http://192.168.194.153:4040
24/05/20 13:09:54 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/05/20 13:09:54 INFO MemoryStore: MemoryStore cleared
24/05/20 13:09:54 INFO BlockManager: BlockManager stopped
24/05/20 13:09:54 INFO BlockManagerMaster: BlockManagerMaster stopped
24/05/20 13:09:54 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/05/20 13:09:54 INFO SparkContext: Successfully stopped SparkContext


In [10]:
import json

In [11]:
with open('../configs/features.json', "r") as columns_file:
    features = json.load(columns_file)

In [12]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import types, functions

In [13]:
dataset: pyspark.sql.DataFrame = spark.read.csv('../data/openfood.csv', header=True, inferSchema=True)

In [14]:
FEATURES_COLUMN = "scaled_feature"

id_columns = features["id"]
feature_numeric = features["numeric"]
numeric_columns = [
    functions.col(c).cast("float").alias(c) for c in feature_numeric
]
cat_columns = features["categorical"]

all_columns = id_columns + numeric_columns + cat_columns
df_with_selected_columns = dataset.select(*all_columns)

vec_assembler = VectorAssembler(
    inputCols=feature_numeric, outputCol="features"
)
df_with_features = vec_assembler.transform(df_with_selected_columns)

In [15]:
scaler = StandardScaler(inputCol="features", outputCol=FEATURES_COLUMN)

In [16]:
scaler_model = scaler.fit(df_with_features)

24/05/21 12:09:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [17]:
df_scaled_features = scaler_model.transform(df_with_features)

In [19]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_feature")
scaler_model = scaler.fit(df_with_features)
df_scaled_features = scaler_model.transform(df_with_features)

24/05/21 12:09:34 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [20]:
from pyspark.ml import clustering, evaluation

In [22]:
model = clustering.KMeans(featuresCol="scaled_feature", k=5, maxIter=10)


In [23]:
model_fit = model.fit(df_scaled_features)

24/05/21 12:11:09 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/21 12:11:09 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [26]:
output = model_fit.transform(df_scaled_features)

In [32]:
output = output.withColumn("prediction", output.prediction.cast("int"))

In [34]:
output.printSchema()

root
 |-- code: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- energy-kcal_100g: float (nullable = true)
 |-- energy_100g: float (nullable = true)
 |-- fat_100g: float (nullable = true)
 |-- saturated-fat_100g: float (nullable = true)
 |-- trans-fat_100g: float (nullable = true)
 |-- cholesterol_100g: float (nullable = true)
 |-- carbohydrates_100g: float (nullable = true)
 |-- sugars_100g: float (nullable = true)
 |-- fiber_100g: float (nullable = true)
 |-- proteins_100g: float (nullable = true)
 |-- salt_100g: float (nullable = true)
 |-- sodium_100g: float (nullable = true)
 |-- calcium_100g: float (nullable = true)
 |-- iron_100g: float (nullable = true)
 |-- nutrition-score-fr_100g: float (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled_feature: vector (nullable = true)
 |-- prediction: integer (nullable = false)

