In [1]:
!ls

data  logs  schema.pb  Transformers4Recs_Pipeline.ipynb


In [2]:
import os

import polars as pl

In [3]:
DATA_DIR = "data/"
all_data_fname = "train_popular_70k.parquet"

In [4]:
df_all = pl.read_parquet(os.path.join(DATA_DIR, all_data_fname), low_memory=True)

In [5]:
import json
import os

golden_test_sess_ids = None
train_for_golden_sess_ids = None

with open(os.path.join(DATA_DIR, 'golden_test_sess_ids.json'), 'r') as f:
    golden_test_sess_ids = json.loads(json.load(f))

with open(os.path.join(DATA_DIR, 'train_for_golden_sess_ids.json'), 'r') as f:
    train_for_golden_sess_ids = json.loads(json.load(f))

In [9]:
# generating current train dataset (shuffle was done previously so no take train from beginning and validation from end)
import pathlib

N_train = 4000000

cur_train_ids = train_for_golden_sess_ids[:N_train]

df_cur_train = df_all.filter(pl.col('session_id').is_in(cur_train_ids))


path: pathlib.Path = os.path.join(DATA_DIR, 'cur_train.parquet')
df_cur_train.write_parquet(path)

In [10]:
# Generating an aid to token id mapping for transformers
aid_to_token_id = {}

ind = 1 # minimum must be 1 because 0 is reserved for a padding token

for x in df_cur_train.select(pl.col("aid_popular").arr.explode().unique()).to_numpy():
    aid_to_token_id[x[0]] = ind
    ind += 1

token_id_to_aid = {v: k for k, v in aid_to_token_id.items()}


In [11]:
MAX_SEQ_LEN = 20

In [12]:
# preparing training data (mapping and truncating)

In [13]:
df_cur_train = df_cur_train.with_columns((pl.col("aid_popular").apply(lambda l: [aid_to_token_id[x] for x in l])).alias("tokid_popular"))

In [14]:
df_cur_train = df_cur_train.with_columns((pl.col("tokid_popular").apply(lambda l: l[-MAX_SEQ_LEN:])).alias("tokid_popular_trunc"))

In [15]:
path: pathlib.Path = os.path.join(DATA_DIR, 'cur_train_trunc.parquet')
df_cur_train.write_parquet(path)

In [16]:
# preparing test data (mapping and truncating)

In [17]:
df_test = df_all.filter(pl.col('session_id').is_in(golden_test_sess_ids))

In [18]:
df_test = df_test.with_columns((pl.col("aid_popular").apply(lambda l: [aid_to_token_id[x] for x in l])).alias("tokid_popular"))

In [19]:
df_test = df_test.with_columns((pl.col("tokid_popular").apply(lambda l: l[-MAX_SEQ_LEN:])).alias("tokid_popular_trunc"))

In [20]:
path: pathlib.Path = os.path.join(DATA_DIR, 'cur_test_trunc.parquet')
df_cur_train.write_parquet(path)

In [21]:
import os
import ast
import json
import glob
import torch
import warnings
import numpy as np
import pandas as pd
import nvtabular as nvt
import matplotlib.pyplot as plt

from tqdm import tqdm
from datetime import datetime
from collections import Counter

warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

from nvtabular.ops import *
from merlin.schema.tags import Tags
from merlin_standard_lib import Schema
from transformers4rec import torch as tr
from transformers4rec.torch import Trainer
from transformers4rec.torch.ranking_metric import RecallAt, NDCGAt
from nvtabular.loader.torch import TorchAsyncItr, DLDataLoader
from transformers4rec.config.trainer import T4RecTrainingArguments

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


In [22]:
%%writefile schema.pb

feature {
  name: "tokid_popular_trunc"
  type: INT
  int_domain {
    name: "aid"
    min: 0
    max: 70001
    is_categorical: true
  }
  annotation {
    tag: "item_id"
    tag: "list"
    tag: "categorical"
    tag: "item"
  }
  value_count {
    min: 2
    max: 20
  }
}

Overwriting schema.pb


In [23]:
schema = Schema().from_proto_text("schema.pb")


In [24]:
inputs = tr.TabularSequenceFeatures.from_schema(
    schema,
    max_sequence_length=MAX_SEQ_LEN,
    d_output=256,
    masking="mlm",
)

In [25]:
# Define XLNetConfig class and set default parameters for HF XLNet config
transformer_config = tr.XLNetConfig.build(
    d_model=64, n_head=8, n_layer=6, total_seq_length=MAX_SEQ_LEN
)

# Define the model block including: inputs, masking, projection and transformer block.
body = tr.SequentialBlock(
    inputs,
    tr.MLPBlock([64]),
    tr.TransformerBlock(transformer_config, masking=inputs.masking)
)

In [26]:
metrics = [
    RecallAt(top_ks=[10, 20], labels_onehot=True),
    NDCGAt(top_ks=[10, 20], labels_onehot=True)
]

# Define a head related to next item prediction task
head = tr.Head(
    body,
    tr.NextItemPredictionTask(weight_tying=True, metrics=metrics),
    inputs=inputs,
)

# Get the end-to-end Model class
model = tr.Model(head)

In [27]:
log_folder = os.path.join(DATA_DIR, "logs/")

In [28]:
# Set hyperparameters for training

train_args = T4RecTrainingArguments(
    data_loader_engine='nvtabular',
    dataloader_drop_last=True,
    gradient_accumulation_steps=1,
    per_device_train_batch_size=128,
    per_device_eval_batch_size=128,
    output_dir=log_folder,
    learning_rate=0.0005,
    lr_scheduler_type='cosine',
    learning_rate_num_cosine_cycles_by_epoch=1,
    num_train_epochs=4,
    max_sequence_length=MAX_SEQ_LEN,
    report_to=[],
    logging_steps=500,
    save_steps=1000,
    no_cuda=False,
)

trainer = Trainer(
    model=model,
    args=train_args,
    schema=schema,
    compute_metrics=True,
)

In [29]:
trainer.train_dataset_or_path = os.path.join(DATA_DIR, 'cur_train_trunc.parquet')
trainer.eval_dataset_or_path = os.path.join(DATA_DIR, 'cur_test_trunc.parquet')

In [None]:
trainer.train()
trainer._save_model_and_checkpoint(save_model_class=True)

Step,Training Loss
500,10.8274
1000,10.5043
1500,10.3817
2000,10.2714
2500,10.1769
3000,10.0595
3500,9.9344
4000,9.8119
4500,9.6826
5000,9.5608


In [None]:
train_metrics = trainer.evaluate(eval_dataset=os.path.join(DATA_DIR, 'cur_test_trunc.parquet'), metric_key_prefix='eval')

In [None]:
for key in sorted(train_metrics.keys()):
    print(" %s = %s" % (key, str(train_metrics[key])))