In [2]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrameNaFunctions as DFna
from pyspark.sql.functions import udf, col, when
import matplotlib.pyplot as plt
import pyspark as ps
import os, sys, requests, json


spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("building recommender") \
            .getOrCreate() # create a spark session
            
sc = spark.sparkContext 

In [3]:
movies = spark.read.csv('movies.csv', header=True, quote='"', sep=",", inferSchema=True)
train = spark.read.csv('train.csv', header=True, quote='"', sep=",", inferSchema=True)
test = spark.read.csv('test.csv', header=True, quote='"', sep=",", inferSchema=True)

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import Row
import numpy as np
import math

In [5]:
num_iter = 15
reg_param = 0.01
rank = 6

In [6]:
als = ALS(maxIter=num_iter, regParam=reg_param, rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating")

In [7]:
model = als.fit(train)

In [8]:
predictions = model.transform(test)

In [9]:
predictions.show(10)

+------+------+-------+------+----------+----------+
|   _c0|userId|movieId|rating| timestamp|prediction|
+------+------+-------+------+----------+----------+
|139490|  4169|    148|     3| 976588402|  3.093684|
| 52709|  1605|    148|     2| 974930221| 2.2287078|
| 14433|   482|    148|     2| 976219954| 2.0649722|
| 12626|   424|    148|     4|1027003224| 2.6177723|
| 83347|  2507|    148|     4| 974082717| 3.5133855|
|102208|  3151|    463|     5| 968916009| 3.3380716|
|  9540|   319|    463|     2| 976424451| 2.5281706|
| 19753|   660|    463|     3| 975690189| 2.4163146|
| 90157|  2777|    463|     3| 973125221| 2.8812928|
| 22950|   746|    463|     1| 975470754|  2.036701|
+------+------+-------+------+----------+----------+
only showing top 10 rows



In [17]:
preds = predictions.filter(col('prediction') != np.nan)
print(preds.count())

199987


In [18]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [19]:
rmse = evaluator.evaluate(preds)
print(rmse)

0.8721785094950305


In [21]:
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")

In [22]:
mae = evaluator_mae.evaluate(preds)
print(mae)

0.6813346081445169


In [23]:
import pandas as pd

In [24]:
pred_df = predictions.toPandas()

In [25]:
movies_df = pd.read_csv('movies.csv', usecols=[0,1])
movies_dict = pd.Series(movies_df['title'].values,index=movies_df['movieId']).to_dict()
users_list = pred_df['userId'].unique().tolist()

In [28]:
pred_df['predicted_movies'] = pred_df.apply(lambda x: (x['movieId'], x['prediction']), axis=1)
pred_rec_df = pred_df[['userId', 'predicted_movies']].groupby('userId')['predicted_movies'].apply(list).reset_index(name='recommendation')
pred_rec_df['recommendation'] = pred_rec_df['recommendation'].apply(lambda x: sorted(x, key=lambda tup: tup[1], reverse=True))
sorted_recs_dict = pd.Series(pred_rec_df['recommendation'].values,index=pred_rec_df['userId']).to_dict()

In [31]:
test_df = pd.read_csv('test.csv')
pred_df_t = test_df[['userId', 'movieId', 'rating']]
pred_df_t['predicted_movies'] = pred_df_t.apply(lambda x: (x['movieId'], x['rating']), axis=1)
pred_rec_df_t = pred_df_t[['userId', 'predicted_movies']].groupby('userId')['predicted_movies'].apply(list).reset_index(name='recommendation')
pred_rec_df_t['recommendation'] = pred_rec_df_t['recommendation'].apply(lambda x: sorted(x, key=lambda tup: tup[1], reverse=True))
sorted_recs_dict_t = pd.Series(pred_rec_df_t['recommendation'].values,index=pred_rec_df_t['userId']).to_dict()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pred_df_t['predicted_movies'] = pred_df_t.apply(lambda x: (x['movieId'], x['rating']), axis=1)


In [33]:
def n_rec(user, n, p):
    if p==True:
        n_recs = sorted_recs_dict[user][:n]
    else:
        n_recs = sorted_recs_dict_t[user][:n]
    return [movies_dict[int(x[0])] for x in n_recs]

In [34]:
tp = 0
fp = 0
fn = 0
for user in users_list:
    pred_recs = n_rec(user=user,n=10,p=True)
    rat_recs = n_rec(user=user,n=10,p=False)
    tp = tp + len(list(set(pred_recs) & set(rat_recs)))
    fp = fp + len(list(set(pred_recs) - set(rat_recs)))
    fn = fn + len(list(set(rat_recs) - set(pred_recs)))
precision = tp/float(fp+tp)
recall = tp/float(fn+tp)
fscore = (2*precision*recall)/(precision+recall)

print(precision,recall, fscore)

0.6516847726017309 0.6516847726017309 0.6516847726017309
