# Movie Recommendation System 

In [None]:
# Data Citation 
# F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4, Article 19 (December 2015), 19 pages. DOI=http://dx.doi.org/10.1145/2827872

### Import the required packages and load data 

In [None]:
# Click here to create your ratings DataFrame
import ibmos2spark

In [None]:
print('Number of partitions for the movies DataFrame: {}'.format(movies.rdd.getNumPartitions()))
print('Number of partitions for the ratings DataFrame: {}'.format(ratings.rdd.getNumPartitions()))


### Create repartitioned ratings data

In [None]:
print('Number of ratings: {}'.format(repartitionedRatings.count()))

In [None]:
repartitionedRatings.cache()

In [None]:
print('Number of ratings: {}'.format(repartitionedRatings.count()))

In [None]:
movies.show(truncate=False)

In [None]:
movies.printSchema()

In [None]:
# Paste here your project token.It will look like this
from project_lib import Project
project = Project(sc, '******************', '******************')
pc = project.project_context

In [None]:
movies_file_name = 'movies.csv'
ratings_file_name = 'ratings.csv'

movies = spark.read.csv(project.get_file_url(movies_file_name), header=True, inferSchema=True).repartition(10).cache()
ratings = spark.read.csv(project.get_file_url(ratings_file_name), header=True, inferSchema=True).repartition(10).cache()

In [None]:
movies.printSchema()
ratings.printSchema()

In [None]:
ratings.describe().show()

In [None]:
print('Number of different users: {}'.format(ratings.select('userId').distinct().count()))
print('Number of different movies: {}'.format(ratings.select('movieId').distinct().count()))
print('Number of movies with at least one rating strictly higher than 4: {}'.format(ratings.filter('rating > 4').select('movieId').distinct().count()))

In [None]:
ratings.createOrReplaceTempView('ratings')
spark.sql('SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4').show()

In [None]:
ratings_url = project.get_file_url(ratings_file_name)
sql = 'SELECT * FROM csv.`' + ratings_url + '`'
spark.sql(sql).take(2)

In [None]:
import pandas as pd

ratings.toPandas().head()

### Visualize the Data

In [None]:
# there are many different ways to visualize the data 
# as this is not the focus of this project, only one method is shown 

import seaborn as sns
%matplotlib inline

ratingsPandas = ratings.toPandas()
sns.lmplot(x='userId', y='movieId', data=ratingsPandas, fit_reg=False);

### Building the Recommender System

In [None]:
spark.sql('''
    SELECT *, 100 * nb_ratings/matrix_size AS percentage
    FROM (
        SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
        FROM (
            SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
            FROM ratings
        )
    )
''').toPandas().head()

In [None]:
# Train the model
from pyspark.ml.recommendation import ALS

model = ALS(userCol='userId', itemCol='movieId', ratingCol='rating').fit(ratings)

In [None]:
# Run the Model 
predictions = model.transform(ratings)
predictions.toPandas().head()

In [None]:
# Evaluate the model 
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))

In [None]:
# Split the Dataset into train and test
(trainingRatings, testRatings) = ratings.randomSplit([80.0, 20.0])
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)

predictions.toPandas().head()

In [None]:
# Overcoming NaN results with average rating 
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.fill(avgRatings))))


In [None]:
# Exclude NaN
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))

In [None]:
def repeatALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='rmse'):
    evaluations = []
    for i in range(0, k):  
        (trainingSet, testingSet) = data.randomSplit([k - 1.0, 1.0])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
        evaluation = evaluator.evaluate(predictions.na.drop())
        print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
        evaluations.append(evaluation)
    return sum(evaluations) / float(len(evaluations))

In [None]:

print('RMSE = {}'.format(repeatALS(ratings, k=4)))

In [None]:

def kfoldALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='rmse'):
    evaluations = []
    weights = [1.0] * k
    splits = data.randomSplit(weights)
    for i in range(0, k):  
        testingSet = splits[i]
        trainingSet = spark.createDataFrame(sc.emptyRDD(), data.schema)
        for j in range(0, k):
            if i == j:
                continue
            else:
                trainingSet = trainingSet.union(splits[j])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
        evaluation = evaluator.evaluate(predictions.na.drop())
        print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
        evaluations.append(evaluation)
    return sum(evaluations) / float(len(evaluations))

In [None]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = appendedRatings.select('movieId').distinct().withColumn('userId', lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated =  appendedRatings.filter(appendedRatings.userId == user).select('movieId', 'userId')

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('movieId', 'prediction')

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join( movies, predictions.movieId ==  movies.movieId).select(predictions.movieId,  movies.title,  movies.genres, predictions.prediction)

#     recommendations.show(truncate=False)
    return dataSet, moviesAlreadyRated, predictions, recommendations

In [None]:
# Test prediction
appendedRatings = ratings

print('Recommendations for user 133:')
recommendMovies(model, 133, 10).toPandas()

### Create a New User

In [None]:
newUserID = int(ratingsPandas[['userId']].max()) +1
moviesPandas.sample(30)

In [None]:
number_array = list()
number = 10
print ('Enter numbers in array: ')
for i in range(number):
    n = input("MovieID :")
    number_array.append(int(n))
print ('Your Selected Movie IDs: ',number_array)

In [None]:
newUserID

In [None]:
columns = ['userId','movieId','rating','timestamp']

appendedRatings = ratings
from pyspark.sql.types import IntegerType
import random
for i in range(number):
    newRow = spark.createDataFrame([(newUserID,number_array[i],float(random.randint(1,5)),int(0))], schema=columns)
    df2 = newRow.withColumn("userId", newRow["userId"].cast(IntegerType()))
    df3 = df2.withColumn("movieId", newRow["movieId"].cast(IntegerType()))
    df4 = df3.withColumn("timestamp", newRow["timestamp"].cast(IntegerType()))

    appendedRatings = appendedRatings.union(df4)

appendedRatings.toPandas()

In [None]:

appendedRatings.printSchema()


In [None]:
a,b,c,d = recommendMovies(model, 610, 5)

In [None]:
a.toPandas()

In [None]:
b.select('movieId').distinct().toPandas()

In [None]:
c.toPandas()

In [None]:

d.toPandas()

In [None]:
w,x,y,z = recommendMovies(model, 611, 5)

In [None]:

w.toPandas()

In [None]:
x.toPandas()

In [None]:
y.toPandas()

In [None]:
ratings.printSchema()