In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
import glob

import cudf
import numpy as np
import pandas as pd

import nvtabular as nvt
from nvtabular.ops import *
from merlin.schema.tags import Tags

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


In [2]:
INPUT_DATA_DIR = os.getcwd()

In [3]:
NUM_ROWS = os.environ.get("NUM_ROWS", 100000)

In [4]:
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., int(NUM_ROWS)).astype(np.int32), 1, 50000)
# generate random item interaction features 
df = pd.DataFrame(np.random.randint(70000, 90000, int(NUM_ROWS)), columns=['session_id'])
df['item_id'] = long_tailed_item_distribution

# generate category mapping for each item-id
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32)
df['age_days'] = np.random.uniform(0, 1, int(NUM_ROWS)).astype(np.float32)
df['weekday_sin']= np.random.uniform(0, 1, int(NUM_ROWS)).astype(np.float32)

# generate day mapping for each session 
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique()))))
df['day'] =  df.session_id.map(map_day)

In [5]:
df.head()

Unnamed: 0,session_id,item_id,category,age_days,weekday_sin,day
0,86390,30,5,0.053233,0.482083,5
1,81359,10,2,0.069111,0.363486,6
2,74445,14,2,0.568191,0.984585,4
3,72134,8,2,0.159875,0.538344,3
4,72609,42,7,0.655202,0.508114,6


In [6]:
SESSIONS_MAX_LENGTH =20

# Categorify categorical features
categ_feats = ['item_id', 'category'] >> nvt.ops.Categorify()

# Define Groupby Workflow
groupby_feats = categ_feats + ['session_id', 'day', 'age_days', 'weekday_sin']

# Group interaction features by session
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    aggs={
        "item_id": ["list", "count"],
        "category": ["list"],     
        "day": ["first"],
        "age_days": ["list"],
        'weekday_sin': ["list"],
        },
    name_sep="-")

# Select and truncate the sequential features
sequence_features_truncated = (
    groupby_features['category-list']
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) 
)

sequence_features_truncated_item = (
    groupby_features['item_id-list']
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) 
    >> TagAsItemID()
)  
sequence_features_truncated_cont = (
    groupby_features['age_days-list', 'weekday_sin-list'] 
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) 
    >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation)
MINIMUM_SESSION_LENGTH = 2
selected_features = (
    groupby_features['item_id-count', 'day-first', 'session_id'] + 
    sequence_features_truncated_item +
    sequence_features_truncated + 
    sequence_features_truncated_cont
)
    
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)

seq_feats_list = filtered_sessions['item_id-list', 'category-list', 'age_days-list', 'weekday_sin-list'] >>  nvt.ops.ValueCount()

workflow = nvt.Workflow(filtered_sessions['session_id', 'day-first'] + seq_feats_list)

dataset = nvt.Dataset(df)

# Generate statistics for the features and export parquet files
# this step will generate the schema file
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))

In [7]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.value_count.min,properties.value_count.max
0,session_id,(),"DType(name='int64', element_type=<ElementType....",False,False,,,,,,,,,,,
1,day-first,(),"DType(name='int64', element_type=<ElementType....",False,False,,,,,,,,,,,
2,item_id-list,"(Tags.CATEGORICAL, Tags.ID, Tags.LIST, Tags.ITEM)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.item_id.parquet,0.0,492.0,item_id,493.0,52.0,2.0,15.0
3,category-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,True,,0.0,0.0,.//categories/unique.category.parquet,0.0,120.0,category,121.0,23.0,2.0,15.0
4,age_days-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='float32', element_type=<ElementTyp...",True,True,,,,,,,,,,2.0,15.0
5,weekday_sin-list,"(Tags.LIST, Tags.CONTINUOUS)","DType(name='float32', element_type=<ElementTyp...",True,True,,,,,,,,,,2.0,15.0


In [8]:
workflow.save(os.path.join(INPUT_DATA_DIR, "workflow_etl"))

In [9]:
OUTPUT_DIR = os.environ.get("OUTPUT_DIR",os.path.join(INPUT_DATA_DIR, "sessions_by_day"))

In [10]:
sessions_gdf = cudf.read_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt/part_0.parquet"))

In [11]:
sessions_gdf.head(3)

Unnamed: 0,session_id,day-first,item_id-list,category-list,age_days-list,weekday_sin-list
0,70000,9,"[31, 46, 5, 41, 55]","[7, 9, 3, 8, 10]","[0.13067873, 0.37698856, 0.43898767, 0.8531232...","[0.8934683, 0.75806284, 0.53071326, 0.7120116,..."
1,70001,5,"[21, 161, 50, 13, 87, 57, 17]","[6, 27, 10, 3, 16, 11, 5]","[0.28445303, 0.28887564, 0.87129164, 0.5134322...","[0.85744035, 0.08272948, 0.9764241, 0.7957961,..."
2,70002,7,"[23, 5, 21, 6, 65]","[6, 3, 6, 3, 12]","[0.7773844, 0.13950226, 0.9957419, 0.8347177, ...","[0.97682446, 0.3133446, 0.25613874, 0.5599951,..."


In [12]:
from transformers4rec.utils.data_utils import save_time_based_splits
save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                       output_dir= OUTPUT_DIR,
                       partition_col='day-first',
                       timestamp_col='session_id', 
                      )

Creating time-based splits: 100%|█████████████████████████████████████████████████████████| 9/9 [00:01<00:00,  7.84it/s]


In [13]:
TRAIN_PATHS = os.path.join(OUTPUT_DIR, "1", "train.parquet")

In [14]:
df = pd.read_parquet(TRAIN_PATHS)
df.head()

Unnamed: 0,session_id,item_id-list,category-list,age_days-list,weekday_sin-list
0,70016,"[15, 4, 8, 9, 23, 7]","[5, 4, 4, 3, 6, 3]","[0.67894405, 0.8941715, 0.4536134, 0.654148, 0...","[0.104017854, 0.3352578, 0.93804187, 0.4269311..."
1,70029,"[10, 9, 28, 13]","[3, 3, 6, 3]","[0.101147756, 0.32233658, 0.3246822, 0.5806937]","[0.09873905, 0.59186, 0.22335593, 0.7593497]"
2,70037,"[26, 18, 16, 17]","[6, 5, 5, 5]","[0.4408881, 0.19247977, 0.25261945, 0.07442593]","[0.88619095, 0.7171725, 0.40067476, 0.6251952]"
4,70059,"[28, 86, 30, 85, 14, 72, 24]","[6, 15, 7, 16, 4, 13, 6]","[0.05233805, 0.5696971, 0.6899029, 0.9963922, ...","[0.7317564, 0.50071394, 0.3582579, 0.18126515,..."
5,70069,"[51, 28, 11, 5]","[10, 6, 4, 3]","[0.21570408, 0.7117531, 0.56744075, 0.86486465]","[0.43951032, 0.985921, 0.8343776, 0.767135]"


In [15]:
from transformers4rec import torch as tr
from transformers4rec.torch.ranking_metric import NDCGAt, AvgPrecisionAt, RecallAt
from transformers4rec.torch.utils.examples_utils import wipe_memory

  from .autonotebook import tqdm as notebook_tqdm


In [16]:
OUTPUT_DIR = os.environ.get("OUTPUT_DIR", f"{INPUT_DATA_DIR}/sessions_by_day")

In [17]:
from merlin.schema import Schema
from merlin.io import Dataset

train = Dataset(os.path.join(INPUT_DATA_DIR, "processed_nvt/part_0.parquet"))
schema = train.schema

In [60]:
train = Dataset(os.path.join(INPUT_DATA_DIR, "movielens-nvt/part_0.parquet"))
schema = train.schema
schema = schema.select_by_name(['user_id', 'item_id-list', 'rating-list', 'timestamp-list'])

In [29]:
# schema = schema.select_by_name(['item_id-list', 
#                                 'category-list', 
#                                 'weekday_sin-list',
#                                 'age_days-list'])

In [61]:
inputs = tr.TabularSequenceFeatures.from_schema(
        schema,
        max_sequence_length=10,
        masking="mlm",
        d_output=100,
)

In [62]:
from transformers4rec.torch.ranking_metric import RankingMetric
import torch

class HitRateAt(RankingMetric):
    def __init__(self, top_ks=None, labels_onehot=False):
        super(HitRateAt, self).__init__(top_ks=top_ks, labels_onehot=labels_onehot)

    def _metric(self, ks: list, scores: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        """
        Compute HitRate@K for each of the provided cutoffs.

        Parameters
        ----------
        ks : list
            list of cutoffs
        scores : torch.Tensor
            predicted item scores (batch_size, num_items)
        labels : torch.Tensor
            true item labels (batch_size, num_items)

        Returns
        -------
        torch.Tensor
            HitRate for each cutoff
        """
        batch_size, num_items = scores.size()
        topk_indices = torch.topk(scores, max(ks), dim=-1).indices  # Top-K indices for predictions

        hit_rates = torch.zeros(batch_size, len(ks), device=scores.device)

        for i, k in enumerate(ks):
            # Check if any of the top-K items are in the true labels
            topk_hits = torch.gather(labels, dim=-1, index=topk_indices[:, :k])
            hit_rates[:, i] = (topk_hits.sum(dim=-1) > 0).float()

        return hit_rates.mean(dim=0)  # Mean hit rate across the batch


In [63]:
# Define XLNetConfig class and set default parameters for HF XLNet config  
transformer_config = tr.XLNetConfig.build(
    d_model=64, n_head=4, n_layer=2, total_seq_length=10
)
# Define the model block including: inputs, masking, projection and transformer block.
body = tr.SequentialBlock(
    inputs, tr.MLPBlock([64]), tr.TransformerBlock(transformer_config, masking=inputs.masking)
)

# Define the evaluation top-N metrics and the cut-offs
metrics = [
    NDCGAt(top_ks=[5, 10], labels_onehot=True),
    RecallAt(top_ks=[5, 10], labels_onehot=True),
    HitRateAt(top_ks=[5, 10], labels_onehot=True),  # 수정된 HitRateAt
]

# Define a head related to next item prediction task 
head = tr.Head(
    body,
    tr.NextItemPredictionTask(weight_tying=True, 
                              metrics=metrics),
    inputs=inputs,
)

# Get the end-to-end Model class 
model = tr.Model(head)

In [64]:
per_device_train_batch_size = int(os.environ.get(
    "per_device_train_batch_size", 
    '128'
))

per_device_eval_batch_size = int(os.environ.get(
    "per_device_eval_batch_size", 
    '32'
))

In [65]:
from transformers4rec.config.trainer import T4RecTrainingArguments
from transformers4rec.torch import Trainer
# Set hyperparameters for training 
train_args = T4RecTrainingArguments(data_loader_engine='merlin', 
                                    dataloader_drop_last = True,
                                    gradient_accumulation_steps = 1,
                                    per_device_train_batch_size = per_device_train_batch_size, 
                                    per_device_eval_batch_size = per_device_eval_batch_size,
                                    output_dir = "./tmp", 
                                    learning_rate=0.0005,
                                    lr_scheduler_type='cosine', 
                                    learning_rate_num_cosine_cycles_by_epoch=1.5,
                                    num_train_epochs=5,
                                    max_sequence_length=10, 
                                    report_to = [],
                                    logging_steps=50,
                                    no_cuda=False)

In [66]:
trainer = Trainer(
    model=model,
    args=train_args,
    schema=schema,
    compute_metrics=True,
)

In [67]:
start_window_index = int(os.environ.get(
    "start_window_index", 
    '1'
))

final_window_index = int(os.environ.get(
    "final_window_index", 
    '8'
))

In [68]:
start_time_window_index = start_window_index
final_time_window_index = final_window_index
#Iterating over days of one week
for time_index in range(start_time_window_index, final_time_window_index):
    # Set data 
    time_index_train = time_index
    time_index_eval = time_index + 1
    train_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_train}/train.parquet"))
    eval_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))
    print(train_paths)
    
    # Train on day related to time_index 
    print('*'*20)
    print("Launch training for day %s are:" %time_index)
    print('*'*20 + '\n')
    trainer.train_dataset_or_path = train_paths
    trainer.reset_lr_scheduler()
    trainer.train()
    trainer.state.global_step +=1
    print('finished')
    
    # Evaluate on the following day
    trainer.eval_dataset_or_path = eval_paths
    train_metrics = trainer.evaluate(metric_key_prefix='eval')
    print('*'*20)
    print("Eval results for day %s are:\t" %time_index_eval)
    print('\n' + '*'*20 + '\n')
    for key in sorted(train_metrics.keys()):
        print(" %s = %s" % (key, str(train_metrics[key]))) 
    wipe_memory()

['/tmp/sessions_by_day/1/train.parquet']
********************
Launch training for day 1 are:
********************





Step,Training Loss
50,7.3611


finished


********************
Eval results for day 2 are:	

********************

 eval_/loss = 6.188547611236572
 eval_/next-item/hit_rate_at_5 = 0.1901041716337204
 eval_/next-item/ndcg_at_10 = 0.11114425957202911
 eval_/next-item/ndcg_at_5 = 0.08520328253507614
 eval_/next-item/recall_at_10 = 0.2291666716337204
 eval_/next-item/recall_at_5 = 0.1510416716337204
 eval_runtime = 0.0864
 eval_samples_per_second = 2222.964
 eval_steps_per_second = 69.468
['/tmp/sessions_by_day/2/train.parquet']
********************
Launch training for day 2 are:
********************



Step,Training Loss
50,5.7705


finished
********************
Eval results for day 3 are:	

********************

 eval_/loss = 4.84507942199707
 eval_/next-item/hit_rate_at_5 = 0.2473958432674408
 eval_/next-item/ndcg_at_10 = 0.1630421280860901
 eval_/next-item/ndcg_at_5 = 0.1385485827922821
 eval_/next-item/recall_at_10 = 0.2864583432674408
 eval_/next-item/recall_at_5 = 0.2083333432674408
 eval_runtime = 0.0888
 eval_samples_per_second = 2162.645
 eval_steps_per_second = 67.583
['/tmp/sessions_by_day/3/train.parquet']
********************
Launch training for day 3 are:
********************



Step,Training Loss
50,4.85


finished
********************
Eval results for day 4 are:	

********************

 eval_/loss = 4.66283655166626
 eval_/next-item/hit_rate_at_5 = 0.2239583432674408
 eval_/next-item/ndcg_at_10 = 0.14726775884628296
 eval_/next-item/ndcg_at_5 = 0.09971418231725693
 eval_/next-item/recall_at_10 = 0.296875
 eval_/next-item/recall_at_5 = 0.1510416716337204
 eval_runtime = 0.0849
 eval_samples_per_second = 2260.515
 eval_steps_per_second = 70.641
['/tmp/sessions_by_day/4/train.parquet']
********************
Launch training for day 4 are:
********************



Step,Training Loss
50,4.6195


finished
********************
Eval results for day 5 are:	

********************

 eval_/loss = 4.552506923675537
 eval_/next-item/hit_rate_at_5 = 0.2317708432674408
 eval_/next-item/ndcg_at_10 = 0.1391473412513733
 eval_/next-item/ndcg_at_5 = 0.09092982113361359
 eval_/next-item/recall_at_10 = 0.3072916865348816
 eval_/next-item/recall_at_5 = 0.15625
 eval_runtime = 0.0891
 eval_samples_per_second = 2153.957
 eval_steps_per_second = 67.311
['/tmp/sessions_by_day/5/train.parquet']
********************
Launch training for day 5 are:
********************



Step,Training Loss
50,4.5495


finished
********************
Eval results for day 6 are:	

********************

 eval_/loss = 4.481883525848389
 eval_/next-item/hit_rate_at_5 = 0.2395833432674408
 eval_/next-item/ndcg_at_10 = 0.15109536051750183
 eval_/next-item/ndcg_at_5 = 0.10079749673604965
 eval_/next-item/recall_at_10 = 0.3177083432674408
 eval_/next-item/recall_at_5 = 0.1614583432674408
 eval_runtime = 0.0862
 eval_samples_per_second = 2226.27
 eval_steps_per_second = 69.571
['/tmp/sessions_by_day/6/train.parquet']
********************
Launch training for day 6 are:
********************



Step,Training Loss
50,4.5295


finished
********************
Eval results for day 7 are:	

********************

 eval_/loss = 4.618710041046143
 eval_/next-item/hit_rate_at_5 = 0.2109375
 eval_/next-item/ndcg_at_10 = 0.12221139669418335
 eval_/next-item/ndcg_at_5 = 0.07732908427715302
 eval_/next-item/recall_at_10 = 0.28125
 eval_/next-item/recall_at_5 = 0.140625
 eval_runtime = 0.0932
 eval_samples_per_second = 2060.282
 eval_steps_per_second = 64.384
['/tmp/sessions_by_day/7/train.parquet']
********************
Launch training for day 7 are:
********************



Step,Training Loss
50,4.5256


finished
********************
Eval results for day 8 are:	

********************

 eval_/loss = 4.384883880615234
 eval_/next-item/hit_rate_at_5 = 0.2291666716337204
 eval_/next-item/ndcg_at_10 = 0.13606131076812744
 eval_/next-item/ndcg_at_5 = 0.09815017879009247
 eval_/next-item/recall_at_10 = 0.2864583432674408
 eval_/next-item/recall_at_5 = 0.171875
 eval_runtime = 0.0845
 eval_samples_per_second = 2273.124
 eval_steps_per_second = 71.035


In [58]:
eval_data_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))

In [59]:
eval_metrics = trainer.evaluate(eval_dataset=eval_data_paths, metric_key_prefix='eval')
for key in sorted(eval_metrics.keys()):
    print("  %s = %s" % (key, str(eval_metrics[key])))

  eval_/loss = 4.3777289390563965
  eval_/next-item/hit_rate_at_5 = 0.2421875
  eval_/next-item/ndcg_at_10 = 0.15463575720787048
  eval_/next-item/ndcg_at_5 = 0.10580851882696152
  eval_/next-item/recall_at_10 = 0.3177083432674408
  eval_/next-item/recall_at_5 = 0.1666666716337204
  eval_runtime = 0.107
  eval_samples_per_second = 1794.103
  eval_steps_per_second = 56.066


In [31]:
model_path= os.environ.get("OUTPUT_DIR", f"{INPUT_DATA_DIR}/saved_model")
model.save(model_path)