# RePlay Tutorial
This notebook is designed to familiarize with the use of RePlay library, including:
- creating SparkSession or passing your own session to RePlay
- data preprocessing
- dataset users and items re-indexing
- data splitting
- model training and inference
- model optimization
- model saving and loading
- models comparison

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%config Completer.use_jedi = False

In [3]:
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)

In [4]:
import pandas as pd

from pyspark.sql import SparkSession

from replay.data_preparator import DataPreparator, Indexer
from replay.experiment import Experiment
from replay.metrics import Coverage, HitRate, NDCG, MAP
from replay.model_handler import save, load, save_indexer, load_indexer
from replay.models import ALSWrap, ItemKNN, SLIM
from replay.session_handler import get_spark_session, State 
from replay.splitters import UserSplitter
from replay.utils import convert2spark, get_log_info

In [5]:
K = 5
SEED=1234

## Managing SparkSession

RePlay uses Spark as a backend, and thus `SparkSession` should be created before RePlay running. Depends on your needs, you can choose, what to do about `SparkSession`.

- Option 1: use default RePlay `SparkSession`
- You can pass you own session to RePlay. Class `State` stores current session. Here you also have two options: 
    - Option 2: call `get_spark_session` to use default RePlay `SparkSession` with the custom driver memory and number of partitions 
    - Option 3: create `SparkSession` from scratch


### Option 1: use default RePlay's SparkSession
It is the simplest and recommended way for the local execution mode. RePlay will get existing SparkSession or create the new one with default configuration.  Default session parameters are stated in `replay/session_handler.py` file. The driver memory volume and number of partitions depends on available RAM and number of cores respectively.

You could initiate default session creation explicitly, e.g. to preprocess spark DataFrames, get link to SparkUI and set logging level, but if you do not create it by yourself, the session will be created by RePlay anyway.

In [6]:
spark = State().session
spark.sparkContext.setLogLevel('ERROR')
spark

22/07/13 14:27:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/13 14:27:05 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [7]:
def print_config_param(session, conf_name):
    # get current spark session configuration:
    conf = session.sparkContext.getConf().getAll()
    # get num partitions
    print(f'{conf_name}: {dict(conf)[conf_name]}')

In [8]:
print_config_param(spark, 'spark.sql.shuffle.partitions')

spark.sql.shuffle.partitions: 24


### Option 2:  Call `get_spark_session`  function to customize driver memory (spark.driver.memory) or number of partitions (spark.sql.shuffle.partitions) and use the default RePlay settings for other configuration parameters.
We will specify 16 partitions and 4Gb driver memory for example. Pass created session to RePlay `State`.

In [9]:
spark.stop()
session = get_spark_session(spark_memory=4, shuffle_partitions=16)
spark = State(session).session

In [10]:
print_config_param(spark, 'spark.sql.shuffle.partitions')

spark.sql.shuffle.partitions: 16


### Option 3: Create your own session
Pass created session to RePlay `State`.

In [11]:
spark.stop()
session = (
        SparkSession.builder.config("spark.driver.memory", "8g")
        .config("spark.sql.shuffle.partitions", "50")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "localhost")
        .master("local[*]")
        .enableHiveSupport()
        .getOrCreate()
    )
spark = State(session).session
print_config_param(spark, 'spark.sql.shuffle.partitions')

spark.sql.shuffle.partitions: 50


#### Will return to the default session config

In [12]:
spark.stop()
spark = State(get_spark_session()).session
spark.sparkContext.setLogLevel('ERROR')
spark

## 0. Data preprocessing <a name='data-preparator'></a>
We will use MovieLens 1m as an example.

In [13]:
df = pd.read_csv("data/ml1m_ratings.dat", sep="\t", names=["userId", "item_id", "relevance", "timestamp"])
users = pd.read_csv("data/ml1m_users.dat", sep="\t", names=["user_id", "gender", "age", "occupation", "zip_code"])

In [14]:
df.head(2)

Unnamed: 0,userId,item_id,relevance,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109


In [15]:
users.head(2)

Unnamed: 0,user_id,gender,age,occupation,zip_code
0,1,F,1,10,48067
1,2,M,56,16,70072


### 0.1. DataPreparator

An inner data format in RePlay is a spark dataframe. 

Columns with users' and items' identifiers are required for interaction log. Original user and item identifiers should be named as `user_id` and `item_id`. Those identifiers in section [0.3. Indexing](#indexing) will be converted to integer identifiers, which will be named `user_idx`, `item_idx`. Optional columns for interaction matrix are ``relevance`` and interaction ``timestamp``.

DataFrames with user or item features should have column `user_id` or `item_id` respectively.

We implemented DataPreparator class to convert pandas dataframes to spark format and preprocess the data, including renaming/creation of required and optional interaction matrix columns, null check and dates parsing. It is an optional step, if you already have data in Spark DataFrame format, could rename the above mentioned columns, and confident in completeness and quality of the data, skip this step.

In [16]:
preparator = DataPreparator()

#### Interactions log preprocessing

In [17]:
%%time
log = preparator.transform(columns_mapping={'user_id': 'userId',
                                      'item_id': 'item_id',
                                      'relevance': 'relevance',
                                      'timestamp': 'timestamp'
                                     }, 
                           data=df)

13-Jul-22 14:28:06, replay, INFO: Columns with ids of users or items are present in mapping. The dataframe will be treated as an interactions log.
                                                                                

CPU times: user 128 ms, sys: 59.7 ms, total: 188 ms
Wall time: 5.22 s


In [18]:
log.show(2)

+-------+-------+---------+-------------------+
|user_id|item_id|relevance|          timestamp|
+-------+-------+---------+-------------------+
|      1|   1193|      5.0|2001-01-01 01:12:40|
|      1|    661|      3.0|2001-01-01 01:35:09|
+-------+-------+---------+-------------------+
only showing top 2 rows



In [19]:
log.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- item_id: long (nullable = true)
 |-- relevance: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [20]:
get_log_info(log, user_col='user_id', item_col='item_id')

                                                                                

'total lines: 1000209, total users: 6040, total items: 3706'

As you see, `userId` was renamed to `user_id` and `timestamp` was converted to `TimestampType`.

#### Feature dataframe preprocessing
To transform feature dataframes you could also use DataPreparator:

In [21]:
user_features = preparator.transform(columns_mapping={'user_id': 'user_id'},
                                     data=users)
user_features.show(2)

13-Jul-22 14:28:13, replay, INFO: Column with ids of users or items is absent in mapping. The dataframe will be treated as a users'/items' features dataframe.


+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
+-------+------+---+----------+--------+
only showing top 2 rows



Using the DataPreparator is optional, you could convert dataFrame to spark with ``convert_to_spark`` from ``replay.utils`` and manually rename columns.

In [22]:
# the same result without DataPreparator
convert2spark(users).show(2)

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
+-------+------+---+----------+--------+
only showing top 2 rows



### 0.2 Filtering
It is common to filter interactions log by interaction date or rating value or remove items or users with small number of interactions. RePlay offers some filters presented in `replay.filters` module.
We will leave ratings greater than or equal to 3 and remove users with 4 or fewer interactions.

In [23]:
from replay.filters import filter_by_min_count, filter_out_low_ratings

In [24]:
log = filter_out_low_ratings(log, value=3)
get_log_info(log, user_col='user_id', item_col='item_id')

'total lines: 836478, total users: 6039, total items: 3628'

In [25]:
%%time
log = filter_by_min_count(log, num_entries=5, group_by='user_id')
get_log_info(log, user_col='user_id', item_col='item_id')

13-Jul-22 14:28:17, replay, INFO: current threshold removes 1.1954887038272376e-06% of data
[Stage 46:>                                                         (0 + 8) / 8]

CPU times: user 15.1 ms, sys: 5.92 ms, total: 21.1 ms
Wall time: 6.18 s


                                                                                

'total lines: 836477, total users: 6038, total items: 3628'

<a id='indexing'></a>
### 0.3. Indexing

RePlay models require columns with users' and items' identifiers _(ids)_ to be named as `user_idx` and `item_idx`. Those _ids_ should be integers starting at zero without gaps. This is important for models that use sparse matrices and define the matrix size as the biggest seen user and item index. Storing _ids_ as integers also help to reduce memory usage compared to string _ids_.

You should convert user and item _ids_ in interaction's log and feature dataframes. RaPlay offers Indexer class to perform the _ids_ conversion and convert them back after recommendations generation (predict). The Indexer will store label encoders for users and items and allow transforming ids for users and items, which come after the Indexer fit.

In [26]:
indexer = Indexer(user_col='user_id', item_col='item_id')

Take all available user and item ids from log and features and pass them to Indexer. The _ids_ could repeat, the indexes will be ordered by label frequencies, so the most frequent label gets index 0.

In [27]:
%%time
indexer.fit(users=log.select('user_id').unionByName(user_features.select('user_id')),
            items=log.select('item_id'))

                                                                                

CPU times: user 52.2 ms, sys: 14.1 ms, total: 66.4 ms
Wall time: 3.76 s


In [28]:
%%time
log_replay = indexer.transform(df=log)
log_replay.show(2)

[Stage 65:==>               (1 + 7) / 8][Stage 66:>                 (0 + 1) / 8]

+--------+--------+---------+-------------------+
|user_idx|item_idx|relevance|          timestamp|
+--------+--------+---------+-------------------+
|    2645|     242|      4.0|2000-12-30 00:47:02|
|    2645|    1418|      4.0|2000-12-30 00:38:05|
+--------+--------+---------+-------------------+
only showing top 2 rows

CPU times: user 65.8 ms, sys: 20.8 ms, total: 86.6 ms
Wall time: 3.85 s




In [29]:
%%time
user_features_replay = indexer.transform(df=user_features)
user_features_replay.show(2)

+--------+------+---+----------+--------+
|user_idx|gender|age|occupation|zip_code|
+--------+------+---+----------+--------+
|    3861|     F|  1|        10|   48067|
|    2301|     M| 56|        16|   70072|
+--------+------+---+----------+--------+
only showing top 2 rows

CPU times: user 36.9 ms, sys: 7.24 ms, total: 44.1 ms
Wall time: 352 ms


### 0.4. Split

RePlay provides you with data splitters to reproduce a validation schemas widely-used in recommender systems. Splitters return cached dataframes to compute them once and re-use for models training, inference and metrics calculation.

`UserSplitter` takes ``item_test_size`` items for ``user_test_size`` user to the test dataset.

In [30]:
%%time
splitter = UserSplitter(
    drop_cold_items=True,
    drop_cold_users=True,
    item_test_size=K,
    user_test_size=500,
    seed=SEED,
    shuffle=True
)
train, test = splitter.split(log_replay)
print(train.count(), test.count())

                                                                                

833977 2499
CPU times: user 50.9 ms, sys: 18.1 ms, total: 69 ms
Wall time: 11.9 s


In [31]:
test.is_cached

True

## 1. Models training

#### SLIM

In [32]:
slim = SLIM(seed=SEED)

In [33]:
%%time
slim.fit(log=train)



CPU times: user 1.78 s, sys: 140 ms, total: 1.92 s
Wall time: 41 s


                                                                                

In [34]:
%%time

recs = slim.predict(
    k=K,
    users=test.select('user_idx').distinct(),
    log=train,
    filter_seen_items=True
)

13-Jul-22 14:29:23, replay, INFO: This model can't predict cold items, they will be ignored


CPU times: user 34.6 ms, sys: 17.8 ms, total: 52.5 ms
Wall time: 2.2 s


In [35]:
recs.show(2)



+--------+--------+------------------+
|user_idx|item_idx|         relevance|
+--------+--------+------------------+
|     282|      15|1.0327709816356303|
|     282|      80|0.9718415807887311|
+--------+--------+------------------+
only showing top 2 rows



                                                                                

## 2. Models evaluation

RePlay implements some popular recommenders' quality metrics. Use pure metrics or calculate a set of chosen metrics and compare models with the ``Experiment`` class.

In [36]:
metrics = Experiment(test, {NDCG(): K,
                            MAP() : K,
                            HitRate(): [1, K],
                            Coverage(train): K
                           })

In [37]:
%%time
metrics.add_result("SLIM", recs)
metrics.results

                                                                                

CPU times: user 182 ms, sys: 107 ms, total: 288 ms
Wall time: 26.2 s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM,0.151916,0.22,0.556,0.10006,0.172416


## 3. Hyperparameters optimization

#### 3.1 Search

In [38]:
# data split for hyperparameters optimization
train_opt, val_opt = splitter.split(train)

                                                                                

In [39]:
%%time
best_params = slim.optimize(train_opt, val_opt, criterion=NDCG(), k=K, budget=15)

[32m[I 2022-07-13 14:31:50,885][0m A new study created in memory with name: no-name-11180cb9-31dc-40a1-bb83-a71f5c182226[0m
13-Jul-22 14:32:34, replay, INFO: This model can't predict cold items, they will be ignored
[32m[I 2022-07-13 14:32:55,945][0m Trial 0 finished with value: 0.1734326589995294 and parameters: {'beta': 0.01, 'lambda_': 0.01}. Best is trial 0 with value: 0.1734326589995294.[0m
13-Jul-22 14:33:54, replay, INFO: This model can't predict cold items, they will be ignored
[32m[I 2022-07-13 14:34:12,212][0m Trial 1 finished with value: 0.170801871243933 and parameters: {'beta': 1.4507835506704378, 'lambda_': 0.00040535434413768067}. Best is trial 0 with value: 0.1734326589995294.[0m
13-Jul-22 14:35:05, replay, INFO: This model can't predict cold items, they will be ignored
[32m[I 2022-07-13 14:35:31,316][0m Trial 2 finished with value: 0.17174942939647048 and parameters: {'beta': 1.6339267240132733, 'lambda_': 0.00486422220451093}. Best is trial 0 with value: 0.

CPU times: user 35.5 s, sys: 5.67 s, total: 41.1 s
Wall time: 20min 44s


In [40]:
best_params

{'beta': 0.061429815496712774, 'lambda_': 0.02613996164121192}

#### 3.2 Compare with previous

In [41]:
def fit_predict_evaluate(model, experiment, name):
    model.fit(log=train)

    recs = model.predict(
        k=K,
        users=test.select('user_idx').distinct(),
        log=train,
        filter_seen_items=True
    )

    experiment.add_result(name, recs)
    return recs

In [42]:
%%time
recs = fit_predict_evaluate(SLIM(**best_params, seed=SEED), metrics, 'SLIM_optimized')
recs.cache() #caching for further processing
metrics.results.sort_values('NDCG@5', ascending=False)

13-Jul-22 14:53:17, replay, INFO: This model can't predict cold items, they will be ignored
                                                                                

CPU times: user 2.28 s, sys: 375 ms, total: 2.65 s
Wall time: 1min 4s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM,0.151916,0.22,0.556,0.10006,0.172416
SLIM_optimized,0.141163,0.214,0.568,0.099273,0.172007


The optimized model was better on the validation dataset, but shows comparable quality on test (better by HitRate@5 and worse by the other quality metrics). 

## 4. Getting final recommendations 

### Return to original user and item identifiers

In [43]:
%%time
recs = indexer.inverse_transform(recs)
recs.show(2)



+-------+-------+------------------+
|user_id|item_id|         relevance|
+-------+-------+------------------+
|   5107|    527| 1.046002020356746|
|   5107|   2599|0.9492305434804991|
+-------+-------+------------------+
only showing top 2 rows

CPU times: user 754 ms, sys: 296 ms, total: 1.05 s
Wall time: 6.67 s


                                                                                

### Convert to pandas or save

In [44]:
recs_pd = recs.toPandas()
recs_pd.head(2)

Unnamed: 0,user_id,item_id,relevance
0,5107,527,1.046002
1,5107,2599,0.949231


In [45]:
%%time
recs.write.parquet(path='./slim_recs.parquet', mode='overwrite')

[Stage 2751:>                                                      (0 + 8) / 24]

CPU times: user 4.62 ms, sys: 4.24 ms, total: 8.86 ms
Wall time: 2.75 s




## 4. Save and load

RePlay allows saving and loading fitted models with `save` and `load` functions of `model_handler` module. Model is saved as a folder with all necessary parameters and data.

In [46]:
%%time
save_indexer(indexer, './indexer_ml1')
indexer = load_indexer('./indexer_ml1')

[Stage 2762:>                                                       (0 + 1) / 1]                                                                                

CPU times: user 715 ms, sys: 273 ms, total: 988 ms
Wall time: 4.57 s


In [47]:
%%time
save(slim, path='./slim_best_params')
slim_loaded = load('./slim_best_params')

                                                                                

CPU times: user 66.1 ms, sys: 59 ms, total: 125 ms
Wall time: 42.4 s


In [48]:
slim_loaded.beta, slim_loaded.lambda_

(0.061429815496712774, 0.02613996164121192)

In [49]:
%%time
pred_from_loaded = slim_loaded.predict(k=K,
    users=test.select('user_idx').distinct(),
    log=train,
    filter_seen_items=True)
pred_from_loaded.show(2)

13-Jul-22 14:54:38, replay, INFO: This model can't predict cold items, they will be ignored

+--------+--------+------------------+
|user_idx|item_idx|         relevance|
+--------+--------+------------------+
|     282|      15|1.0362708717674016|
|     282|      61|0.9560245729412186|
+--------+--------+------------------+
only showing top 2 rows

CPU times: user 66.2 ms, sys: 33.3 ms, total: 99.5 ms
Wall time: 10.4 s


                                                                                

In [50]:
%%time
recs = indexer.inverse_transform(pred_from_loaded)
recs.show(2)



+-------+-------+------------------+
|user_id|item_id|         relevance|
+-------+-------+------------------+
|   5107|    527|1.0362708717674016|
|   5107|   2599|0.9560245729412186|
+-------+-------+------------------+
only showing top 2 rows

CPU times: user 886 ms, sys: 341 ms, total: 1.23 s
Wall time: 7.32 s


                                                                                

## 5. Other RePlay models

#### ALS
Commonly-used matrix factorization algorithm.

In [51]:
%%time
recs = fit_predict_evaluate(ALSWrap(rank=100, seed=SEED), metrics, 'ALS')
metrics.results.sort_values('NDCG@5', ascending=False)

13-Jul-22 14:55:47, replay, INFO: This model can't predict cold users, they will be ignored
13-Jul-22 14:55:47, replay, INFO: This model can't predict cold items, they will be ignored
                                                                                

CPU times: user 746 ms, sys: 450 ms, total: 1.2 s
Wall time: 3min 11s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM,0.151916,0.22,0.556,0.10006,0.172416
SLIM_optimized,0.141163,0.214,0.568,0.099273,0.172007
ALS,0.196305,0.202,0.55,0.08636,0.156045


#### ItemKNN
Commonly-used item-based recommender

In [52]:
%%time
recs = fit_predict_evaluate(ItemKNN(num_neighbours=100), metrics, 'ItemKNN')
metrics.results.sort_values('NDCG@5', ascending=False)

13-Jul-22 15:00:29, replay, INFO: This model can't predict cold items, they will be ignored
                                                                                

CPU times: user 373 ms, sys: 413 ms, total: 786 ms
Wall time: 3min 3s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM,0.151916,0.22,0.556,0.10006,0.172416
SLIM_optimized,0.141163,0.214,0.568,0.099273,0.172007
ALS,0.196305,0.202,0.55,0.08636,0.156045
ItemKNN,0.050731,0.168,0.39,0.063387,0.113365


## 6 Compare RePlay models with others
To easily evaluate recommendations obtained from other sources, read and pass these recommendations to ``Experiment``

In [53]:
import pyspark.sql.functions as sf

In [54]:
metrics.add_result("my_model", recs.withColumn("relevance", sf.rand()))
metrics.results.sort_values("NDCG@5", ascending=False)

                                                                                

Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM,0.151916,0.22,0.556,0.10006,0.172416
SLIM_optimized,0.141163,0.214,0.568,0.099273,0.172007
ALS,0.196305,0.202,0.55,0.08636,0.156045
ItemKNN,0.050731,0.168,0.39,0.063387,0.113365
my_model,0.050731,0.118,0.39,0.05572,0.104047
