## Setup

In [None]:
import os
# give googe drive the required permission
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:

os.chdir("/content/drive/MyDrive/food_recommender_assignment/")
os.getcwd()

'/content/drive/MyDrive/food_recommender_assignment'

In [None]:
os.chdir("/content/drive/MyDrive/food_recommender_assignment/Dataset/")
os.getcwd()

'/content/drive/MyDrive/food_recommender_assignment/Dataset'

In [None]:
try:
  import pyspark 
except:
  !pip install pyspark==3.1.2
  import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 79 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 24.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880769 sha256=76171ed80cb4dcb78f3cca2fa9c154692a143514f5309b72cb326dd2f7bb549a
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [None]:
spark = SparkSession.builder.master("local").config('spark.ui.port', '4050').getOrCreate()

In [None]:
spark

## Imports

In [None]:
import pandas as pd 
import numpy as np

from pyspark.sql import functions as F
# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType, ArrayType

## Read the data 

In [None]:
raw_ratings_df = (spark.read.csv("raw_ratings_small.csv", 
                                 header=True, 
                                 inferSchema= True))

In [None]:
raw_recipes_df = spark.read.csv("raw_recipies_small.csv", 
                                header=True, 
                                inferSchema=True)

#### Decide a split date based on the ratings dataframe. 

In [None]:
# Finding the number of data points in the interaction dataset. 
num_review_int = raw_ratings_df.count()

In [None]:
num_review_int

93357

Divide the data into train and test based on the 80 - 20 split using the approach discussed.

In [None]:
test_num_reviews_int = round(num_review_int *0.2)

In [None]:
# Sorting the interactions dataset in descending order of review date. 
# Extracting ```test_num_reviews_int``` most recent reviews. 

temp_ratings_df = (raw_ratings_df.sort("review_date", ascending=False)
                                 .limit(test_num_reviews_int)
                  )

In [None]:
temp_ratings_df.collect()[-1][4]

'2011-07-17'

In [None]:
temp_ratings_df.count()

18671

Split the data into two parts before and after 2011-07-17. 

- All reviews in the ratings data after 2011-07-17 will not exsist in the training set. 
- For all future predictions the date will be set at 2011-07-18.   

In [None]:
# Join raw_recipes and raw_ratings
# Using recipe_id as the key to join these dataframes 
# The resulting dataframe must have all rows from the raw_ratings dataframe. 

interaction_level_df = raw_ratings_df.join(raw_recipes_df,raw_ratings_df.recipe_id == raw_recipes_df.id,'right' )

In [None]:
interaction_level_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- year_of_review: integer (nullable = true)



In [None]:
interaction_level_df.show(5)

+-------+---------+------+--------------------+-----------+--------------------+-----+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|user_id|recipe_id|rating|              review|review_date|                name|   id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|
+-------+---------+------+--------------------+-----------+--------------------+-----+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
| 152552|    29601|     5|Excellent. It was...| 2004-08-20|zippy cold spicy ...|29601|    270|         30367|2002-05-29|['time-to-make', ...|[277.2, 15.0, 13....|     12|['in a heavy 10-i...|shrimp drink in t...

In [None]:
interaction_level_df = (interaction_level_df
                        .withColumn('review_date', F.to_date(interaction_level_df['review_date']) 
                                   ))
interaction_level_df = (interaction_level_df
                        .withColumn('submitted', F.to_date(interaction_level_df['submitted']) 
                                   ))

In [None]:
interaction_level_df.filter( F.col('review_date') >= F.to_date(F.lit('2011-07-17'))).count()

18684

In [None]:
len(interaction_level_df.columns)

18

In [None]:
interaction_level_df.filter( "review_date >= '2011-07-17'").orderBy(F.to_date(F.col('review_date')).asc()).show(5)

+-------+---------+------+--------------------+-----------+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|user_id|recipe_id|rating|              review|review_date|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|
+-------+---------+------+--------------------+-----------+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
| 407625|   220008|     4|Tasty and easy!  ...| 2011-07-17|rhubarb strawberr...|220008|     40|        169430|2007-04-01|['60-minutes-or-l...|[364.7, 26.0, 122...|     11|['put oven rack i...|this no-hassle m

In [None]:
# Using the filter command to separate the datasets. 
# All interactions which were rated BEFORE '2011-07-17' will be train data. 

train_interaction_level_df  = (interaction_level_df.filter(  "review_date <'2011-07-17'"  ))

In [None]:
# Using the filter command to separate the datasets. 
# All interactions which were rated AFTER '2011-07-17' will be train data. 

test_interaction_level_all_recipies_df  = (interaction_level_df.filter(   "review_date >= '2011-07-17'"))

In [None]:
# creating data files for modeling 

(train_interaction_level_df.coalesce(1)
                           .write.mode('overwrite')
                           .parquet('train_interaction_level_df.parquet'))  # change the file name and file path

(test_interaction_level_all_recipies_df.coalesce(1)
                                       .write.mode('overwrite')
                                       .parquet('test_interaction_level_df.parquet'))  # change the file name and file path

In [None]:
os.getcwd()

'/content/drive/MyDrive/food_recommender_assignment/Dataset'