In [None]:
# Gathering the data

ratings_filename = "dbfs:/mnt/Files/Validated/ratings.csv"
movies_filename = "dbfs:/mnt/Files/Validated/movies.csv"

In [None]:
# Databricks connecting (interact with the filesystems supported by the underlying Databricks cluster)
%fs
ls /mnt/Files/

## 2 Dataframes will be created for analysis
1) movie_based_on_time - final schema (movie_id, name, Year)
2) movie_based_on_genres - final schema (movie_id, name_with_year, one_genre)

In [None]:
# working on movies.csv

from pyspark.sql.types import *

movies_with_genres_df_schema = StructType(
    [StructField('ID', IntegerType()),
     StructField('title', StringType()),
     StructField('genres', StringType())]
)

# dropping the genres, Year will be added later on transformation of df.
movies_df_schema = StructType(
    [StructField('ID', IntegerType()),
     StructField('title', StringType())]
)


In [None]:
# Creating dataframes

movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

## Inspecting Dataframes 

In [None]:
movies_df.show(4, truncate= False) # for Collabrative filtering
movies_with_genres_df.show(4, truncate=False)

In [None]:
# transforming the Dataframes
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID', 'title',regexp_extract('title', '\((\d+)\)',1).alias('year'))

In [None]:
# inspect dataframes again
movies_with_year_df.show(4, truncate= False)

In [None]:
# from here we can look at the count and find that the maximum number of movies are produced in 2009 
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))


In [None]:
# again for avoiding the action we are explicitly defining the schema, Time Stamp column dropped
ratings_df_schema = StructType(
    [StructField('userId', IntegerType()), 
     StructField('movieId', IntegerType()),
     StructField('rating', DoubleType())]
) 

In [None]:
# creating the df
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df.show(4)


In [None]:
# Cache both the dataframes, because to avoid recomputing them each time
ratings_df.cache()
movies_df.cache()

## Global Popularity (note: will discard the movies where the count of ratings is less than 500)

In [None]:
from pyspark.sql import functions as F

# From ratings DF, create a movie_ids_with_avg ratings_df that combines the two DataFrames 
movie_ids_with_avg ratings_df= ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"),
F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg ratings_df:')
movie_ids_with_avg ratings_df.show(4, truncate=False)


In [None]:
# this df will have names with movie_id Make it more understandable
movie_nanes_with_avg ratings_df= movie_ids_with_avg_ratings_df.join(movies_df,F.col('movieID') == F.col('ID')).drop('ID') 
movie_names_with_avg ratings_df.show(4, truncate = False)

In [None]:
# Global popularity
movies_with_500_ratings_or_more = movie_names_with_avg ratings_df.filter(movie_names_with_avg ratings_df['count'] >= 500).orderBy('average', ascending = False)
movies_with_500_ratings_or_more.show(truncate = False)

## Splitting in Train, Test and Validation dataset (tune parameters and test accuracy, test(check the final accuracy), validation(optimizing hyperparameters))

In [None]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing 

seed = 4
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2], seed)

# Let's cache these datasets for performance
training_df split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format( 
    training_df.count(), 
    validation_df.count(), 
    test_df.count())
)

training_df.show(4, truncate = False)
validation_df.show(4, truncate = False) 
test_df.show(4, truncate = False)

## Alternating Least Square (ALS)

In [None]:
from pyspark.ml.recommendation import ALS 
als = ALS ()

# Reset the parameters for the ALS object. 
als.setPredictionCol("prediction")\
    .setMaxIter (5)\ 
    .setSeed (seed)\ 
    .setRegParam(0.1)\
    .setUserCol('userId')\
    .setItemCol('movieId')\
    .setRatingCol('rating')\
    .setRank (8) #we got rank 8 as optimal

# Create the model with these parameters. 
my_ratings_model = als.fit(training_df)

## Looking for RMSE again

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
# Create an RMSE evaluator using the label and predicted columns
# it will essentially calculate the rmse score based on these columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame 
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))
dbutils.widgets.text("input", "5", "")
ins=dbutils.widgets.get("input")
uid=int(ins)
ll = predicted_test_my_ratings_df.filter(col("userId") == uid)

In [None]:
MovieRec= ll.join(movies_df, F.col('movieID') == F.col('ID')).drop('ID').select('title').take(10)

l = dbutils.notebook.exit(MovieRec)