## Databricks notebook for the movie recomendation code

In [None]:
ratings_filename = "dbfs:/mnt/Files/Validated/ratings.csv"
movies_filename = "dbfs:/mnt/Files/Validated/movies.csv"

In [None]:
%fs
ls /mnt/Files/

### A Little analysis on the movies.csv
We will create 2 dataframes for our analysis which will make the visualization with Databricks display function pretty straightforward-
1. movies_based_on_time - We will drop the genres here final schema will be (movie_id,name, Year)
2. movies_based_on_genres - Final schema would look like (movie_id,name_with_year,one_genre)

In [None]:
from pyspark.sql.types import *

#working only on movies.csv right now
movies_with_genres_df_schema = StructType(
[StructField('ID', IntegerType()),
StructField('title', StringType()),
StructField('genres',StringType())]
)

movies_df_schema = StructType(
[StructField('ID', IntegerType()),
StructField('title', StringType())]
) 

#dropping the genres. Also, we will transform the df to include the Year later

In [None]:
#Creating the dataframes
movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,
inferSchema=False).schema (movies_df_schema).load(movies_filename)

movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,
inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

### Explore the dataframes

In [None]:
movies_df.show(4, truncate=False) # we will also use this for collaborative filtering
movies_with_genres_df.show(4, truncate=False)

In [None]:
#transforming the Dataframes
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID', 'title', regexp_extract('title','\((\d+)\)',1).alias('year'))

### Dataframes after transformation

In [None]:
movies_with_year_df.show(4, truncate=False)

### Now we will gain some insights from the data

In [None]:
#from here we can look at the count and find what year the maximum number of movies were produced (it was in 2009)
display(movies_with_year_df.groupBy ( 'year').count().orderBy('count', ascending = False))

### Now let's create the ratings dataframe

In [None]:
#again for avoiding the action we are explicitly defining the schema, this ratings df contains over 20 million ratings
ratings_df_schema = StructType(
[StructField('userId', Integer Type()), 
 StructField('movieId', IntegerType()), 
 StructField('rating', DoubleType())])
#we are dropping the Time Stamp column

In [None]:
# creating the ratings df
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,
            inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df.show(4)

In [None]:
# cache the dataframes for quick access

ratings_df.cache()
movies_df.cache()

### Global Popularity

It is good to know the most popular movies, movies with the highest average ratings will be constrained on the number of reviews given. Similarly we will discard movies where number of reviews is less than 500.

In [None]:
from pyspark.sql import functions as F

# From ratingsDF, create a movie_ids_with_avg_ratings_df that combines the two Dataframes
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movield').agg(F.count(ratings_df.rating).alias("count"),
F.avg(ratings_df.rating).alias("average"))

print('movie_ids_with_avg_ratings_df:')

movie_ids_with_avg_ratings_df.show(4, truncate=False)

In [None]:
# this df will have names with movie_id - make it more understandable

movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movies_df, F.col('movieID') == F.col('ID')).drop('ID')
movie_names_with_avg_ratings_df.show(4, truncate=False)

In [None]:
#looking at global popularity

movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter(movie_names_with_avg_ratings_df['count'] >= 500).orderBy('average', ascending=False)
movies_with_500_ratings_or_more.show(truncate=False)

### Splitting in Train, Test, Validation Dataset

As with all ML Algorithms, in practice we have to tune parameters and then test accuracy. For this we will split the data into 3 parts: Train, Test(Checking the accuracy of the trained model) and Validation(A way to see how we can tune the hyperparameters). 

In [None]:
# We will hold out 60% for training, 20% for validation and 20% for testing

seed = 4
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
training_df.count(), validation_df.count(), test_df.count())
)

training_df.show(4,truncate = False)

validation_df.show(4, truncate =False)

test_df.show(4,truncate = False)

### Alternating Least Square (ALS)

A challenge for collaborative filtering is how to provide ratings to a new user (a user who has not provided any ratings at all). Some recommendation systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings), while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN (Not a Number ) value when asked to provide a rating for a new user.
Using the ML Pipeline's CrossValidator with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds (e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result, ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make all of the parameters in the parameter grid appear to be equally good (or bad). You can read the discussion on Spark JIRA 14489 about this issue. There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues. We have chosen to have RMSE drop NaN values. While this does not solve the underlying issue of ALS not predicting a value for a new user, it does provide some evaluation value, We manually implement the parameter grid search process using a for loop (below) and remove the NaN values before using RMSE.

For a production application, you would want to cosider the tradeoffs in how to handle new users.