# Rating some movies
#### To make recommendation for you, we are going to learn your taste by asking you to rate a few movies.

In [2]:
import sys
import os
from os import remove, removedirs
from os.path import dirname, join, isfile
from time import time

topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

parentDir = os.path.abspath('')
ratingsFile = join(parentDir, "personalRatings.txt")

if isfile(ratingsFile):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
        remove(ratingsFile)
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)

now = int(time())
n = 0

f = open(ratingsFile, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",")
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print("No rating provided!")


Looks like you've already rated the movies. Overwrite ratings (y/N)? y
Please rate the following movie (1-5 (best), or 0 if not seen): 
Toy Story (1995): `1
Please rate the following movie (1-5 (best), or 0 if not seen): 
Toy Story (1995): 1
Independence Day (a.k.a. ID4) (1996): 5
Dances with Wolves (1990): 3
Star Wars: Episode VI - Return of the Jedi (1983): 5
Mission: Impossible (1996): 5
Ace Ventura: Pet Detective (1994): 5
Die Hard: With a Vengeance (1995): 5
Batman Forever (1995): 5
Pretty Woman (1990): 1
Men in Black (1997): 5
Dumb & Dumber (1994): 2


# Solution Structure

In [3]:
#!/usr/bin/env python

# All the imports needed for the project
import sys
import itertools
import os
import math
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark.sql import SparkSession, Row
from pyspark.mllib.recommendation import ALS
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
# Methods need to parse data to a RDD
def parseMyRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp,
    for my movie ratings genereated from the txt file above.
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseMovieRatings(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp,
    for movie ratings generated from the ratings.dat file.
    """
    fields = line.strip().split("::")
    return int(fields[0]), int(fields[1]), float(fields[2])

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle,
    for movies generated from the movies.dat file.
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseMyRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

In [5]:
if __name__ == "__main__":

    # set up environment for spark
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

    sc = spark.sparkContext

21/10/07 12:42:16 WARN Utils: Your hostname, MSI resolves to a loopback address: 127.0.1.1; using 172.22.5.230 instead (on interface wifi0)
21/10/07 12:42:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/10/07 12:42:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
    # load personal ratings
    myRatings = loadRatings(os.path.abspath('./personalRatings.txt'))
    # Create an RDD of myRatings
    myRatingsRDD = sc.parallelize(myRatings)
    # Creates a DF with userID, movieID and movieRating
    myRatingsDF = myRatingsRDD.map(lambda line: Row(userId=line[0], movieId=line[1], movieRating=line[2])).toDF()
    
    # load ratings and movie titles
    # Directory of the file
    movieLensHomeDir = os.path.abspath('.')

    # Create an RDD of ratings
    ratingsRDD = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseMovieRatings)
    # Creats a DF with userID, movieID and movieRating
    ratingsDF = ratingsRDD.map(lambda line: Row(userId=line[0], movieId=line[1], movieRating=line[2])).toDF()

    # Create an RDD of movies     
    movies = sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie)
    # creates a DF of movies with movieID and movieName
    moviesDF = movies.map(lambda line: Row(movieId=line[0], movieName=line[1])).toDF()
    
    # Joins my ratings with all users ratings so data can be used in the algorithm     
    jointRatingsDF = ratingsDF.union(myRatingsDF)

                                                                                

In [7]:
    # your code here
    # Creates an ALS algorithm with 10 iterations, regParams of 0.1 with specifies the regulrization parameter,
    # rank 6 is the number of features to use, the three columns to use are userId, movieId, and movieRating for the
    # algorithm, coldStartStrategy drops any rows with a NaN value.
    # All code below is found on the Pyspark Library information
    als = ALS(maxIter=10, regParam=0.1, rank=6, userCol="userId",
              itemCol = "movieId", ratingCol="movieRating", coldStartStrategy = "drop")
    # Create a train and test data set splitting  the data 80/20% split respectively.
    # The jointRatingsDF is used as it has all the user ratings including my own
    train, test = jointRatingsDF.randomSplit([0.8, 0.2])
    
    # Training the Model using the train dataset
    alsModel = als.fit(train)
    # Generating Predictions using the test dataset
    prediction = alsModel.transform(test)
    
    # the evaluator is used to produce the cost function in this case Mean Squarred Error
    evaluator = RegressionEvaluator(metricName="mse", labelCol="movieRating",  predictionCol="prediction")
    # Mse is generated from the predictions data
    mse = evaluator.evaluate(prediction)
    # The mse is printed
    print(mse)

21/10/07 12:42:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/10/07 12:42:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/10/07 12:42:40 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
21/10/07 12:42:40 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK

0.7537907559876704


                                                                                

In [8]:
    # Generate recommend movies 5 movies for all users in the dataset
    recommended_movie_df = alsModel.recommendForAllUsers(5)
    # print the recommended movies for 5 users and the False signifys not to hide the data
    print(recommended_movie_df.show(5, False))
    # Filter the recommended movies df to only show movies for my user, user 0
    recommended_movie_df = recommended_movie_df[recommended_movie_df['userId']==0]
    # print the new updated recommended movies df with only my user
    print(recommended_movie_df.show(1, False))

                                                                                

+------+-------------------------------------------------------------------------------------------+
|userId|recommendations                                                                            |
+------+-------------------------------------------------------------------------------------------+
|1580  |[{572, 4.5322785}, {989, 4.3725634}, {1851, 4.3330235}, {557, 4.2620735}, {787, 4.22304}]  |
|4900  |[{572, 6.0900874}, {3236, 5.431673}, {1851, 5.3946605}, {3233, 5.386013}, {318, 5.2496533}]|
|5300  |[{557, 5.4200816}, {989, 5.304091}, {2309, 5.244132}, {787, 5.1481786}, {1420, 5.030448}]  |
|471   |[{2309, 4.734953}, {1780, 4.7279377}, {989, 4.7174015}, {3236, 4.710863}, {1851, 4.684157}]|
|1591  |[{572, 5.887376}, {557, 5.773006}, {989, 5.6165886}, {2562, 5.4845977}, {787, 5.3916078}]  |
+------+-------------------------------------------------------------------------------------------+
only showing top 5 rows

None


                                                                                

+------+-------------------------------------------------------------------------------------------+
|userId|recommendations                                                                            |
+------+-------------------------------------------------------------------------------------------+
|0     |[{3382, 8.316638}, {776, 6.8689036}, {2765, 6.730685}, {2197, 6.3872523}, {1793, 6.356622}]|
+------+-------------------------------------------------------------------------------------------+

None


In [9]:
    # Create a movies array which we can pass in movies that we recommended by the system
    moviesArray = []
    
    # for the column in the recommended movie df create a flat list using the first row, [0]
    for row in recommended_movie_df.select("recommendations").rdd.flatMap(list).collect()[0]:
        # Print the row to be able to see the data generated
        print(row)
        # Add movies to the array using the movieId and movie rating generated from the algorithm
        moviesArray.append((row['movieId'], row['rating']))
    # Generate a RDD using the movies Array and give it columns movieId and rating
    moviesGenerated = map(lambda x : Row(movieId=x[0], rating=x[1]), moviesArray)
    # Create a DF using the moviesGenerated and pass it in columns movieId and ratings
    moviesGeneratedDF=spark.createDataFrame(moviesGenerated, ["movieId"],["ratings"])

    # Print the new DF showing the movieId and rating
    print(moviesGeneratedDF.show())



Row(movieId=3382, rating=8.316637992858887)
Row(movieId=776, rating=6.868903636932373)
Row(movieId=2765, rating=6.730685234069824)
Row(movieId=2197, rating=6.387252330780029)
Row(movieId=1793, rating=6.356622219085693)
+-------+-----------------+
|movieId|           rating|
+-------+-----------------+
|   3382|8.316637992858887|
|    776|6.868903636932373|
|   2765|6.730685234069824|
|   2197|6.387252330780029|
|   1793|6.356622219085693|
+-------+-----------------+

None


                                                                                

In [10]:
    # Create an a new DF joining moviesDF with moviesGeneratedDF, this way we can get the name of the movies generated
    # Join the DFs on movieId using a left join
    recommendMovies = moviesGeneratedDF.join(moviesDF, moviesGeneratedDF.movieId == moviesDF.movieId, how='left')
    # Order the recommendMovies DF by rating so the highest rated is at the top and lowest at the bottom
    recommendMovies = recommendMovies.orderBy('rating', ascending=False)
    # Print out the new DF showing the result
    print(recommendMovies.show(5,False))



+-------+-----------------+-------+---------------------------+
|movieId|rating           |movieId|movieName                  |
+-------+-----------------+-------+---------------------------+
|3382   |8.316637992858887|3382   |Song of Freedom (1936)     |
|776    |6.868903636932373|776    |Babyfever (1994)           |
|2765   |6.730685234069824|2765   |Acid House, The (1998)     |
|2197   |6.387252330780029|2197   |Firelight (1997)           |
|1793   |6.356622219085693|1793   |Welcome to Woop-Woop (1997)|
+-------+-----------------+-------+---------------------------+

None




In [11]:
    # X is = 1 so we can use it in the formatting of the results
    x = 1
    # Loop through the recommendMovies using the movie Name
    for row in recommendMovies.select("movieName").rdd.flatMap(list).collect():
        # Create a temporary variable to format the string using the x and the movie name
        var = str(x) + ": "+ row
        # Finally print out the desired output in the form
        # 1: Song of Freedom (1936) for example
        print(var)
        # Increment x for the next movie
        x = x + 1



1: Song of Freedom (1936)
2: Babyfever (1994)
3: Acid House, The (1998)
4: Firelight (1997)
5: Welcome to Woop-Woop (1997)


                                                                                

In [12]:
    # clean up and stop the spark instance
    sc.stop()