In this notebook we will train an `XGBoost Ranker` on the GPU and perform prediction.

Training with varied architectures and ensembling (please see [💡 [2 methods] How-to ensemble predictions 🏅🏅🏅](https://www.kaggle.com/code/radek1/2-methods-how-to-ensemble-predictions) for a tutorial on ensembling) can offer you a significant jump on the LB!

Training with `XGBoost` however offers more additional advantages. In comparison to `LGBM`, `XGBoost` allows you to train with the following objectives (`LGBM` gives you access to a single loss only for ranking, training with different objectives is a great way of improving your ensemble!):
* `rank:pairwise`
* `rank:ndcg`
* `rank:map`

On top of that, we will train on the GPU! 🔥 GPU can offer a significant speed-up. You can train more and bigger models in a shorter amount of time. However, when training on the GPU with large amounts of tabular data, you can easily run into problems (how to load the data onto the GPU for processing in chunks, how to manage memory).

As we want to focus on feature engineering and training lets offload all the low level, tedious considerations to the `Merlin Framework`!

In this notebook, we will introduce the entire pipeline. We will preprocess our data on the GPU using a library specifically designed for tabular data preprocessing, `NVTabular`. We will then proceed to train our `XGBoost` model with `Merlin Models`. In the background  we will leverage `dask_cuda` and distributed training to optimize the use of available GPU RAM, but we will let the libraries handle all that! No additional configuration will be required from us.

Let's get started!

## Other resources you might find useful:

* [💡 [2 methods] How-to ensemble predictions 🏅🏅🏅](https://www.kaggle.com/code/radek1/2-methods-how-to-ensemble-predictions)
* [co-visitation matrix - simplified, imprvd logic 🔥](https://www.kaggle.com/code/radek1/co-visitation-matrix-simplified-imprvd-logic)
* [💡 Word2Vec How-to [training and submission]🚀🚀🚀](https://www.kaggle.com/code/radek1/word2vec-how-to-training-and-submission)
* [local validation tracks public LB perfecty -- here is the setup](https://www.kaggle.com/competitions/otto-recommender-system/discussion/364991)
* [💡 For my friends from Twitter and LinkedIn -- here is how to dive into this competition 🐳](https://www.kaggle.com/competitions/otto-recommender-system/discussion/368560)
* [Full dataset processed to CSV/parquet files with optimized memory footprint](https://www.kaggle.com/competitions/otto-recommender-system/discussion/363843)

# Libraries installation

We will need a couple of libraries that do not come preinstalled on the Kaggle VM. Let's install them here.

In [26]:
!pip install polars
!pip install nvtabular==1.3.3 merlin-models polars merlin-core==v0.4.0 dask_cuda

# Data Processing

We will briefly preprocess our data using polars. After that step, we will hand it over to `NVTabular` to tag our data (so that our model will know where to find the information it needs for training).

In [27]:
from nvtabular import *
from merlin.schema.tags import Tags
import polars as pl
import xgboost as xgb

from merlin.core.utils import Distributed
from merlin.models.xgb import XGBoost
from nvtabular.ops import AddTags

In [28]:
print("start")

import time
import gc
import pandas as pd
import polars as pl
import numpy as np
from collections import defaultdict
# import cudf
from sklearn.preprocessing import label_binarize

from gensim.test.utils import common_texts
from gensim.models import Word2Vec

all_train = pl.read_parquet('/kaggle/input/otto-full-optimized-memory-footprint/train.parquet')
test = pl.read_parquet('/kaggle/input/otto-full-optimized-memory-footprint/test.parquet')

all_train = all_train.with_columns([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('type').cast(pl.datatypes.UInt8),
    pl.col('aid').cast(pl.datatypes.Int32),
    pl.col('ts').cast(pl.datatypes.Int64)
])

test = test.with_columns([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('type').cast(pl.datatypes.UInt8),
    pl.col('aid').cast(pl.datatypes.Int32),
    pl.col('ts').cast(pl.datatypes.Int64)
])

##### for Word2Vec pretrain
sentences_df = pl.concat([all_train, test]).groupby('session').agg(
    pl.col('aid').alias('sentence')
)

sentences = sentences_df['sentence'].to_list()

del all_train
gc.collect()

train = pl.read_parquet('/kaggle/input/otto-train-and-test-data-for-local-validation/test.parquet')

train = train.with_columns([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('type').cast(pl.datatypes.UInt8),
    pl.col('aid').cast(pl.datatypes.Int32),
    pl.col('ts').cast(pl.datatypes.Int64)
])

print("w2vec start")
start = time.time()

w2vec = Word2Vec(sentences=sentences, vector_size=32, min_count=1, workers=4)

print("w2vec end")
end = time.time()
print(f'執行時間: {end - start} 秒\n')

from annoy import AnnoyIndex

aid2idx = {aid: i for i, aid in enumerate(w2vec.wv.index_to_key)}
index = AnnoyIndex(32, 'euclidean')

for aid, idx in aid2idx.items():
    index.add_item(idx, w2vec.wv.vectors[idx])
    
index.build(10)
#####

###
train_labels = pl.read_parquet('/kaggle/input/otto-train-and-test-data-for-local-validation/test_labels.parquet')

def word2vec_candidate(df): 
    global index
    session_types = ['clicks', 'carts', 'orders']
    df_session_AIDs = df.to_pandas().reset_index(drop=True).groupby('session')['aid'].apply(list)
    df_session_types = df.to_pandas().reset_index(drop=True).groupby('session')['type'].apply(list)
    df_session_num = df.to_pandas().reset_index(drop=True).groupby('session')['session'].apply(list)
    
    #
    label_sessions = []
    label_aids = []

    print("candidate calc start")
    start = time.time()

    type_weight_multipliers = {0: 1, 1: 6, 2: 3}
    for AIDs, types, session_num in zip(df_session_AIDs, df_session_types, df_session_num):
        session_num = session_num[0]
            
        if len(AIDs) >= 20:
            # if we have enough aids (over equals 20) we don't need to look for candidates! we just use the old logic
            weights=np.logspace(0.1,1,len(AIDs),base=2, endpoint=True)-1
            aids_temp=defaultdict(lambda: 0)
            for aid,w,t in zip(AIDs,weights,types): 
                aids_temp[aid]+= w * type_weight_multipliers[t]
                
            sorted_aids=[k for k, v in sorted(aids_temp.items(), key=lambda item: -item[1])]

            if len(sorted_aids) < 20:
                AIDs = list(dict.fromkeys(AIDs[::-1]))
                # let's grab the most recent aid
                most_recent_aid = AIDs[0]
                # and look for some neighbors!
                nns = [w2vec.wv.index_to_key[i] for i in index.get_nns_by_item(aid2idx[most_recent_aid], 21)[1:]]
                sorted_aids = (sorted_aids + nns)
           
            session_arr = [session_num for i in range(20)]
            # 
            label_sessions.extend(session_arr)
            label_aids.extend(sorted_aids[:20])

        else:
            # here we don't have 20 aids to output -- we will use word2vec embeddings to generate candidates!
            AIDs = list(dict.fromkeys(AIDs[::-1]))
            # let's grab the most recent aid
            most_recent_aid = AIDs[0]
            # and look for some neighbors!
            nns = [w2vec.wv.index_to_key[i] for i in index.get_nns_by_item(aid2idx[most_recent_aid], 21)[1:]]
            
            session_arr = [session_num for i in range(20)]
            label_sessions.extend(session_arr)
            label_aids.extend((AIDs+nns)[:20])  

    candidates = pl.DataFrame({"session": label_sessions, "aid":label_aids})
    candidates = candidates.with_columns([
        pl.col('session').cast(pl.datatypes.Int32),
        pl.col('aid').cast(pl.datatypes.Int32),
    ])
    
    print('candidates')
    print(candidates)
    
    print("candidate calc end")
    end = time.time()
    print(f'執行時間: {end - start} 秒\n')
    
    candidates = candidates.with_column(pl.col('aid').cumcount().over('session').alias('word2vec_rank') + 1)
    candidates = candidates.with_columns([
        pl.col('session').cast(pl.datatypes.Int32),
    ])
    
    df = df.join(candidates, on=['session', 'aid'], how='outer').sort("session")
    return df

def add_action_num_reverse_chrono(df):
    return df.select([
        pl.col('*'),
        pl.col('session').cumcount().reverse().over('session').alias('action_num_reverse_chrono')
    ])

def add_session_length(df):
    return df.select([
        pl.col('*'),
        pl.col('session').count().over('session').alias('session_length')
    ])

def add_log_recency_score(df):
    linear_interpolation = 0.1 + ((1-0.1) / (df['session_length']-1)) * (df['session_length']-df['action_num_reverse_chrono']-1)
    return df.with_columns(pl.Series(2**linear_interpolation - 1).alias('log_recency_score')).fill_nan(1)

def add_type_weighted_log_recency_score(df):
    type_weights = {0:1, 1:6, 2:3}
    type_weighted_log_recency_score = pl.Series(df['log_recency_score'] / df['type'].apply(lambda x: type_weights[x]))
    return df.with_column(type_weighted_log_recency_score.alias('type_weighted_log_recency_score'))

def apply(df, pipeline):
    for f in pipeline:
        df = f(df)
    return df

pipeline = [add_action_num_reverse_chrono, add_session_length, add_log_recency_score, add_type_weighted_log_recency_score, word2vec_candidate]

train = apply(train, pipeline)

gc.collect()

type2id = {"clicks": 0, "carts": 1, "orders": 2}

train_labels = train_labels.explode('ground_truth').with_columns([
    pl.col('ground_truth').alias('aid'),
    pl.col('type').apply(lambda x: type2id[x])
])[['session', 'type', 'aid']]

train_labels = train_labels.with_columns([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('type').cast(pl.datatypes.UInt8),
    pl.col('aid').cast(pl.datatypes.Int32)
])

train_labels = train_labels.with_column(pl.lit(1).alias('gt'))

train = train.join(train_labels, how='left', on=['session', 'type', 'aid']).with_column(pl.col('gt').fill_null(0))

def get_session_lenghts(df):
    return df.groupby('session').agg([
        pl.col('session').count().alias('session_length')
    ])['session_length'].to_numpy()

session_lengths_train = get_session_lenghts(train)

start
w2vec start
w2vec end
執行時間: 0.2642664909362793 秒

candidate calc start
candidates
shape: (2000, 2)
┌─────────┬─────────┐
│ session ┆ aid     │
│ ---     ┆ ---     │
│ i32     ┆ i32     │
╞═════════╪═════════╡
│ 0       ┆ 974651  │
│ 0       ┆ 543308  │
│ 0       ┆ 1199474 │
│ 0       ┆ 1549618 │
│ ...     ┆ ...     │
│ 99      ┆ 1262469 │
│ 99      ┆ 1642686 │
│ 99      ┆ 82799   │
│ 99      ┆ 746209  │
└─────────┴─────────┘
candidate calc end
執行時間: 0.02591562271118164 秒



Let us now define the preprocessing steps we would like to apply to our data.

In [29]:
train_ds = Dataset(train.to_pandas())

feature_cols = ['aid', 'type','action_num_reverse_chrono', 'session_length', 'log_recency_score', 'type_weighted_log_recency_score', 'word2vec_rank']
target = ['gt'] >> AddTags([Tags.TARGET])
qid_column = ['session'] >>  AddTags([Tags.USER_ID]) # we will use sessions as a query ID column
                                                     # in XGBoost parlance this a way of grouping together for training
                                                     # when training with LGBM we had to calculate session lengths, but here the model does all the work for us!
gc.collect()

175

Having defined the preprocessing steps, we can now apply them to our data. The preprocessing is going to run on the GPU!

In [30]:
wf = Workflow(feature_cols + target + qid_column)
train_processed = wf.fit_transform(train_ds)

# Model training

In [31]:
ranker = XGBoost(train_processed.schema, objective='rank:pairwise')

The `Distributed` context manager will start a dask cudf cluster of us. A Dask cluster will be able to better manage memory usage for us. Normally, setting it up would be quite tedious -- here, we get all the benefits with a single line of Python code!

In [32]:
# version mismatch doesn't result in a loss of functionality here for us
# it stems from the versions of libraries that the Kaggle vm comes preinstalled with

with Distributed():
    ranker.fit(train_processed)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41677 instead
  f"Port {expected} is already in use.\n"


[0]	train-map:1.00000
[1]	train-map:1.00000
[2]	train-map:1.00000
[3]	train-map:1.00000
[4]	train-map:1.00000
[5]	train-map:1.00000
[6]	train-map:1.00000
[7]	train-map:1.00000
[8]	train-map:1.00000
[9]	train-map:1.00000


[07:38:52] task [xgboost.dask-1]:tcp://127.0.0.1:35091 got new rank 0


We have now trained our model! Let's predict on test!

# Predict on test data

Let's load our test set, process it and predict on it.

In [33]:
test = apply(test, pipeline)
test_ds = Dataset(test.to_pandas())

wf = wf.remove_inputs(['gt']) # we don't have ground truth information in test!

test_ds_transformed = wf.transform(test_ds)

candidate calc start
candidates
shape: (2000, 2)
┌──────────┬─────────┐
│ session  ┆ aid     │
│ ---      ┆ ---     │
│ i32      ┆ i32     │
╞══════════╪═════════╡
│ 12899779 ┆ 59625   │
│ 12899779 ┆ 1219653 │
│ 12899779 ┆ 977788  │
│ 12899779 ┆ 646708  │
│ ...      ┆ ...     │
│ 12899878 ┆ 1597140 │
│ 12899878 ┆ 1040641 │
│ 12899878 ┆ 1689148 │
│ 12899878 ┆ 372062  │
└──────────┴─────────┘
candidate calc end
執行時間: 0.010401248931884766 秒



Let's output the predictions

In [34]:
test_preds = ranker.booster.predict(xgb.DMatrix(test_ds_transformed.compute()))

# Create submission

In [35]:
test = test.with_columns(pl.Series(name='score', values=test_preds))
test_predictions = test.sort(['session', 'score'], reverse=True).groupby('session').agg([
    pl.col('aid').limit(20).list()
])

In [36]:
session_types = []
labels = []

for session, preds in zip(test_predictions['session'].to_numpy(), test_predictions['aid'].to_numpy()):
    l = ' '.join(str(p) for p in preds)
    for session_type in ['clicks', 'carts', 'orders']:
        labels.append(l)
        session_types.append(f'{session}_{session_type}')

In [37]:
submission = pl.DataFrame({'session_type': session_types, 'labels': labels})
submission.write_csv('xgb_submission.csv')