<b>Name: Jonathan Lawrence<br/>
Date: 11/8/19<br/>
Assignment: Exercise 11.2 - Movie Recommendation Engine

<b>Instruction</b>: In this exercise, you will create a movie recommendation engine from the MovieLens data. 

<b>Set up Spark</b>

In [1]:
# import packages
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import*
from pyspark import SparkConf
import os
import shutil
import time

# Spark configured with 16GB RAM
spark = SparkSession \
    .builder \
    .appName("MovieEngine") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

<b>Prepare Data</b>

In [2]:
ratings_path = "C:\\Users\\Jonathan\\Desktop\\Master's Stuff\\Master's Program\\DSC650-T301 Big Data\\Week 11\\movielens\\ratings.csv"
movies_path = "C:\\Users\\Jonathan\\Desktop\\Master's Stuff\\Master's Program\\DSC650-T301 Big Data\\Week 11\\movielens\\movies.csv"

# load Ratings
df_ratings = spark.read.load(
  ratings_path,
  format="csv",
  sep=",",
  inferSchema=True,
  header=True
)

# Load Movies
df_movies = spark.read.load(
  movies_path,
  format="csv",
  sep=",",
  inferSchema=True,
  header=True
)

print("Ratings:")
print(df_ratings.printSchema())
print("Movies:")
print(df_movies.printSchema())

Ratings:
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

None
Movies:
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

None


<b>Join the data</b>

In [3]:
# Change column name for 'Movies.movieId'
df_movies = (df_movies
   .withColumnRenamed('movieId','movieId_dup'))

# Join data
joinExpression = df_ratings.movieId == df_movies.movieId_dup
joinType = "left_outer"
joinedData = df_ratings.join(
    df_movies, joinExpression, joinType)

# Drop movieId_dup
joinedData = joinedData.drop("movieId_dup")

# Print the schema of the joined 
print(joinedData.printSchema())

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

None


<b>Create training and test data</b>

In [4]:
(training, test) = joinedData.randomSplit([0.8, 0.2])

<b>Collaborative Filtering (Root-mean-square error)</b>

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.0786617637784752


<b>Collaborative Filtering (Recommendations for Specific Users)</b>

In [31]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = joinedData.select(als.getUserCol()).distinct()
userSubsetRecs = model.recommendForUserSubset(users, 10)

# Generate top 10 user recommendations for a specified set of movies
#movies = joinedData.select(als.getItemCol()).distinct().limit(3)
#movieSubSetRecs = model.recommendForItemSubset(movies, 10)

<b>Get recommendations for specific users</b>

In [33]:
# userId 127
recommendations_127 = userSubsetRecs.filter(userSubsetRecs.userId == 127)
recommendations_127.show(truncate=False)

# userId 151
recommendations_151 = userSubsetRecs.filter(userSubsetRecs.userId == 151)
recommendations_151.show(truncate=False)

# userId 300
recommendations_300 = userSubsetRecs.filter(userSubsetRecs.userId == 300)
recommendations_300.show(truncate=False)

+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                           |
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|127   |[[95182, 10.242892], [4056, 8.484064], [6881, 8.097054], [83349, 8.088048], [2130, 8.01723], [2186, 7.9534407], [3675, 7.9384527], [1187, 7.9366918], [49274, 7.8469305], [194, 7.826082]]|
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

+------+-----------