For running the Matrix Factorization model, we make use of pyspark environment which has a built-in ALS function. All the other methods are user defined

Importing the required packages 

In [10]:
import pandas as pd
import scipy
import numpy as np
from sklearn.metrics import mean_absolute_error
from datetime import datetime
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
import time
from sklearn.metrics import mean_squared_error
import math

In [11]:
data=pd.read_csv("../data/merged_data.csv")

In [13]:
subset_data_2=data

In [14]:
user_ids = subset_data_2.user_id.unique()
user_dict = dict(zip(user_ids, range(len(user_ids))))
subset_data_2['user_id_int']=subset_data_2.user_id.map(user_dict)

business_ids = subset_data_2.business_id.unique()
business_dict = dict(zip(business_ids, range(len(business_ids))))
subset_data_2['business_id_int']=subset_data_2.business_id.map(business_dict)

In [15]:
test_dataset = subset_data_2[subset_data_2.groupby('user_id')['date'].transform('max') == subset_data_2['date']]
print("test dataset created")
train_dataset = pd.concat([subset_data_2, test_dataset]).drop_duplicates(keep=False)
print("train dataset created")

test dataset created
train dataset created


In [16]:
sparse_1=pd.DataFrame(pd.Series(subset_data_2.user_id).value_counts().reset_index(drop=True))
sparsity=((subset_data_2.user_id.nunique()*subset_data_2.business_id.nunique())-sum(sparse_1.user_id))/(subset_data_2.user_id.nunique()*subset_data_2.business_id.nunique())
sparsity

0.9978637414297792

In [17]:
#Cross validation setup
def cross_validation_setup(ratings,n1):
    (training,tune) = ratings.randomSplit([n1,1-n1],seed = 42)
    return(training,tune)

The cross_validation_setup method is used for cross validation. This function divides the ratings dataframe into training, tune and test set. We will use the training set to train the model and tune set to tune the hyperparameters. Finally, with the best parameters, we will evaluate our model on the test set

In [18]:
 #Evaluation metric setup
def accuracy(predictions,metric):
    evaluator = RegressionEvaluator(metricName=metric, labelCol="rating",
                                predictionCol="prediction")
    return(evaluator.evaluate(predictions))

The accuracy method is used for evaluating the model by passing the predictions dataframe with the actual and predicted columns along with the desired metric (eg, rmse or mae)

In [19]:
ratings = spark.createDataFrame(train_dataset[['user_id_int','business_id_int','rating']])
(training,tune) = cross_validation_setup(ratings,0.8)
test=spark.createDataFrame(test_dataset[['user_id_int','business_id_int','rating']])

In [20]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.05, userCol="user_id_int", itemCol="business_id_int", ratingCol="rating",
          coldStartStrategy="drop",rank=10)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the training data
predictions_training = model.transform(training)
rmse = accuracy(predictions_training,"rmse")
print("Root-mean-square error for training data = " + str(rmse))
mae = accuracy(predictions_training,"mae")
print("Mean-Absolute error for training data = " + str(mae))

Root-mean-square error for training data = 0.5789969309878112
Mean-Absolute error for training data = 0.40622480648294007


For testing purposes, we check if pyspark ALS is working fine by passing arbitrary parameters to the ALS function and fitting the model on the training dataset. Since we get an rmse and mae value, we are good to go ahead and tune our hyperparameters.

In [21]:
#baseline model using surprise BaseLine
from surprise.model_selection import cross_validate
from surprise import BaselineOnly
from surprise import accuracy
from surprise import Reader, Dataset
from surprise.model_selection import train_test_split

training_new=training.toPandas()
test_new=test.toPandas()

algo = BaselineOnly()
reader = Reader()
data_train = Dataset.load_from_df(training_new, reader)
data_test = Dataset.load_from_df(test_new, reader)

trainset = data_train.build_full_trainset()
algo.fit(trainset)



trainset, testset = train_test_split(data_test, test_size=1.0)
predictions = algo.test(testset)


accuracy.rmse(predictions)
accuracy.mae(predictions)

Estimating biases using als...
RMSE: 1.2385
MAE:  1.0021


1.0020554974924238

The best way to check if our recommendation algorithm is performing well is to compare it against a baseline model. A baseline model is one in which we don't use any predictions. It can be assumed as a lazy model to give ratings to the missing user-item pairs. The first baseline model we considered is the user-item bias which is given by the ALS model. The above code computes the baseline metric for the tune set

In [22]:
training_df=training.toPandas()
test_df=test.toPandas()
temp_df=pd.DataFrame(training_df.groupby('user_id_int')['rating'].mean()).reset_index()
temp_df=temp_df.rename(columns={"rating": "avg_rating_user"})
temp_df_2=pd.DataFrame(training_df.groupby('business_id_int')['rating'].mean()).reset_index()
temp_df_2=temp_df_2.rename(columns={"rating": "avg_rating_business"})
test_df=test_df.merge(temp_df,how='inner',on='user_id_int')
test_df=test_df.merge(temp_df_2,how='inner',on='business_id_int')
test_df['prediction']=(test_df['avg_rating_user']+test_df['avg_rating_business'])/2
baseline_avg_rmse=math.sqrt(mean_squared_error(test_df['rating'],test_df['prediction']))
baseline_avg_mae=math.sqrt(mean_absolute_error(test_df['rating'],test_df['prediction']))
print('Baseline RMSE:',baseline_avg_rmse)
print('Baseline MAE:',baseline_avg_mae)

Baseline RMSE: 1.2503016998650578
Baseline MAE: 1.0070540501592762


We also considered creating another baseline model which gives a missing user-item pair, a rating which is calculated using the average of 2 things:
1) Average rating that particular user has given
2) Average rating that particular movie has received

In [23]:
#Parameter tuning
def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    start_time = time.time()
    min_error = float('inf')
    min_mae= float('inf')
    best_rank = -1
    best_regularization = regParams[0]
    best_model = None
    l1=[]
    l2=[]
    l3=[]
    l4=[]
    for rank in ranks:
        for reg in regParams:
                    # get ALS model
            als = ALS(userCol="user_id_int", itemCol="business_id_int", ratingCol="rating",
          coldStartStrategy="drop").setMaxIter(maxIter).setRank(rank).setRegParam(reg)
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="rating",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            evaluator2 = RegressionEvaluator(metricName="mae",
                                            labelCol="rating",
                                            predictionCol="prediction")
            mae = evaluator2.evaluate(predictions)            
            l1.append(rmse)
            l2.append(mae)
            if (rmse < min_error): #and (mae < min_mae):
                min_error = rmse
                min_mae= mae
                best_rank = rank
                best_model = model
            l3.append(rank)
            l4.append(reg)
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return (l1,l2,l3,l4,time.time()-start_time)

The above method is used for training the hyperparameters for our ALS model. It takes in the training data, tune data and a set of range of hyperparameter values to return the best model, i.e. one having the least rmse

In [24]:
param_tune=tune_ALS(training,tune,5,[0.01,0.05,0.1],[10,20,30,40,50,60,70])


The best model has 10 latent factors and regularization = 0.01


In [25]:
best_rank=param_tune[2][param_tune[0].index(min(param_tune[0]))]
best_reg=param_tune[3][param_tune[0].index(min(param_tune[0]))]

After trying the range of hyperparameters for reularization and latent factors, we plot the model performance (rmse/mae) with respect to these parameters. Since there are two hyperparameters, we fix the regularization parameter and plot for different values of latent factors

In [26]:
#Evaluation metric setup
def accuracy(predictions,metric):
    evaluator = RegressionEvaluator(metricName=metric, labelCol="rating",
                                predictionCol="prediction")
    return(evaluator.evaluate(predictions))

# Build the recommendation model using ALS on the test data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.1, userCol="user_id_int", itemCol="business_id_int", ratingCol="rating",
          coldStartStrategy="drop",rank=70)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions_test = model.transform(test)
predictions_test_pandas=predictions_test.toPandas()
predictions_test_pandas['prediction']=np.where(predictions_test_pandas['prediction']>5,5,predictions_test_pandas['prediction'])
predictions_test=spark.createDataFrame(predictions_test_pandas)
rmse = accuracy(predictions_test,"rmse")
print("Root-mean-square error for test data = " + str(rmse))
mae = accuracy(predictions_test,"mae")
print("Mean-Absolute error for test data = " + str(mae))

Root-mean-square error for test data = 1.6460970389877598
Mean-Absolute error for test data = 1.3834931867984956


After getting the best hyperparameter values from grid search, we use it to finally train the model on our training set and finally validate our model performance on the test set to report the results


Once we have our model, we'll now use it to recommend movies to the users. The following codes will lead to the recommendation of top k movies to each user in our dataset. Using the recommendForUserSubset Function we recommend the movies which a particular user has not already watched. We use this function to recommend for training and test set separately

In [27]:
userRecs_training = model.recommendForUserSubset(training,100)
recommend_movies_1_training=userRecs_training.toPandas()
userRecs_test = model.recommendForUserSubset(test,100)
recommend_movies_1_test=userRecs_test.toPandas()

In [28]:
#giving top k recommendation for each user 
def top_k_recommendation(recommend_movies_1,subset_data_2):
    list1=[]
    list2=[]
    for i in range(0,recommend_movies_1.shape[0]):
        list3=[]
        list4=[]
        for j in range(0,100):
            if(len(list3)==10):
                break
            user_i_ratings = list(subset_data_2.loc[subset_data_2['user_id_int'] == recommend_movies_1['user_id_int'][i]]
                                  [['business_id_int']]['business_id_int'])
            if (recommend_movies_1['recommendations'][i][j][0] in user_i_ratings):
                continue
            else:
                list3.append(recommend_movies_1['recommendations'][i][j][0])
                list4.append(recommend_movies_1['recommendations'][i][j][1])
        list1.append(list3)
        list2.append(list4)
    return(list1,list2)
recommend_training=top_k_recommendation(recommend_movies_1_training,subset_data_2)
recommend_test=top_k_recommendation(recommend_movies_1_test,subset_data_2)

In [29]:
recommend_movies_2_train=recommend_movies_1_training[['user_id_int']]
recommend_movies_2_train['recommended_movies']=recommend_training[0]
recommend_movies_2_train['predicted_ratings']=recommend_training[1]

recommend_movies_2_test=recommend_movies_1_test[['user_id_int']]
recommend_movies_2_test['recommended_movies']=recommend_test[0]
recommend_movies_2_test['predicted_ratings']=recommend_test[1]

The user_coverage method calculates the user coverage from our model. This is the proportion of users for which atleast k movies can be recommended well. First, we define what a good recommendation is and fix a value of k to calculate the coverage value

In [30]:
#User Coverage
def user_coverage(data,k,threshold):
    sum=0
    for i in range(0,data.shape[0]):
        if((np.array(data['predicted_ratings'][i])>threshold).sum()>k):
            sum+=1
    user_coverage=sum/subset_data_2['user_id_int'].nunique()
    return(user_coverage)
coverage_1_train=user_coverage(recommend_movies_2_train,5,3.5)
coverage_1_test=user_coverage(recommend_movies_2_test,5,3.5)
print('User coverage on training set:',coverage_1_train)
print('User coverage on test set:',coverage_1_test)

User coverage on training set: 0.852929292929293
User coverage on test set: 0.852929292929293


The catalogue_coverage method calculates the movies coverage from our model. This is the proportion of movies covered in the recommendation for all the users. In other words, the fraction of items that are in the top-k for at least 1 user.

In [31]:
#Catalogue Coverage
def catalogue_coverage(predicted, catalog):
    predicted_flattened = [p for sublist in predicted for p in sublist]
    unique_predictions = len(set(predicted_flattened))
    prediction_coverage = round(unique_predictions/(len(catalog)* 1.0)*100,2)
    return prediction_coverage
coverage_2_train=catalogue_coverage(list(recommend_movies_2_train['recommended_movies']),list(subset_data_2['business_id_int'].unique()))
coverage_2_test=catalogue_coverage(list(recommend_movies_2_test['recommended_movies']),list(subset_data_2['business_id_int'].unique()))
print('Catalogue coverage on training set:',coverage_2_train)
print('Catalogue coverage on test set:',coverage_2_test)

Catalogue coverage on training set: 53.12
Catalogue coverage on test set: 53.12


In [40]:
user_counts=subset_data_2['user_id_int'].value_counts()
less_prolific_users = user_counts.loc[user_counts <= 5].index.tolist()
test_data_less_prolific=test_dataset[(test_dataset.user_id_int.isin(less_prolific_users))]

In [43]:
test_less_prolific=spark.createDataFrame(test_data_less_prolific[['user_id_int','business_id_int','rating']])
predictions_test = model.transform(test_less_prolific)
predictions_test_pandas=predictions_test.toPandas()
predictions_test_pandas['prediction']=np.where(predictions_test_pandas['prediction']>5,5,predictions_test_pandas['prediction'])
predictions_test=spark.createDataFrame(predictions_test_pandas)
rmse = accuracy(predictions_test,"rmse")
print("Root-mean-square error for less prolific users = " + str(rmse))
mae = accuracy(predictions_test,"mae")
print("Mean-Absolute error for less prolific users = " + str(mae))

Root-mean-square error for less prolific users = 1.8902777956386978
Mean-Absolute error for less prolific users = 1.6142431840676013


In [56]:
business_counts=subset_data_2['business_id_int'].value_counts()
less_popular_business = business_counts.loc[business_counts <= 100].index.tolist()
test_data_less_popular=test_dataset[(test_dataset.business_id_int.isin(less_popular_business))]

In [57]:
test_less_popular=spark.createDataFrame(test_data_less_popular[['user_id_int','business_id_int','rating']])
predictions_test = model.transform(test_less_popular)
predictions_test_pandas=predictions_test.toPandas()
predictions_test_pandas['prediction']=np.where(predictions_test_pandas['prediction']>5,5,predictions_test_pandas['prediction'])
predictions_test=spark.createDataFrame(predictions_test_pandas)
rmse = accuracy(predictions_test,"rmse")
print("Root-mean-square error for less popular business = " + str(rmse))
mae = accuracy(predictions_test,"mae")
print("Mean-Absolute error for less popular business = " + str(mae))

Root-mean-square error for less popular business = 1.6700578022970134
Mean-Absolute error for less popular business = 1.3805810743454285
