In [1]:
%%capture

!pip install nvtabular==1.3.3 merlin-models polars merlin-core==v0.4.0 dask_cuda

# Data Processing

In [2]:
from nvtabular import *
from merlin.schema.tags import Tags
import polars as pl
import xgboost as xgb

from merlin.core.utils import Distributed
from merlin.models.xgb import XGBoost
from nvtabular.ops import AddTags

In [3]:
train = pl.read_parquet('/kaggle/input/otto-train-and-test-data-for-local-validation/test.parquet')
train_labels = pl.read_parquet('/kaggle/input/otto-train-and-test-data-for-local-validation/test_labels.parquet')


def add_action_num_reverse_chrono(df):
    return df.select(
        pl.col('*'),
        (pl.col('session').cumcount().reverse().over('session')).alias('action_num_reverse_chrono')
    )

def add_session_length(df):
    return df.select(
        pl.col('*'),
        (pl.col('session').count().over('session')).alias('session_length')
    )

def add_log_recency_score(df):
    linear_interpolation = 0.1 + ((1 - 0.1) / (df['session_length'] - 1)) * (df['session_length'] - df['action_num_reverse_chrono'] - 1)
    log_recency_score = (pl.Series(2**linear_interpolation - 1)).alias('log_recency_score')
    return df.select('*', log_recency_score).fill_null(1)


def add_type_weighted_log_recency_score(df):
    type_weights = {0: 1, 1: 6, 2: 3}
    type_weighted_log_recency_score = (pl.Series(df['type'].apply(lambda x: type_weights[x]) * df['log_recency_score'])).alias('type_weighted_log_recency_score')
    return df.select('*', type_weighted_log_recency_score)

def apply(df, pipeline):
    for f in pipeline:
        df = f(df)
    return df

pipeline = [add_action_num_reverse_chrono, add_session_length, add_log_recency_score, add_type_weighted_log_recency_score]

train = apply(train, pipeline)

type2id = {"clicks": 0, "carts": 1, "orders": 2}


train_labels = train_labels.explode('ground_truth').select(
    pl.col('session').cast(pl.datatypes.Int32).alias('session'),
    pl.col('ground_truth').alias('aid'),
    pl.lit(str(type2id['orders'])).alias('type')
).select('session', 'type', 'aid')


train_labels = train_labels.select(
    pl.col('session').cast(pl.datatypes.Int32).alias('session'),
    pl.col('type').cast(pl.datatypes.UInt8).alias('type'),
    pl.col('aid').apply(lambda x: [int(i) for i in x.split(',')] if isinstance(x, str) else [x]).apply(lambda x: [int(i) for i in x]).apply(lambda x: x[0]).cast(pl.Int32).alias('aid')

)

train_labels = train_labels.select('*', pl.lit(1).alias('gt_label'))
train = train.join(train_labels, how='left', on=['session', 'type', 'aid']).select('*', pl.col('gt_label').alias('gt').fill_null(0))


In [4]:
train_labels

session,type,aid,gt_label
i32,u8,i32,i32
11098528,2,1679529,1
11098528,2,1199737,1
11098528,2,990658,1
11098528,2,950341,1
11098528,2,1462506,1
11098528,2,1561739,1
11098528,2,907564,1
11098528,2,369774,1
11098528,2,440367,1
11098528,2,92401,1


In [5]:
train

session,aid,ts,type,action_num_reverse_chrono,session_length,log_recency_score,type_weighted_log_recency_score,gt_label,gt
i32,i32,i32,u8,u32,u32,f64,f64,i32,i32
11098528,11830,1661119200,0,0,1,,,,0
11098529,1105029,1661119200,0,0,1,,,,0
11098530,264500,1661119200,0,5,6,0.071773,0.071773,,0
11098530,264500,1661119288,0,4,6,0.214195,0.214195,,0
11098530,409236,1661119369,0,3,6,0.375542,0.375542,,0
11098530,409236,1661119441,0,2,6,0.558329,0.558329,,0
11098530,409236,1661120165,0,1,6,0.765406,0.765406,,0
11098530,409236,1661120532,1,0,6,1.0,6.0,,0
11098531,452188,1661119200,0,23,24,0.071773,0.071773,,0
11098531,1239060,1661119227,0,22,24,0.101241,0.101241,,0


Let us now define the preprocessing steps we would like to apply to our data.

In [None]:
train_ds = Dataset(train.to_pandas())

feature_cols = ['aid', 'type','action_num_reverse_chrono', 'session_length', 'log_recency_score', 'type_weighted_log_recency_score']
target = ['gt'] >> AddTags([Tags.TARGET])
qid_column = ['session'] >>  AddTags([Tags.USER_ID]) # we will use sessions as a query ID column
                                                     # in XGBoost parlance this a way of grouping together for training
                                                     # when training with LGBM we had to calculate session lengths, but here the model does all the work for us!

Having defined the preprocessing steps, we can now apply them to our data. The preprocessing is going to run on the GPU!

In [None]:
wf = Workflow(feature_cols + target + qid_column)
train_processed = wf.fit_transform(train_ds)

# Model training

In [None]:
ranker = XGBoost(train_processed.schema, objective='rank:pairwise')

The `Distributed` context manager will start a dask cudf cluster of us. A Dask cluster will be able to better manage memory usage for us. Normally, setting it up would be quite tedious -- here, we get all the benefits with a single line of Python code!

In [None]:
# version mismatch doesn't result in a loss of functionality here for us
# it stems from the versions of libraries that the Kaggle vm comes preinstalled with

with Distributed():
    ranker.fit(train_processed)

We have now trained our model! Let's predict on test!

# Predict on test data

Let's load our test set, process it and predict on it.

In [None]:
test = pl.read_parquet('/kaggle/input/otto-full-optimized-memory-footprint/test.parquet')
test = apply(test, pipeline)
test_ds = Dataset(test.to_pandas())

wf = wf.remove_inputs(['gt']) # we don't have ground truth information in test!

test_ds_transformed = wf.transform(test_ds)

Let's output the predictions

In [None]:
test_preds = ranker.booster.predict(xgb.DMatrix(test_ds_transformed.compute()))

# Create submission

In [None]:
test = test.with_columns(pl.Series(name='score', values=test_preds))
test_predictions = test.sort(['session', 'score'], reverse=True).groupby('session').agg([
    pl.col('aid').limit(20).list()
])

In [None]:
session_types = []
labels = []

for session, preds in zip(test_predictions['session'].to_numpy(), test_predictions['aid'].to_numpy()):
    l = ' '.join(str(p) for p in preds)
    for session_type in ['clicks', 'carts', 'orders']:
        labels.append(l)
        session_types.append(f'{session}_{session_type}')

In [None]:
submission = pl.DataFrame({'session_type': session_types, 'labels': labels})
submission.write_csv('submission.csv')

In [None]:
submission