In [46]:
%env SPARK_HOME = /home/conorstreete/CS4337/lib/python3.8/site-packages/pyspark

env: SPARK_HOME=/home/conorstreete/CS4337/lib/python3.8/site-packages/pyspark


In [47]:
#Importing required libraries that will be used throughout the project code

import findspark
import pyspark.sql.functions
import pyspark.sql.types
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt


In [48]:
#Initialises the spark enviroment variable
findspark.init()

In [49]:
#!/usr/bin/env python

import sys
import itertools
from math import sqrt
from operator import add
import os
from os.path import join, isfile, dirname

import findspark
import pyspark.sql.functions as F
import pyspark.sql.types

from pyspark.sql import SparkSession
from pyspark.mllib.recommendation import ALS

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split(",")
    return int(fields[0]), int(fields[1]), float(fields[2])

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split(",")
    return int(fields[0]), fields[1]

def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = list(filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f]))
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.rdd.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.rdd.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

if __name__ == "__main__":

    # set up environment
    # Build the SparkSession
    spark = SparkSession.builder \
    .master("local") \
    .appName("Project 1 CS4337") \
    .config("spark.executor.memory", "3gb") \
    .getOrCreate()

    sc = spark.sparkContext

    # load personal ratings
    myRatings = sc.textFile('Project1/personalratings.txt').map(parseRating)
    
    # load ratings and movie titles

    movieLensHomeDir = os.path.abspath('Project1/movieData')

    # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
    ratings = sc.textFile("Project1/ratings.dat").map(parseRating)

    # movies is an RDD of (movieId, movieTitle)
    movies = sc.textFile("Project1/movies.dat").map(parseMovie)

In [50]:
#Important to ensure that rarings are properly imported in the correct format
myRatings.take(10)


[(0, 1, 4.0),
 (0, 780, 2.0),
 (0, 590, 3.0),
 (0, 1210, 0.0),
 (0, 648, 4.0),
 (0, 344, 0.0),
 (0, 165, 5.0),
 (0, 153, 1.0),
 (0, 597, 2.0),
 (0, 1580, 3.0)]

In [51]:
# Import the necessary modules
from pyspark.sql import Row
# Map the RDD to a DF
movdf = movies.map(lambda line: Row(movie=line[0],
 title=line[1])).toDF()

In [52]:
from pyspark.sql.types import *
#Giving the movie dataframe its types and formatting it to be movie: int , title: string
movdf = movdf.withColumn("movie", movdf["movie"].cast(IntegerType())) \
.withColumn("title", movdf["title"].cast(StringType())) 


#Caching the dataframe for better performance
movdf.cache()
#Testing to make sure that the dataframe was proprely cached
assert movdf.is_cached

In [53]:
#Mapping RDD to a dataframe
ratdf = ratings.map(lambda line: Row(user=line[0],
 product=line[1],
 rating=line[2])).toDF()

In [54]:
#Assigning types and formatting the dataframe to be user: int, product: int, rating: float
ratdf = ratdf.withColumn("user", ratdf["user"].cast(IntegerType())) \
.withColumn("product", ratdf["product"].cast("integer")) \
.withColumn("rating", ratdf["rating"].cast("float")) 

#Caching data frame for better performance
ratdf.cache()
#Testing to make sure dataframe is propely cached
assert ratdf.is_cached

In [55]:
#Printing out data frame to double check its correct along with the schema
ratdf.show(truncate = False)
ratdf.printSchema()


movdf.show(truncate = False)
movdf.printSchema()


+----+-------+------+
|user|product|rating|
+----+-------+------+
|1   |1      |4.0   |
|1   |3      |4.0   |
|1   |6      |4.0   |
|1   |47     |5.0   |
|1   |50     |5.0   |
|1   |70     |3.0   |
|1   |101    |5.0   |
|1   |110    |4.0   |
|1   |151    |5.0   |
|1   |157    |5.0   |
|1   |163    |5.0   |
|1   |216    |5.0   |
|1   |223    |3.0   |
|1   |231    |5.0   |
|1   |235    |4.0   |
|1   |260    |5.0   |
|1   |296    |3.0   |
|1   |316    |3.0   |
|1   |333    |5.0   |
|1   |349    |4.0   |
+----+-------+------+
only showing top 20 rows

root
 |-- user: integer (nullable = true)
 |-- product: integer (nullable = true)
 |-- rating: float (nullable = true)

+-----+-------------------------------------+
|movie|title                                |
+-----+-------------------------------------+
|1    |Toy Story (1995)                     |
|2    |Jumanji (1995)                       |
|3    |Grumpier Old Men (1995)              |
|4    |Waiting to Exhale (1995)             |
|5  

In [56]:
#Converting my personal movies to a dataframe
myRatDf = myRatings.map(lambda line: Row(user=line[0],
 product=line[1],
 rating=line[2])).toDF()

In [57]:
#Assigning types and formatting the dataframe to be user: int, product: int, rating: float
myRatDf = myRatDf.withColumn("user", myRatDf["user"].cast(IntegerType())) \
.withColumn("product", myRatDf["product"].cast("integer")) \
.withColumn("rating", myRatDf["rating"].cast("float")) 

In [58]:
#Printing out dataframe to ensure it is correct along with schema
myRatDf.show(truncate = False)
myRatDf.printSchema()

+----+-------+------+
|user|product|rating|
+----+-------+------+
|0   |1      |4.0   |
|0   |780    |2.0   |
|0   |590    |3.0   |
|0   |1210   |0.0   |
|0   |648    |4.0   |
|0   |344    |0.0   |
|0   |165    |5.0   |
|0   |153    |1.0   |
|0   |597    |2.0   |
|0   |1580   |3.0   |
|0   |231    |5.0   |
+----+-------+------+

root
 |-- user: integer (nullable = true)
 |-- product: integer (nullable = true)
 |-- rating: float (nullable = true)



In [59]:
#Join the two ratings dataframes to incorporate your data along with the MovieLens dataset
ratdf = myRatDf.union(ratdf);

In [60]:
#Print out the newly joined ratings 
ratdf.show()

#Counts number of movies that are now in the dataset
movdf_count = movdf.count()
print("MOVIE COUNT:", movdf_count)

#Counts the number of ratings that are now in the dataset
ratdf_count = ratdf.count()
print("RATING COUNT:",ratdf_count)


+----+-------+------+
|user|product|rating|
+----+-------+------+
|   0|      1|   4.0|
|   0|    780|   2.0|
|   0|    590|   3.0|
|   0|   1210|   0.0|
|   0|    648|   4.0|
|   0|    344|   0.0|
|   0|    165|   5.0|
|   0|    153|   1.0|
|   0|    597|   2.0|
|   0|   1580|   3.0|
|   0|    231|   5.0|
|   1|      1|   4.0|
|   1|      3|   4.0|
|   1|      6|   4.0|
|   1|     47|   5.0|
|   1|     50|   5.0|
|   1|     70|   3.0|
|   1|    101|   5.0|
|   1|    110|   4.0|
|   1|    151|   5.0|
+----+-------+------+
only showing top 20 rows

MOVIE COUNT: 9742
RATING COUNT: 100847


In [61]:
#Counts the number of total users who have rated movies
rdf2 = ratdf.select("user")
print("Number of Users:")
print(rdf2.distinct().count())

Number of Users:
611


In [62]:
#Split the dataset randomly in order to obtain a random split of data for training, validation and testing
(training, validation, test) = ratdf.randomSplit([0.6, 0.2 , 0.2], seed=42)
#These are all cached to enchance performance
training.cache()
validation.cache()
test.cache()

DataFrame[user: int, product: int, rating: float]

In [63]:
#Printing out data to ensure they contain data
training.show(3)
validation.show(3)
test.show(3)


#prints out the data count so we can check that the data split how we wanted it to
print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training.count(), validation.count(), test.count())
)

+----+-------+------+
|user|product|rating|
+----+-------+------+
|   0|    153|   1.0|
|   0|    231|   5.0|
|   0|    590|   3.0|
+----+-------+------+
only showing top 3 rows

+----+-------+------+
|user|product|rating|
+----+-------+------+
|   0|      1|   4.0|
|   0|    344|   0.0|
|   0|   1210|   0.0|
+----+-------+------+
only showing top 3 rows

+----+-------+------+
|user|product|rating|
+----+-------+------+
|   0|    165|   5.0|
|   0|    597|   2.0|
|   0|    780|   2.0|
+----+-------+------+
only showing top 3 rows

Training: 60707, validation: 20048, test: 20092



In [64]:
#retrieves the movie name from its id
def name_retriever(movie_id):
    data = movdf.where((F.col("movie") == movie_id)).collect()
    for row in data:
     title =str(row['title'])

    return title
    

In [65]:
#importing ALS model library from pyspark machince learning library
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, regParam=0.1,seed=42, userCol= "user", 
                      itemCol = "product", ratingCol = "rating",coldStartStrategy = "drop")

In [66]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator
evaluator = RegressionEvaluator(predictionCol="prediction",metricName="rmse", labelCol="rating")

print(evaluator)

RegressionEvaluator_97adc0a1c74d


In [67]:


tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
#We will train the model over different ranks to ensure we find the best fit for the model and avoid under or underfitting
for rank in ranks:
  # Set the rank here:
  als.setRank(rank)
  # Create the model with out training data
  model = als.fit(training)
  # I run the model agaisnt the validation data for it to make a prediction.
  prediction = model.transform(validation)


#When training we want to see waht model has the lowest RMSE which allows us to know which model is most accurate
  error = evaluator.evaluate(prediction)
  errors[err] = error
  models[err] = model
  print ('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

#Sets model to the best found rank
als.setRank(ranks[best_rank])
print ('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

For rank 4 the RMSE is 0.9408491956703827
For rank 8 the RMSE is 0.930852903922386
For rank 12 the RMSE is 0.9280193252263161
The best model was trained with rank 12


In [68]:
num_of_movies = 10
#Using als built in function to create a recommendation for all users based off their own movie ratings
recommended_movie_df = my_model.recommendForAllUsers(num_of_movies)
#Selecting user 0 which is my user recommendations to be collected
recommendations = recommended_movie_df.where(recommended_movie_df.user == 0).collect()

print("recommendations for user : 0 ")


for i in range(0,num_of_movies):
  #retrieves movie id from the list
  recommendation = recommendations[0]['recommendations'][i][0]
  #runs movie id through a function that retrieves its name as a string
  movieName = name_retriever(recommendation)

  #prints out our movie name
  print(i+1, ":" + movieName)


recommendations for user : 0 
1 :Brick (2005)
2 :Mulholland Dr. (1999)
3 :On the Beach (1959)
4 :Chungking Express (Chung Hing sam lam) (1994)
5 :Ninja Scroll (Jûbei ninpûchô) (1995)
6 :Pierrot le fou (1965)
7 :Split (2017)
8 :Tadpole (2002)
9 :"Tao of Steve
10 :"Tinker
