In [127]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [128]:
spark  = (
    SparkSession.builder.appName("Recipes ML model - Are you a dessert?")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

In [129]:
food = spark.read.csv(
    "../Data/recipes/epi_r.csv", inferSchema=True, header=True
)

In [130]:
def sanitize_colum_names(name):
    """The function dropes all the speshel characters and whitespaces
    and replaces them with the alphanumeric characters or the underscore and
    we only keep alpha numeric characters"""
    
    answer = name
    for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
        answer = answer.replace(i, j)
    return "".join(
        char
        for char in answer
        if char.isalpha() or char.isdigit() or char == "_"
    )

In [131]:
food = food.toDF(*[sanitize_colum_names(name) for name in food.columns])

In [132]:
import pandas as pd

pd.set_option("display.max_rows", 1000)

is_binary = food.agg(*[(F.size(F.collect_set(x))==2).alias(x) for x in food.columns]).toPandas()

In [133]:
IDENTIFIERS = ["title"]
CONTINUOUS_COLUMNS = [
    "rating",
    "calories",
    "protein",
    "fat",
    "sodium"]
TARGET_COLUMN = "desert"
BINARY_COLUMNS = [
    x for x in food.columns
    if x not in IDENTIFIERS
    and x not in CONTINUOUS_COLUMNS
    and x not in TARGET_COLUMN
]

In [134]:
food =  food.dropna(how="all", subset = [x for x in food.columns if x not in IDENTIFIERS])
food = food.fillna(0.0, subset=BINARY_COLUMNS)

In [135]:
from typing import Optional

In [136]:
@F.udf(T.BooleanType())
def is_a_nmber(value) -> bool:
    if value is None: #that's why we use optinoal, if there is no value return True
        return True
    try:
        float(value) #check if the value can be trasfered to float
        return True
    except ValueError:
        return False #return False if not

In [137]:
# food.select(*CONTINUOUS_COLUMNS).show()
# food.select(F.col('rating')).show()
# food.select("rating").show()

In [138]:
for column in ["rating", "calories"]:
    food = food.where(F.col("rating").cast("double").isNotNull())
    food = food.withColumn(column, F.col(column).cast(T.DoubleType()))
    
food.summary().show()

+-------+--------------------+-----------------+------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+--------------------+----

In [139]:
maximum = {
    "calories": 3203.0,
    "protein": 173.0,
    "fat": 207.0,
    "sodium": 5661.0
}

for k,v in maximum.items():
    food = food.withColumn(k, F.when(F.isnull(F.col(k)), F.col(k)).otherwise(F.least(F.col(k), F.lit(v))))

In [140]:
inst_sum_of_binary_columns = [F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS]

sum_of_binary_columns = (food.select(inst_sum_of_binary_columns).head().asDict())

num_rows = food.count()

to_rare_features=[
    k for k, v in sum_of_binary_columns.items()
    if v < 10 or v > (num_rows-10)
]

BINARY_COLUMNS = list(set(BINARY_COLUMNS)- set(to_rare_features))

In [141]:
food = food.withColumn(
    "protein_ratio", F.col("protein") *  4 / F.col("calories")
).withColumn(
    "fat_ratio", F.col("fat") * 9 / F.col("calories")
)

food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])

CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]

In [142]:
import pyspark.ml.feature as MF
from pyspark.ml.stat import Correlation

continuous_features = MF.VectorAssembler(inputCols=CONTINUOUS_COLUMNS, outputCol="continuous_features")
vector_variables = food.select(CONTINUOUS_COLUMNS)
for x in CONTINUOUS_COLUMNS:
    vector_variables = vector_variables.where(~F.isnull(F.col(x)))

vector_variables = continuous_features.transform(vector_variables)

correlation = Correlation.corr(vector_variables,"continuous_features")

correlation_array = correlation.head()[0].toArray()
correlation_pd = pd.DataFrame(correlation_array,
                              index=CONTINUOUS_COLUMNS,
                              columns=CONTINUOUS_COLUMNS)

In [143]:

OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "soium_i"]

imputer = MF.Imputer(strategy="mean", inputCols=OLD_COLS, outputCols=NEW_COLS)

imputer_model = imputer.fit(food)
food_imputed = imputer_model.transform(food)

CONTINUOUS_COLUMNS = list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS

CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]

continuous_assembler = MF.VectorAssembler(inputCols=CONTINUOUS_NB, outputCol="continuous")

food_features = continuous_assembler.transform(food_imputed)

continuous_scalaer = MF.MinMaxScaler(inputCol="continuous", outputCol="continuous_scaled")

food_features = continuous_scalaer.fit(food_features).transform(food_features)

In [144]:
food_features.select("continuous_scaled").show(5, 40)

+----------------------------------------+
|                       continuous_scaled|
+----------------------------------------+
|[0.5,0.13300031220730565,0.1734104046...|
|[0.875,0.12581954417733376,0.10404624...|
|[0.75,0.051514205432407124,0.03468208...|
|[1.0,0.14846151090620935,0.1163728223...|
|[0.625,0.17077739619107088,0.11560693...|
+----------------------------------------+
only showing top 5 rows



In [145]:
from pyspark.ml import Pipeline
import pyspark.ml.feature as MF

imputer = MF.Imputer(strategy="mean", 
                     inputCols=["calories", "protein", "fat", "sodium"],
                     outputCols=["calories_i", "protein_i", "fat_i", "sodium_i"])

continuous_assembler = MF.VectorAssembler(inputCols=["calories_i", "protein_i", "fat_i", "sodium_i"],
                                          outputCol="continuous")

continuous_scaler = MF.MinMaxScaler(inputCol="continuous", 
                                     outputCol="continuous_scalaed")

food_pipeline = Pipeline(stages=[imputer, continuous_assembler, continuous_scaler])

preml_assembler = MF.VectorAssembler(inputCols=BINARY_COLUMNS
                                     + ["continuous_scalaed"]
                                     + ["protein_ratio", "fat_ratio"],
                                     outputCol="features")

food_pipeline.setStages([imputer, continuous_assembler, continuous_scaler, preml_assembler])

food_features = food_pipeline.fit(food).transform(food)

In [146]:
food_features.select("title", "dessert", "features").show(5,False)

+-------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                                      |dessert|features                                                                                                                                                                                                                                     |
+-------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Lentil, Apple, and Turkey Wrap             |0.0    |(513,[63,68,78,83,88,156,220,288,316,336,412,507,508,509,510,511,512],[

In [147]:
# print(food_features.schema["features"])
# food_features.schema["features"].metadata

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="dessert", predictionCol="prediction")

food_pipeline.setStages([imputer, continuous_assembler, continuous_scaler, preml_assembler, lr])

train, test = food.randomSplit([0.7, 0.3], 10)
train.cache()

lr_model = food_pipeline.fit(train)

results = lr_model.transform(test)

results.groupby("dessert").pivot("prediction").count().show()