# Recipe Recommender Assignment - ALS Model

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=07b266be51686f31165074ab45638c7283766c5f7c1c87f614daaa02340de0c2
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import os
# give googe drive the required permission
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [4]:
# Lets create first working directory for accessing dataset

# I have created folder in your google drive named  Receipe_Recommendation_Assignment_M in 'My Drive'.
# The said folder will be your home directory for the rest of the project.
# You will be able to save and read data from the folder.

os.chdir("/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML/")
os.getcwd()

'/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML'

In [6]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [7]:
spark = SparkSession.builder.master("local").config('spark.ui.port', '4050').getOrCreate()

In [8]:
spark

## Imports

In [9]:
# import necessary libraries
import pandas as pd
import numpy as np

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType, ArrayType


In [11]:
# Import the required functions for ALS and estimating

In [12]:
try:
  import lenskit
except:
  %pip install lenskit

Collecting lenskit
  Downloading lenskit-0.14.2-py3-none-any.whl (74 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/74.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.0/74.0 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Collecting binpickle>=0.3.2 (from lenskit)
  Downloading binpickle-0.3.4-py3-none-any.whl (13 kB)
Collecting seedbank>=0.1.0 (from lenskit)
  Downloading seedbank-0.1.2-py3-none-any.whl (7.9 kB)
Collecting csr>=0.3.1 (from lenskit)
  Downloading csr-0.5.0-py3-none-any.whl (25 kB)
Collecting anyconfig (from seedbank>=0.1.0->lenskit)
  Downloading anyconfig-0.13.0-py2.py3-none-any.whl (87 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.8/87.8 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: binpickle, anyconfig, seedbank, csr, lenskit
Successfully installed anyconfig-0.13.0 binpickle-0.3.4 csr-0.5.0 lenskit-0.14.2 seedbank-0.1.2


In [13]:
from lenskit import batch, topn, util
from lenskit.algorithms import Recommender, als, item_knn as knn

## Read the data

In [14]:
train_ratings_df = spark.read.parquet('/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML/train_interaction_level_df.parquet', # Replace the given path with the path for your file
                                      header=True,
                                      inferSchema=True)

In [15]:
train_ratings_df.show(10)

+-------+---------+------+--------------------+-----------+--------------------+---+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|user_id|recipe_id|rating|              review|review_date|                name| id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|
+-------+---------+------+--------------------+-----------+--------------------+---+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|  14000|      360|     5|A great dish  lov...| 2002-01-17|baked zucchini fr...|360|     67|          1587|1999-08-09|['weeknight', 'ti...|[200.2, 19.0, 19....|     11|['heat oven to 40...|                null|['zucch

In [16]:
train_ratings_df.describe()

DataFrame[summary: string, user_id: string, recipe_id: string, rating: string, review: string, name: string, id: string, minutes: string, contributor_id: string, tags: string, nutrition: string, n_steps: string, steps: string, description: string, ingredients: string, n_ingredients: string, year_of_review: string]

In [17]:
train_ratings_df.count()

74673

In [18]:
test_ratings_all_df  = spark.read.parquet('/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML/test_interaction_level_df.parquet', # Replace the given path with the path for your file
                                          header=True,
                                          inferSchema=True)

In [19]:
test_ratings_all_df.show(5)

+----------+---------+------+--------------------+-----------+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|   user_id|recipe_id|rating|              review|review_date|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|
+----------+---------+------+--------------------+-----------+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|   1911882|   205270|     4|I make pizza in m...| 2013-05-03|bacon cheeseburge...|205270|     37|         89831|2007-01-13|['60-minutes-or-l...|[813.2, 93.0, 13....|      8|['pre-bake the pi...|the 

In [20]:
test_ratings_all_df.describe()

DataFrame[summary: string, user_id: string, recipe_id: string, rating: string, review: string, name: string, id: string, minutes: string, contributor_id: string, tags: string, nutrition: string, n_steps: string, steps: string, description: string, ingredients: string, n_ingredients: string, year_of_review: string]

In [21]:
test_ratings_all_df.count()

18684

In [22]:
raw_recipes_df = spark.read.csv("/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML/RAW_recipes_cleaned.csv", # Replace the given path with the path for your file
                                header=True,
                                inferSchema=True)

In [23]:
raw_recipes_df.show(5)

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|2002-06-17|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|
|all in the kitche...|112140|    130|        

In [24]:
raw_recipes_df.describe()

DataFrame[summary: string, name: string, id: string, minutes: string, contributor_id: string, tags: string, nutrition: string, n_steps: string, steps: string, description: string, ingredients: string, n_ingredients: string]

In [25]:
raw_recipes_df.count()

231637

In [26]:
raw_recipes_df.columns

['name',
 'id',
 'minutes',
 'contributor_id',
 'submitted',
 'tags',
 'nutrition',
 'n_steps',
 'steps',
 'description',
 'ingredients',
 'n_ingredients']

In [27]:
# Count the total number of ratings in the dataset
numerator = train_ratings_df.select("rating").count()

# Count the number of distinct userIds and distinct recipe_Ids
num_users =train_ratings_df.select("user_id").distinct().count() # find the number of unique users in the training data. The output must be an integer
num_recipes =train_ratings_df.select("recipe_id").distinct().count() # find the number of unique recipes in the training data. The output must be an integer

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_recipes

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The training dataframe is ", "%.7f" % sparsity + "% empty.")

The training dataframe is  99.9829264% empty.


## Functions

In [28]:
def manual_recommendation_check (user_id):
  '''
  Given a user ID form the test dataset, this function will return the names of the recipes recommended to the user.

  Initialize this function after the all_recs data frame is calculated.

  Input user_id of a user from the test set as an integer.
  Prints the names of recipes recommended to this user.
  Returns nothing.
  '''
  recs_user = all_recs_als[all_recs_als.user == user_id]
  recs_user_list = list(recs_user.item.values)
  recs_user_list = [x.item() for x in recs_user_list]
  display((raw_recipes_df.filter(F.col('id').isin(recs_user_list))
               .select("name")
               .collect()
               ))

# Model

## Task 02 - Collaborative Filtering Model

Add the argument details in the algorithm initialization below to build the ALS model.

In [29]:
from pyspark.ml.recommendation import ALS

In [30]:
# Create ALS model
als = ALS(userCol="user_id", # add the name of the column for users
          itemCol="recipe_id", # add the name of the column for recipes
          ratingCol="rating", # add the name of the column for ratings
          nonnegative = True,
          implicitPrefs = False,
          coldStartStrategy="drop"
         )

In [31]:
assert type(als)

#### Training

In [32]:
#Fit the model to the 'train' dataset
model = als.fit(train_ratings_df)

## Prediction

In [33]:
# use the model to create predictions for test data
test_predictions_unseen =model.transform(test_ratings_all_df)  # add a statment to tranform the test data

In [34]:
test_predictions_unseen.show(5)

+-------+---------+------+--------------------+-----------+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+----------+
|user_id|recipe_id|rating|              review|review_date|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|prediction|
+-------+---------+------+--------------------+-----------+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+----------+
| 286566|   406393|     5|I scaled this bac...| 2012-01-03|foil wrapped savo...|406393|     30|        226863|2010-01-02|['30-minutes-or-l...|[296.9, 10.0, 5.0...|     13|['he

In [35]:
assert test_predictions_unseen.select(F.col("prediction"))

In [36]:
test_predictions_unseen.select(F.col("user_id"),
                               F.col("recipe_id"),
                               F.col("rating"),
                               F.col("prediction")
                              ).show(5)

+-------+---------+------+----------+
|user_id|recipe_id|rating|prediction|
+-------+---------+------+----------+
| 286566|   406393|     5| 4.9712553|
| 286566|   232735|     5| 3.8279624|
| 286566|    96519|     5| 2.6245208|
| 286566|    65675|     4|   4.21137|
| 286566|   192543|     5| 4.7510834|
+-------+---------+------+----------+
only showing top 5 rows



In [37]:
# Each user in the test set must have 10 predictions.
# Use ALS model to get these predictions.
# You can use the recommendForAllUsers() method.

recommendations = model.recommendForAllUsers(10) # complete the code

In [38]:
assert len(dict(recommendations.select(F.col("recommendations")).collect()[0][0])) == 10

In [39]:
# Transfroming the results by exploding the recommendations column

recommendations = (recommendations.select(F.col("user_id"),
                                          F.posexplode(F.col("recommendations")).alias("pos", "item"))
                                  .select(F.col("user_id"),
                                          F.col("pos"),
                                          F.col("item.recipe_id").alias("recomended_recipe_id"),
                                          F.col("item.rating").alias("predicted_rating")))

## Evaluation

### Task 04 - Model Evaluation

#### 1. RMSE

Add the argument details to the evaluator function below to calculate the RMSE score of the ALS model.

In [41]:
from pyspark.ml.evaluation import RegressionEvaluator

In [42]:
# Define evaluator as RMSE and print RMSE value
evaluator_seen = RegressionEvaluator(metricName="rmse",
                                     labelCol="rating" ,  # add the name of the column that has the ratings
                                     predictionCol="prediction" # add the name of the column that has the predicted ratings.
                                     )

In [43]:
predictions=model.transform(test_ratings_all_df)

In [44]:
RMSE = evaluator_seen.evaluate(predictions) # Use the .evaluate to find the RMSE on the test set.
print(RMSE)

1.4060581394880909


#### Rank based Metrics

We will use the lenskit library to calculate the ranking-based matrics. The lenskit library is available in pandas only so we need to convert the data frames from PySpark dataframes to Pandas dataframes.

In [45]:
all_recs_als = recommendations.toPandas()

In [46]:
all_recs_als

Unnamed: 0,user_id,pos,recomended_recipe_id,predicted_rating
0,1533,0,242393,6.205901
1,1533,1,243882,6.059007
2,1533,2,148469,6.014294
3,1533,3,38855,6.005024
4,1533,4,302950,5.965456
...,...,...,...,...
233615,1958606,5,17015,0.000000
233616,1958606,6,16809,0.000000
233617,1958606,7,16768,0.000000
233618,1958606,8,16765,0.000000


In [47]:
# Rename the columns of to eunsre that they match the columns names as in the cell below.

column_names = {'user_id':'user','pos':'rank','recomended_recipe_id':'item','predicted_rating':'score'} # create a dictionary with current column names as keys and the intended column names as values.
all_recs_als = all_recs_als.rename(columns=column_names)

In [48]:
all_recs_als

Unnamed: 0,user,rank,item,score
0,1533,0,242393,6.205901
1,1533,1,243882,6.059007
2,1533,2,148469,6.014294
3,1533,3,38855,6.005024
4,1533,4,302950,5.965456
...,...,...,...,...
233615,1958606,5,17015,0.000000
233616,1958606,6,16809,0.000000
233617,1958606,7,16768,0.000000
233618,1958606,8,16765,0.000000


In [49]:
# Adding a column to make sure the we know these recommendations are from the ALS algorithm.

all_recs_als["algorithm"] = "ALS"

In [50]:
all_recs_als

Unnamed: 0,user,rank,item,score,algorithm
0,1533,0,242393,6.205901,ALS
1,1533,1,243882,6.059007,ALS
2,1533,2,148469,6.014294,ALS
3,1533,3,38855,6.005024,ALS
4,1533,4,302950,5.965456,ALS
...,...,...,...,...,...
233615,1958606,5,17015,0.000000,ALS
233616,1958606,6,16809,0.000000,ALS
233617,1958606,7,16768,0.000000,ALS
233618,1958606,8,16765,0.000000,ALS


In [51]:
# Convert the test dataset to pandas and ensure that it has the same column names as shown in the cell below.
# Also, note there are only three columns in the rest data. You have to ensure your test data looks identical.

test_data = (test_ratings_all_df.select("user_id", "recipe_id","rating").toPandas())# add code to select the necessary columns and change the name of the columns.


In [52]:
column_names = {'user_id':'user','recipe_id':'item'}
test_data = test_data.rename(columns=column_names)

In [53]:
test_data

Unnamed: 0,user,item,rating
0,1911882,205270,4
1,2001149923,320408,5
2,2747439,320408,4
3,1275258,320408,5
4,2459507,320408,4
...,...,...,...
18679,2001525528,328332,0
18680,1179225,171506,5
18681,2001115353,122580,5
18682,2001535624,134360,0


In [54]:
# code to calculate the necessary metrics
# the code below uses lenskit library to evaluate the rank metrics

rla = topn.RecListAnalysis()
rla.add_metric(topn.recip_rank)
rla.add_metric(topn.ndcg)
rla.add_metric(topn.dcg)
results = rla.compute(all_recs_als, test_data)
results.head()

  result = getattr(ufunc, method)(*inputs, **kwargs)


Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,1533,10,0.0,,
ALS,1535,10,0.0,0.0,0.0
ALS,1634,10,0.0,,
ALS,1676,10,0.0,,
ALS,1792,10,0.0,,


## Manual Prediction Checking

1. Why are ndcg and dcg nulls? EG: user 28170

In [56]:
test_data[test_data.user == 28170]

Unnamed: 0,user,item,rating


In [57]:
all_recs_als[all_recs_als.user == 28170]

Unnamed: 0,user,rank,item,score,algorithm
4500,28170,0,108417,5.956179,ALS
4501,28170,1,146694,5.889125,ALS
4502,28170,2,156550,5.864495,ALS
4503,28170,3,360099,5.711204,ALS
4504,28170,4,38855,5.685368,ALS
4505,28170,5,148469,5.643238,ALS
4506,28170,6,270587,5.618041,ALS
4507,28170,7,275114,5.609941,ALS
4508,28170,8,94864,5.606172,ALS
4509,28170,9,178137,5.593911,ALS


User ```28170``` does not appear in the test set. Hence cannot be evaluated.

2. Why are all metrics 0 for specific users?

In [58]:
test_data[test_data.user == 56680]

Unnamed: 0,user,item,rating
11837,56680,229831,5


In [59]:
all_recs_als[all_recs_als.user == 56680]

Unnamed: 0,user,rank,item,score,algorithm
15000,56680,0,331552,6.246305,ALS
15001,56680,1,397395,6.205885,ALS
15002,56680,2,413832,6.121653,ALS
15003,56680,3,308671,6.117253,ALS
15004,56680,4,333466,6.083302,ALS
15005,56680,5,146106,5.985888,ALS
15006,56680,6,323347,5.976254,ALS
15007,56680,7,179225,5.974277,ALS
15008,56680,8,409187,5.958946,ALS
15009,56680,9,407749,5.958946,ALS


User-recipe combination does not appear in the recommendations set.

3. Are any non zero metrics?

In [60]:
results[results.ndcg > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,173085,10,0.142857,0.123643,1.333333
ALS,496803,10,0.2,0.068077,1.547411
ALS,545684,10,inf,1.0,5.0
ALS,1630307,10,0.111111,0.100758,1.50515


In [61]:
results[results.dcg > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,173085,10,0.142857,0.123643,1.333333
ALS,496803,10,0.2,0.068077,1.547411
ALS,545684,10,inf,1.0,5.0
ALS,1630307,10,0.111111,0.100758,1.50515


In [62]:
results[results.recip_rank > 0]

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,recip_rank,ndcg,dcg
algorithm,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALS,173085,10,0.142857,0.123643,1.333333
ALS,496803,10,0.2,0.068077,1.547411
ALS,545684,10,inf,1.0,5.0
ALS,1630307,10,0.111111,0.100758,1.50515


There are few user recipe combination that has a corresponding value in the test set. Hence, we have only few non-zero value of evaluation metrics.


Manually, check one prediction.

In [65]:
manual_recommendation_check (653438) # use the function diclared earlier to print the recomendations for the user 653438

[Row(name='backwoods idaho thick and hearty smoky baked beans'),
 Row(name='caramel dip'),
 Row(name='israeli salad with jicama'),
 Row(name='mushroom chicken over rice'),
 Row(name='orange creamsicle cake'),
 Row(name='pecan log roll'),
 Row(name='saffron challah'),
 Row(name='sugar free coleslaw'),
 Row(name='sunflower bread'),
 Row(name='whole wheat energy bars')]

## Saving the models and predictions

In [69]:
all_recs_als.to_csv("/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML/recommendation_als.csv", # modify the path
                    index=False)

In [71]:
model.save('/content/drive/MyDrive/Receipe_Recommendation_Assignment_ML/ALS_model.model') # modify the path

            ---- THANK YOU -----