# Rating some movies
#### To make recommendation for you, we are going to learn your taste by asking you to rate a few movies.

In [1]:
# Set path variable for SPARK_HOME：
%env SPARK_HOME = /home/aono/CS4337/lib/python3.8/site-packages/pyspark 

env: SPARK_HOME=/home/aono/CS4337/lib/python3.8/site-packages/pyspark


In [11]:
import sys
import os
from time import time
from os.path import join, isfile, dirname

# Movies List
topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

# Project Path
parentDir = os.path.abspath('/home/aono/CS4337/Project1/')
ratingsFile = join(parentDir, "personalRatings.txt")

# Start 
if isfile(ratingsFile):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":  # Y / y is ok
        remove(ratingsFile)
    else:
        sys.exit()

# the tip
prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)

# get current time
now = int(time())
# numbers of rated movies
n = 0

f = open(ratingsFile, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",") # separate what is needed
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)  # can not input a number less than 0 or greater than 5 !!
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now)) # ok, we can storage it
                n += 1
f.close() # stop and get the personalRatings

# if you do not enter any number
if n == 0:
    print("No rating provided!")


Please rate the following movie (1-5 (best), or 0 if not seen): 
Toy Story (1995): 1
Independence Day (a.k.a. ID4) (1996): 2
Dances with Wolves (1990): 3
Star Wars: Episode VI - Return of the Jedi (1983): 4
Mission: Impossible (1996): 5
Ace Ventura: Pet Detective (1994): 1
Die Hard: With a Vengeance (1995): 2
Batman Forever (1995): 3
Pretty Woman (1990): 4
Men in Black (1997): 5
Dumb & Dumber (1994): 1


# Solution Structure

In [7]:
#!/usr/bin/env python

import os
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.recommendation import ALS # ml
from pyspark.sql import Row

    # parse---

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

    # load---
    
def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

    # Compute RMSE (Root Mean Squared Error)---
    
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

if __name__ == "__main__":

    # set up environment
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
    sc = spark.sparkContext

    # load personal ratings
    myRatings = loadRatings(os.path.abspath('/home/aono/CS4337/Project1/personalRatings.txt')) #Original：'/home/ashish/personalRatings.txt'
    myRatingsRDD = sc.parallelize(myRatings, 1)
    # (personal comments)easy to see and know what personalRatingsRDD is
    print("personalRatingsRDD:") 
    print(myRatingsRDD.take(20))
    print("\n")
   
    
    # load ratings and movie titles

    movieLensHomeDir = os.path.abspath('/home/aono/CS4337/Project1/movielens/medium') #Original: '/home/ashish/movieData'

    # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
    ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
    # (personal comments)easy to see and know what RatingsRDD is
    print("RatingsRDD:") 
    print(ratings.take(20))
    print("\n")

    # movies is an RDD of (movieId, movieTitle)
    movie = sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie)
    # (personal comments)easy to see and know what moviesRDD is
    print("moviesRDD:") 
    print(movie.take(20))
    print("\n")
    movies = dict(movie.collect())
    # Original code: movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())

    
    
    ##########################################################################################################
    # my code here
    ##########################################################################################################
    ############################### PART2--MACHINE LEARNING ##################################################
    ##########################################################################################################
    
    
    # (personal comments) 
    # create RDDs and DataFrames
    # DataFrames is helpful to read and operate!
    # --RatingsRDD and DataFrame--
    r1 = lambda line: Row(userID = line[1][0], movieID = line[1][1], rating = line[1][2]) # This is 2-D. Be careful!
    Ratings_df = ratings.map(r1).toDF()
    Ratings_df.show(5)
    # (personal comments) 
    # maxIter is the maximum number of iterations to run (defaults to 10).
    # regParam specifies the regularization parameter in ALS (defaults to 1.0)
    
    
    
    # --personalRatingsRDD and DataFrame--
    r2 = lambda line: Row(userID = line[0], movieID = line[1], rating = line[2])
    personalRatings_df = myRatingsRDD.map(r2).toDF()
    personalRatings_df.show(5)
    
    
    # (personal comments) 80% is training sets, 20% is testing sets
    (training, test) = Ratings_df.randomSplit([.8,.2], seed = 3500)
    
    
    # (personal comments) add the personalRatings data (for training)!
    training = training.union(personalRatings_df)

    
    
    
    # Build the recommendation model using ALS on the training data
    # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
    
    # (personal comments) use ALS model (important!)
    als = ALS(maxIter = 20, regParam = 0.06, userCol="userID", itemCol="movieID", ratingCol="rating", coldStartStrategy="drop")
    model = als.fit(training)
    
    
    
    # Evaluate the model by computing the RMSE on the test data
    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    # show the RMSE
    print("Root-mean-square error = " + str(rmse))
    print("\n")
    
    
    
    """
    # Generate top 10 movie recommendations for each user
    userRecs = model.recommendForAllUsers(10)
    # Generate top 10 user recommendations for each movie
    movieRecs = model.recommendForAllItems(10)
    
    # Generate top 10 movie recommendations for a specified set of users
    users = Ratings_df.select(als.getUserCol()).distinct().limit(3)
    userSubsetRecs = model.recommendForUserSubset(users, 10)
    # Generate top 10 user recommendations for a specified set of movies
    movies = Ratings_df.select(als.getItemCol()).distinct().limit(3)
    movieSubSetRecs = model.recommendForItemSubset(movies, 10)
    
    
    # userRecs.show()
    # movieRecs.show()
    userSubsetRecs.show()
    movieSubSetRecs.show()
    """
    
    
    # output the recommented result
    print("5 Movies recommended for you:")
    users = Ratings_df.select(als.getUserCol()).distinct().limit(1)
    userSubsetRecs = model.recommendForUserSubset(users, 5)
    userSubsetRecs.show()
    
    
    # print(userSubsetRecs[1][2])
    
    
    spark.stop()
    
    
    # clean up
    sc.stop() #close


personalRatingsRDD:
[(0, 1, 1.0), (0, 780, 2.0), (0, 590, 3.0), (0, 1210, 4.0), (0, 648, 5.0), (0, 344, 1.0), (0, 165, 2.0), (0, 153, 3.0), (0, 597, 4.0), (0, 1580, 5.0), (0, 231, 1.0)]


RatingsRDD:
[(0, (1, 1193, 5.0)), (9, (1, 661, 3.0)), (8, (1, 914, 3.0)), (5, (1, 3408, 4.0)), (1, (1, 2355, 5.0)), (8, (1, 1197, 3.0)), (9, (1, 1287, 5.0)), (9, (1, 2804, 5.0)), (8, (1, 594, 4.0)), (8, (1, 919, 4.0)), (8, (1, 595, 5.0)), (2, (1, 938, 4.0)), (1, (1, 2398, 4.0)), (4, (1, 2918, 4.0)), (3, (1, 1035, 5.0)), (8, (1, 2791, 4.0)), (8, (1, 2687, 3.0)), (7, (1, 2018, 4.0)), (3, (1, 3105, 5.0)), (9, (1, 2797, 4.0))]


moviesRDD:
[(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)'), (3, 'Grumpier Old Men (1995)'), (4, 'Waiting to Exhale (1995)'), (5, 'Father of the Bride Part II (1995)'), (6, 'Heat (1995)'), (7, 'Sabrina (1995)'), (8, 'Tom and Huck (1995)'), (9, 'Sudden Death (1995)'), (10, 'GoldenEye (1995)'), (11, 'American President, The (1995)'), (12, 'Dracula: Dead and Loving It (1995)'), (13, '

                                                                                

Root-mean-square error = 0.8562701636251344


5 Movies recommended for you:


                                                                                

+------+--------------------+
|userID|     recommendations|
+------+--------------------+
|    26|[{2129, 5.0129795...|
+------+--------------------+

