In [1]:
import os
# give googe drive the required permission
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
os.chdir("/content/drive/MyDrive/FoodRecSys/")
os.getcwd()

'/content/drive/MyDrive/FoodRecSys'

In [3]:
try:
  import pyspark 
except:
  !pip install pyspark==3.1.2
  import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 KB[0m [31m24.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880770 sha256=95a7104176b329e45982fe1ea2cc66f9dbf0547552ede68a68fe8a99a5deddbd
  Stored in directory: /root/.cache/pip/wheels/11/17/0b/53e7d10fe66ca7647d391cdba323fcf5b2f9dfcb7ebad87aa7
Successfully built pyspark
Installing collected packages: py4j, py

In [4]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [5]:
spark = SparkSession.builder.master("local").config('spark.ui.port', '4050').getOrCreate()

In [6]:
spark

**Importing the necessary libraries**

In [7]:
import pandas as pd 
import numpy as np

from pyspark.sql import functions as F
# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType, ArrayType

**Reading the data**

In [8]:
raw_ratings_df = (spark.read.csv("raw_ratings_small.csv",
                                 header=True, 
                                 inferSchema= True))

In [9]:
raw_recipes_df = spark.read.csv("raw_recipies_small.csv", # modify the path to read the data
                                header=True, 
                                inferSchema=True)

In [10]:
assert (raw_recipes_df.count(), len(raw_recipes_df.columns)) == (20340, 13)
assert (raw_ratings_df.count(), len(raw_ratings_df.columns)) == (93357, 5)

In [11]:
# Find the number of data points in the interaction dataset. 
# You can use the count() method. 
# The output must be an integer.

num_review_int = raw_ratings_df.count()

**Task 01 - Train Test Split**

In [12]:
test_num_reviews_int = round(num_review_int *0.2)

In [13]:
# Sort the interactions dataset in descending order of review date. 
# Extract ```test_num_reviews_int``` most recent reviews. 

temp_ratings_df = (raw_ratings_df.sort("review_date", ascending=False)
                                 .limit(test_num_reviews_int)
                  )

In [14]:
assert temp_ratings_df.count()  == 18671
assert raw_recipes_df.collect()[11][4] <= raw_recipes_df.collect()[10][4] 

In [15]:
temp_ratings_df.collect()[-1][4]

'2011-07-17'

Split the data into two parts before and after 2011-07-17.

All reviews in the ratings data after 2011-07-17 will not exsist in the training set.
For all future predictions the date will be set at 2011-07-18.

In [16]:
# Join raw_recipes and raw_ratings
# Use recipe_id as the key to join these dataframes 
# The resulting dataframe must have all rows from the raw_ratings dataframe. 

interaction_level_df = raw_ratings_df.join(
                                           raw_recipes_df,# dataframe 2 
                                           raw_ratings_df["recipe_id"]==raw_recipes_df["id"],# key to join 
                                          "outer"# how to join    
                                          )

In [17]:
interaction_level_df.show()

+----------+---------+------+--------------------+-----------+--------------------+-----+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|   user_id|recipe_id|rating|              review|review_date|                name|   id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|
+----------+---------+------+--------------------+-----------+--------------------+-----+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|    152552|    29601|     5|Excellent. It was...| 2004-08-20|zippy cold spicy ...|29601|    270|         30367|2002-05-29|['time-to-make', ...|[277.2, 15.0, 13....|     12|['in a heavy 10-i...|shrimp d

In [18]:
# Use the filter command to separate the datasets. 
# All interactions which were rated BEFORE '2011-07-17' will be train data. 

train_interaction_level_df  = (interaction_level_df.filter(  interaction_level_df["review_date"]<'2011-07-17'
                                             # add code to filter 
                                           ))
train_interaction_level_df.count()

74673

In [19]:
test_interaction_level_all_recipies_df  = (interaction_level_df.filter(  interaction_level_df["review_date"]>='2011-07-17'))
test_interaction_level_all_recipies_df.count()

18684

In [20]:
assert (test_interaction_level_all_recipies_df.count(), len(test_interaction_level_all_recipies_df.columns)) == (18684, 18)
assert (train_interaction_level_df.count(), len(train_interaction_level_df.columns)) == (74673, 18)

In [21]:
(train_interaction_level_df.coalesce(1)
                           .write.mode('overwrite')
                           .parquet('train_interaction_level_df.parquet'))

(test_interaction_level_all_recipies_df.coalesce(1)
                                       .write.mode('overwrite')
                                       .parquet('test_interaction_level_df.parquet'))

**Model building**

Importing the required functions for ALS and estimating

In [22]:
try:
  import lenskit 
except:
  %pip install lenskit
from lenskit import batch, topn, util
from lenskit.algorithms import Recommender, als, item_knn as knn
# Create ALS model
from pyspark.ml.recommendation import ALS


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting lenskit
  Downloading lenskit-0.14.2-py3-none-any.whl (74 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/74.0 KB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.0/74.0 KB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Collecting seedbank>=0.1.0
  Downloading seedbank-0.1.2-py3-none-any.whl (7.9 kB)
Collecting binpickle>=0.3.2
  Downloading binpickle-0.3.4-py3-none-any.whl (13 kB)
Collecting csr>=0.3.1
  Downloading csr-0.4.3-py3-none-any.whl (23 kB)
Collecting anyconfig
  Downloading anyconfig-0.13.0-py2.py3-none-any.whl (87 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.8/87.8 KB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: binpickle, anyconfig, seedbank, csr, lenskit
Successfully installed anyconfig-0.13.0 binpickle-0.3.4 csr-0.4.3 lenskit-0.14.2 

**Reading the data for model building**

In [23]:
train_ratings_df = spark.read.parquet('train_interaction_level_df.parquet',
                                      header=True, 
                                      inferSchema=True)

In [24]:
test_ratings_all_df  = spark.read.parquet('test_interaction_level_df.parquet', 
                                          header=True, 
                                          inferSchema=True)

In [25]:
raw_recipes_df = spark.read.csv("RAW_recipes_cleaned.csv",
                                header=True, 
                                inferSchema=True)

In [26]:
# Count the total number of ratings in the dataset
numerator = train_ratings_df.select("rating").count()

# Count the number of distinct userIds and distinct recipe_Ids
num_users =  train_ratings_df.select("user_id").distinct().count()#find the number of unique users in the training data. The output must be an integer
num_recipes = train_ratings_df.select("recipe_id").distinct().count()# find the number of unique recipes in the training data. The output must be an integer

# Set the denominator equal to the number of users multiplied by the number of recipes
denominator = num_users * num_recipes

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The training dataframe is ", "%.7f" % sparsity + "% empty.")

The training dataframe is  99.9829264% empty.


**Functions**

In [27]:
def manual_recommendation_check (user_id):
  '''
  Given a user ID form the test dataset, this function will return the names of the recipes recommended to the user. 

  Initialize this function after the all_recs data frame is calculated. 

  Input user_id of a user from the test set as an integer.  
  Prints the names of recipes recommended to this user. 
  Returns nothing. 
  '''
  recs_user = all_recs_als[all_recs_als.user == user_id]
  recs_user_list = list(recs_user.item.values)
  recs_user_list = [x.item() for x in recs_user_list]
  display((raw_recipes_df.filter(F.col('id').isin(recs_user_list))
               .select("name")
               .collect()
               ))

**Model**

Task 02 - Collaborative Filtering Model

In [28]:
als = ALS(userCol= 'user_id', # add the name of the column for users  
          itemCol= 'recipe_id', # add the name of the column for recipes
          ratingCol= 'rating', # add the name of the column for ratings 
          nonnegative = True, 
          implicitPrefs = False, 
          coldStartStrategy="drop"
         )

In [29]:
assert type(als) == pyspark.ml.recommendation.ALS

**Training**

In [30]:
model = als.fit(train_ratings_df)

**Prediction**

In [31]:
# use the model to create predictions for test data
test_predictions_unseen = model.transform(test_ratings_all_df)

In [32]:
assert test_predictions_unseen.select(F.col("prediction"))

In [33]:
test_predictions_unseen.select(F.col("user_id"),
                               F.col("recipe_id"),
                               F.col("rating"),
                               F.col("prediction")
                              ).show(5)

+-------+---------+------+----------+
|user_id|recipe_id|rating|prediction|
+-------+---------+------+----------+
| 199020|    55265|     5| 4.7250657|
| 369284|    76143|     5| 3.9014056|
| 224235|    89385|     5|  4.553422|
| 385423|    95476|     4| 3.9325652|
| 538098|    95476|     5| 4.3173947|
+-------+---------+------+----------+
only showing top 5 rows



In [34]:
recommendations = model.recommendForAllUsers(10)

In [35]:
assert len(dict(recommendations.select(F.col("recommendations")).collect()[0][0])) == 10

In [36]:
recommendations = (recommendations.select(F.col("user_id"),
                                          F.posexplode(F.col("recommendations")).alias("pos", "item")) 
                                  .select(F.col("user_id"),
                                          F.col("pos"), 
                                          F.col("item.recipe_id").alias("recomended_recipe_id"), 
                                          F.col("item.rating").alias("predicted_rating")))

**Evaluation**

Task 04 - Model Evaluation
1. RMSE

In [37]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_seen = RegressionEvaluator(metricName="rmse", 
                                     labelCol= "rating",  # add the name of the column that has the ratings 
                                     predictionCol="prediction" # add the name of the column that has the predicted ratings. 
                                     ) 

In [38]:
RMSE = evaluator_seen.evaluate(test_predictions_unseen) # Use the evaluator to find the RMSE on the test set. 
print(RMSE)

1.405685968358892


**Rank based Metrics**

We will use the lenskit library to calculate the ranking-based matrics. The lenskit library is available in pandas only so we need to convert the data frames from PySpark dataframes to Pandas dataframes.

In [39]:
all_recs_als = recommendations.toPandas()

In [40]:
# Rename the columns of to eunsre that they match the columns names as in the cell below. 

column_names = {"user_id":"user","pos":"rank","recomended_recipe_id":"item", "predicted_rating":"score"}# create a dictionary with current column names as keys and the intended column names as values. 
all_recs_als = all_recs_als.rename(columns=column_names)

In [41]:
all_recs_als

Unnamed: 0,user,rank,item,score
0,28170,0,237750,6.075220
1,28170,1,260665,5.993740
2,28170,2,319562,5.974010
3,28170,3,159279,5.815721
4,28170,4,146652,5.771141
...,...,...,...,...
233615,1904821,5,9410,0.000000
233616,1904821,6,9970,0.000000
233617,1904821,7,10150,0.000000
233618,1904821,8,11440,0.000000


In [42]:
# Adding a column to make sure the we know these recommendations are from the ALS algorithm. 

all_recs_als["algorithm"] = "ALS"

In [43]:
# Convert the test dataset to pandas and ensure that it has the same column names as shown in the cell below. 
# Also, note there are only three columns in the rest data. You have to ensure your test data looks identical. 

test_data = (test_ratings_all_df.select(F.col("user_id").alias('user'),
                                        F.col("recipe_id").alias('item'),
                                        F.col("rating"))# add code to select the necessary columns and change the name of the columns. 
                                .toPandas())

In [44]:
test_data

Unnamed: 0,user,item,rating
0,1802380878,35912,0
1,2001602879,40335,5
2,2758877,50348,0
3,199020,55265,5
4,369284,76143,5
...,...,...,...
18679,855082,438292,5
18680,1553277,447699,0
18681,2000072578,447699,5
18682,2775141,469503,5


In [45]:
# code to calculate the necessary metrics 
# the code below uses lenskit library to evaluate the rank metrics

rla = topn.RecListAnalysis()
rla.add_metric(topn.recip_rank)
rla.add_metric(topn.ndcg)
rla.add_metric(topn.dcg)
results = rla.compute(all_recs_als, test_data)
results.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,28170,10,0.0,,
ALS,56680,10,0.0,0.0,0.0
ALS,108460,10,0.0,0.0,0.0
ALS,139830,10,0.0,,
ALS,198430,10,0.0,,


**Manual Prediction Checking**

1. Why are ndcg and dcg nulls? EG: user 28170

In [46]:
test_data[test_data.user == 28170]

Unnamed: 0,user,item,rating


In [47]:
all_recs_als[all_recs_als.user == 28170]

Unnamed: 0,user,rank,item,score,algorithm
0,28170,0,237750,6.07522,ALS
1,28170,1,260665,5.99374,ALS
2,28170,2,319562,5.97401,ALS
3,28170,3,159279,5.815721,ALS
4,28170,4,146652,5.771141,ALS
5,28170,5,108417,5.744643,ALS
6,28170,6,243882,5.737638,ALS
7,28170,7,280255,5.715251,ALS
8,28170,8,403906,5.647476,ALS
9,28170,9,306862,5.635936,ALS


User ```28170``` does not appear in the test set. Hence cannot be evaluated. 

2. Why are all metrics 0 for specific users?

In [48]:
test_data[test_data.user == 56680]

Unnamed: 0,user,item,rating
446,56680,229831,5


In [49]:
all_recs_als[all_recs_als.user == 56680]

Unnamed: 0,user,rank,item,score,algorithm
10,56680,0,236846,6.494751,ALS
11,56680,1,343747,6.491288,ALS
12,56680,2,135589,6.390859,ALS
13,56680,3,385440,6.39001,ALS
14,56680,4,292085,6.289631,ALS
15,56680,5,342007,6.272626,ALS
16,56680,6,286972,6.241429,ALS
17,56680,7,365165,6.241128,ALS
18,56680,8,238885,6.219762,ALS
19,56680,9,135483,6.191377,ALS


User-recipe combination does not appear in the recommendations set. 

3. Are any non zero metrics? 

In [50]:
results[results.ndcg > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,593927,10,0.125,0.028396,1.577324


In [51]:
results[results.dcg > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,593927,10,0.125,0.028396,1.577324


In [52]:
results[results.recip_rank > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,593927,10,0.125,0.028396,1.577324


There are few user recipe combination that has a corresponding value in the test set. Hence, we have only few non-zero value of evaluation metrics.

## Saving the models and predictions

In [53]:
all_recs_als.to_csv("recommendation_als.csv", 
                    index=False)

In [54]:
model.save('ALS_model.model')