#### GETTING DATA

In [0]:
ratings_filename = "dbfs:/mnt/Files/Validateddata/ratings.csv"
movies_filename = "dbfs:/mnt/Files/Validateddata/movies.csv"

In [0]:
%fs
ls /mnt/Files/Validateddata

path,name,size,modificationTime
dbfs:/mnt/Files/Validateddata/genome-tags.csv,genome-tags.csv,18103,1681901135000
dbfs:/mnt/Files/Validateddata/movies.csv,movies.csv,3038099,1681901136000
dbfs:/mnt/Files/Validateddata/ratings.csv,ratings.csv,678260987,1681901140000


#### A Little analysis on the movies.csv
We will create 2 dataframes for our analysis which will make the visualization with Databricks display function pretty straightforward
1) movie_based_on_time - We will drop the genres here final schema will be (movie_id,name,Year)
2) movie_based_on_genres - Final schema would look like (movie_id,name_with_year,one_genre)

In [0]:
from pyspark.sql.types import *
# working only one movie.csv right now
movie_with_genre_df_schema = StructType(
    [StructField('ID',IntegerType()),
    StructField('title',StringType()),
    StructField('genres',StringType())]
)

movie_df_schema = StructType(
    [StructField('ID',IntegerType()),
    StructField('title',StringType())]
) # in this we have just removed genre column. We will transform this schema to add year column in later section

In [0]:
# Creating the dataframes
movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(movie_df_schema).load(movies_filename)
movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(movie_with_genre_df_schema).load(movies_filename)

##### Inspecting the dataframes before transformations

In [0]:
movies_df.show(4,truncate=False)  # we will also use this for collaborative filtering
movies_with_genres_df.show(4,truncate=False)

+---+------------------------+
|ID |title                   |
+---+------------------------+
|1  |Toy Story (1995)        |
|2  |Jumanji (1995)          |
|3  |Grumpier Old Men (1995) |
|4  |Waiting to Exhale (1995)|
+---+------------------------+
only showing top 4 rows

+---+------------------------+-------------------------------------------+
|ID |title                   |genres                                     |
+---+------------------------+-------------------------------------------+
|1  |Toy Story (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2  |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3  |Grumpier Old Men (1995) |Comedy|Romance                             |
|4  |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+---+------------------------+-------------------------------------------+
only showing top 4 rows



In [0]:
# transformating the dataframes

from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID','title',regexp_extract('title',r'\((\d+)\)',1).alias('year'))

In [0]:
# Dataframes after transformation
movies_with_year_df.show(4,truncate=False)

+---+------------------------+----+
|ID |title                   |year|
+---+------------------------+----+
|1  |Toy Story (1995)        |1995|
|2  |Jumanji (1995)          |1995|
|3  |Grumpier Old Men (1995) |1995|
|4  |Waiting to Exhale (1995)|1995|
+---+------------------------+----+
only showing top 4 rows



### Now we will use inbuilt functionality of Databricks for some insights

In [0]:
# from here we can look at the cound and find the maximum number of movies produced in year 2009
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))

year,count
2015.0,2513
2016.0,2488
2014.0,2406
2017.0,2374
2013.0,2173
2018.0,2034
2012.0,1978
2011.0,1838
2009.0,1723
2010.0,1691


### Lets check the Ratings

In [0]:
# to avoid the action we are explicitly defining the schema
ratings_df_schema = StructType(
[StructField('userId',IntegerType()),
StructField('movieId',IntegerType()),
StructField('rating',DoubleType())])  
# we have dropped the timestamp column

In [0]:
# creating the df
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(ratings_df_schema).load(ratings_filename)

ratings_df.show(4)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
|     1|    665|   5.0|
+------+-------+------+
only showing top 4 rows



### Caching the dataframes

In [0]:
# We will cache both the dataframes now

movies_df.cache()
ratings_df.cache()

Out[10]: DataFrame[userId: int, movieId: int, rating: double]

## Global Popularity
Here we will put a constraint on the no. of reviews given by discarding the movies where the count of ratings is less than 500

In [0]:
from pyspark.sql import functions as F

# from ratings dataframe we will create a movie_ids_with_avg_ratings_df that combines the two DataFrames
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias('count'),F.avg(ratings_df.rating).alias('average'))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(4,truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|1088   |11935|3.25002094679514  |
|1580   |40308|3.5817083457378187|
|3175   |14659|3.6077836141619484|
|44022  |4833 |3.2593627146699773|
+-------+-----+------------------+
only showing top 4 rows



In [0]:
# this dataframe will have movies name with movieid for better understanding
movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movies_df,F.col('movieId')==F.col('ID')).drop('ID')
movie_names_with_avg_ratings_df.show(4,truncate=False)

+-------+-----+------------------+--------------------------------+
|movieId|count|average           |title                           |
+-------+-----+------------------+--------------------------------+
|1088   |11935|3.25002094679514  |Dirty Dancing (1987)            |
|1580   |40308|3.5817083457378187|Men in Black (a.k.a. MIB) (1997)|
|3175   |14659|3.6077836141619484|Galaxy Quest (1999)             |
|44022  |4833 |3.2593627146699773|Ice Age 2: The Meltdown (2006)  |
+-------+-----+------------------+--------------------------------+
only showing top 4 rows



In [0]:
# so let us see the global popularity
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter(movie_names_with_avg_ratings_df['count'] >= 500).orderBy('average',ascending=False)
movies_with_500_ratings_or_more.show(truncate=False)

+-------+-----+------------------+---------------------------------------------------------------------------+
|movieId|count|average           |title                                                                      |
+-------+-----+------------------+---------------------------------------------------------------------------+
|171011 |1124 |4.483096085409253 |Planet Earth II (2016)                                                     |
|159817 |1747 |4.464796794504865 |Planet Earth (2006)                                                        |
|318    |81482|4.413576004516335 |Shawshank Redemption, The (1994)                                           |
|170705 |1356 |4.398598820058997 |Band of Brothers (2001)                                                    |
|858    |52498|4.324336165187245 |Godfather, The (1972)                                                      |
|179135 |659  |4.289833080424886 |Blue Planet II (2017)                                                      |
|

A good thing to observe here is that it has a lot of simalarity with IMDB top 250 movies
If there is a cold start problem(new user), we can just recommend the global populars

### Collaborative Filtering 
We will use the Matrix Factorization algorithm present in spark MLib called as ALS quora explination

### Splitting in Train, Test and Validation Dataset
As with all the machine learning algorithms in practice we have to tune the parameters and then test accuracy. For this we will split the data into 3 parts Train, Test(checking the final accuracy) and Validation(optimizing the hyperparameters) data. For more information about this refer brillant lecture by Nando

In [0]:
# We will hold 60% for training, 20% of our data for validation and leave 20% of data for testing

seed = 4
(split_60_df,split_a_20_df,split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

# Lets cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(training_df.count(),validation_df.count(),test_df.count()))

training_df.show(4,truncate=False)
validation_df.show(4,truncate=False)
test_df.show(4,truncate=False)

Training: 14999112, validation: 4999908, test: 5001075

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |306    |3.5   |
|1     |307    |5.0   |
|1     |665    |5.0   |
|1     |899    |3.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |1250   |4.0   |
|1     |2011   |2.5   |
|1     |2161   |3.5   |
|1     |2351   |4.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |296    |5.0   |
|1     |1217   |3.5   |
|1     |2068   |2.5   |
|1     |2843   |4.5   |
+------+-------+------+
only showing top 4 rows



From above we can see 10 million training samples, 4 million validation samples and 4 million test samples

### Alternating Least Square (ALS)

Need of cross validation, some problems and solutions here I am copying it directly from the Assignment notebook

A challenge for collaborative filtering is how to provide ratings to new users( a user who has not provided any ratings at all). Some recommendations systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings, while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN value when asked to provide a rating for a new user

Using the ML Pipeline's CrossValidator with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds(e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make all of the parameters in the parameter grid appear to be equally good(or bad).

You can read the discussion on Spark JIRA 14489 about this issue. There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues. We have chose to have RMSE drop NaN values. While this does not solve the underlying issue of ALS not predicting a value for a new user,it does provide some evaluation value. We manually implement the parameter grid search process using a for loop(below) and remove the NaN values before using RMSE

For a production application, you would want to consider the tradeoffs in how to handle new users.

In [0]:
# from pyspark.ml.recommendation import ALS

# # our ALS learner
# als = ALS()

# # now we set the parameters for the method
# als.setMaxIter(5)\
#    .setSeed(seed)\
#    .setRegParam(0.1)\
#    .setUserCol('userId')\
#    .setItemCol('movieId')\
#    .setRatingCol('rating')

# # now lets compute an evaluation metric for our test and validation dataset
# from pyspark.ml.evaluation import RegressionEvaluator

# # Create RMSE evaluator using the lable and predicted columns
# # it will essentially calculate RMSE score based on these columns
# reg_eval = RegressionEvaluator(PredictionCol='prediction',labelCol='rating',metricName='rmse')

# tolerance = 0.03

# # now to understand rank let us initially assume that my recommendation matrix is 1000 x 1000 (1000 users and 1000 products, this is a very sparse matrix)
# # now we get 2 matrices P (shape 1000 * rank) and Q (shape rank * 1000) so if we multiply them we get the same matrix but our storage have gone down from storing (1000 x 1000) to (2 * 1000 * rank) (for rank = 4 we only need 8000 numbers compared to 1000000)

# ranks = [4,8,12]
# errors = [0,0,0]
# models = [0,0,0]
# err = 0
# min_error = float('inf')
# best_rank = 1
# for rank in ranks:
#     # Set the ranks here
#     als.setRank(rank)
#     # create the model with these parameters
#     model = als.fit(training_df)
#     # Run the model to create the prediction Predict against the validation_df
#     predict_df = model.transform(validation_df)
    
#     # Remove NaN values from predictions (due to spark-14489)
#     predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))
    
#     # Run the previously created RSME evaluator , reg_eval, on the predicted_ratings_df DataFrame
#     error = reg_eval.evaluate(predicted_ratings_df)
#     error[err] = error
#     models[err] = error
#     print 'For rank %s the RMSE is %s' % (rank,error)
#     if error < min_error:
#         min_error = error
#         best_rank = err
#     err += 1
    
# als.setRank(ranks[best_rank])
# print 'The best model was trained with rank %s' % ranks[best_rank]
# my_model = models[best_rank]


In [0]:
from pyspark.ml.recommendation import ALS

als = ALS()

# reset the parameters from the als object
als.setPredictionCol('prediction')\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating')\
   .setRank(8)              # we got rank 8 as optimal

# creating model with these parameters
my_rating_model = als.fit(training_df)   # now this is our movie rating model which we will use for recommendation

#### Looking for RMSE again

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import col
# create a RMSE evaluator using the lable and predicted columns
# it will essentially calculate the RMSE score based on these columns
reg_eval = RegressionEvaluator(predictionCol='prediction',labelCol='rating',metricName='rmse')
my_predict_df = my_rating_model.transform(test_df)

# remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

#Run the previously created RMSE evaluator , reg_eval, on the predicted_test_my_ratings_df Dataframe
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model has an RMSE on the test set of {0}'.format(test_RMSE_my_ratings))
dbutils.widgets.text("input","5","")
ins=dbutils.widgets.get("input")
uid=int(ins)
ll=predicted_test_my_ratings_df.filter(col('userId')==uid)

The model has an RMSE on the test set of 0.813771546438166


To get output from the notebook

In [0]:
MovieRec=ll.join(movies_df,F.col('movieID') == F.col('ID')).drop('ID').select('title').take(10)

l=dbutils.notebook.exit(MovieRec)

[Row(title='Toy Story (1995)'), Row(title='Dead Man Walking (1995)'), Row(title='Clueless (1995)'), Row(title='Before and After (1996)'), Row(title='Batman Forever (1995)'), Row(title='Hackers (1995)'), Row(title='Boys on the Side (1995)'), Row(title='Natural Born Killers (1994)'), Row(title='Outbreak (1995)'), Row(title='Quiz Show (1994)')]