## Setup

In [1]:
import os
# give googe drive the required permission
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Create a folder in your drive and add the name of that folder here.
# For example, for the code below to run correctly, you need to have a folder named FoodRecSys in 'My Drive'.
# The said folder will be your home directory for the rest of the project.
# You will be able to save and read data from the folder.

os.chdir("/content/drive/MyDrive/FoodRecSys/")
os.getcwd()

'/content/drive/MyDrive/FoodRecSys'

In [3]:
os.chdir("/content/drive/MyDrive/food_recsys_project/Code_Files/")
os.getcwd()

'/content/drive/MyDrive/food_recsys_project/Code_Files'

In [4]:
try:
  import pyspark
except:
  !pip install pyspark==3.1.2
  import pyspark

Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.1.2)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m14.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880745 sha256=8a60f0237275db21ab14878a274ea3e1e68e45bdb65b9d4f37c989a1cd9a9162
  Stored in directory: /root/.cache/pip/wheels/ef/70/50/7882e1bcb5693225f7cc86698f10953201b48b3f36317c2d18
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

In [5]:
try:
  import lenskit
except:
  %pip install lenskit

Collecting lenskit
  Downloading lenskit-0.14.4-py3-none-any.whl (74 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.0/74.0 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
Collecting binpickle>=0.3.2 (from lenskit)
  Downloading binpickle-0.3.4-py3-none-any.whl (13 kB)
Collecting seedbank>=0.1.0 (from lenskit)
  Downloading seedbank-0.1.3-py3-none-any.whl (8.5 kB)
Collecting csr>=0.3.1 (from lenskit)
  Downloading csr-0.5.1-py3-none-any.whl (25 kB)
Collecting anyconfig==0.13.* (from seedbank>=0.1.0->lenskit)
  Downloading anyconfig-0.13.0-py2.py3-none-any.whl (87 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.8/87.8 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: binpickle, anyconfig, seedbank, csr, lenskit
Successfully installed anyconfig-0.13.0 binpickle-0.3.4 csr-0.5.1 lenskit-0.14.4 seedbank-0.1.3


In [6]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [7]:
spark = SparkSession.builder.master("local").config('spark.ui.port', '4050').getOrCreate()

In [8]:
spark

## Imports

In [9]:
# import necessary libraries
import pandas as pd
import numpy as np

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType, ArrayType

In [11]:
# Import the required functions for ALS and estimating

In [12]:
from lenskit import batch, topn, util
from lenskit.algorithms import Recommender, als, item_knn as knn

## Read the data

To connect the data files with your google collab, download them and upload them to your home folder in your google drive.

In [13]:
train_ratings_df = spark.read.parquet('train_interaction_level_df.parquet', # Replace the given path with the path for your file
                                      header=True,
                                      inferSchema=True)

In [14]:
test_ratings_all_df  = spark.read.parquet('test_interaction_level_all_recipies_df.parquet', # Replace the given path with the path for your file
                                          header=True,
                                          inferSchema=True)

In [15]:
raw_recipes_df = spark.read.csv("/content/RAW_recipes_cleaned.csv", # Replace the given path with the path for your file
                                header=True,
                                inferSchema=True)

In [16]:
raw_recipes_df.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|2002-06-17|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|
|all in the kitche...|112140|    130|        

In [17]:
# Count the total number of ratings in the dataset
numerator = train_ratings_df.select("rating").count()

# Count the number of distinct userIds and distinct recipe_Ids
num_users = train_ratings_df[['user_id']].distinct().count()
print('The number of unique users in the training data :', num_users)
num_recipes = train_ratings_df[['recipe_id']].distinct().count()
print('The number of unique recipes in the training data :', num_recipes)

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_recipes

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The training dataframe is ", "%.7f" % sparsity + "% empty.")

The number of unique users in the training data : 23362
The number of unique recipes in the training data : 18721
The training dataframe is  99.9829264% empty.


## Functions

In [18]:
def manual_recommendation_check (user_id):
  '''
  Given a user ID form the test dataset, this function will return the names of the recipes recommended to the user.

  Initialize this function after the all_recs data frame is calculated.

  Input user_id of a user from the test set as an integer.
  Prints the names of recipes recommended to this user.
  Returns nothing.
  '''
  recs_user = all_recs_als[all_recs_als.user == user_id]
  recs_user_list = list(recs_user.item.values)
  recs_user_list = [x.item() for x in recs_user_list]
  display((raw_recipes_df.filter(F.col('id').isin(recs_user_list))
               .select("name")
               .collect()
               ))

## Model

# Task 02 - Collaborative Filtering Model

Add the argument details in the algorithm initialization below to build the ALS model.

In [19]:
# Create ALS model
from pyspark.ml.recommendation import ALS
als = ALS(userCol= 'user_id',
          itemCol= 'recipe_id',
          ratingCol= 'rating'  ,
          nonnegative = True,
          implicitPrefs = False,
          coldStartStrategy="drop"
         )

In [20]:
assert type(als) == pyspark.ml.recommendation.ALS

#### Training

In [21]:
#Fit the model to the 'train' dataset
model = als.fit(train_ratings_df)

## Prediction

In [22]:
# use the model to create predictions for test data
test_predictions_unseen = model.transform(test_ratings_all_df)

In [23]:
assert test_predictions_unseen.select(F.col("prediction"))

In [24]:
test_predictions_unseen.select(F.col("user_id"),
                               F.col("recipe_id"),
                               F.col("rating"),
                               F.col("prediction")
                              ).show(5)

+-------+---------+------+----------+
|user_id|recipe_id|rating|prediction|
+-------+---------+------+----------+
| 199020|    55265|     5|  4.455839|
| 369284|    76143|     5| 3.4754832|
| 224235|    89385|     5| 4.5533595|
| 385423|    95476|     4| 4.5453777|
| 538098|    95476|     5| 4.7688656|
+-------+---------+------+----------+
only showing top 5 rows



In [25]:
# Each user in the test set must have 10 predictions.
# Use ALS model to get these predictions.
# You can use the recommendForAllUsers() method.

recommendations = model.recommendForAllUsers(10)
recommendations.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|  28170|[{13443, 5.942095...|
|  56680|[{188406, 6.68868...|
| 108460|[{38031, 5.826381...|
| 139830|[{92854, 5.186731...|
| 198430|[{108417, 5.90999...|
| 241660|[{108417, 5.84360...|
| 280340|[{38031, 5.749831...|
| 291650|[{309795, 5.98315...|
| 343180|[{178137, 5.90386...|
| 409910|[{178137, 5.83212...|
| 545970|[{265812, 5.00780...|
| 607820|[{178137, 6.14459...|
| 749060|[{268958, 4.05725...|
| 764700|[{347654, 5.63297...|
|1034510|[{108417, 6.03789...|
|1760440|[{242393, 5.72565...|
|  31261|[{146694, 6.98183...|
|  34061|[{227785, 4.98593...|
|  53191|[{26867, 3.026750...|
|  74281|[{178137, 6.42762...|
+-------+--------------------+
only showing top 20 rows



In [26]:
assert len(dict(recommendations.select(F.col("recommendations")).collect()[0][0])) == 10

In [27]:
# Transfroming the results by exploding the recommendations column

recommendations = (recommendations.select(F.col("user_id"),
                                          F.posexplode(F.col("recommendations")).alias("pos", "item"))
                                  .select(F.col("user_id"),
                                          F.col("pos"),
                                          F.col("item.recipe_id").alias("recomended_recipe_id"),
                                          F.col("item.rating").alias("predicted_rating")))

## Evaluation

# Task 04 - Model Evaluation

#### 1. RMSE

Add the argument details to the evaluator function below to calculate the RMSE score of the ALS model.

In [28]:
# Define evaluator as RMSE and print RMSE value
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_seen = RegressionEvaluator(metricName="rmse",
                                     labelCol= 'rating',  # add the name of the column that has the ratings
                                     predictionCol= 'prediction' # add the name of the column that has the predicted ratings.
                                     )

In [29]:
RMSE = evaluator_seen.evaluate(test_predictions_unseen) # Use the evaluator to find the RMSE on the test set.
print(RMSE)

1.4091824037430478


#### Rank based Metrics

We will use the lenskit library to calculate the ranking-based matrics. The lenskit library is available in pandas only so we need to convert the data frames from PySpark dataframes to Pandas dataframes.

In [30]:
all_recs_als = recommendations.toPandas()

In [31]:
# Rename the columns of to eunsre that they match the columns names as in the cell below.

column_names = {'user_id':'user' ,'pos': 'rank', 'recomended_recipe_id': 'item', 'predicted_rating': 'score'}# create a dictionary with current column names as keys and the intended column names as values.
all_recs_als = all_recs_als.rename(columns=column_names)

In [32]:
all_recs_als

Unnamed: 0,user,rank,item,score
0,28170,0,13443,5.942095
1,28170,1,108417,5.926983
2,28170,2,178137,5.859184
3,28170,3,360099,5.675150
4,28170,4,406089,5.630899
...,...,...,...,...
233615,1904821,5,9410,0.000000
233616,1904821,6,9970,0.000000
233617,1904821,7,10150,0.000000
233618,1904821,8,11440,0.000000


In [33]:
# Adding a column to make sure the we know these recommendations are from the ALS algorithm.

all_recs_als["algorithm"] = "ALS"

In [34]:
# Convert the test dataset to pandas and ensure that it has the same column names as shown in the cell below.
# Also, note there are only three columns in the test data. You have to ensure your test data looks identical.

test_data = test_ratings_all_df.select("user_id", "recipe_id","rating").withColumnRenamed("user_id", "user").withColumnRenamed("recipe_id","item").toPandas()


In [35]:
test_data

Unnamed: 0,user,item,rating
0,1802380878,35912,0
1,2001602879,40335,5
2,2758877,50348,0
3,199020,55265,5
4,369284,76143,5
...,...,...,...
18679,855082,438292,5
18680,1553277,447699,0
18681,2000072578,447699,5
18682,2775141,469503,5


In [36]:
# code to calculate the necessary metrics
# the code below uses lenskit library to evaluate the rank metrics
import lenskit
rla = topn.RecListAnalysis()
rla.add_metric(topn.recip_rank)
rla.add_metric(topn.ndcg)
rla.add_metric(topn.dcg)
results = rla.compute(all_recs_als, test_data)
results.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,28170,10,0.0,,
ALS,56680,10,0.0,0.0,0.0
ALS,108460,10,0.0,0.0,0.0
ALS,139830,10,0.0,,
ALS,198430,10,0.0,,


## Manual Prediction Checking

1. Why are ndcg and dcg nulls? EG: user 28170

In [37]:
test_data[test_data.user == 28170]

Unnamed: 0,user,item,rating


In [38]:
all_recs_als[all_recs_als.user == 28170]

Unnamed: 0,user,rank,item,score,algorithm
0,28170,0,13443,5.942095,ALS
1,28170,1,108417,5.926983,ALS
2,28170,2,178137,5.859184,ALS
3,28170,3,360099,5.67515,ALS
4,28170,4,406089,5.630899,ALS
5,28170,5,148828,5.61267,ALS
6,28170,6,320507,5.611255,ALS
7,28170,7,265431,5.550305,ALS
8,28170,8,217848,5.500812,ALS
9,28170,9,263419,5.500306,ALS


User ```28170``` does not appear in the test set. Hence cannot be evaluated.

2. Why are all metrics 0 for specific users?

In [39]:
test_data[test_data.user == 56680]

Unnamed: 0,user,item,rating
446,56680,229831,5


In [40]:
all_recs_als[all_recs_als.user == 56680]

Unnamed: 0,user,rank,item,score,algorithm
10,56680,0,188406,6.688689,ALS
11,56680,1,379283,6.447846,ALS
12,56680,2,178137,6.431056,ALS
13,56680,3,233938,6.340022,ALS
14,56680,4,166172,6.311109,ALS
15,56680,5,26867,6.295914,ALS
16,56680,6,121380,6.256392,ALS
17,56680,7,166170,6.241594,ALS
18,56680,8,169724,6.229252,ALS
19,56680,9,203862,6.200632,ALS


User-recipe combination does not appear in the recommendations set.

3. Are any non zero metrics?

In [41]:
results[results.ndcg > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,884421,10,0.25,0.5,2.153383
ALS,354725,10,0.142857,0.356207,1.666667


In [42]:
results[results.dcg > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,884421,10,0.25,0.5,2.153383
ALS,354725,10,0.142857,0.356207,1.666667


In [43]:
results[results.recip_rank > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,884421,10,0.25,0.5,2.153383
ALS,354725,10,0.142857,0.356207,1.666667


There are few user recipe combination that has a corresponding value in the test set. Hence, we have only few non-zero value of evaluation metrics.


Manually, check one prediction.

In [44]:
# use the function declared earlier to print the recomendations for the user 653438
def manual_recommendation_check (user_id):
  '''
  Given a user ID form the test dataset, this function will return the names of the recipes recommended to the user.

  Initialize this function after the all_recs data frame is calculated.

  Input user_id of a user from the test set as an integer.
  Prints the names of recipes recommended to this user.
  Returns nothing.
  '''
  recs_user = all_recs_als[all_recs_als.user == user_id]
  recs_user_list = list(recs_user.item.values)
  recs_user_list = [x.item() for x in recs_user_list]
  display((raw_recipes_df.filter(F.col('id').isin(recs_user_list))
               .select("name")
               .collect()
               ))
manual_recommendation_check(653438)

[Row(name='achy breaky shrimpy bakey'),
 Row(name='german baked apple pancake'),
 Row(name='italian hamburger veggie soup like olive garden soup'),
 Row(name='italian pattie sandwich')]

Of the recipes that have been recommended, few appear similar, and few do not.

## Saving the models and predictions

In [45]:
all_recs_als.to_csv("model_output/ALS/recommendation_als.csv", # modify the path
                    index=False)

In [46]:
model.save('/ALS/ALS_model.model') # modify the path