We will show the main RePlay functionality and compare performance of RePlay models on well-known MovieLens dataset. For simplicity we consider here only the variou bandit algorithms with context. The list of considered strategies for comparison:

1) Linear UCB

2) Linear Thompson Sampling

3) Logistic Thompson Sampling

### Dataset

We will compare RePlay models on MovieLens 1m.

### Dataset preprocessing:

Ratings greater than or equal to 3 are considered as positive interactions.

### Data split

Dataset is split by date so that 20% of the last interactions as are placed in the test part. Cold items and users are dropped.

### Predict:
We will predict top-10 most relevant films for each user.

### Metrics

Quality metrics used: ndcg@k, hitrate@k, map@k, mrr@k for k = 1, 5, 10 Additional metrics used: coverage@k and surprisal@k.


In [1]:
# ! pip install rs-datasets

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
%config Completer.use_jedi = False

In [4]:
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)

In [5]:
import logging
import time
import pandas as pd
import numpy as np

from pyspark.sql import functions as sf
from pyspark.sql import types as st
# from pyspark.sql.types import IntegerType

from replay.data import Dataset, FeatureHint, FeatureInfo, FeatureSchema, FeatureType
from replay.data.dataset_utils import DatasetLabelEncoder
from replay.metrics import Coverage, HitRate, MRR, MAP, NDCG, Surprisal, Experiment, OfflineMetrics
from replay.experimental.preprocessing.data_preparator import Indexer, DataPreparator

# from replay.experimental.preprocessing.data_preparator import DataPreparator, Indexer
from replay.models import (
    UCB,
    Wilson, 
    Word2VecRec,
    RandomRec,
    LinUCB, #added LinUCB (disjoint version)
)

from replay.models.base_rec import HybridRecommender
from replay.utils.session_handler import State
from replay.splitters import TimeSplitter
from replay.utils.spark_utils import convert2spark, get_log_info
from rs_datasets import MovieLens

In [6]:
spark = State().session
spark

24/08/23 16:23:50 WARN Utils: Your hostname, sudakovcom-MS-7D48 resolves to a loopback address: 127.0.1.1; using 10.255.173.26 instead (on interface enp3s0)
24/08/23 16:23:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/23 16:23:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/23 16:23:51 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
24/08/23 16:23:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
spark.sparkContext.setLogLevel('ERROR')

In [8]:
logger = logging.getLogger("replay")

In [9]:
K = 10
K_list_metrics = [1, 5, 10]
BUDGET = 20
BUDGET_NN = 10
SEED = 12345

## Preprocessing 

### Data loading

In [10]:
data = MovieLens("1m")
data.info()

ratings


Unnamed: 0,user_id,item_id,rating,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109
2,1,914,3,978301968



users


Unnamed: 0,user_id,gender,age,occupation,zip_code
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117



items


Unnamed: 0,item_id,title,genres
0,1,Toy Story (1995),Animation|Children's|Comedy
1,2,Jumanji (1995),Adventure|Children's|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance





In [11]:
%%time

preparator = DataPreparator()
interactions = preparator.transform(columns_mapping={'user_id': 'user_id',
                                      'item_id': 'item_id',
                                      'relevance': 'rating',
                                      'timestamp': 'timestamp'
                                     }, 
                           data=data.ratings)

item_features = preparator.transform(columns_mapping={'item_id': 'item_id'}, 
                           data=data.items)

23-Aug-24 16:23:53, replay, INFO: Columns with ids of users or items are present in mapping. The dataframe will be treated as an interactions log.
  arrow_data = [[(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]
23-Aug-24 16:23:55, replay, INFO: Column with ids of users or items is absent in mapping. The dataframe will be treated as a users'/items' features dataframe.


CPU times: user 20.5 ms, sys: 14.5 ms, total: 34.9 ms
Wall time: 2.66 s


In [12]:
indexer = Indexer(user_col='user_id', item_col='item_id')

In [13]:
%%time
indexer.fit(users=interactions.select('user_id'),
           items=interactions.select('item_id').union(item_features.select('item_id')))

CPU times: user 14 ms, sys: 3.02 ms, total: 17 ms
Wall time: 1.1 s


In [14]:
%%time
interactions = indexer.transform(df=interactions)
interactions.show(2)

+--------+--------+---------+-------------------+
|user_idx|item_idx|relevance|          timestamp|
+--------+--------+---------+-------------------+
|    4131|      43|      5.0|2001-01-01 01:12:40|
|    4131|     585|      3.0|2001-01-01 01:35:09|
+--------+--------+---------+-------------------+
only showing top 2 rows

CPU times: user 21.8 ms, sys: 690 µs, total: 22.5 ms
Wall time: 940 ms


In [15]:
# will consider ratings >= 3 as positive feedback. A positive feedback is treated with rating = 1
only_positives_interactions = interactions.filter(sf.col('rating') >= 3).withColumn('rating', sf.lit(1))
only_positives_interactions.count()

836478

In [16]:
only_positives_interactions.show(2)

+--------+--------+---------+-------------------+------+
|user_idx|item_idx|relevance|          timestamp|rating|
+--------+--------+---------+-------------------+------+
|    4131|      43|      5.0|2001-01-01 01:12:40|     1|
|    4131|     585|      3.0|2001-01-01 01:35:09|     1|
+--------+--------+---------+-------------------+------+
only showing top 2 rows



### Data split

In [17]:
# train/test split 
train_spl = TimeSplitter(
    time_threshold=0.2,
    drop_cold_items=True,
    drop_cold_users=True,
    item_column = 'item_idx',
    query_column="user_idx",
)
train, test = train_spl.split(only_positives_interactions)
print('train info:\n', get_log_info(train, user_col="user_idx", item_col="item_idx"))
print('test info:\n', get_log_info(test, user_col="user_idx", item_col="item_idx"))

train info:
 total lines: 669181, total users: 5397, total items: 3569


                                                                                

test info:
 total lines: 86542, total users: 1139, total items: 3279


                                                                                

In [18]:
train.is_cached

False

In [19]:
# train/test split for hyperparameters selection
opt_train, opt_val = train_spl.split(train)
opt_train.count(), opt_val.count()

(535343, 24241)

In [20]:
opt_train.is_cached

False

In [21]:
# negative feedback will be used for Wilson and UCB models
only_negatives_log = interactions.filter(sf.col('rating') < 3).withColumn('rating', sf.lit(0.))
test_start = test.agg(sf.min('timestamp')).collect()[0][0]

# train with both positive and negative feedback
pos_neg_train=(train
              .withColumn('rating', sf.lit(1.))
              .union(only_negatives_log.filter(sf.col('timestamp') < test_start))
             )
pos_neg_train.cache()
pos_neg_train.count()

                                                                                

798993

In [22]:
pos_neg_train.is_cached

True

### Item features 

In [23]:
%%time
item_features = indexer.transform(df=item_features)
item_features.show(2)

+--------+----------------+--------------------+
|item_idx|           title|              genres|
+--------+----------------+--------------------+
|      29|Toy Story (1995)|Animation|Childre...|
|     393|  Jumanji (1995)|Adventure|Childre...|
+--------+----------------+--------------------+
only showing top 2 rows

CPU times: user 6.99 ms, sys: 1.95 ms, total: 8.94 ms
Wall time: 164 ms


In [24]:
item_features.select(sf.min('item_idx')).show()
item_features.select(sf.max('item_idx')).show()
#just to check that the indexing is dense between 0 and 3882
item_features.count()

+-------------+
|min(item_idx)|
+-------------+
|            0|
+-------------+

+-------------+
|max(item_idx)|
+-------------+
|         3882|
+-------------+



3883

In [25]:
year = item_features.withColumn('year', sf.substring(sf.col('title'), -5, 4).astype(st.IntegerType())).select('item_idx', 'year')
year.show(2)

+--------+----+
|item_idx|year|
+--------+----+
|      29|1995|
|     393|1995|
+--------+----+
only showing top 2 rows



In [26]:
genres = (
    item_features.select(
        sf.col("item_idx"),
        sf.split("genres", "\|").alias("genres")
    )
)

In [27]:
genres.show()

+--------+--------------------+
|item_idx|              genres|
+--------+--------------------+
|      29|[Animation, Child...|
|     393|[Adventure, Child...|
|     648|   [Comedy, Romance]|
|    1574|     [Comedy, Drama]|
|    1066|            [Comedy]|
|     233|[Action, Crime, T...|
|     689|   [Comedy, Romance]|
|    2293|[Adventure, Child...|
|    1998|            [Action]|
|     259|[Action, Adventur...|
|     194|[Comedy, Drama, R...|
|    1635|    [Comedy, Horror]|
|    2020|[Animation, Child...|
|    1670|             [Drama]|
|    1706|[Action, Adventur...|
|     413|   [Drama, Thriller]|
|     292|    [Drama, Romance]|
|    1649|          [Thriller]|
|     815|            [Comedy]|
|    1637|            [Action]|
+--------+--------------------+
only showing top 20 rows



In [28]:
genres_list = (
    genres.select(sf.explode("genres").alias("genre"))
    .distinct().filter('genre <> "(no genres listed)"')
    .toPandas()["genre"].tolist()
)

In [29]:
genres_list

['Mystery',
 'Action',
 'Documentary',
 "Children's",
 'Drama',
 'Adventure',
 'Film-Noir',
 'Crime',
 'Animation',
 'Fantasy',
 'Comedy',
 'Western',
 'Romance',
 'Thriller',
 'War',
 'Sci-Fi',
 'Musical',
 'Horror']

In [30]:
item_features = genres
for genre in genres_list:
    item_features = item_features.withColumn(
        genre,
        sf.array_contains(sf.col("genres"), genre).astype(st.IntegerType())
    )
item_features = item_features.drop("genres").cache()
item_features.count()
item_features = item_features.join(year, on='item_idx', how='inner')
item_features.cache()

DataFrame[item_idx: int, Mystery: int, Action: int, Documentary: int, Children's: int, Drama: int, Adventure: int, Film-Noir: int, Crime: int, Animation: int, Fantasy: int, Comedy: int, Western: int, Romance: int, Thriller: int, War: int, Sci-Fi: int, Musical: int, Horror: int, year: int]

In [31]:
item_features = item_features.withColumnRenamed("Children's","Children")
item_features.cache()

DataFrame[item_idx: int, Mystery: int, Action: int, Documentary: int, Children: int, Drama: int, Adventure: int, Film-Noir: int, Crime: int, Animation: int, Fantasy: int, Comedy: int, Western: int, Romance: int, Thriller: int, War: int, Sci-Fi: int, Musical: int, Horror: int, year: int]

In [32]:
item_features.show(2)

+--------+-------+------+-----------+--------+-----+---------+---------+-----+---------+-------+------+-------+-------+--------+---+------+-------+------+----+
|item_idx|Mystery|Action|Documentary|Children|Drama|Adventure|Film-Noir|Crime|Animation|Fantasy|Comedy|Western|Romance|Thriller|War|Sci-Fi|Musical|Horror|year|
+--------+-------+------+-----------+--------+-----+---------+---------+-----+---------+-------+------+-------+-------+--------+---+------+-------+------+----+
|      29|      0|     0|          0|       1|    0|        0|        0|    0|        1|      0|     1|      0|      0|       0|  0|     0|      0|     0|1995|
|     393|      0|     0|          0|       1|    0|        1|        0|    0|        0|      1|     0|      0|      0|       0|  0|     0|      0|     0|1995|
+--------+-------+------+-----------+--------+-----+---------+---------+-----+---------+-------+------+-------+-------+--------+---+------+-------+------+----+
only showing top 2 rows



### Users features

In [33]:
data.users.head()

Unnamed: 0,user_id,gender,age,occupation,zip_code
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [34]:
user_features = preparator.transform(columns_mapping={'user_id': 'user_id'}, 
                           data=data.users)

23-Aug-24 16:24:07, replay, INFO: Column with ids of users or items is absent in mapping. The dataframe will be treated as a users'/items' features dataframe.
  arrow_data = [[(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


In [35]:
%%time
user_features = indexer.transform(df=user_features)
user_features.show(2)

+--------+------+---+----------+--------+
|user_idx|gender|age|occupation|zip_code|
+--------+------+---+----------+--------+
|    4131|     F|  1|        10|   48067|
|    2364|     M| 56|        16|   70072|
+--------+------+---+----------+--------+
only showing top 2 rows

CPU times: user 8.82 ms, sys: 1.82 ms, total: 10.6 ms
Wall time: 151 ms


In [36]:
user_features = user_features.select(sf.col('user_idx'), sf.col('age'), sf.col('occupation'), sf.col('zip_code'))

In [37]:
#switch for a while into pandas
user_features = user_features.toPandas()
user_features.head(2)

Unnamed: 0,user_idx,age,occupation,zip_code
0,4131,1,10,48067
1,2364,56,16,70072


In [38]:
print("max ocupation index: ", user_features['occupation'].max())
print("min ocupation index: ", user_features['occupation'].min())
count_diff_zips = user_features['zip_code'].unique().size
print("different zip codes: ", count_diff_zips) #ok, too much different zip codes, let us drop them for now
users_pd = user_features.drop(columns=['zip_code'])
users_pd.head()
#binarize age variable
bins = [0, 20, 30, 40, 50, 60, np.inf]
names = ['<20', '20-29', '30-39','40-49', '51-60', '60+']

# users_pd['agegroup'] = pd.cut(users_pd['age'], bins, labels=names)

#binarize following https://github.com/kfoofw/bandit_simulations/tree/master
cat_embed_cols = ["agegroup","gender","occupation", "year"]
continuous_cols = ["age", "Crime", "Sci-Fi", "Musical", 
                   "Mystery", "Documentary", "Fantasy", "Children", 
                   "Drama", "Horror", "Adventure", "Western","Romance", 
                   "War", "Animation", "Action", "Comedy", "Thriller",
                   "Film-Noir"]
wide_cols = ["agegroup"]

max ocupation index:  20
min ocupation index:  0
different zip codes:  3439


In [39]:
users_pd.head()

Unnamed: 0,user_idx,age,occupation
0,4131,1,10
1,2364,56,16
2,4217,25,15
3,5916,45,7
4,1603,25,20


In [40]:
#make it pyspark
user_features = spark.createDataFrame(users_pd) 
user_features.printSchema()
print("total users: ",user_features.count())

root
 |-- user_idx: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: integer (nullable = true)

total users:  6040


  arrow_data = [[(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


In [41]:
user_features.show(2)

+--------+---+----------+
|user_idx|age|occupation|
+--------+---+----------+
|    4131|  1|        10|
|    2364| 56|        16|
+--------+---+----------+
only showing top 2 rows



In [42]:
feature_schema = FeatureSchema(
    [
        FeatureInfo(
            column="user_idx",
            feature_type=FeatureType.CATEGORICAL,
            feature_hint=FeatureHint.QUERY_ID,
        ),
        FeatureInfo(
            column="item_idx",
            feature_type=FeatureType.CATEGORICAL,
            feature_hint=FeatureHint.ITEM_ID,
        ),
        FeatureInfo(
            column="rating",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.RATING,
        ),
        FeatureInfo(
            column="timestamp",
            feature_type=FeatureType.NUMERICAL,
            feature_hint=FeatureHint.TIMESTAMP,
        ),
    ]
)

In [43]:
all_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=only_positives_interactions,
    item_features=item_features,
    query_features=user_features,
)

train_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=train,
    item_features=item_features,
    query_features=user_features,
)

test_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=test,
    item_features=item_features,
    query_features=user_features,
)

train_neg_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=pos_neg_train,
    item_features=item_features,
    query_features=user_features,
)

opt_train_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=opt_train,
    item_features=item_features,
    query_features=user_features,
)

opt_val_dataset = Dataset(
    feature_schema=feature_schema,
    interactions=opt_val,
    item_features=item_features,
    query_features=user_features,
)

                                                                                

In [44]:
encoder = DatasetLabelEncoder()
encoder.fit(all_dataset)
train_dataset = encoder.transform(train_dataset)
train_neg_dataset = encoder.transform(train_neg_dataset)
test_dataset = encoder.transform(test_dataset)
opt_train_dataset = encoder.transform(opt_train_dataset)
opt_val_dataset = encoder.transform(opt_val_dataset)

In [45]:
all_dataset.interactions.show(2)

+--------+--------+---------+-------------------+------+
|user_idx|item_idx|relevance|          timestamp|rating|
+--------+--------+---------+-------------------+------+
|    4131|      43|      5.0|2001-01-01 01:12:40|     1|
|    4131|     585|      3.0|2001-01-01 01:35:09|     1|
+--------+--------+---------+-------------------+------+
only showing top 2 rows



In [46]:
all_dataset.interactions.toPandas()['relevance'].values

array([5., 3., 3., ..., 5., 4., 4.])

In [47]:
all_dataset.item_features.show(2)

+--------+-------+------+-----------+--------+-----+---------+---------+-----+---------+-------+------+-------+-------+--------+---+------+-------+------+----+
|item_idx|Mystery|Action|Documentary|Children|Drama|Adventure|Film-Noir|Crime|Animation|Fantasy|Comedy|Western|Romance|Thriller|War|Sci-Fi|Musical|Horror|year|
+--------+-------+------+-----------+--------+-----+---------+---------+-----+---------+-------+------+-------+-------+--------+---+------+-------+------+----+
|      29|      0|     0|          0|       1|    0|        0|        0|    0|        1|      0|     1|      0|      0|       0|  0|     0|      0|     0|1995|
|     393|      0|     0|          0|       1|    0|        1|        0|    0|        0|      1|     0|      0|      0|       0|  0|     0|      0|     0|1995|
+--------+-------+------+-----------+--------+-----+---------+---------+-----+---------+-------+------+-------+-------+--------+---+------+-------+------+----+
only showing top 2 rows



In [48]:
all_dataset.query_features.show(2)

+--------+---+----------+
|user_idx|age|occupation|
+--------+---+----------+
|    4131|  1|        10|
|    2364| 56|        16|
+--------+---+----------+
only showing top 2 rows



In [49]:
e = Experiment(
    [
        MAP(K),
        NDCG(K),
        HitRate(K_list_metrics),
        Coverage(K),
        Surprisal(K),
        MRR(K)
    ],
    test_dataset.interactions,
    train_dataset.interactions,
    query_column=train_dataset.feature_schema.query_id_column,
    item_column=train_dataset.feature_schema.item_id_column,
    rating_column=train_dataset.feature_schema.interactions_rating_column,
    )

In [50]:
bandit_models = {
    # 'Random (uniform)': [RandomRec(seed=SEED, distribution='uniform'), 'no_opt'],
    # 'Random (popularity-based)': [RandomRec(seed=SEED, distribution='popular_based'), {"alpha": [-0.5, 100]}],
    # 'UCB': [UCB(exploration_coef=2.0), 'no_opt'], #2.0 as default, 0.5 as original 
    # 'Wilson': [Wilson(), 'no_opt'],
    'Linear UCB (disjoint models)': [LinUCB(eps = 0.0, alpha = 1.0, regr_type = 'disjoint'), 'no_opt'],
}

In [51]:
for name, [model, params] in bandit_models.items():
    print(name)
    print(model)
    print(params)

Linear UCB (disjoint models)
LinUCB
no_opt


In [52]:
bandit_models.items()

dict_items([('Linear UCB (disjoint models)', [<replay.models.lin_ucb.LinUCB object at 0x73b60fa1acd0>, 'no_opt'])])

In [53]:
def fit_predict_add_res(name, model, experiment, train, test, suffix=''):
    """
    Run fit_predict for the `model`, measure time on fit_predict and evaluate metrics
    """
    start_time=time.time()
    
    dataset = {'dataset': train_dataset}
    predict_params = {'k': K, 'queries': test_dataset.query_ids}
    
    if isinstance(model, (Wilson, UCB, LinUCB)):
        dataset['dataset'] = train_neg_dataset

    predict_params.update(dataset)

    model.fit(**dataset)
    fit_time = time.time() - start_time

    pred=model.predict(**predict_params)
    pred.cache()
    pred.count()
    predict_time = time.time() - start_time - fit_time

    experiment.add_result(name + suffix, pred)
    metric_time = time.time() - start_time - fit_time - predict_time
    experiment.results.loc[name + suffix, 'fit_time'] = fit_time
    experiment.results.loc[name + suffix, 'predict_time'] = predict_time
    experiment.results.loc[name + suffix, 'metric_time'] = metric_time
    experiment.results.loc[name + suffix, 'full_time'] = (fit_time + 
                                                          predict_time +
                                                          metric_time)
    pred.unpersist()
    print(experiment.results[['NDCG@{}'.format(K), 'MRR@{}'.format(K), 'Coverage@{}'.format(K), 'fit_time']].sort_values('NDCG@{}'.format(K), ascending=False))

In [54]:
def full_pipeline(models, experiment, train, test, suffix='', budget=BUDGET):
    """
    For each model:
        -  if required: run hyperparameters search, set best params and save param values to `experiment`
        - pass model to `fit_predict_add_res`        
    """
    
    for name, [model, params] in models.items():
        model.logger.info(msg='{} started'.format(name))
        if params != 'no_opt':
            model.logger.info(msg='{} optimization started'.format(name))
            best_params = model.optimize(opt_train, 
                                         opt_val, 
                                         param_borders=params, 
                                         item_features=item_features,
                                         user_features=user_features,
                                         k=K, 
                                         budget=budget)
            logger.info(msg='best params for {} are: {}'.format(name, best_params))
            model.set_params(**best_params)
        
        logger.info(msg='{} fit_predict started'.format(name))
        fit_predict_add_res(name, model, experiment, train, test, suffix)
        # here we call protected attribute to get all parameters set during model initialization
        experiment.results.loc[name + suffix, 'params'] = str(model._init_args)

In [55]:
%%time
full_pipeline(bandit_models, e, train, test)

23-Aug-24 16:24:24, replay, INFO: Linear UCB (disjoint models) started
23-Aug-24 16:24:24, replay, INFO: Linear UCB (disjoint models) fit_predict started


ValueError: kth(=-180) out of bounds (20)

In [56]:
e.results.sort_values('NDCG@10', ascending=False)

KeyError: 'NDCG@10'