In [0]:
ratings_filename= "dbfs:/mnt/Files/Validated/rating.csv"
movies_filename= "dbfs:/mnt/Files/Validated/movie.csv"

In [0]:
movies_filename

In [0]:
%fs
ls /mnt/Files/validated/

path,name,size
dbfs:/mnt/Files/validated/movie/,movie/,0
dbfs:/mnt/Files/validated/movie.csv,movie.csv,1493648
dbfs:/mnt/Files/validated/movie_schema.csv,movie_schema.csv,27
dbfs:/mnt/Files/validated/rating/,rating/,0
dbfs:/mnt/Files/validated/rating.csv,rating.csv,690353377
dbfs:/mnt/Files/validated/rating_schema.csv,rating_schema.csv,28


In [0]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
)

In [0]:
ratings_df_schema

In [0]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *


In [0]:
raw_movies_df=sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')

In [0]:
raw_ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df = raw_ratings_df.drop('Timestamp')

In [0]:
ratings_df.cache()
movies_df.cache()

In [0]:
movies_df.show(4,truncate = False)

In [0]:
ratings_df.show(4,truncate = False)

In [0]:
#Extract year from movies_df
from pyspark.sql.functions import split, regexp_extract


In [0]:
movies_with_year_df = movies_df.select('ID','title',regexp_extract('title',r'\((\d+)\)',1).alias('year'))

In [0]:
movies_with_year_df.show(4,truncate = False) # coulumn(ID, title, year)

In [0]:
#Total movies launched in that particular year lets say in 1990- 108 movies launched, orderBy ascending order(Year with highest movies will be displayed at last)
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))

year,count
2009.0,1112
2012.0,1022
2011.0,1016
2013.0,1011
2008.0,979
2010.0,962
2007.0,902
2006.0,855
2005.0,741
2014.0,740


In [0]:
#Grouping user by movieId, count all the ratings for a particular movie and alias it as "count", then average the raings for a particular movie and alias it as average.
from pyspark.sql import functions as F
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"),F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(4, truncate=False)# output-> movieId,count,average

In [0]:
# output -> column(movieId,count,average,title) <- JOIN operation with movieId = ID and update result in 'movie_names_with_avg_ratings_df'
movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movies_df,F.col('movieId') == F.col('ID')).drop('ID')

In [0]:
# output -> column(movieId,title)
movie_names_with_avg_ratings_df.show(4,truncate = False)# columns(movieId,count,average,title)

In [0]:
seed = 1800009193
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6, 0.2, 0.2], seed)

In [0]:
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

In [0]:
# Training: output rows(xxxx), validation: output rows(xxxx), test: output rows(xxxx)
print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)

In [0]:
#show
training_df.show(4,truncate = False)
validation_df.show(4,truncate = False)
test_df.show(4,truncate = False)

In [0]:
#ALS(Alternating least square method): Method to do matrix factorization 
from pyspark.ml.recommendation import ALS 
als = ALS()

als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating') 

my_ratings_model = als.fit(training_df) # training model

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
my_predict_df = my_ratings_model.transform(test_df)#testing the my_ratings_model
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))# As the rating matrix is highly sparse, therefore we are removing all the nan values
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The Model has a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))


In [0]:
dbutils.widgets.text("input","5"," ")#input to the notebook is given by widgets, in our case we give users as input for recommendation of movies.
ins=dbutils.widgets.get("input")
uid=int(ins)
ll=predicted_test_my_ratings_df.filter(col("userId")==uid) # predicting userid, to whome we can recommend

In [0]:
#output -> column(movieId,title), will recommend top-10 movies to user
MovieRec=ll.join(movies_df,F.col('movieId') == F.col('ID')).drop('ID').select('title').take(10) # will recommend top-10 movies to user
l=dbutils.notebook.exit(MovieRec)

[Row(title='Jumanji (1995)'), Row(title='Indian in the Cupboard, The (1995)'), Row(title='Happy Gilmore (1996)'), Row(title='Don Juan DeMarco (1995)'), Row(title='Stargate (1994)'), Row(title='Shawshank Redemption, The (1994)'), Row(title='Client, The (1994)'), Row(title='Lion King, The (1994)'), Row(title='Fugitive, The (1993)'), Row(title='Mrs. Doubtfire (1993)')]