Read the file ratings and movies


In [None]:
ratings_filename = "dbfs:/mnt/temp/Data/ratings.csv"
movies_filename = "dbfs:/mnt/temp/Data/movies.csv"

Check the file is now there or not

In [None]:
%fs
ls /mnt/temp/Data

path,name,size,modificationTime
dbfs:/mnt/temp/Data/genome-tags.csv,genome-tags.csv,18103,1693939976000
dbfs:/mnt/temp/Data/links.csv,links.csv,1368578,1693939976000
dbfs:/mnt/temp/Data/movies.csv,movies.csv,3038099,1693939977000
dbfs:/mnt/temp/Data/ratings.csv,ratings.csv,678260987,1693940021000


Working only on movies.csv right now and define its structure

In [None]:
from pyspark.sql.types import *

movies_with_genres_df_schema = StructType(
[StructField('ID', IntegerType()),
StructField('title', StringType()),
StructField('genres',StringType())]
)

movies_df_schema = StructType(
[StructField('ID', IntegerType()),
StructField('title', StringType())]
) 
#dropping the genres.Also, we will tranform the df to include the Year later

Creating dataframe with the schema you have defined

In [None]:
movies_df = sqlContext.read.format('com.databricks.spark.csv')
.options(header=True,inferSchema=False).schema(movies_df_schema).load(movies_filename)

movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv')
.options(header=True,inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

Show results

In [None]:
movies_df.show(4, truncate=False)
movies_with_genres_df.show(4, truncate=False)

+---+------------------------+
|ID |title                   |
+---+------------------------+
|1  |Toy Story (1995)        |
|2  |Jumanji (1995)          |
|3  |Grumpier Old Men (1995) |
|4  |Waiting to Exhale (1995)|
+---+------------------------+
only showing top 4 rows

+---+------------------------+-------------------------------------------+
|ID |title                   |genres                                     |
+---+------------------------+-------------------------------------------+
|1  |Toy Story (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2  |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3  |Grumpier Old Men (1995) |Comedy|Romance                             |
|4  |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+---+------------------------+-------------------------------------------+
only showing top 4 rows



We extract the year of movies base on the title

In [None]:
#transforming the Dataframes
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID', 'title', regexp_extract('title', r'\((\d+)\)',1).alias('year'))

Take a look of our results

In [None]:
movies_with_year_df.show(4, truncate = False)

+---+------------------------+----+
|ID |title                   |year|
+---+------------------------+----+
|1  |Toy Story (1995)        |1995|
|2  |Jumanji (1995)          |1995|
|3  |Grumpier Old Men (1995) |1995|
|4  |Waiting to Exhale (1995)|1995|
+---+------------------------+----+
only showing top 4 rows



Now we have one more column for year to aggregate function on!

In [None]:
#from here we can look at the count and find that the maximum each year produced
display(movies_with_year_df.groupby('year').count().orderBy('count', ascending = False))

year,count
2015.0,2513
2016.0,2488
2014.0,2406
2017.0,2374
2013.0,2173
2018.0,2034
2012.0,1978
2011.0,1838
2009.0,1723
2010.0,1691


Now to the ratings file and again for avoiding the action we are explicity defining the schema

In [None]:
rating_df_schema = StructType(
[StructField('userId', IntegerType()),
StructField('movieId', IntegerType()),
StructField('rating',DoubleType())]
)

#we are dropping the Time Stamp column

Creating dataframe with the schema you have defined

In [None]:
ratings_df = sqlContext.read.format('com.databricks.spark.csv')
.options(header=True,inferSchema=False).schema(rating_df_schema).load(ratings_filename)
ratings_df.show(4)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
|     1|    665|   5.0|
+------+-------+------+
only showing top 4 rows



We will cache both dataframes for accessing quickly

In [None]:
ratings_df.cache()
movies_df.cache()

Out[24]: DataFrame[ID: int, title: string]

From ratingDF, create a movie_ids_with_avg_ratings_df that combines two dataframes

In [None]:
from pyspark.sql import functions as F

movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(4, truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|1088   |11935|3.25002094679514  |
|1580   |40308|3.5817083457378187|
|3175   |14659|3.6077836141619484|
|44022  |4833 |3.2593627146699773|
+-------+-----+------------------+
only showing top 4 rows



This dataframe will have names with movie_id, make it more understandable 

In [None]:
movie_names_with_avg_rating_df =movie_ids_with_avg_ratings_df.join(movies_df, F.col('movieID') == F.col('ID')).drop('ID')
movie_names_with_avg_rating_df.show(4,truncate = False)

+-------+-----+------------------+--------------------------------+
|movieId|count|average           |title                           |
+-------+-----+------------------+--------------------------------+
|1088   |11935|3.25002094679514  |Dirty Dancing (1987)            |
|1580   |40308|3.5817083457378187|Men in Black (a.k.a. MIB) (1997)|
|3175   |14659|3.6077836141619484|Galaxy Quest (1999)             |
|44022  |4833 |3.2593627146699773|Ice Age 2: The Meltdown (2006)  |
+-------+-----+------------------+--------------------------------+
only showing top 4 rows



So let us see the global popularity

In [None]:
movies_with_500_ratings_or_more = movie_names_with_avg_rating_df.filter(movie_names_with_avg_rating_df['count'] >=500).orderBy('average', ascending =False)
movies_with_500_ratings_or_more.show(truncate = False)

+-------+-----+------------------+---------------------------------------------------------------------------+
|movieId|count|average           |title                                                                      |
+-------+-----+------------------+---------------------------------------------------------------------------+
|171011 |1124 |4.483096085409253 |Planet Earth II (2016)                                                     |
|159817 |1747 |4.464796794504865 |Planet Earth (2006)                                                        |
|318    |81482|4.413576004516335 |Shawshank Redemption, The (1994)                                           |
|170705 |1356 |4.398598820058997 |Band of Brothers (2001)                                                    |
|858    |52498|4.324336165187245 |Godfather, The (1972)                                                      |
|179135 |659  |4.289833080424886 |Blue Planet II (2017)                                                      |
|

Here the fun part, we splitting in TRAIN, TEST AND VALIDATION DATASET
We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing

In [None]:
seed = 4
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2], seed)

#Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(training_df.count(), validation_df.count(), test_df.count())
      )
training_df.show(4,truncate=False)
validation_df.show(4,truncate=False)
test_df.show(4,truncate=False)

Training: 14996550, validation: 5003462, test: 5000083

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |306    |3.5   |
|1     |307    |5.0   |
|1     |665    |5.0   |
|1     |899    |3.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |1250   |4.0   |
|1     |2011   |2.5   |
|1     |2161   |3.5   |
|1     |2351   |4.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |296    |5.0   |
|1     |1217   |3.5   |
|1     |2068   |2.5   |
|1     |2843   |4.5   |
+------+-------+------+
only showing top 4 rows



We use Collaborative Filtering (i.e recommendation based on other user's behaviour)

In [None]:
from pyspark.ml.recommendation import ALS
als = ALS()

#Reset the parameters for the ALS object.
als.setPredictionCol("prediction")\
    .setMaxIter(5)\
    .setSeed(seed)\
    .setRegParam(0.1)\
    .setUserCol('userId')\
    .setItemCol('movieId')\
    .setRatingCol('rating')\
    .setRank(8) #we got rank 8 as optimal

#create the model with these parameters
my_ratings_model = als.fit(training_df)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

#create an RMSE evaluator using the label and predicted columns
#it will essentially calculate the rmse score based on these columns
reg_eval = RegressionEvaluator(predictionCol='prediction', labelCol='rating', metricName="rmse")
my_predict_df = my_ratings_model.transform(test_df)

#Remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction 
 != float('nan'))


#Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df Dataframe
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))
dbutils.widgets.text("input", "5","")
ins = dbutils.widgets.get("input")
uid=int(ins)
#set the movie recommendation foe each user
ll=predicted_test_my_ratings_df.filter(col("userId")==uid)

The model had a RMSE on the test set of 0.814153554974725


Take ten movie for that user with the id = input and using exit() to give out the result to Azure services

In [None]:
MovieRec = ll.join(movies_df,F.col('movieID') == F.col('ID')).drop('ID').select('title').take(10)   

ll=dbutils.notebook.exit(MovieRec)