In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col


# start spark session
spark = SparkSession.builder.appName('rec-eval').getOrCreate()

# load data
rdd_orig   = spark.read.option("header", True).csv('movies.csv').rdd
ratingsRDD = rdd_orig.map(lambda p: Row(userId=int(p[2]), movieId=int(p[0]),
                                     rating=int(p[1])))
ratings    = spark.createDataFrame(ratingsRDD)

# load trained model
als        = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model_path = 'rec-model-v01/'
model      = CrossValidatorModel.read().load(model_path)

# store data of original ratings by select users
user1             = 10
user2             = 14
user1_ratings     = []
user2_ratings     = []
rdd_user1_ratings = ratingsRDD.filter(lambda row: row[0]==user1)
rdd_user2_ratings = ratingsRDD.filter(lambda row: row[0]==user2)

for row in rdd_user1_ratings.collect():
    user1_ratings.append(row[1])
for row in rdd_user2_ratings.collect():
    user2_ratings.append(row[1])

# make top n predictions for the select users
top_n_pred   = 15
total_movies = 100
users_all    = ratings.select(als.getUserCol()).distinct()
user1_id     = users_all.filter((col("userId") == user1))
user2_id     = users_all.filter((col("userId") == user2))
user1_pred   = []
user2_pred   = []

movie_recomm_user1 = model.bestModel.recommendForUserSubset(user1_id, total_movies).rdd
movie_recomm_user1 = movie_recomm_user1.collect()[0][1]
movie_recomm_user2 = model.bestModel.recommendForUserSubset(user2_id, total_movies).rdd
movie_recomm_user2 = movie_recomm_user2.collect()[0][1] 

count = 0
for item in movie_recomm_user1:
    if item[0] not in user1_ratings:
        user1_pred.append(item[0])
        count +=1
    if count == top_n_pred:
        break
count = 0
for item in movie_recomm_user2:
    if item[0] not in user2_ratings:
        user2_pred.append(item[0])
        count +=1
    if count == top_n_pred:
        break

print(f'The top {top_n_pred} movie recommendations for user id {user1} is {user1_pred}')
print(f'The top {top_n_pred} movie recommendations for user id {user2} is {user2_pred}')

The top 15 movie recommendations for user id 10 is [92, 12, 19, 71, 81, 91, 34, 46, 93, 32, 95, 82, 64, 65, 87]
The top 15 movie recommendations for user id 14 is [43, 85, 58, 90, 2, 41, 70, 30, 60, 77, 87, 61, 18, 74, 75]


In [11]:
movie_rec.collect()[0][1]

[Row(movieId=40, rating=3.125819206237793),
 Row(movieId=49, rating=2.659262180328369),
 Row(movieId=25, rating=2.648585796356201),
 Row(movieId=89, rating=2.539316177368164),
 Row(movieId=92, rating=2.4682955741882324),
 Row(movieId=42, rating=2.3969507217407227),
 Row(movieId=12, rating=2.2252213954925537),
 Row(movieId=2, rating=2.0579333305358887),
 Row(movieId=0, rating=2.0548501014709473),
 Row(movieId=19, rating=2.0487143993377686),
 Row(movieId=71, rating=2.0419516563415527),
 Row(movieId=81, rating=2.0155396461486816),
 Row(movieId=41, rating=1.9518482685089111),
 Row(movieId=91, rating=1.909127116203308),
 Row(movieId=34, rating=1.9004818201065063),
 Row(movieId=55, rating=1.891852617263794),
 Row(movieId=46, rating=1.814581036567688),
 Row(movieId=93, rating=1.7712585926055908),
 Row(movieId=67, rating=1.7593706846237183),
 Row(movieId=32, rating=1.7496992349624634),
 Row(movieId=16, rating=1.7348004579544067),
 Row(movieId=13, rating=1.698664903640747),
 Row(movieId=95, rat