<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>

<i>Licensed under the MIT License.</i>

# Collaborative Filtering Recommendation Algorithm Comparison

This illustrative comparison applies to collaborative filtering algorithms available in this repository such as Spark ALS, Surprise SVD and SAR. These algorithms are usable in a variety of recommendation tasks, including product or news recommendations. 

The main purpose of this notebook is not to produce comprehensive benchmarking results on multiple datasets. Rather, it is intended to illustrate on how one could evaluate different recommender algorithms using tools in this repository.

## Experimentation setup:
* Objective
  * To compare how each collaborative filtering algorithm perform in predicting ratings and recommending relevant items.
* Environment
  * The comparison is run on a [Azure Data Science Virtual Machine](https://azure.microsoft.com/en-us/services/virtual-machines/data-science-virtual-machines/). 
  * The virtual machine size is Standard NC6s_v2 (6 vcpus, 112 GB memory).
  * It should be noted that the single node DSVM is not supposed to run scalable benchmarking analysis. Either scaling up or out the computing instances is necessary to run the benchmarking in an run-time efficient way without any memory issue.
* Datasets
  * [Movielens 100K](https://grouplens.org/datasets/movielens/100k/).
  * [Movielens 1M](https://grouplens.org/datasets/movielens/1m/).
* Data split
  * The data is split into train and test sets.
  * The split ratios are 75-25 for train and test datasets.
  * The splitting is random. 
* Model training
  * A recommendation model is trained by using each of the collaborative filtering algorithms. 
  * Empirical parameter values reported [here](http://mymedialite.net/examples/datasets.html) are used in this notebook.  More exhaustive hyper parameter tuning would be required to further optimize results.
* Evaluation metrics
  * Ranking metrics:
    * Precision@k.
    * Recall@k.
    * Normalized discounted cumulative gain@k (NDCG@k).
    * Mean-average-precision (MAP). 
    * In the evaluation metrics above, k = 10. 
  * Rating metrics:
    * Root mean squared error (RMSE).
    * Mean average error (MAE).
    * R squared.
    * Explained variance.
  * Run time performance
    * Elapsed for training a model and using a model for predicting/recommending k items. 
    * The time may vary across different machines. 

## 0 Global settings

In [20]:
import sys
sys.path.append("../../")
import os
import json
import shutil
import tempfile
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import numpy as np
import seaborn as sns
import papermill as pm
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
import torch
import fastai
from fastai.collab import EmbeddingDotBias, collab_learner, CollabDataBunch, load_learner
import tensorflow as tf
import surprise

from reco_utils.common.python_utils import get_number_processors
from reco_utils.common.timer import Timer
from reco_utils.common.gpu_utils import get_cuda_version, get_cudnn_version
from reco_utils.common.spark_utils import start_or_get_spark
from reco_utils.dataset import movielens
from reco_utils.dataset.sparse import AffinityMatrix
from reco_utils.dataset.python_splitters import python_chrono_split
from reco_utils.recommender.sar.sar_singlenode import SARSingleNode
from reco_utils.recommender.ncf.ncf_singlenode import NCF
from reco_utils.recommender.ncf.dataset import Dataset as NCFDataset
from reco_utils.recommender.rbm.rbm import RBM
from reco_utils.recommender.surprise.surprise_utils import surprise_trainset_to_df
from reco_utils.recommender.fastai.fastai_utils import hide_fastai_progress_bar, cartesian_product, score
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from reco_utils.evaluation.python_evaluation import map_at_k, ndcg_at_k, precision_at_k, recall_at_k
from reco_utils.evaluation.python_evaluation import rmse, mae, rsquared, exp_var

print("System version: {}".format(sys.version))
print("Pandas version: {}".format(pd.__version__))
print("PySpark version: {}".format(pyspark.__version__))
print("Surprise version: {}".format(surprise.__version__))
print("PyTorch version: {}".format(torch.__version__))
print("Fast AI version: {}".format(fastai.__version__))
print("Tensorflow version: {}".format(tf.__version__))
print("CUDA version: {}".format(get_cuda_version()))
print("CuDNN version: {}".format(get_cudnn_version()))
n_cores = get_number_processors()
print("Number of cores: {}".format(n_cores))

%load_ext autoreload
%autoreload 2

System version: 3.6.8 |Anaconda, Inc.| (default, Dec 30 2018, 01:22:34) 
[GCC 7.3.0]
Pandas version: 0.24.1
PySpark version: 2.3.1
Surprise version: 1.0.6
PyTorch version: 1.0.0
Fast AI version: 1.0.45
Tensorflow version: 1.12.0
CUDA version: CUDA Version 9.1.85
CuDNN version: 7.0.5
Number of cores: 24
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
%env PYSPARK_PYTHON=/home/miguel/anaconda/envs/reco_full/bin/python
%env PYSPARK_DRIVER_PYTHON=/home/miguel/anaconda/envs/reco_full/bin/python

#%env PYSPARK_PYTHON=/anaconda/envs/reco_full/bin/python
#%env PYSPARK_DRIVER_PYTHON=/anaconda/envs/reco_full/bin/python

env: PYSPARK_PYTHON=/home/miguel/anaconda/envs/reco_full/bin/python
env: PYSPARK_DRIVER_PYTHON=/home/miguel/anaconda/envs/reco_full/bin/python


In [17]:
# top k items to recommend
TOP_K = 10

# Model parameters
EPOCHS_CPU = 30
EPOCHS_PYSPARK = 1#15
EPOCHS_GPU = 5
USER_COL = "UserId"
ITEM_COL = "MovieId"
RATING_COL = "Rating"
TIMESTAMP_COL = "Timestamp"
PREDICTION_COL = "prediction"
SEED = 77

In [4]:
# Hide fastai progress bar
hide_fastai_progress_bar()

In [5]:
# fix random seeds to make sure out runs are reproducible
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

In [6]:
environments = {
    "als": "pyspark",
    "sar_single_node": "python_cpu",
    "svd": "python_cpu",
    "fastai": "python_gpu",
    "ncf": "python_gpu",
    "rbm": "python_gpu"
}

metrics = {
    "als": ["rating", "ranking"],
    "sar_single_node": ["ranking"],
    "svd": ["rating", "ranking"],
    "fastai": ["rating", "ranking"],
    "ncf": ["ranking"],
    "rbm": ["ranking"]
}

In [7]:
als_params = {
    "rank": 10,
    "maxIter": EPOCHS_PYSPARK,
    "implicitPrefs": False,
    "alpha": 0.1,
    "regParam": 0.05,
    "coldStartStrategy": "drop",
    "nonnegative": False,
    "userCol": USER_COL,
    "itemCol": ITEM_COL,
    "ratingCol": RATING_COL,
}

sar_single_node_params = {
    "remove_seen": True,
    "similarity_type": "jaccard",
    "time_decay_coefficient": 30,
    "time_now": None,
    "timedecay_formula": True,
    "col_user": USER_COL,
    "col_item": ITEM_COL,
    "col_rating": RATING_COL,
    "col_timestamp": TIMESTAMP_COL,
}

svd_params = {
    "n_factors": 200,
    "n_epochs": EPOCHS_CPU,
    "lr_all": 0.005,
    "reg_all": 0.02,
    "random_state": SEED,
    "verbose": False
}

fastai_params = {
    "n_factors": 40, 
    "y_range": [0,5.5], 
    "wd": 1e-1,
    "max_lr": 5e-3,
    "epochs": EPOCHS_GPU
}

ncf_params = {
    "model_type": "NeuMF",
    "n_factors": 4,
    "layer_sizes": [16,8,4],
    "n_epochs": EPOCHS_GPU,
    "batch_size": 1024,
    "learning_rate": 1e-3,
    "verbose": 10
}

rbm_params = {
    "hidden_units": 600, 
    "training_epoch": EPOCHS_GPU,
    "minibatch_size": 60, 
    "keep_prob": 0.9,
    "with_metrics": False
}

params = {
    "als": als_params,
    "sar_single_node": sar_single_node_params,
    "svd": svd_params,
    "fastai": fastai_params,
    "ncf": ncf_params,
    "rbm": rbm_params
}

In [8]:
def prepare_training_als(train):
    schema = StructType(
    (
        StructField(USER_COL, IntegerType()),
        StructField(ITEM_COL, IntegerType()),
        StructField(RATING_COL, FloatType()),
        StructField(TIMESTAMP_COL, LongType()),
    )
    )
    spark = start_or_get_spark()
    return spark.createDataFrame(train, schema)

def prepare_training_svd(train):
    reader = surprise.Reader('ml-100k', rating_scale=(1, 5))
    return surprise.Dataset.load_from_df(train.drop(TIMESTAMP_COL, axis=1), reader=reader).build_full_trainset()

def prepare_training_fastai(train):
    data = train.copy()
    data[USER_COL] = data[USER_COL].astype('str')
    data[ITEM_COL] = data[ITEM_COL].astype('str')
    data = CollabDataBunch.from_df(data, user_name=USER_COL, item_name=ITEM_COL, rating_name=RATING_COL)
    return data

def prepare_training_ncf(train):
    data = NCFDataset(train=train, 
                      col_user=USER_COL,
                      col_item=ITEM_COL,
                      col_rating=RATING_COL,
                      col_timestamp=TIMESTAMP_COL,
                      seed=SEED)
    return data

def prepare_training_rbm(train):
    header = {
        "col_user": USER_COL,
        "col_item": ITEM_COL,
        "col_rating": RATING_COL,
    }
    train_copy = train.copy()
    train_copy.loc[:, RATING_COL] = train_copy[RATING_COL].astype(np.int32)
    aff_train = AffinityMatrix(train_copy, **header)
    return aff_train 

prepare_training_data = {
    "als": prepare_training_als,
    "svd": prepare_training_svd,
    "fastai": prepare_training_fastai,
    "ncf": prepare_training_ncf,
    "rbm": prepare_training_rbm
} 

In [9]:
def prepare_rating_als(test):
    schema = StructType(
    (
        StructField(USER_COL, IntegerType()),
        StructField(ITEM_COL, IntegerType()),
        StructField(RATING_COL, FloatType()),
        StructField(TIMESTAMP_COL, LongType()),
    )
    )
    spark = start_or_get_spark()
    return spark.createDataFrame(test, schema)

def prepare_rating_rbm(test):
    header = {
        "col_user": USER_COL,
        "col_item": ITEM_COL,
        "col_rating": RATING_COL,
    }
    test_copy = test.copy()
    test_copy.loc[:, RATING_COL] = test_copy[RATING_COL].astype(np.int32)
    aff_test = AffinityMatrix(test_copy, **header)
    return  aff_test


prepare_rating_data = {
    "als": prepare_rating_als,
    "rbm": prepare_rating_rbm,
} 

In [10]:



prepare_ranking_data = {
    "als": lambda train, test: prepare_test_als(df_train, df_test),
    "svd": lambda train, test: prepare_test_svd(df_train, df_test),
    "fastai": lambda train, test: prepare_test_fastai(df_train, df_test),
    "ncf": lambda train, test: prepare_test_ncf(df_train, df_test),
    "rbm": lambda train, test: prepare_test_rbm(df_train, df_test)
} 

In [11]:
def train_als(params, data):
    symbol = ALS(**params)
    with Timer() as t:
        model = symbol.fit(data)
    return model, t

def train_svd(params, data):
    model = surprise.SVD(**params)
    with Timer() as t:
        model.fit(data)
    return model, t

def train_fastai(params, data):
    model = collab_learner(data, 
                           n_factors=params["n_factors"],
                           y_range=params["y_range"],
                           wd=params["wd"]
                          )
    with Timer() as t:
        model.fit_one_cycle(cyc_len=params["epochs"], max_lr=params["max_lr"])
    return model, t

def train_sar_single_node(params, data):
    model = SARSingleNode(**params)
    model.set_index(data)    
    with Timer() as t:
        model.fit(data)
    return model, t
    
def train_ncf(params, data):
    model = NCF(n_users=data.n_users, n_items=data.n_items, **params)
    with Timer() as t:
        model.fit(data)
    return model, t
    
def train_rbm(params, data):
    model = RBM(**params)
    train = data.gen_affinity_matrix()
    with Timer() as t:
        model.fit(train, None)
    return model, t
    
trainer = {
    "als": lambda params, data: train_als(params, data),
    "svd": lambda params, data: train_svd(params, data),
    "sar_single_node": lambda params, data: train_sar_single_node(params, data), 
    "fastai": lambda params, data: train_fastai(params, data),
    "ncf": lambda params, data: train_ncf(params, data),
    "rbm": lambda params, data: train_rbm(params, data) 
}

Regression predictions

In [12]:
def predict_als(model, test):
    with Timer() as t:
        preds = model.transform(test)
    return preds, t


def predict_svd(model, test):
    with Timer() as t:
        preds = [model.predict(row[USER_COL], row[ITEM_COL], row[RATING_COL])
                       for (_, row) in test.iterrows()]
        preds = pd.DataFrame(preds)
        preds = preds.rename(index=str, columns={'uid': USER_COL, 
                                                 'iid': ITEM_COL,
                                                 'est': PREDICTION_COL})
        preds = preds.drop(['details', 'r_ui'], axis='columns')
    return preds, t

    
def predict_fastai(model, test):
    with Timer() as t:
        preds = score(model, 
                      test_df=test, 
                      user_col=USER_COL, 
                      item_col=ITEM_COL, 
                      prediction_col=PREDICTION_COL)
    return preds, t


rating_predictor = {
    "als": lambda model, test: predict_als(model, test),
    "svd": lambda model, test: predict_svd(model, test),
    "fastai": lambda model, test: predict_fastai(model, test),
}

Ranking predictions

In [25]:
def recommend_k_als(model, test, train):
    with Timer() as t:
        # Get the cross join of all user-item pairs and score them.
        users = train.select(USER_COL).distinct()
        items = train.select(ITEM_COL).distinct()
        user_item = users.crossJoin(items)
        dfs_pred = model.transform(user_item)

        # Remove seen items.
        dfs_pred_exclude_train = dfs_pred.alias("pred").join(
            train.alias("train"),
            (dfs_pred[USER_COL] == train[USER_COL]) & (dfs_pred[ITEM_COL] == train[ITEM_COL]),
            how='outer'
        )
        top_k_scores = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train." + RATING_COL].isNull()) \
            .select('pred.' + USER_COL, 'pred.' + ITEM_COL, 'pred.' + PREDICTION_COL)

        # In Spark, transformations are lazy evaluation
        # Use an action to force execute and measure the test time 
        #top_k_scores.cache().count()
    return top_k_scores, t
    

def recommend_k_sar_single_node(model, test, train):
    with Timer() as t:
        top_k_scores = model.recommend_k_items(test)
    return top_k_scores, t


def recommend_k_svd(model, test, train):
    with Timer() as t:
        preds_lst = []
        for user in train[USER_COL].unique():
            for item in train[ITEM_COL].unique():
                preds_lst.append([user, item, model.predict(user, item).est])
        top_k_scores = pd.DataFrame(data=preds_lst, columns=[USER_COL, ITEM_COL, PREDICTION_COL])
        merged = pd.merge(train, top_k_scores, on=[USER_COL, ITEM_COL], how="outer")
        top_k_scores = merged[merged[RATING_COL].isnull()].drop(RATING_COL, axis=1)
    return top_k_scores, t


def recommend_k_fastai(model, test, train):
    with Timer() as t: 
#         total_users, total_items = learner.data.classes.values()
#         total_items = np.array(total_items[1:])
#         total_users = np.array(total_users[1:])
#         test_users = test_df[USER].unique()
#         test_users = np.intersect1d(test_users, total_users)
#         users_items = cartesian_product(np.array(test_users),np.array(total_items))
#         users_items = pd.DataFrame(users_items, columns=[USER,ITEM])
#         training_removed = pd.concat([users_items, train_valid_df[[USER,ITEM]], train_valid_df[[USER,ITEM]]]).drop_duplicates(keep=False)
        # model.export('movielens_model.pkl')
        # model = load_learner(path=".", fname='movielens_model.pkl')
        total_users, total_items = model.data.classes.values()
        total_items = np.array(total_items[1:])
        total_users = np.array(total_users[1:])
        test_users = test[USER_COL].unique()
        test_users = np.intersect1d(test_users, total_users)
        users_items = cartesian_product(np.array(test_users), np.array(total_items))
        users_items = pd.DataFrame(users_items, columns=[USER_COL, ITEM_COL])
        training_removed = pd.concat([users_items, train[[USER_COL, ITEM_COL]]]).drop_duplicates(keep=False)
        top_k_scores = score(model, 
                             test_df=training_removed,
                             user_col=USER_COL, 
                             item_col=ITEM_COL, 
                             prediction_col=PREDICTION_COL, 
                             top_k=TOP_K)
    return top_k_scores, t


def recommend_k_ncf(model, test, train):
    with Timer() as t: 
        users, items, preds = [], [], []
        item = list(train[ITEM_COL].unique())
        for user in train[USER_COL].unique():
            user = [user] * len(item) 
            users.extend(user)
            items.extend(item)
            preds.extend(list(model.predict(user, item, is_list=True)))
        top_k_scores = pd.DataFrame(data={USER_COL: users, ITEM_COL:items, PREDICTION_COL:preds})
        merged = pd.merge(train, top_k_scores, on=[USER_COL, ITEM_COL], how="outer")
        top_k_scores = merged[merged[RATING_COL].isnull()].drop(RATING_COL, axis=1)
    return top_k_scores, t


def recommend_k_rbm(model, test, train):
    xtst = test.gen_affinity_matrix()
    with Timer() as t:
        top_k_scores, _ =  model.recommend_k_items(xtst)
        top_k_scores = test.map_back_sparse(top_k_scores, kind="prediction")
    return top_k_scores, t


ranking_predictor = {
    "als": lambda model, test, train: recommend_k_als(model, test, train),
    "sar_single_node": lambda model, test, train: recommend_k_sar_single_node(model, test, train),
    "svd": lambda model, test, train: recommend_k_svd(model, test, train),
    "fastai": lambda model, test, train: recommend_k_fastai(model, test, train),
    "ncf": lambda model, test, train: recommend_k_ncf(model, test, train),
    "rbm": lambda model, test, train: recommend_k_rbm(model, test, train),
}

Metrics

In [14]:
def rating_metrics_pyspark(test, predictions):
    rating_eval = SparkRatingEvaluation(test, 
                                        predictions, 
                                        col_user=USER_COL, 
                                        col_item=ITEM_COL, 
                                        col_rating=RATING_COL, 
                                        col_prediction=PREDICTION_COL)
    return {
        "RMSE": rating_eval.rmse(),
        "MAE": rating_eval.mae(),
        "R2": rating_eval.exp_var(),
        "Explained Variance": rating_eval.rsquared()
    }
    
    
def ranking_metrics_pyspark(test, predictions, k=10):
    rank_eval = SparkRankingEvaluation(test, 
                                       predictions, 
                                       k=k, 
                                       col_user=USER_COL, 
                                       col_item=ITEM_COL, 
                                       col_rating=RATING_COL, 
                                       col_prediction=PREDICTION_COL, 
                                       relevancy_method="top_k")
    return {
        "MAP": rank_eval.map_at_k(),
        "nDCG@k": rank_eval.ndcg_at_k(),
        "Precision@k": rank_eval.precision_at_k(),
        "Recall@k": rank_eval.recall_at_k()
    }
    
    
def rating_metrics_python(test, predictions):
    cols = {
        "col_user": USER_COL, 
        "col_item": ITEM_COL, 
        "col_rating": RATING_COL, 
        "col_prediction": PREDICTION_COL
    }
    return {
        "RMSE": rmse(test, predictions, **cols),
        "MAE": mae(test, predictions, **cols),
        "R2": rsquared(test, predictions, **cols),
        "Explained Variance": exp_var(test, predictions, **cols)
    }
    
    
def ranking_metrics_python(test, predictions, k=10):
    cols = {
        "col_user": USER_COL, 
        "col_item": ITEM_COL, 
        "col_rating": RATING_COL, 
        "col_prediction": PREDICTION_COL
    }
    return {
        "MAP": map_at_k(test, predictions, k=k, **cols),
        "nDCG@k": ndcg_at_k(test, predictions, k=k, **cols),
        "Precision@k": precision_at_k(test, predictions, k=k, **cols),
        "Recall@k": recall_at_k(test, predictions, k=k, **cols)
    }
    
    
rating_evaluator = {
    "als": lambda test, predictions: rating_metrics_pyspark(test, predictions),
    "svd": lambda test, predictions: rating_metrics_python(test, predictions),
    "fastai": lambda test, predictions: rating_metrics_python(test, predictions)
}
    
    
ranking_evaluator = {
    "als": lambda test, predictions, k: ranking_metrics_pyspark(test, predictions, k),
    "sar_single_node": lambda test, predictions, k: ranking_metrics_python(test, predictions, k),
    "svd": lambda test, predictions, k: ranking_metrics_python(test, predictions, k),
    "fastai": lambda test, predictions, k: ranking_metrics_python(test, predictions, k),
    "ncf": lambda test, predictions, k: ranking_metrics_python(test, predictions, k),
    "rbm": lambda test, predictions, k: ranking_metrics_python(test, predictions, k),
}

In [15]:
data_sizes = ["100k"]#, "1m"] # Movielens data size: 100k, 1m, 10m, or 20m
#algorithms = ["als", "svd", "sar_single_node", "fastai", "ncf", "rbm"]
algorithms = ["fastai"]

In [26]:
%%time

# For each data size and each algorithm, a recommender is evaluated. 
df_results = pd.DataFrame()

for data_size in data_sizes:
    # Load the dataset
    df = movielens.load_pandas_df(
        size=data_size,
        header=[USER_COL, ITEM_COL, RATING_COL, TIMESTAMP_COL]
    )
    print("Size of Movielens {}: {}".format(data_size, df.shape))
    
    # Split the dataset
    df_train, df_test = python_chrono_split(df, 
                                  ratio=0.75, 
                                  min_rating=1, 
                                  filter_by="user", 
                                  col_user=USER_COL, 
                                  col_item=ITEM_COL, 
                                  col_timestamp=TIMESTAMP_COL)
    print("Train set size: {}".format(df_train.shape))
    print("Test set size: {}".format(df_test.shape))
   
    # Loop through the algos
    for algo in algorithms:
        print("\nComputing {} algorithm on Movielens {}".format(algo, data_size))
          
        # Data prep for training set
        train = prepare_training_data.get(algo, lambda x:x)(df_train)
        
        # Get model parameters
        model_params = params[algo]
          
        # Train the model
        model, time_train = trainer[algo](model_params, train)
        print("Training time: {}".format(time_train))
                
        # Predict and evaluate
        print("\nEvaluating with {}".format(algo))
        test = prepare_rating_data.get(algo, lambda x:x)(df_test)
        if "rating" in metrics[algo]:   
            # Predict for rating
            preds, time_rating = rating_predictor[algo](model, test)
            print("Rating prediction time: {}".format(time_rating))
                        
            # Evaluate for rating
            ratings = rating_evaluator[algo](test, preds)
            print("Rating metrics: \n{}".format(json.dumps(ratings, indent=4, sort_keys=True)))
        
        if "ranking" in metrics[algo]:
            # Predict for ranking
            top_k_scores, time_ranking = ranking_predictor[algo](model, test, df_train)
            print("Ranking prediction time: {}".format(time_ranking))
            
            # Evaluate for rating
            rankings = ranking_evaluator[algo](test, top_k_scores, TOP_K)
            print("Ranking metrics: \n{}".format(json.dumps(rankings, indent=4, sort_keys=True)))


Size of Movielens 100k: (100000, 4)
Train set size: (74992, 4)
Test set size: (25008, 4)

Computing fastai algorithm on Movielens 100k
Training time: 0:00:05.582270

Evaluating with fastai
Rating prediction time: 0:00:00.060397
Rating metrics: 
{
    "Explained Variance": 0.0,
    "MAE": 1.110945494698929,
    "R2": -0.25841659733971145,
    "RMSE": 1.3181982048941618
}
Ranking prediction time: 0:00:00.887559
Ranking metrics: 
{
    "MAP": 213457.64543650797,
    "Precision@k": 25008.0,
    "Recall@k": 10.0,
    "nDCG@k": 25008.00000005573
}
CPU times: user 3min 38s, sys: 5min 3s, total: 8min 42s
Wall time: 16min 14s


In [None]:
for data_size in data_sizes:
    # Load the dataset
    df = movielens.load_pandas_df(
        size=data_size,
        header=[USER_COL, ITEM_COL, RATING_COL, TIMESTAMP_COL]
    )
    print("Size of Movielens {}: {}".format(data_size, df.shape))
    
    # Split the dataset
    df_train, df_test = python_chrono_split(df, 
                                  ratio=0.75, 
                                  min_rating=1, 
                                  filter_by="user", 
                                  col_user=USER_COL, 
                                  col_item=ITEM_COL, 
                                  col_timestamp=TIMESTAMP_COL)
    print("Train set size: {}".format(df_train.shape))
    print("Test set size: {}".format(df_test.shape))

In [None]:
    # Loop through the algos
    for algo in algorithms:
        print("\nComputing {} algorithm on Movielens {}".format(algo, data_size))
          
        # Get data
        #train = prepare_training_data.get(algo, lambda x:x)(df_train)
        header = {
        "col_user": USER_COL,
        "col_item": ITEM_COL,
        "col_rating": RATING_COL,
        }
        train_copy = pd.concat([df_train, df_test])
        train_copy.loc[:, RATING_COL] = train_copy[RATING_COL].astype(np.int32)
        aff_train = AffinityMatrix(train_copy, **header)
        aff_train._gen_index()
        
        # Get model parameters
        model_params = params[algo]
          
        # Train the model
        df_train_int = df_train.copy()
        df_train_int.loc[:, RATING_COL] = df_train_int[RATING_COL].astype(np.int32)
        train = AffinityMatrix(df_train_int, map_users=aff_train.map_users, map_items=aff_train.map_items, **header)
        
        model = RBM(**model_params)
        model.fit(train.gen_affinity_matrix())
        #model, time_train = trainer[algo](model_params, train)
        print("Training time: {}".format(time_train))
        
        
        

In [None]:
df_test_int = df_test.copy()
df_test_int.loc[:, RATING_COL] = df_test_int[RATING_COL].astype(np.int32)
test = AffinityMatrix(df_test_int, map_users=aff_train.map_users, map_items=aff_train.map_items, **header)
top_k_scores, _ =  model.recommend_k_items(test.gen_affinity_matrix())
top_k_scores = test.map_back_sparse(top_k_scores, kind="prediction")

In [None]:
        train._gen_index()
        print(train.df_.head())
        print(train.Nusers)#943
        print(train.Nitems)#1598
        ratings = train.df_[train.col_rating]  # ratings
        itm_id = train.df_["hashedItems"]  # itm_id serving as columns
        usr_id = train.df_["hashedUsers"]  # usr_id serving as rows
        print(len(ratings)) #74992
        print(len(itm_id)) #74992
        print(len(usr_id)) #74992
        print(list(ratings[:5]))
        print(list(itm_id[:5]))
        print(list(usr_id[:5]))
        print(itm_id.max()) #1680
        print(usr_id.max()) #942

In [None]:
from scipy.sparse import coo_matrix

AM = coo_matrix(
            #(ratings, (usr_id, itm_id)), shape=(train.Nusers, train.Nitems)
            (ratings, (usr_id, itm_id)), shape=(usr_id.max()+1, itm_id.max()+1)

        ).toarray()

In [None]:
AM.shape

In [None]:
        if "ranking" in metrics[algo]:
            # Predict for ranking
            top_k_scores, time_ranking = ranking_predictor[algo](model, test, df_train)
            print("Ranking prediction time: {}".format(time_ranking))
            
            # Evaluate for rating
            rankings = ranking_evaluator[algo](test, top_k_scores, TOP_K)
            print("Ranking metrics: \n{}".format(json.dumps(rankings, indent=4, sort_keys=True)))

## 1 Run notebooks to generate results

# For each data size and each algorithm, a recommender is evaluated. 
df_results = pd.DataFrame()

for data_size in data_sizes:
    for algorithm in algorithms:
        print(algorithm, data_size)
        # Execute the notebook
        pm.execute_notebook(
            notebooks[algorithm],
            output_path,
            parameters = dict(TOP_K=k, MOVIELENS_DATA_SIZE=data_size)
        )
        
        # Read records from the notebook.
        nb = pm.read_notebook(output_path)
        
        # Arrange results and save them into dataframe.
        df_eval = nb.dataframe.transpose()
        df_eval = df_eval.rename(columns=df_eval.iloc[0]).drop(['name', 'type', 'filename'])
        df_eval.columns = [x.lower() for x in list(df_eval.columns)]
        
        if algorithm in ["als", "svd", "fast"]:
            df_result = pd.DataFrame(
                {
                    "Data": data_size,
                    "Algo": algorithm,
                    "K": k,
                    "MAP": df_eval['map'].item(),
                    "nDCG@k": df_eval['ndcg'].item(),
                    "Precision@k": df_eval['precision'].item(),
                    "Recall@k": df_eval['recall'].item(),
                    "RMSE": df_eval['rmse'].item(),
                    "MAE": df_eval['mae'].item(),
                    "R2": df_eval['rsquared'].item(),
                    "Explained Variance": df_eval['exp_var'].item(),
                    "Train time": df_eval['train_time'].item(),
                    "Test time": df_eval['test_time'].item()
                }, 
                index=[0]
            )
        # NOTE SAR algorithm does not predict rating scores so the rating metrics do not apply. 
        # Therefore, for SAR, the rating metrics are assigned with NAN.
        elif algorithm in ["sar"]:
            df_result = pd.DataFrame(
                {
                    "Data": data_size,
                    "Algo": algorithm,
                    "K": k,
                    "MAP": df_eval['map'].item(),
                    "nDCG@k": df_eval['ndcg'].item(),
                    "Precision@k": df_eval['precision'].item(),
                    "Recall@k": df_eval['recall'].item(),
                    "RMSE": np.nan,
                    "MAE": np.nan,
                    "R2": np.nan,
                    "Explained Variance": np.nan,
                    "Train time": df_eval['train_time'].item(),
                    "Test time": df_eval['test_time'].item()
                }, 
                index=[0]
            )
        else:
            raise ValueError("{} is not a recognized algorithm".format(algorithm))
        df_results = df_results.append(df_result, ignore_index=True)
        
df_results