## Setup

In [1]:
import os
# give googe drive the required permission
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Create a folder in your drive and add the name of that folder here. 
# For example, for the code below to run correctly, you need to have a folder named FoodRecSys in 'My Drive'.  
# The said folder will be your home directory for the rest of the project. 
# You will be able to save and read data from the folder. 

os.chdir("/content/drive/MyDrive/FoodRecSys/")
os.getcwd()

'/content/drive/MyDrive/FoodRecSys'

In [6]:
os.chdir("/content/drive/MyDrive/food_recsys_project/Code Files/")
os.getcwd()

'/content/drive/MyDrive/food_recsys_project/Code Files'

In [7]:
try:
  import pyspark 
except:
  !pip install pyspark==3.1.2
  import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 KB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880770 sha256=d9ab7aef8c892ebb70d805c86d35623ffc7047d1c84c9b78ff421ea64ca0d045
  Stored in directory: /root/.cache/pip/wheels/11/17/0b/53e7d10fe66ca7647d391cdba323fcf5b2f9dfcb7ebad87aa7
Successfully built pyspark
Installing collected packages: py4j, py

In [8]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [9]:
spark = SparkSession.builder.master("local").config('spark.ui.port', '4050').getOrCreate()

In [10]:
spark

## Imports

In [11]:
import pandas as pd 
import numpy as np

from pyspark.sql import functions as F
# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType, ArrayType

## Read the data 

In [14]:
raw_ratings_df = (spark.read.csv("raw_ratings_small.csv", # modify the path to read the data
                                 header=True, 
                                 inferSchema= True))

In [16]:
raw_recipes_df = spark.read.csv("raw_recipies_small.csv", # modify the path to read the data
                                header=True, 
                                inferSchema=True)

In [17]:
assert (raw_recipes_df.count(), len(raw_recipes_df.columns)) == (20340, 13)
assert (raw_ratings_df.count(), len(raw_ratings_df.columns)) == (93357, 5)

#### Decide a split date based on the ratings dataframe. 

In [18]:
# Find the number of data points in the interaction dataset. 
# You can use the count() method. 
# The output must be an integer.

num_review_int = raw_ratings_df.count()

#### Task 01 - Train Test Split 

Divide the data into train and test based on the 80 - 20 split using the approach discussed. You will have to save the data in a parquet file. 

In [19]:
test_num_reviews_int = round(num_review_int *0.2)

In [20]:
# Sort the interactions dataset in descending order of review date. 
# Extract ```test_num_reviews_int``` most recent reviews. 

temp_ratings_df = (raw_ratings_df.sort("review_date", ascending=False)
                                 .limit(test_num_reviews_int)
                  )

In [21]:
assert temp_ratings_df.count()  == 18671
assert raw_recipes_df.collect()[11][4] <= raw_recipes_df.collect()[10][4] 

In [22]:
temp_ratings_df.collect()[-1][4]

'2011-07-17'

Split the data into two parts before and after 2011-07-17. 

- All reviews in the ratings data after 2011-07-17 will not exsist in the training set. 
- For all future predictions the date will be set at 2011-07-18.   

In [23]:
raw_ratings_df.columns


['user_id', 'recipe_id', 'rating', 'review', 'review_date']

In [24]:
raw_recipes_df.columns

['name',
 'id',
 'minutes',
 'contributor_id',
 'submitted',
 'tags',
 'nutrition',
 'n_steps',
 'steps',
 'description',
 'ingredients',
 'n_ingredients',
 'year_of_review']

In [25]:
# Join raw_recipes and raw_ratings
# Use recipe_id as the key to join these dataframes 
# The resulting dataframe must have all rows from the raw_ratings dataframe. 

# interaction_level_df = raw_ratings_df.join(
#     raw_recipes_df.withColumnRenamed("id", "recipe_id"), # dataframe 2
#     on='recipe_id', # key to join
#     how='left' # how to join
# )
interaction_level_df = raw_ratings_df.join(
    raw_recipes_df,
    raw_recipes_df.id == raw_ratings_df.recipe_id, # key to join
    how='left' # how to join
)


In [26]:
# Use the filter command to separate the datasets. 
# All interactions which were rated BEFORE '2011-07-17' will be train data. 

train_interaction_level_df  = (interaction_level_df.filter(  
                                             # add code to filter
                                             interaction_level_df.review_date < '2011-07-17' 
                                           ))

In [27]:
# Use the filter command to separate the datasets. 
# All interactions which were rated ON OR AFTER '2011-07-17' will be test data. 

test_interaction_level_all_recipies_df  = (interaction_level_df.filter(  
                                             # add code to filter 
                                             interaction_level_df.review_date >= '2011-07-17' 
                                           ))

In [28]:
assert (test_interaction_level_all_recipies_df.count(), len(test_interaction_level_all_recipies_df.columns)) == (18684, 18)
assert (train_interaction_level_df.count(), len(train_interaction_level_df.columns)) == (74673, 18)

In [29]:
# create data files for modeling 

(train_interaction_level_df.coalesce(1)
                           .write.mode('overwrite')
                           .parquet('train_interaction_level_df.parquet'))  # change the file name and file path

(test_interaction_level_all_recipies_df.coalesce(1)
                                       .write.mode('overwrite')
                                       .parquet('test_interaction_level_df.parquet'))  # change the file name and file path