# Recommendation model based on OTTO dataset

This notebook loads the normalized version of [the OTTO session dataset](https://www.kaggle.com/datasets/otto/recsys-dataset?select=otto-recsys-train.jsonl).

We aim is to solve three tasks:

1. Provide the top-seller products from their shop
1. Identifying frequently bought together (FreBoTo) items
1. The customer requests real-time recommendation

The dataset has been normalized (see [prep-data.ipynb](./prep-data.ipynb)) from the original json format into a table with the schema:

```python
meta = {
    "aid": np.int64,
    "ts": np.int64,
    "type": np.str,
    "session": np.int64,
}
```

where 
* `aid` is the `article id` of a product
* `ts` is the timestamp in epoch format
* `type` is an enum of `{"clicks","carts","orders"}` representing the user _clicking on the product_, _adding the product to their basket_, and _placing an order that includes that product_, respectively.
* `session` is the session id. From the dataset documentation: _"A session is all activity by a single user either in the train or the test set."_

> NOTE: Preferably, we would store and load the dataset in some kind of cloud-enabled storage to keep this book
> portable and reproducible, but this is outside the scope of this book.

_All_ IDs are anonymized, i.e. it's pretty hard to present an intuition about the results. As an example, you cannot look up the products, as they are no longer related to the products from `OTTO`. We have to rely solely on our evaluation scores with this data.

## Approach

We identify problem 2 (_Identifying frequently bought together (FreBoTo) items_) as the most challenging part, as it leaves the most room for interpretation while also inviting a machine-learning based approach.

To solve this problem, we focus on the `orders` in a session and treat it as a Natural Language Processing (NLP). Namely, we can treat the sequence of orders in a session as _a sentence_ in NLP and solve the problem as *Next Event Predition* (NEP) task. 

If we look at a session in isolation, the NEP task would be, given the history of ordered articles, **what is the most likely product that the customer would buy next**.

![](FF19_Artboard_10rev.png)
> (Image from https://session-based-recommenders.fastforwardlabs.com/)


In this notebook, we choose the [`Word2Vec`](https://en.wikipedia.org/wiki/Word2vec) approach to solve this problem. The [Gensim](https://radimrehurek.com/gensim/) library has an implementation of this training algorithm which we will use ([see specificly this link](https://radimrehurek.com/gensim/models/word2vec.html)).
We are using the `skip-gram` algorithm configuration, specifically.

To assess our solution, we use [`Recall at K`](https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval)#Recall) and [`Mean Reciprocal Rank at K`](https://en.wikipedia.org/wiki/Mean_reciprocal_rank) to score the model against the "ground truth", i.e. based on the `n-1`th order, how likely are we to recommend the `n`th item from that session.

We compare the 2 scores to a random algorithm which picks `K` random elements from a given vocabulary (the _vocabulary_ is the full list of `aid`s we see in the dataset).

## Unsolved problems

We do not solve the **cold start problem**, where, i.e., a new product is launched and not recommended, since the algorithm has no prior order history which includes this product. This could be solved as part of `problem 3` at the time of providing recommendations through an `API`, or possibly presented with a "hero spot" on the webpage until enough history can be gathered.

We do not tune the **hyperparameters**, though this could possibly increase our score dramatically. 

## Dataset

We use only the main dataset ('otto-recsys-train.jsonl') and split the sessions into training and test data manually. This is a bit more flexible and suits the `Word2Vec` approach.


In [22]:
# Import libraries and print versions
import sys

import dask.dataframe as dd
import pandas as pd
import platform
from typing import List

print(f"Running on machine: {platform.uname()}")
print()
print(f"Python {sys.version}")
print(f"Pandas {pd.__version__}")

Running on machine: uname_result(system='Darwin', node='dkfepaha.local', release='22.6.0', version='Darwin Kernel Version 22.6.0: Wed Jul  5 22:22:05 PDT 2023; root:xnu-8796.141.3~6/RELEASE_ARM64_T6000', machine='arm64')

Python 3.9.15 | packaged by conda-forge | (main, Nov 22 2022, 08:52:10) 
[Clang 14.0.6 ]
Pandas 1.4.4


In [23]:
# Start a Dask Client for a local cluster (automatically started)
from dask.distributed import Client
client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 53872 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:53872/status,

0,1
Dashboard: http://127.0.0.1:53872/status,Workers: 5
Total threads: 10,Total memory: 32.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53873,Workers: 5
Dashboard: http://127.0.0.1:53872/status,Total threads: 10
Started: Just now,Total memory: 32.00 GiB

0,1
Comm: tcp://127.0.0.1:53895,Total threads: 2
Dashboard: http://127.0.0.1:53903/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:53877,
Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-kwtp5j3w,Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-kwtp5j3w

0,1
Comm: tcp://127.0.0.1:53897,Total threads: 2
Dashboard: http://127.0.0.1:53901/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:53879,
Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-_rwh18rt,Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-_rwh18rt

0,1
Comm: tcp://127.0.0.1:53896,Total threads: 2
Dashboard: http://127.0.0.1:53902/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:53880,
Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-venxdlg9,Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-venxdlg9

0,1
Comm: tcp://127.0.0.1:53898,Total threads: 2
Dashboard: http://127.0.0.1:53899/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:53876,
Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-onrk9yfo,Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-onrk9yfo

0,1
Comm: tcp://127.0.0.1:53894,Total threads: 2
Dashboard: http://127.0.0.1:53900/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:53878,
Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-q0ciff50,Local directory: /var/folders/z3/skd_zhyn1b3g4vlnycmz7kw00000gp/T/dask-worker-space/worker-q0ciff50


In [24]:
# Read the normalized dataset into a Dask DataFrame
# We continue to work with the Dask DataFrame as long as possible as it supports lazy evaluation
normalized = dd.read_parquet(f"archive/otto-recsys-train.jsonl-parquet")
normalized.compute() ## show preview

Unnamed: 0,aid,ts,type,session
0,1517085,1659304800025,clicks,0
1,1563459,1659304904511,clicks,0
2,1309446,1659367439426,clicks,0
3,16246,1659367719997,clicks,0
4,1781822,1659367871344,clicks,0
...,...,...,...,...
960804,1737908,1661723987073,clicks,12899776
960805,384045,1661723976974,clicks,12899777
960806,384045,1661723986800,clicks,12899777
960807,561560,1661723983611,clicks,12899778


In [25]:
mask = normalized["type"] == "orders"  ## only orders

orders = normalized[mask]  ## filter out orders


sessions_with_orders = len(orders["session"].unique())
session_number = len(normalized["session"].unique())
vocabulary = (
    normalized["aid"].unique().compute().tolist()
)  # we need this for the random baseline model later
unique_aids = len(vocabulary)
percentage_with_purchase = sessions_with_orders / session_number * 100

total_orders = len(orders)
average_order_per_buying_session = total_orders / sessions_with_orders

normalized[mask].groupby("session")

MINIMUM_ORDERS = 2

sessions_with_more_than_k_orders = orders.map_partitions(
    lambda df: df.groupby("session").filter(lambda x: len(x) > MINIMUM_ORDERS)
) ## we assume that each dask-partition contains full-sessions. TODO: recheck assumption

num_sessions_with_more_than_k_orders = len(
    sessions_with_more_than_k_orders["session"].unique()
)

print(f"Number of unique aids: {unique_aids}")

print(f"Total number of orders: {total_orders}")
print(
    f"Average number of orders per buying session: {average_order_per_buying_session}"
)

print(
    f"Number of sessions with more than 3 orders: {num_sessions_with_more_than_k_orders}"
)

print(f"Number of sessions with orders: {sessions_with_orders}")
print(f"Percentage of sessions with orders: {percentage_with_purchase:.2f}%")

Number of unique aids: 1855603
Total number of orders: 5098951
Average number of orders per buying session: 3.1352344961502467
Number of sessions with more than 3 orders: 602169
Number of sessions with orders: 1626338
Percentage of sessions with orders: 12.61%


In [26]:
# We find the 10 most and least popular `aid`s (basically solves problem 1)
orders["aid"].value_counts().compute()

# TODO: plot value counts to see distribution

231487     4485
166037     3824
1733943    3042
1445562    2998
1022566    2788
           ... 
1010520       1
1010518       1
1010505       1
1010497       1
1855600       1
Name: aid, Length: 657940, dtype: int64

In [27]:
filtered = sessions_with_more_than_k_orders[["aid","session"]] # discard timestamps and type
filtered.compute()

Unnamed: 0,aid,session
8,305831,0
9,461689,0
245,1199474,0
246,543308,0
360,357461,3
...,...,...
959388,465366,12899355
960085,1488793,12899525
960086,1599360,12899525
960087,996393,12899525


In [28]:
# we get the final session data in memory.
# NOTE: This does not scale for even larger datasets, but it is not clear if gensim supports dask dataframes
# At least this fits the requirement that the input needs to be a "restartable" iterator.
orders_by_session: List[List[int]] = (
    filtered.compute().groupby("session").agg(list)["aid"].values.tolist()
) 
print(len(orders_by_session))

602169


In [29]:
# We split the data into train, test and validation sets
from split import train_test_split
print(f"orders_by_session has {len(orders_by_session)} sessions")
test_size = 3000 # this affects the scoring time and accuracy
assert test_size < len(orders_by_session), "test_size must be smaller than the number of sessions"

train, test, valid = train_test_split(orders_by_session, test_size=test_size)

orders_by_session has 602169 sessions


In [32]:
"""
We define a logger to keep track of the loss function progress while training the model.
(Not neccessary, but feels nice to have).
We also store the losses in a list to possibly plot them later to visualize where the model plateus.
"""

from gensim.models.callbacks import CallbackAny2Vec
from tqdm import tqdm
class LossLogger(CallbackAny2Vec):
    """Output loss at each epoch"""

    def __init__(self, epochs):
        self.epoch = 1
        self.epochs = epochs
        self.losses = []
        self.progress_bar = tqdm(total=epochs,unit="epoch")

    def on_epoch_end(self, model):
        self.progress_bar.update(1)

        if self.epoch == self.epochs:
            self.progress_bar.close()
            losses_with_indices = [x for x in enumerate(self.losses)]
            print(f"Losses: {losses_with_indices}")

        loss = model.get_latest_training_loss()
        last_loss = self.losses[self.epoch - 2] if len(self.losses) > 0 else None
        delta = (
            loss - last_loss
            if last_loss is not None
            else loss
        )
        self.losses.append(delta)
        self.epoch += 1

In [39]:
"""
Finally, we train the model and score it.
We then compare it to the score of a random baseline model.
"""

from gensim.models import Word2Vec
from evaluation import recall_at_k, mrr_at_k, RandomEmbeddings
import os

epochs = 50

print(f"Training Word2Vec model on {len(train)} sessions")

loss_logger = LossLogger(epochs)

model = Word2Vec(
    train,
    min_count=1,
    sg=1,
    workers=8,
    epochs=epochs,
    window=10,
    compute_loss=True,
    callbacks=[loss_logger],
)

print("Saving model")
model.save("word2vec.model")
model_size_in_mb = os.path.getsize("word2vec.model") / (1024 * 1024)
print(f"Model size: {model_size_in_mb:.2f}mb")


embeddings = model.wv  # "word vectors"

Training Word2Vec model on 602169 sessions


100%|██████████| 50/50 [03:19<00:00,  4.00s/epoch]


Losses: [(0, 5659276.5), (1, 3746996.5), (2, 8398153.5), (3, 6020292.5), (4, 10385297.5), (5, 7667756.5), (6, 11930071.5), (7, 9083752.5), (8, 13314393.5), (9, 10429106.5), (10, 14545851.5), (11, 11587684.5), (12, 15642107.5), (13, 12589680.5), (14, 16606531.5), (15, 13463648.5), (16, 17453265.5), (17, 14242932.5), (18, 18204473.5), (19, 14939100.5), (20, 18727839.5), (21, 15214452.5), (22, 18965211.5), (23, 15478168.5), (24, 19210395.5), (25, 15726204.5), (26, 19446383.5), (27, 15961904.5), (28, 19672775.5), (29, 16192700.5), (30, 19897739.5), (31, 16410500.5), (32, 20114899.5), (33, 16625408.5), (34, 20324131.5), (35, 16835512.5), (36, 20529163.5), (37, 17041936.5), (38, 20728767.5), (39, 17239216.5), (40, 20929467.5), (41, 17433972.5), (42, 21126727.5), (43, 17629556.5), (44, 21316575.5), (45, 17825524.5), (46, 21504959.5), (47, 18016060.5), (48, 21694679.5)]
Saving model
Model size: 12.46mb


In [41]:
# Score the model against the random model

K = 20

random_embeddings = RandomEmbeddings(vocabulary)

print(f"Evaluating model with 'recall@{K}'")
recall_score = recall_at_k(tqdm(test), embeddings, k=K)
recall_score_random = recall_at_k(tqdm(test), random_embeddings, k=K)
print(f"Model recall@{K} score: {recall_score}")
print(f"Random recall@{K} score: {recall_score_random}")

print(f"Evaluating model with 'mrr@{K}'")
mrr_score = mrr_at_k(tqdm(test), embeddings, k=K)
mrr_score_random = mrr_at_k(tqdm(test), random_embeddings, k=K)
print(f"Model mrr@{K} score: {mrr_score}")
print(f"Random mrr@{K} score: {mrr_score_random}")

Evaluating model with 'recall@20'


100%|██████████| 3000/3000 [00:21<00:00, 141.93it/s]
100%|██████████| 3000/3000 [03:35<00:00, 13.90it/s]


0.032
Random score: 0.0
Evaluating model with 'mrr@20'


100%|██████████| 3000/3000 [00:21<00:00, 139.19it/s]
100%|██████████| 3000/3000 [03:36<00:00, 13.87it/s]

0.014594468440056685
Random mrr score: 0.0



