# Data preparation for ML
And fit linear regression model

In [None]:
from pyspark.sql import SparkSession
import re

spark=SparkSession.builder.appName('recipes').getOrCreate()

df=spark.read.csv('./data/7_epi_r.csv',header=True)
df.show()

In [43]:
def clean_name(name):
    return re.sub('[^a-zA-Z0-9]','_',name)

cleand_names=[ clean_name(col) for col in df.columns]
df=df.toDF(*cleand_names)


In [None]:
# Cleanning the cakeweek and wastless columns.
df=df.filter((df['_cakeweek'].isNotNull() )  | (df['_cakeweek']>1 ) | (df['_cakeweek'] > 1 ) | (df['_cakeweek'] < 0))
df=df.filter((df['_wasteless'].isNotNull() )  | (df['_wasteless']>1 ) | (df['_wasteless'] > 1 ) | (df['_wasteless'] < 0))
df.show()

In [45]:
IDENTIFIERS = ["title"] 
CONTINUOUS_COLUMNS = [ "rating", "calories", "protein", "fat", "sodium", ] 
TARGET_COLUMN = ["dessert"] 
BINARY_COLUMNS = [ x for x in df.columns if x not in CONTINUOUS_COLUMNS and x not in TARGET_COLUMN and x not in IDENTIFIERS]

In [46]:
# Drop the rows with null values at the target colunn.
df=df.dropna(subset=TARGET_COLUMN)
# Drop the rows with null values as needed.
df=df.dropna(subset= CONTINUOUS_COLUMNS + BINARY_COLUMNS, how ='all')

# Fill the null binary values with 0.
df=df.fillna(subset=BINARY_COLUMNS,value=0)


In [47]:
# Convert the continuous values to int type
for col in df.columns:
    df=df.withColumn(col,df[col].cast('int'))

df=df.fillna(subset=CONTINUOUS_COLUMNS,value=0)


In [48]:
# Calculate the threshold for the given percentage
total_rows = df.count()
threshold_percentage = 0.9995  # 99.95%
threshold_row_count = int(total_rows * threshold_percentage)

# Identify columns to drop
columns_to_drop = []
for column in df.columns:
    value_counts = df.groupBy(column).count().collect()
    same_value_count = sum(1 for count in value_counts if count["count"] >= threshold_row_count)
    if same_value_count == 1:  # If only one value appears frequently, drop the column
        columns_to_drop.append(column)

# Drop identified columns
df_cleaned = df.drop(*columns_to_drop)


In [13]:
# Describe the continuous columms to detect the outliers.
for col in CONTINUOUS_COLUMNS:
    df.select(col).summary().show()

Removing outliers

<!-- Removing outliers -->

In [50]:
from pyspark.sql.functions import col, when

high_percentile_values = df.approxQuantile(CONTINUOUS_COLUMNS, [0.99], 0.01)[0]
low_percentile_values = df.approxQuantile(CONTINUOUS_COLUMNS, [0.11], 0.99)[0]



for col_name, high_val, low_val in zip(CONTINUOUS_COLUMNS, high_percentile_values, low_percentile_values):
    df = df.withColumn(col_name, 
                       when(col(col_name) > high_val, high_val)
                       .when(col(col_name) < low_val, low_val)
                       .otherwise(col(col_name)))

df.show()


Impute null values & Normalize the data using pipe

<!-- Imputing null values -->

In [None]:
from pyspark.ml.feature import Imputer,VectorAssembler,MinMaxScaler
from pyspark.ml import Pipeline

# Create an imputer to fill the null values wiht the avarage.
imputer = Imputer(
    inputCols=CONTINUOUS_COLUMNS,
    outputCols=[col_name  for col_name in CONTINUOUS_COLUMNS]
)

# In order to scale the data we have to assemble the continuous features to one column.
continuous_feature_assembler = VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS,
    outputCol="continues_features"
)

# Assemble the binary columns
binary_features_assembler=VectorAssembler(
    inputCols=BINARY_COLUMNS,
    outputCol='binary_features'
)

# Create a scaler to normalize the data.
continuous_scaler = MinMaxScaler(inputCol="continues_features", outputCol="scaled_continuous_features")

# Assemble all the features togather
feature_assembler=VectorAssembler(
    inputCols=['scaled_continuous_features','binary_features'],
    outputCol='features'
)
# Create a pipeline
pipeline = Pipeline(stages=[imputer,continuous_feature_assembler,binary_features_assembler, continuous_scaler,feature_assembler])

# Fit and transform the pipeline to scale the continuous columns.
pipeline_model = pipeline.fit(df)
df_scaled = pipeline_model.transform(df)

# df_scaled.show()

df_scaled.printSchema()

In [63]:
df_scaled.select('features','dessert').show()

+--------------------+-------+
|            features|dessert|
+--------------------+-------+
|(678,[0,1,2,3,4,2...|      0|
|(678,[0,1,2,3,4,3...|      0|
|(678,[0,1,2,3,4,1...|      0|
|(678,[0,62,176,18...|      0|
|(678,[0,1,2,3,4,3...|      0|
|(678,[0,1,2,3,4,3...|      0|
|(678,[0,29,60,62,...|      0|
|(678,[0,139,196,2...|      0|
|(678,[0,1,2,3,4,4...|      0|
|(678,[0,1,2,3,4,1...|      0|
|(678,[0,1,2,3,4,6...|      0|
|(678,[0,71,121,14...|      0|
|(678,[0,1,2,3,4,3...|      1|
|(678,[0,1,2,3,4,4...|      0|
|(678,[0,1,2,3,4,6...|      0|
|(678,[0,1,2,3,4,1...|      0|
|(678,[0,1,2,3,4,3...|      1|
|(678,[0,1,2,3,4,6...|      0|
|(678,[0,1,2,3,4,6...|      0|
|(678,[0,1,2,3,4,2...|      0|
+--------------------+-------+
only showing top 20 rows



Find correlation

In [64]:
from pyspark.ml.stat import Correlation

df_vectored=df_scaled.select('features','dessert')
df_vectored.head()
cor=Correlation.corr(df_vectored,'feautres').collect()[0][0]
print(cor)

Row(features=SparseVector(678, {0: 0.4, 1: 0.0, 2: 0.0001, 3: 0.0, 4: 0.0, 22: 1.0, 43: 1.0, 153: 1.0, 232: 1.0, 309: 1.0, 331: 1.0, 332: 1.0, 541: 1.0, 625: 1.0, 639: 1.0, 677: 1.0}), dessert=0)

# Fit linear regression model

In [65]:
from pyspark.ml.regression import LinearRegression

train_set,test_set=df_vectored.randomSplit([0.75,0.25])
linreg=LinearRegression(featuresCol='features',labelCol='dessert')
linreg=linreg.fit(train_set)

In [68]:
linreg.coefficients
# Train results:
res=linreg.evaluate(train_set)
preds_test=res.predictions.withColumn('prediction',when(col('prediction')>0.5 ,1).otherwise(0))
preds_test.show()


    

# Test results:
res=linreg.evaluate(test_set)
preds_train=res.predictions.withColumn('prediction',when(col('prediction')>0.5 ,1).otherwise(0))
preds_train.show()


+--------------------+-------+----------+
|            features|dessert|prediction|
+--------------------+-------+----------+
|(678,[0,1,2,3,4,7...|      0|         0|
|(678,[0,1,2,3,4,7...|      0|         0|
|(678,[0,1,2,3,4,9...|      0|         0|
|(678,[0,1,2,3,4,9...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      1|         1|
|(678,[0,1,2,3,4,1...|      1|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      1|         1|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
|(678,[0,1,2,3,4,1...|      0|         0|
+--------------------+-------+----

# Evaluate performence

In [77]:
# Calculate confusion matrix for train set
train_confusion_matrix = preds_train.groupBy("dessert", "prediction").count()
train_confusion_matrix.show()

+-------+----------+-----+
|dessert|prediction|count|
+-------+----------+-----+
|      1|         0|  134|
|      1|         1|  764|
|      0|         0| 4105|
|      0|         1|   35|
+-------+----------+-----+



In [78]:
# Calculate confusion matrix for test set
test_confusion_matrix = preds_test.groupBy("dessert", "prediction").count()
test_confusion_matrix.show()

+-------+----------+-----+
|dessert|prediction|count|
+-------+----------+-----+
|      1|         0|  317|
|      1|         1| 2358|
|      0|         0|12227|
|      0|         1|  111|
+-------+----------+-----+



In [79]:
# Calculate precision and recall for train set
tp = train_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 1)).first()["count"]
fp = train_confusion_matrix.filter((col("dessert") == 0) & (col("prediction") == 1)).first()["count"]
fn = train_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 0)).first()["count"]

precision_train = tp / (tp + fp)
recall_train = tp / (tp + fn)

print("Train Precision:", precision_train)
print("Train Recall:", recall_train)



# Calculate precision and recall for test set
tp = test_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 1)).first()["count"]
fp = test_confusion_matrix.filter((col("dessert") == 0) & (col("prediction") == 1)).first()["count"]
fn = test_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 0)).first()["count"]

precision_test = tp / (tp + fp)
recall_test = tp / (tp + fn)

print("Test Precision:", precision_test)
print("Test Recall:", recall_test)


Train Precision: 0.9561952440550688
Train Recall: 0.8507795100222717
Test Precision: 0.9550425273390036
Test Recall: 0.8814953271028038


# Add combined features- 
The same proccess with additional features

In [80]:
# Add the combined features to the df
df=df.withColumn('protein_ratio',df['protein']*4/df['calories'])
df=df.withColumn('fat_ratio',df['fat']*9/df['calories'])

# Add the combined column names to the list
CONTINUOUS_COLUMNS.append('fat_ratio')
CONTINUOUS_COLUMNS.append('protein_ratio')

# Impute and scale the data (again for the combined features)

imputer = Imputer(
    inputCols=CONTINUOUS_COLUMNS,
    outputCols=[col_name  for col_name in CONTINUOUS_COLUMNS]
)

continuous_feature_assembler = VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS,
    outputCol="continues_features"
)

binary_features_assembler=VectorAssembler(
    inputCols=BINARY_COLUMNS,
    outputCol='binary_features'
)

continuous_scaler = MinMaxScaler(inputCol="continues_features", outputCol="scaled_continuous_features")

feature_assembler=VectorAssembler(
    inputCols=['scaled_continuous_features','binary_features'],
    outputCol='features'
)

pipeline = Pipeline(stages=[imputer,continuous_feature_assembler,binary_features_assembler, continuous_scaler,feature_assembler])

pipeline_model = pipeline.fit(df)
df_scaled = pipeline_model.transform(df)

df_vectored=df_scaled.select('features','dessert')

# Fit the linear regression model
train_set,test_set=df_vectored.randomSplit([0.75,0.25])
linreg=LinearRegression(featuresCol='features',labelCol='dessert')
linreg=linreg.fit(train_set)

# Measure the results
# Train results:
res=linreg.evaluate(train_set)
preds_test=res.predictions.withColumn('prediction',when(col('prediction')>0.5 ,1).otherwise(0))
preds_test.show()

# Test results:
res=linreg.evaluate(test_set)
preds_train=res.predictions.withColumn('prediction',when(col('prediction')>0.5 ,1).otherwise(0))
preds_train.show()

# Confusion matrix for train and test:
# Calculate confusion matrix for train set
train_confusion_matrix = preds_train.groupBy("dessert", "prediction").count()
train_confusion_matrix.show()

# Calculate confusion matrix for test set
test_confusion_matrix = preds_test.groupBy("dessert", "prediction").count()
test_confusion_matrix.show()

# Calculate precision and recall for train set
tp = train_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 1)).first()["count"]
fp = train_confusion_matrix.filter((col("dessert") == 0) & (col("prediction") == 1)).first()["count"]
fn = train_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 0)).first()["count"]

precision_train = tp / (tp + fp)
recall_train = tp / (tp + fn)

print("Train Precision:", precision_train)
print("Train Recall:", recall_train)



# Calculate precision and recall for test set
tp = test_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 1)).first()["count"]
fp = test_confusion_matrix.filter((col("dessert") == 0) & (col("prediction") == 1)).first()["count"]
fn = test_confusion_matrix.filter((col("dessert") == 1) & (col("prediction") == 0)).first()["count"]

precision_test = tp / (tp + fp)
recall_test = tp / (tp + fn)

print("Test Precision:", precision_test)
print("Test Recall:", recall_test)


+--------------------+-------+----------+
|            features|dessert|prediction|
+--------------------+-------+----------+
|(680,[0,1,2,3,4,5...|      1|         1|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      1|         1|
|(680,[0,1,2,3,4,5...|      1|         0|
|(680,[0,1,2,3,4,5...|      1|         1|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
|(680,[0,1,2,3,4,5...|      0|         0|
+--------------------+-------+----