In [None]:
import argparse
import json
try:
    import pyspark.sql.functions as F
    from pyspark.sql import SparkSession, DataFrame as SparkDataFrame
    from pyspark.ml.recommendation import ALS
    from pyspark.mllib.evaluation import RankingMetrics
except ImportError as e:
    print(e)
    pass

class ALSTrainer:
    def __init__(self, spark, rank: int = 10, maxIter: int = 10, lmbda: float = 1.0, alpha: float = 1.0):
        self.spark = spark

        self.als = ALS(rank=rank, maxIter=maxIter, regParam=lmbda, alpha=alpha,
                   userCol='user_id_index', itemCol='track_id_index', ratingCol='count',
                   coldStartStrategy='drop', implicitPrefs=True, nonnegative=True)
        self.model = None
            
    def fit(self, data: SparkDataFrame) -> None:    
        self.model = self.als.fit(data)

    def predict(self, data: SparkDataFrame, k: int = 50) -> SparkDataFrame:
        assert self.model is not None, 'Did you run .fit() first?'

        return self.model.recommendForUserSubset(data.select('user_id_index'), k)

    def score(self, data: SparkDataFrame, k: int = 50) -> RankingMetrics:
        labels = data.select('user_id_index', 'track_id_index', 'count') \
              .groupBy('user_id_index') \
              .agg(F.sort_array(F.collect_list(F.struct('count', 'track_id_index')), asc=False).alias('list')) \
              .withColumn('labels', F.col('list.track_id_index')).drop('list')

        predictions = self.predict(data, k=k) \
                    .select('user_id_index', F.col('recommendations.track_id_index').alias('predictions'))

        results = labels.join(predictions, on='user_id_index', how='left')
        # results.show()

        metrics = RankingMetrics(results.select('predictions', 'labels').rdd)

        ###
        ## NOTE: RankingEvaluator does not work with PySpark 2.4.0.
        ## See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RankingEvaluator.html
    
        # evaluator = RankingEvaluator(predictionCol='predictions', labelCol='labels')
        # value = evaluator.evaluate(results,
        #                            {evaluator.metricName: 'precisionAtK', evaluator.k: k})

        return {
          'map': metrics.meanAveragePrecision,
          f'ndcgAt{k}': metrics.ndcgAt(k),
          f'precisionAt{k}': metrics.precisionAt(k)
        }


if __name__ == "__main__":
    '''
    Defaults have been picked from http://yifanhu.net/PUB/cf.pdf. 
    '''
    parser = argparse.ArgumentParser(description='Train ALS model.')
    parser.add_argument('--train-file', type=str, dest='train_file', metavar='',
                      help='Path to train file.')
    parser.add_argument('--val-file', type=str, dest='val_file', metavar='',
                      default=None, help='Path to validation file.')
    parser.add_argument('--test-file', type=str, dest='test_file', metavar='',
                      default=None, help='Path to test file.')
    parser.add_argument('--epochs', type=int, dest='epochs', metavar='',
                      default=10, help='Maximum number of training iterations.')
    parser.add_argument('--rank', type=int, dest='rank', metavar='',
                      default=10, help='Feature size.')
    parser.add_argument('--lmbda', type=float, dest='lmbda', metavar='',
                      default=500.0, help='Regularization parameter.')
    parser.add_argument('--alpha', type=float, dest='alpha', metavar='',
                      default=40.0, help='Confidence weighting parameter.')
    parser.add_argument('--k', type=int, dest='k', metavar='',
                      default=50, help='For ranking evaluation metrics.')
    args = parser.parse_args()
    print(args)

    spark = SparkSession.builder.appName(ALSTrainer.__class__.__name__).getOrCreate()
    trainer = ALSTrainer(spark, rank=args.rank, maxIter=args.epochs, lmbda=args.lmbda, alpha=args.alpha)

    train_data = spark.read.parquet(args.train_file)
    trainer.fit(train_data)

    ## Evaluation.
    results = {}

    train_metrics = trainer.score(train_data)
    for m, v in train_metrics.items():
        results[f'train/{m}'] = v

    if args.val_file:
        val_metrics = trainer.score(spark.read.parquet(args.val_file))
        for m, v in val_metrics.items():
            results[f'val/{m}'] = v

    if args.test_file:
        test_metrics = trainer.score(spark.read.parquet(args.test_file))
        for m, v in test_metrics.items():
            results[f'test/{m}'] = v

    print(json.dumps(results, indent=2))


# LENSKIT

In [1]:
import logging
import pickle

from lenskit import util
from lenskit.algorithms import als

import pandas as pd
import numpy as np
from scipy import stats
import binpickle

from pytest import mark, approx

import lenskit.util.test as lktu
from lenskit.algorithms import Recommender
from lenskit.util import Stopwatch

from tqdm.notebook import tqdm_notebook as tqdm
tqdm.pandas()

In [2]:
util.log_to_notebook()

[   INFO] lenskit.util.log notebook logging configured


In [3]:
simple_df = pd.DataFrame({'item': [1, 1, 2, 3]*50,
                          'user': [10, 12, 10, 13]*50})

simple_dfr = simple_df.assign(rating=[4.0, 3.0, 5.0, 2.0]*50)


In [4]:
simple_dfr.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries, 0 to 199
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   item    200 non-null    int64  
 1   user    200 non-null    int64  
 2   rating  200 non-null    float64
dtypes: float64(1), int64(2)
memory usage: 4.8 KB


In [5]:
simple_df_2 = pd.DataFrame({'item': ['TRIQAUQ128F42435AD', 'TRIRLYL128F42539D1', 'TRMHBXZ128F4238406', 'TRYQMNI128F147C1C7', 'TRAHZNE128F9341B86']*50,
                          'user': ['b80344d063b5ccb3212f76538f3d9e43d87dca9a', 'b80344d063b5ccb3212f76538f3d9e43d87dca9b', 'b80344d063b5ccb3212f76538f3d9e43d87dca9c',
                                   'b80344d063b5ccb3212f76538f3d9e43d87dca9d', 'b80344d063b5ccb3212f76538f3d9e43d87dca9e']*50})

simple_dfr_2 = simple_df_2.assign(rating=[1, 1, 2, 1, 1]*50)
simple_dfr_2

Unnamed: 0,item,user,rating
0,TRIQAUQ128F42435AD,b80344d063b5ccb3212f76538f3d9e43d87dca9a,1
1,TRIRLYL128F42539D1,b80344d063b5ccb3212f76538f3d9e43d87dca9b,1
2,TRMHBXZ128F4238406,b80344d063b5ccb3212f76538f3d9e43d87dca9c,2
3,TRYQMNI128F147C1C7,b80344d063b5ccb3212f76538f3d9e43d87dca9d,1
4,TRAHZNE128F9341B86,b80344d063b5ccb3212f76538f3d9e43d87dca9e,1
...,...,...,...
245,TRIQAUQ128F42435AD,b80344d063b5ccb3212f76538f3d9e43d87dca9a,1
246,TRIRLYL128F42539D1,b80344d063b5ccb3212f76538f3d9e43d87dca9b,1
247,TRMHBXZ128F4238406,b80344d063b5ccb3212f76538f3d9e43d87dca9c,2
248,TRYQMNI128F147C1C7,b80344d063b5ccb3212f76538f3d9e43d87dca9d,1


In [None]:
algo = als.ImplicitMF(20, iterations=20, method='cg', use_ratings=False)

In [None]:
algo.fit(simple_df_2[:-50])

In [None]:
preds = algo.predict(simple_df_2[-50:])

In [None]:
preds

In [6]:

from lenskit.batch import MultiEval
from lenskit.crossfold import partition_users, SampleN
from lenskit.algorithms import basic, als
from lenskit.datasets import MovieLens
from lenskit import topn, util
import pandas as pd
import matplotlib.pyplot as plt

In [7]:
eval = MultiEval('my-eval', recommend=4)



In [9]:
pairs = list(partition_users(simple_dfr_2, 5, SampleN(5)))
eval.add_datasets(pairs, name='song')

[   INFO] lenskit.crossfold partitioning 250 rows for 5 users into 5 partitions
[   INFO] lenskit.crossfold fold 0: selecting test ratings
[   INFO] lenskit.crossfold fold 0: partitioning training data
[   INFO] lenskit.crossfold fold 1: selecting test ratings
[   INFO] lenskit.crossfold fold 1: partitioning training data
[   INFO] lenskit.crossfold fold 2: selecting test ratings
[   INFO] lenskit.crossfold fold 2: partitioning training data
[   INFO] lenskit.crossfold fold 3: selecting test ratings
[   INFO] lenskit.crossfold fold 3: partitioning training data
[   INFO] lenskit.crossfold fold 4: selecting test ratings
[   INFO] lenskit.crossfold fold 4: partitioning training data


In [10]:
eval.add_algorithms([als.BiasedMF(5)],
                    attrs=['features'], name='ImplicitMF')

In [11]:
eval.run()

[   INFO] lenskit.batch._multi starting run 1: als.BiasedMF(features=5, regularization=0.1) on song:1
[   INFO] lenskit.batch._multi adapting als.BiasedMF(features=5, regularization=0.1) into a recommender
[   INFO] lenskit.batch._multi training algorithm als.BiasedMF(features=5, regularization=0.1) on 245 ratings


[  ERROR] lenskit.util.debug could not load LIBBLAS: Could not find module 'libblas' (or one of its dependencies). Try using the full path with constructor syntax.


[   INFO] lenskit.util.debug OpenBLAS error: 'BlasInfo' object has no attribute 'openblas_get_config'
[   INFO] lenskit.util.debug numba threading layer: tbb
[   INFO] lenskit.algorithms.als [ 0ms] fitting bias model
[   INFO] lenskit.algorithms.bias building bias model for 245 ratings
[   INFO] lenskit.algorithms.bias global mean: 1.204
[   INFO] numexpr.utils Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
[   INFO] numexpr.utils NumExpr defaulting to 8 threads.
[   INFO] lenskit.algorithms.bias computed means for 5 items
[   INFO] lenskit.algorithms.bias computed means for 5 users
[   INFO] lenskit.algorithms.als [ 17ms] normalizing ratings
[   INFO] lenskit.algorithms.als [ 614ms] training biased MF model with ALS for 5 features
[   INFO] lenskit.algorithms.als [2.48s] finished epoch 0 (|ΔP|=2.033, |ΔQ|=2.228)
[   INFO] lenskit.algorithms.als [2.48s] finished epoch 1 (|ΔP|=0.715, |ΔQ|=0.125)
[   INFO] lenskit.algorithms.als [2.48s] f

In [12]:
runs = pd.read_csv('my-eval/runs.csv')
runs.set_index('RunId', inplace=True)
runs.head()

Unnamed: 0_level_0,DataSet,Partition,AlgoClass,AlgoStr,name,features,TrainTime,PredTime,RecTime
RunId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1,song,1,BiasedMF,"als.BiasedMF(features=5, regularization=0.1)",ImplicitMF,5,3.030417,3.384395,3.335601
2,song,2,BiasedMF,"als.BiasedMF(features=5, regularization=0.1)",ImplicitMF,5,0.02952,3.361743,3.34822
3,song,3,BiasedMF,"als.BiasedMF(features=5, regularization=0.1)",ImplicitMF,5,0.027617,3.346149,3.398654
4,song,4,BiasedMF,"als.BiasedMF(features=5, regularization=0.1)",ImplicitMF,5,0.025799,3.331613,3.387824
5,song,5,BiasedMF,"als.BiasedMF(features=5, regularization=0.1)",ImplicitMF,5,0.026904,3.261756,3.344181


In [13]:
recs = pd.read_parquet('my-eval/recommendations.parquet')
recs.head()

Unnamed: 0,item,score,user,rank,RunId
0,TRMHBXZ128F4238406,1.909276,b80344d063b5ccb3212f76538f3d9e43d87dca9a,1,1
1,TRAHZNE128F9341B86,1.000186,b80344d063b5ccb3212f76538f3d9e43d87dca9a,2,1
2,TRIRLYL128F42539D1,1.000186,b80344d063b5ccb3212f76538f3d9e43d87dca9a,3,1
3,TRYQMNI128F147C1C7,1.000186,b80344d063b5ccb3212f76538f3d9e43d87dca9a,4,1
4,TRAHZNE128F9341B86,1.090167,b80344d063b5ccb3212f76538f3d9e43d87dca9c,1,10


In [14]:
truth = pd.concat((p.test for p in pairs), ignore_index=True)

In [15]:
rla = topn.RecListAnalysis()
rla.add_metric(topn.ndcg)
raw_ndcg = rla.compute(recs, truth)
raw_ndcg.head()

[   INFO] lenskit.topn analyzing 60 recommendations (25 truth rows)
[   INFO] lenskit.topn using rec key columns ['RunId', 'user']
[   INFO] lenskit.topn using truth key columns ['user']
[   INFO] lenskit.topn numbering truth lists


                              rating
LKTruthID item                      
0         TRIQAUQ128F42435AD       1
          TRIQAUQ128F42435AD       1
          TRIQAUQ128F42435AD       1
          TRIQAUQ128F42435AD       1
          TRIQAUQ128F42435AD       1
1         TRYQMNI128F147C1C7       1
          TRYQMNI128F147C1C7       1
          TRYQMNI128F147C1C7       1
          TRYQMNI128F147C1C7       1
          TRYQMNI128F147C1C7       1
2         TRIRLYL128F42539D1       1
          TRIRLYL128F42539D1       1
          TRIRLYL128F42539D1       1
          TRIRLYL128F42539D1       1
          TRIRLYL128F42539D1       1
3         TRAHZNE128F9341B86       1
          TRAHZNE128F9341B86       1
          TRAHZNE128F9341B86       1
          TRAHZNE128F9341B86       1
          TRAHZNE128F9341B86       1
4         TRMHBXZ128F4238406       2
          TRMHBXZ128F4238406       2
          TRMHBXZ128F4238406       2
          TRMHBXZ128F4238406       2
          TRMHBXZ128F4238406       2


[   INFO] lenskit.topn numbering rec lists
[   INFO] lenskit.topn collecting metric results
[   INFO] lenskit.topn measured 15 lists in  11ms


Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,ndcg
RunId,user,Unnamed: 2_level_1,Unnamed: 3_level_1
1,b80344d063b5ccb3212f76538f3d9e43d87dca9a,4,0.0
10,b80344d063b5ccb3212f76538f3d9e43d87dca9c,4,0.0
11,b80344d063b5ccb3212f76538f3d9e43d87dca9c,4,0.0
12,b80344d063b5ccb3212f76538f3d9e43d87dca9c,4,0.0
13,b80344d063b5ccb3212f76538f3d9e43d87dca9b,4,0.0


In [16]:

ndcg = raw_ndcg.join(runs[['AlgoClass', 'features']], on='RunId')
ndcg.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,nrecs,ndcg,AlgoClass,features
RunId,user,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,b80344d063b5ccb3212f76538f3d9e43d87dca9a,4,0.0,BiasedMF,5.0
10,b80344d063b5ccb3212f76538f3d9e43d87dca9c,4,0.0,,
11,b80344d063b5ccb3212f76538f3d9e43d87dca9c,4,0.0,,
12,b80344d063b5ccb3212f76538f3d9e43d87dca9c,4,0.0,,
13,b80344d063b5ccb3212f76538f3d9e43d87dca9b,4,0.0,,
