In [1]:
# import necessary libraries
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
# instantiate SparkSession object
spark = SparkSession\
        .builder\
        .appName("ALSExample").config("spark.driver.host","localhost")\
        .getOrCreate()

In [3]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv("./data/ratings.csv",header = 'true' , inferSchema = 'true')

We aren't going to need the time stamp, so we can go ahead and remove that column.

In [4]:
movie_ratings = movie_ratings.drop("timestamp")

In [5]:
movie_ratings.head()

Row(userId=1, movieId=1, rating=4.0)

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
# split into training and testing sets
(train, test) = movie_ratings.randomSplit((0.8,0.2),42 )
print(train.count(), test.count())

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter = 5, 
          rank = 10, 
          userCol = 'userId', 
          itemCol = 'movieId', 
          ratingCol = 'rating', 
          seed = 42,
          coldStartStrategy = 'drop',
          regParam = 0.01)
# fit the ALS model to the training set
model = als.fit(train)

80796 20040


In [7]:
movie_titles = spark.read.csv('./data/movies.csv', header = True)

In [8]:
def name_retriever(movie_id,movie_title_df):
    title = movie_title_df.filter(f"movieId == {movie_id}").collect()[0][1]
    return title

In [9]:
def new_user_recs(user_id,new_ratings,rating_df,movie_title_df,num_recs):
    # turn the new_recommendations list into a spark DataFrame
    new_rates = spark.createDataFrame(new_ratings)
    # combine the new ratings df with the rating_df
    rating_df = rating_df.union(new_rates)
    
    # create an ALS model and fit it
    als = ALS(maxIter = 5,  
                userCol = 'userId', 
                itemCol = 'movieId', 
                ratingCol = 'rating', 
                seed = 42,
                coldStartStrategy = 'drop',
                rank = 50,
                regParam = 0.01)
    model = als.fit(rating_df)
    # make recommendations for all users using the recommendForAllUsers method
    recommendations = model.recommendForAllUsers(num_recs)
    user_recommends = recommendations.filter(recommendations.userId == user_id)
    recs_list = user_recommends.collect()[0]['recommendations']
    # get recommendations specifically for the new user that has been added to the DataFrame
    out_list = [name_retriever(row[0],movie_title_df) for row in recs_list]
    print("\n".join(out_list))
    return out_list
        

In [10]:
def movie_rater(movie_titles,num, genre=None, userId = 1000):
    if genre == None:
        genre = ""
    pool = movie_titles.filter(f"genres like '%{genre.title()}%'").toPandas()
    selection = pool.sample(num).reset_index(drop = True)
    ratings =[]
    i = 0
    while i <= max(selection.index):
        movie = selection.loc[i,:]
        print("")
        print(movie)
        rating = input("How do you rate this movie on a scale of 1-5, press n if you have not seen:")
        if rating.lower() == "n":
            selection = selection.append(pool.query("movieId not in @selection.movieId").sample(1), ignore_index = True)
        elif float(rating)<=5:
            ratings.append((int(userId),int(movie.movieId),float(rating)))
            #ratings.append({"userId" : userId,
            #                "movieId": movie.movieId,
            #                "rating":float(rating)})
        else:
            print("---------")
            print("Input incorrect, please try again")
            print("---------")
            i -=1
        i += 1
    return ratings

In [11]:
user_id = 100000
user_ratings_1 = movie_rater(movie_titles,
                             num = 5,
                             userId = user_id)
new_user_recs(user_id,
             new_ratings=user_ratings_1,
             rating_df=movie_ratings,
             movie_title_df=movie_titles,
             num_recs = 10)


movieId                   773
title      Touki Bouki (1973)
genres                  Drama
Name: 0, dtype: object
How do you rate this movie on a scale of 1-5, press n if you have not seen:2

movieId                  4617
title      Let It Ride (1989)
genres                 Comedy
Name: 1, dtype: object
How do you rate this movie on a scale of 1-5, press n if you have not seen:3

movieId        179813
title      LBJ (2017)
genres          Drama
Name: 2, dtype: object
How do you rate this movie on a scale of 1-5, press n if you have not seen:4

movieId                    6850
title      Leap of Faith (1992)
genres             Comedy|Drama
Name: 3, dtype: object
How do you rate this movie on a scale of 1-5, press n if you have not seen:54
---------
Input incorrect, please try again
---------

movieId                    6850
title      Leap of Faith (1992)
genres             Comedy|Drama
Name: 3, dtype: object
How do you rate this movie on a scale of 1-5, press n if you have not seen:4

m

['Morvern Callar (2002)',
 'Blood Simple (1984)',
 'Maltese Falcon, The (1941)',
 'Vertigo (1958)',
 'Short Cuts (1993)',
 'eXistenZ (1999)',
 'Hustler, The (1961)',
 'LBJ (2017)',
 'Leap of Faith (1992)',
 'Citizen Kane (1941)']