In [1]:
import os
from merlin.datasets.entertainment import get_movielens

input_path = os.environ.get("INPUT_DATA_DIR", os.path.expanduser("~/merlin-framework/movielens/"))
get_movielens(variant="ml-1m", path=input_path); #noqa


from merlin.core.dispatch import get_lib

data = get_lib().read_parquet(f'{input_path}ml-1m/train.parquet').sample(frac=1)

train = data.iloc[:600_000]
valid = data.iloc[600_000:]

movies = get_lib().read_parquet(f'{input_path}ml-1m/movies_converted.parquet')



import nvtabular as nvt
from merlin.schema.tags import Tags

train_ds = nvt.Dataset(train, npartitions=2)
valid_ds = nvt.Dataset(valid)

train_ds, valid_ds
train_ds.shuffle_by_keys('userId')
valid_ds.shuffle_by_keys('userId')

genres = ['movieId'] >> nvt.ops.JoinExternal(movies, on='movieId', columns_ext=['movieId', 'genres']) >> nvt.ops.AddTags(tags=[Tags.LIST])

genres = genres >> nvt.ops.Categorify(freq_threshold=10) 

def rating_to_binary(col):
    return col > 3

binary_rating = ['rating'] >> nvt.ops.LambdaOp(rating_to_binary) >> nvt.ops.Rename(name='binary_rating')

userId = ['userId'] >> nvt.ops.Categorify() >> nvt.ops.AddTags(tags=[Tags.USER_ID, Tags.CATEGORICAL, Tags.USER])
movieId = ['movieId'] >> nvt.ops.Categorify() >> nvt.ops.AddTags(tags=[Tags.ITEM_ID, Tags.CATEGORICAL, Tags.ITEM])
binary_rating = binary_rating >> nvt.ops.AddTags(tags=[Tags.TARGET, Tags.BINARY_CLASSIFICATION])


workflow = nvt.Workflow(userId + movieId + genres + binary_rating)

train_transformed = workflow.fit_transform(train_ds)
valid_transformed = workflow.transform(valid_ds)
valid_transformed.compute().head()
train_transformed.schema



2023-07-16 05:15:35.169484: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-07-16 05:15:35.189602: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-07-16 05:15:35.991441: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-07-16 05:15:35.991706: W tensorflow/core

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.value_count.min,properties.value_count.max
0,userId,"(Tags.USER, Tags.ID, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.userId.parquet,0.0,6042.0,userId,6043.0,210.0,,
1,movieId,"(Tags.ITEM, Tags.LIST, Tags.ID, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",False,False,,10.0,0.0,.//categories/unique.movieId.parquet,0.0,3103.0,movieId,3104.0,144.0,,
2,genres,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,10.0,0.0,.//categories/unique.genres.parquet,0.0,20.0,genres,21.0,16.0,0.0,
3,binary_rating,"(Tags.BINARY_CLASSIFICATION, Tags.TARGET)","DType(name='bool', element_type=<ElementType.B...",False,False,,,,,,,,,,,


In [2]:
train_transformed.schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.value_count.min,properties.value_count.max
0,userId,"(Tags.USER, Tags.ID, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.userId.parquet,0.0,6042.0,userId,6043.0,210.0,,
1,movieId,"(Tags.ITEM, Tags.LIST, Tags.ID, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",False,False,,10.0,0.0,.//categories/unique.movieId.parquet,0.0,3103.0,movieId,3104.0,144.0,,
2,genres,"(Tags.LIST, Tags.CATEGORICAL)","DType(name='int64', element_type=<ElementType....",True,True,,10.0,0.0,.//categories/unique.genres.parquet,0.0,20.0,genres,21.0,16.0,0.0,
3,binary_rating,"(Tags.BINARY_CLASSIFICATION, Tags.TARGET)","DType(name='bool', element_type=<ElementType.B...",False,False,,,,,,,,,,,


In [3]:
from transformers4rec import torch as tr

max_sequence_length, d_model = 20, 320
schema = train_transformed.schema
# Define input module to process tabular input-features and to prepare masked inputs
input_module = tr.TabularSequenceFeatures.from_schema(
    schema,
    masking="clm",
    max_sequence_length=max_sequence_length,
    d_model=d_model,

)

# Define Next item prediction-task 
prediction_task = tr.BinaryClassificationTask()

# Define the config of the XLNet Transformer architecture
transformer_config = tr.XLNetConfig.build(
    d_model=d_model, n_head=8, n_layer=2, total_seq_length=max_sequence_length
)

# Get the end-to-end model 
model = transformer_config.to_torch_model(input_module, prediction_task)

(-1, 20, 192)
(-1, 20, 192)


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
training_args = tr.trainer.T4RecTrainingArguments(
            output_dir="./tmp",
            max_sequence_length=20,
            data_loader_engine='merlin',
            num_train_epochs=10, 
            dataloader_drop_last=False,
            per_device_train_batch_size = 384,
            per_device_eval_batch_size = 512,
            learning_rate=0.0005,
            fp16=True,
            report_to = [],
            logging_steps=200
        )

In [5]:
recsys_trainer = tr.Trainer(
    model=model,
    args=training_args,
    schema=schema,
    compute_metrics=True)

Using amp fp16 backend


In [6]:
# Instantiate the T4Rec Trainer, which manages training and evaluation
trainer = tr.Trainer(
    model=model,
    args=training_args,
    schema=schema,
    compute_metrics=True,
)

Using amp fp16 backend


In [7]:
%%time
start_time_window_index = 1
final_time_window_index = 4
for time_index in range(start_time_window_index, final_time_window_index):
    # Set data 
    time_index_train = time_index
    time_index_eval = time_index + 1
    # train_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_train}/train.parquet"))
    # eval_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))
    # Train on day related to time_index 
    print('*'*20)
    print("Launch training for day %s are:" %time_index)
    print('*'*20 + '\n')
    trainer.train_dataset_or_path = train_transformed
    trainer.reset_lr_scheduler()
    trainer.train()
    trainer.state.global_step +=1
    # Evaluate on the following day
    trainer.eval_dataset_or_path = valid_transformed
    train_metrics = trainer.evaluate(metric_key_prefix='eval')
    print('*'*20)
    print("Eval results for day %s are:\t" %time_index_eval)
    print('\n' + '*'*20 + '\n')
    for key in sorted(train_metrics.keys()):
        print(" %s = %s" % (key, str(train_metrics[key]))) 
    wipe_memory()

***** Running training *****
  Num examples = 600192
  Num Epochs = 10
  Instantaneous batch size per device = 384
  Total train batch size (w. parallel, distributed & accumulation) = 384
  Gradient Accumulation steps = 1
  Total optimization steps = 15630


********************
Launch training for day 1 are:
********************

Output Schema -> [{'name': 'userId', 'tags': {<Tags.USER: 'user'>, <Tags.ID: 'id'>, <Tags.CATEGORICAL: 'categorical'>}, 'properties': {'num_buckets': None, 'freq_threshold': 0, 'max_size': 0, 'cat_path': './/categories/unique.userId.parquet', 'domain': {'min': 0, 'max': 6042, 'name': 'userId'}, 'embedding_sizes': {'cardinality': 6043, 'dimension': 210}}, 'dtype': DType(name='int64', element_type=<ElementType.Int: 'int'>, element_size=64, element_unit=None, signed=True, shape=Shape(dims=(Dimension(min=0, max=None),))), 'is_list': False, 'is_ragged': False}, {'name': 'movieId', 'tags': {<Tags.ITEM: 'item'>, <Tags.LIST: 'list'>, <Tags.ID: 'id'>, <Tags.CATEGORICAL: 'categorical'>}, 'properties': {'num_buckets': None, 'freq_threshold': 10, 'max_size': 0, 'cat_path': './/categories/unique.movieId.parquet', 'domain': {'min': 0, 'max': 3103, 'name': 'movieId'}, 'embedding_sizes': {'cardinality': 3104, 'dimension': 144}},

AssertionError: `item_ids` must have 2 dimensions.

In [8]:
Tags.LIST

<Tags.LIST: 'list'>

In [9]:
[col.tags for col in workflow.output_schema if Tags.LIST in col.tags]

[{<Tags.ITEM: 'item'>, <Tags.LIST: 'list'>, <Tags.ID: 'id'>, <Tags.CATEGORICAL: 'categorical'>},
 {<Tags.LIST: 'list'>, <Tags.CATEGORICAL: 'categorical'>}]