In [1]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import DataFrameNaFunctions as DFna
from pyspark.sql.functions import udf, col, when
import matplotlib.pyplot as plt
import pyspark as ps
import os, sys, requests, json

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import Row
import numpy as np
import math

In [1]:
spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("building recommender") \
            .getOrCreate() # create a spark session
            
sc = spark.sparkContext 

In [1]:
movies = spark.read.csv('movies.csv', header=True, quote='"', sep=",", inferSchema=True)
train_data = spark.read.csv('train.csv', header=True, quote='"', sep=",", inferSchema=True)
test_data = spark.read.csv('test.csv', header=True, quote='"', sep=",", inferSchema=True)

In [6]:
als_model = ALS(maxIter=num_iter, regParam=reg_param, rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating")
als_model1 = als.fit(train_data)
pred = als_model1.transform(test_data)

In [7]:
pred_cvals = pred.filter(col('prediction') != np.nan)
print(pred_cvals.count())

199987


In [8]:
eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_val = eval.evaluate(pred_cvals)
print(rmse_val)

0.6813346081445169


In [None]:
df_predictions = pred.toPandas()

mov_dat = pd.read_csv('movies.csv', usecols=[0,1])
mov_dict = pd.Series(mov_dat['title'].values,index=mov_dat['movieId']).to_dict()
users_list = df_predictions['userId'].unique().tolist()

df_predictions['predicted_movies'] = df_predictions.apply(lambda x: (x['movieId'], x['prediction']), axis=1)
pred_rec_df = df_predictions[['userId', 'predicted_movies']].groupby('userId')['predicted_movies'].apply(list).reset_index(name='recommendation')
pred_rec_df['recommendation'] = pred_rec_df['recommendation'].apply(lambda x: sorted(x, key=lambda tup: tup[1], reverse=True))
sorted_recs_dict = pd.Series(pred_rec_df['recommendation'].values,index=pred_rec_df['userId']).to_dict()

test_df = pd.read_csv('test.csv')
pred_df_t = test_df[['userId', 'movieId', 'rating']]
pred_df_t['predicted_movies'] = pred_df_t.apply(lambda x: (x['movieId'], x['rating']), axis=1)
pred_rec_df_t = pred_df_t[['userId', 'predicted_movies']].groupby('userId')['predicted_movies'].apply(list).reset_index(name='recommendation')
pred_rec_df_t['recommendation'] = pred_rec_df_t['recommendation'].apply(lambda x: sorted(x, key=lambda tup: tup[1], reverse=True))
recs_dict = pd.Series(pred_rec_df_t['recommendation'].values,index=pred_rec_df_t['userId']).to_dict()

In [8]:
def helper(user, n, p):
    if p==True:
        n_recs = recs_dict[user][:n]
    else:
        n_recs = recs_dict[user][:n]
    return [mov_dict[int(x[0])] for x in n_recs]

In [9]:
tp = 0
fp = 0
fn = 0
for user in users_list:
    preds1 = helper(user=user,n=10,p=True)
    rats1 = helper(user=user,n=10,p=False)
    tp = tp + len(list(set(preds1) & set(rats1)))
    fp = fp + len(list(set(preds1) - set(rats1)))
    fn = fn + len(list(set(rats1) - set(preds1)))
    
precision = tp/float(fp+tp)
recall = tp/float(fn+tp)
fscore = (2*precision*recall)/(precision+recall)

print(precision,recall, fscore)

0.6516847726017309 0.6516847726017309 0.6516847726017309
