In [3]:
# import general libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

In [18]:
# import pyspark libraries
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer, StringIndexer, IndexToString

In [6]:
# create spark session
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

In [12]:
df = spark.read.csv('movie_rating.csv',inferSchema=True,header=True)

In [15]:
# toPandas() changes the view to pandas style
df.limit(5).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5
3,154,Kolya (1996),3
4,306,Kolya (1996),5


In [16]:
# printSchema gives the quick view of the datatypes
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [20]:
# Convert string values to numeric

stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df) # new dataframe with transformed values
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [21]:
# split the dataset into train and test
train, test = indexed.randomSplit([0.75, 0.25])

In [22]:
from pyspark.ml.recommendation import ALS

In [23]:
# configure the recommender model using train dataset
rec = ALS (maxIter = 10,
           regParam=0.01,
           userCol='userId',
           itemCol='title_new',
           ratingCol='rating',
           nonnegative=True,
           coldStartStrategy='drop'
            )

In [24]:
# fit the model on the train set
rec_model = rec.fit(train)

In [26]:
# make predictions on the test set
predicted_ratings = rec_model.transform(test)
predicted_ratings.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,85,Much Ado About Nothing (1993),4,148.0,3.868219
1,883,Much Ado About Nothing (1993),3,148.0,4.329245
2,436,Much Ado About Nothing (1993),5,148.0,4.1375
3,844,Much Ado About Nothing (1993),5,148.0,3.765724
4,727,Much Ado About Nothing (1993),5,148.0,3.497217


## EVALUATE THE TRAINING

In [27]:
# import regression evaluator for calculating RMSE
from pyspark.ml.evaluation import RegressionEvaluator

# create regressor evaluator to measure accuracy
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse=evaluator.evaluate(predicted_ratings)
print(rmse)

1.0267767536366257


In [28]:
# Create dataset of distinct movies
unique_movies=indexed.select('title_new').distinct()

In [57]:
def top_movies(user_id, n):
    """
        Returns the top n movies corresponding to the user_id
    """
    
    a = unique_movies.alias('a')
    
    # create dataframe of watched movies by active user 
    watched_movies = indexed.filter(indexed['userId'] == user_id).select('title_new')

    b=watched_movies.alias('b')
    
    total_movies = a.join(b, a.title_new==b.title_new, how='left')
    
    # selecting movies which active user is yet to watch
    remaining_movies = total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    # adding new column of user_id to remaining movies
    remaining_movies = remaining_movies.withColumn('userId', lit(int(user_id)))
    
    # making recommendations using the model and providing top-n recommendations
    recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending=False).limit(n)
    
    # adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol='title_new', outputCol='title', labels=model.labels)
    final_recommendations = movie_title.transform(recommendations)
    
    # return recommendations of active users
    return final_recommendations.show(n, False)

In [58]:
top_movies(60, 5)

+---------+------+----------+--------------------------------------+
|title_new|userId|prediction|title                                 |
+---------+------+----------+--------------------------------------+
|882.0    |60    |5.792799  |Live Nude Girls (1995)                |
|787.0    |60    |5.5554113 |Once Were Warriors (1994)             |
|1146.0   |60    |5.5392385 |Band Wagon, The (1953)                |
|1329.0   |60    |5.502775  |Roseanna's Grave (For Roseanna) (1997)|
|1390.0   |60    |5.404863  |Perfect Candidate, A (1996)           |
+---------+------+----------+--------------------------------------+



In [59]:
top_movies(85,10)

+---------+------+----------+-------------------------+
|title_new|userId|prediction|title                    |
+---------+------+----------+-------------------------+
|1277.0   |85    |6.125114  |Mina Tannenbaum (1994)   |
|1347.0   |85    |5.927028  |Angel Baby (1995)        |
|1411.0   |85    |4.993579  |Boys, Les (1997)         |
|882.0    |85    |4.8894725 |Live Nude Girls (1995)   |
|1306.0   |85    |4.8636675 |Faust (1994)             |
|663.0    |85    |4.6844544 |When We Were Kings (1996)|
|1207.0   |85    |4.6738634 |Aparajito (1956)         |
|654.0    |85    |4.4727798 |M (1931)                 |
|752.0    |85    |4.4661064 |Lost Horizon (1937)      |
|1135.0   |85    |4.4500995 |Ponette (1996)           |
+---------+------+----------+-------------------------+

