## Evaluate the Performance of the Recommender System
### Import All Needed Packages

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Import Packages for Spark MLlib
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from sklearn.metrics import ndcg_score
from pyspark.ml.recommendation import ALS
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import explode
from sklearn.metrics import ndcg_score
import pickle
import random
sc = SparkContext('local')
spark = SparkSession(sc)

### Read Main Dataset

In [None]:
df = pd.read_csv("usersha1-artmbid-artname-plays.tsv", sep='\t', header = None)
df.columns = ['userId', 'artistId', 'artist', 'plays']

### Read Dictionary of Selected Users with Dominant Music Preferences

In [None]:
with open('selected_user.pkl', 'rb') as f:
    selected_user = pickle.load(f)
selected_user_list = list(selected_user.keys())

In [None]:
# Group users depending on their music preferences
ir = [user for user in selected_user_list if selected_user[user] == 'I&R']
er = [user for user in selected_user_list if selected_user[user] == 'E&R']
rc = [user for user in selected_user_list if selected_user[user] == 'R&C']
uc = [user for user in selected_user_list if selected_user[user] == 'U&C']

In [None]:
# Sample user with same proportions in each music preference group
k = 14500
sample_user_ir = random.sample(ir, k)
sample_user_er = random.sample(er, k)
sample_user_rc = random.sample(rc, k)
sample_user_uc = random.sample(uc, k)

In [None]:
# Total sample user
sample_user = sample_user_ir + sample_user_er + sample_user_rc + sample_user_uc

In [None]:
# Read back the selected users from the previous pipeline for recommendation system evaluation
data = df[df['userId'].isin(sample_user)]

In [None]:
print("The remaining number of records: ", len(data))
print("The remaining number of users: ", len(data['userId'].unique()))
print("The remaining number of artists: ", len(data['artistId'].unique()))

In [None]:
# Map userId and artistId to integers (assign integers to replace the userId)
userId_address = data['userId'].unique()
userId_dict = dict(zip(userId_address, range(len(userId_address))))
data.loc[:, 'userId'] = data.loc[:, 'userId'].map(userId_dict)


artistId_address = data['artistId'].unique()
artistId_dict = dict(zip(artistId_address, range(len(artistId_address))))
data.loc[:, 'artistId'] = data.loc[:, 'artistId'].map(artistId_dict)
# Drop the column of artist for the modeling 
data = data.drop(['artist'], axis=1)

## Apply Spark ALS model on the dataset

In [None]:
# Split the testing data into training and testing
training_df = data.groupby("userId").sample(frac=0.8)
test_df = data.drop(training_df.index)

In [None]:
# Split Dataset into training and testing 
training = spark.createDataFrame(training_df)
test = spark.createDataFrame(test_df)

In [None]:
# General Model with default parameters 
als = ALS(userCol="userId", itemCol="artistId", ratingCol="plays",
          implicitPrefs=True)
model = als.fit(training)
predictions = model.transform(test)
pred_df = predictions.toPandas()

In [None]:
pred_df = predictions.toPandas()
pred_df = pred_df.dropna()

### Use NDCG to evaluate the performance among different groups of users

In [None]:
# Drop the NAN given by Spark and drop out the rows with less than 5 records for each user 
t = pred_df.groupby("userId").count()
drop_userid = list(t[t['plays'] < 5].index)
pred_df = pred_df[-pred_df['userId'].isin(drop_userid)]
user = list(pred_df['userId'].unique())

In [None]:
# Assign relevance score to artists for each user
relevance = [1.0, 0.8, 0.6, 0.4, 0.2]
all_individual_ndcg = {}
for u in user:
    dictionary = {}
    record = pred_df[pred_df['userId'] == u]
    actual_rank = record.sort_values("plays", ascending = False)
    actual_rank_artist = list(actual_rank['artistId'])
    prediction_rank = record.sort_values("prediction", ascending = False)
    score = 1.0
    n = len(actual_rank_artist)
    for i in range(n):
        if i > 4:
            dictionary[actual_rank_artist[i]] = 0
        else:
            dictionary[actual_rank_artist[i]] = relevance[i]
    score_list = np.asarray([prediction_rank['prediction'][0:5]])
    artist_list = list(prediction_rank['artistId'][0:5])
    relevance_list = []
    for artist in artist_list:
        relevance_list.append(dictionary.get(artist))
    relevance_list = np.asarray([relevance_list])
    individual_score = ndcg_score(relevance_list, score_list)
    all_individual_ndcg[u] = individual_score

In [None]:
# Map the user id back to its original string
new_userid_dict = {}
for k, v in userId_dict.items():
    new_userid_dict[v] = k

In [None]:
set(selected_user.values())

In [None]:
# Calculate the NDCG score for different groups of users depending on their music preferences
music_preference = ['I&R', 'E&R', 'R&C', 'U&C']
ndcg_dict = dict.fromkeys(music_preference, 0)
num_dict = dict.fromkeys(music_preference, 0)
for u in user:
    user_string = new_userid_dict[u]
    group = selected_user[user_string]
    ndcg_dict[group] += all_individual_ndcg[u]
    num_dict[group] += 1

# NDCG score for each group
ndcg_score = []
for group in ndcg_dict.keys():
    ndcg_total = ndcg_dict[group]
    num_total = num_dict[group]
    ndcg_i = ndcg_total/num_total
    ndcg_score.append(ndcg_i)
print(ndcg_score)