In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics

import numpy as np
import pandas as pd
from itertools import islice
from collections import defaultdict

In [2]:
def parseline(line):
    fields = line.split(',')
    userId = fields[0]
    movieId = fields[1]
    rating = fields[2]
    return (userId, movieId, rating)

def loadMovieInfo():
    moviesNames = {}
    data_frame = pd.read_csv("res/sample/movies.csv")
    for index, row in data_frame.iterrows():
        moviesNames[int(row["movieId"])] = row["title"]
            
    return moviesNames

movieNames = loadMovieInfo()
    
# pyspark set-up
sc.setCheckpointDir('checkpoint')

# Build rating object for ALS 
print("\nLoading data...")

 
lines = sc.textFile("res/sample/ratings.csv")
parsedlines = lines.map(parseline)
header = parsedlines.first()

#filter out the header, make sure the rest looks correct
parsedlines = parsedlines.filter(lambda line: line != header)

ratings = parsedlines.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])) ).cache()
print(f"Ratings : \n{ratings.take(3)}")

test, train = ratings.randomSplit(weights=[0.2, 0.8], seed=1)
print(f"Ratings chosen: \n{test.take(3)}")

testset = test.map(lambda t: (t[0], t[1]))
print(f"Testset : \n{testset.take(3)}")

# Build the recommendation model using Alternating Least Squares
print("\nTraining recommendation model...")
rank = 5
numIterations = 20
model = ALS.train(train, rank, numIterations)

predictions = model.predictAll(testset).collect()
#print(f"Predictions : \n{predictions}")

embedding = model.userFeatures().collect()
print("\nDone!")


#output recommendations to file
prediction_with_movie_names = []

for userId,movieId,rating in predictions:
    prediction_with_movie_names.append(movieNames[movieId])

pred_df = pd.DataFrame(predictions)
pred_df['title'] = pd.Series(prediction_with_movie_names)
pred_df = pred_df[['user','product', 'title', 'rating']]
pred_df.to_csv("als_predictions.csv", index=False, header = True)


Loading data...
Ratings : 
[Rating(user=1, product=1, rating=4.0), Rating(user=1, product=3, rating=4.0), Rating(user=1, product=6, rating=4.0)]
Ratings chosen: 
[Rating(user=1, product=157, rating=5.0), Rating(user=1, product=216, rating=5.0), Rating(user=1, product=231, rating=5.0)]
Testset : 
[(1, 157), (1, 216), (1, 231)]

Training recommendation model...

Done!


In [3]:
true_rating = pd.DataFrame(test.collect()).sort_values(by=['user','rating'],
                                                       ascending=[True,False]).reset_index()['rating']

pred_df = pred_df.sort_values(by=['user','rating'],
                              ascending=[True,False]).reset_index()

pred_df['true'] = true_rating

pred_df = pred_df[['user','product','rating','true']]

In [4]:
def get_top_n(predictions, n=10):

    ini = defaultdict(list)
    top_n = defaultdict(list)
    
    for uid, iid, est in predictions:
        ini[uid].append((iid, est))
        
    for uid, iid, true in test.collect():
        for i in [x[0] for x in ini[uid]]:
            if iid == i:
                top_n[uid].append(ini[uid][[x[0] for x in ini[uid]].index(iid)]+(true,))
                
    #uid:[(iid,est),(iid,est)]
    # Then sort the predictions for each user and retrieve the k highest ones.
    for uid, user_ratings in top_n.items():
        user_ratings.sort(key=lambda x: x[1], reverse=True)
        top_n[uid] = user_ratings#[:n]
    return top_n

top_n = get_top_n(predictions, n=10)
users_est = defaultdict(list)
users_true=defaultdict(list)

for uid, user_ratings in top_n.items():
    users_est[uid].append([est for (_, est,_) in user_ratings])
    users_true[uid].append([true_r for (_,_,true_r) in user_ratings])

In [5]:
def ndcg(y_true, y_pred, k=None, powered=False):
    def dcg(scores, k=None, powered=False):
        if k is None:
            k = scores.shape[0]
        if not powered:
            ret = scores[0]
            for i in range(1, k):
                ret += scores[i] / np.log2(i + 1)
            return ret
        else:
            ret = 0
            for i in range(k):
                ret += (2 ** scores[i] - 1) / np.log2(i + 2)
            return ret
    
    ideal_sorted_scores = np.sort(y_true)[::-1]
    ideal_dcg_score = dcg(ideal_sorted_scores, k=k, powered=powered)
    
    pred_sorted_ind = np.argsort(y_pred)[::-1]
    pred_sorted_scores = y_true[pred_sorted_ind]
    dcg_score = dcg(pred_sorted_scores, k=k, powered=powered)
    
    return dcg_score / ideal_dcg_score

def ndcg1(y_true, y_pred, k=None):
    return ndcg(y_true, y_pred, k=k, powered=False)

def ndcg2(y_true, y_pred, k=None):
    return ndcg(y_true, y_pred, k=k, powered=True)

In [6]:
ndcg_list=[]
for uid in top_n:
    
    for i in users_true[uid]:
        y_true=np.asarray(i)#.reshape(-1,1)
    
    for i in users_est[uid]:
        y_pred=np.asarray(i)#.reshape(-1,1)
        ndcg_list.append(ndcg1(y_true, y_pred, k=None))
        
ndcg_list = [i for i in ndcg_list if str(i) != 'nan']
print("\nNDCG(10) = %s" % str(sum(ndcg_list)/len(ndcg_list)))


NDCG(10) = 0.9478173654283814


In [8]:
# MAP
threshold = 8
pred_df['relevant'] = np.where(pred_df['true'] > threshold, 1,0)
pred_df['recommended'] = np.where(pred_df['rating'] > threshold, 1,0)
def MAP_k(mydf, k=5):
    
    AP = 0.0
    for i in mydf['user'].unique():
        #print("user is", i)
        user_df = mydf[mydf['user'] == i]
        user_df.sort_values('rating', axis=0, inplace=True, ascending=False)
        top_N_items = user_df['recommended'].values[:k+1]
       
        #print("top items", top_N_items )
        p_list = np.empty((0,k), int)
        for j in range(len(top_N_items)):
            l = user_df['recommended'].values[:j+1]
            val = np.sum(l)/len(l)
            p_list = np.append(p_list, val)
       
        #print("List is",p_list)
        sum_val = sum(p_list * top_N_items)
        if(sum(user_df['relevant'] >0)):
           AP = AP + sum_val/sum(user_df['relevant'])
        
   MAP = AP/mydf['user'].nunique()
   return MAP

MAP_k(pred_df)

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 26)

In [None]:
print("\nCalculating Test RMSE...")
testData = test.map(lambda p: (p.user, p.product))
predictions = model.predictAll(testData).map(lambda r: ((r.user, r.product), r.rating))
ratingsTuple = test.map(lambda r: ((r.user, r.product), r.rating))
scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])

metrics_rating = RegressionMetrics(scoreAndLabels)
print(f"\nTest RMSE = {metrics_rating.rootMeanSquaredError}")
print(f"\nTest MAE = {metrics_rating.meanAbsoluteError}")
