In [25]:
# Rating some movies
#### To make recommendation for you, we are going to learn your taste by asking you to rate a few movies.

In [26]:
import sys
import os
from time import time
from os.path import join, isfile, dirname

topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

parentDir = os.path.abspath('')
ratingsFile = join(parentDir, "personalRatings.txt")

if isfile(ratingsFile):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
        os.remove(ratingsFile)
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)

now = int(time())
n = 0

f = open(ratingsFile, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",")
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print("No rating provided!")


Looks like you've already rated the movies. Overwrite ratings (y/N)? n


SystemExit: 

# Solution Structure

In [None]:
#!/usr/bin/env python

import sys
import os
import itertools
import numpy as np
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, desc
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.linalg import DenseVector

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1], fields[2]

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

if __name__ == "__main__":

    # set up environment
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
    sc = spark.sparkContext

    # personal ratings
    myRatings = sc.textFile(os.path.abspath('./personalRatings.txt')).map(parseRating) 
    myRatingsDF = myRatings.map(lambda line: Row(userID=line[1][0], movieID=line[1][1], 
                                                 rating=line[1][2], timestamp=line[0])).toDF()
    
    # load ratings and movie titles
    movieLensHomeDir = os.path.abspath('.')
    
    # movies is an RDD of (movieId, movieTitle)
    movies = sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie)
    moviesDF = movies.map(lambda line: Row(movieID=line[0], movieTitle=line[1], genre=line[2])).toDF()
    # moviesDF.describe() -- example of how I explored the Data
    
    # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
    ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
    ratingsDF = ratings.map(lambda line: Row(userID=line[1][0], movieID=line[1][1], rating=line[1][2],
                            timestamp=line[0])).toDF()
    
    #join the ratings DF with my ratings to get the full set of ratings
    combinedRatings = ratingsDF.union(myRatingsDF)

In [7]:
    """
    This code block deals with the standardisation of the ratings data.
    I use a standard scaler from the pyspark ml library
    The output of the scaling will go into the rating_scaled column as a DenseVector
    """
    
    df = combinedRatings.rdd.map(lambda line: Row(userID=line[0], movieID=line[1], 
                                                          rating=DenseVector(line[2:]), timestamp=line[3])).toDF()

    standardScaler = StandardScaler(inputCol="rating", outputCol="rating_scaled")
    scaler = standardScaler.fit(df)
    scaledRatingsDF = scaler.transform(df)

                                                                                

In [8]:
    #format the DF so that the ratings column is out of the Dense Vector
    standardisedRatingsDF = scaledRatingsDF.rdd.map(lambda line: Row(userID=line[0], movieID=line[1], 
                                                          rating=float(line[4][0]), timestamp=line[3])).toDF()

[Stage 97:>                                                         (0 + 1) / 1]                                                                                

In [9]:
    '''
    The following lines define the parameters for the ALS
    I found that max iterations 10 is the best so that the computation time is short
    The reg param is set to 0.1 because a big number would cause inaccurate results
    whereas a number that is too small would make the als never converge. 0.1 works well.
    I experimented with different ranks, 6 worked the best - gave me the best MSE
    Nonnegative is true because all the ratings are not negative
    Implicit prefs is false because we are working with explicit data
    '''
    als = ALS(
            maxIter=10,
            regParam=0.1,
            rank=6,
            userCol="userID",
            itemCol = "movieID",
            ratingCol = "rating",
            nonnegative = True,
            implicitPrefs = False,
            coldStartStrategy = "drop"
    )
    
    #split the full list of ratings into a training set and test set
    #The training set is 80% of the data, the test  set is 20%
    (trainDF, testDF) = standardisedRatingsDF.randomSplit([0.8, 0.2])

    #fit the als to the training set
    model = als.fit(trainDF)
    
    #this is the prediction DataFrame made with the als model perfomed on the training set
    predictionDF = model.transform(testDF)
    
    evaluate = RegressionEvaluator(metricName="mse", labelCol="rating",  predictionCol="prediction")
    MSE = evaluate.evaluate(predictionDF)
    print('MSE: ', MSE)

    #get only the ratings with the userID: 0
    myUser = combinedRatings.filter(combinedRatings['userID'] == 0)
    
    #recommend 5 movies for just the one user - saves on computation time!
    recommendMoviesDF = model.recommendForUserSubset(myUser, 5)
    
    #select the recommendations column
    myRecommendedMovies = recommendMoviesDF.select("recommendations")
    

                                                                                

MSE:  0.6221704222725032


In [10]:
    
    movieIDs = []
    
    #loop through the 'recommendations' column in recommended movies
    #and apped the tuple: movieID, rating to the list of movieIDs
    for movie in myRecommendedMovies.first()['recommendations']:
        movieData = (movie['movieID'], movie['rating'])
        movieIDs.append(movieData)
    
    #DF of my recommended movies
    myMoviesDF = spark.createDataFrame(data=movieIDs, schema = ["movieID", "rating"])
    

                                                                                

In [None]:

    #join the movies DataFrame with my recommended movies
    moviesJoined = myMoviesDF.join(moviesDF, on="movieID", how="left")
    
    #order the joined DF by rating descending
    moviesJoined = moviesJoined.orderBy(col("rating").desc())
    
    #extract just the movie title from the resulting DF
    moviesText = moviesJoined.select("movieTitle").rdd.flatMap(list).collect()
    
    #print recommended movies
    print("Movies recommended for you:")
    for index, movie in enumerate(moviesText):
        print(index+1, ": ", movie)
        

In [None]:
    # clean up
    sc.stop()