In [1]:
import os
import merlin.models.tf as mm
import torch

  warn(f"Triton dtype mappings did not load successfully due to an error: {exc.msg}")




In [2]:
from os import path

# Get dataframe library - cudf or pandas
from merlin.core.dispatch import get_lib
df_lib = get_lib()

In [3]:
import merlin.io

In [4]:
from nvtabular import ops
import nvtabular as nvt
from merlin.models.utils.example_utils import workflow_fit_transform
import pandas as pd
from merlin.schema.tags import Tags

In [5]:
INPUT_DATA_DIR = "book_dataset/"

In [6]:
import glob

TRAIN_PATHS = sorted(glob.glob(os.path.join(INPUT_DATA_DIR, "train", "*.parquet")))
VALID_PATHS = sorted(glob.glob(os.path.join(INPUT_DATA_DIR, "valid", "*.parquet")))
TRAIN_PATHS, VALID_PATHS

(['book_dataset/train/part_0.parquet'], ['book_dataset/valid/part_0.parquet'])

In [7]:
train = df_lib.read_parquet(TRAIN_PATHS[0])
train.head()

Unnamed: 0,User-ID,ISBN,Book-Title,Book-Rating
0,36647,3,3,0.0
1,86289,3952,1,0.0
2,1663,13121,9234,1.0
3,334,1896,2146,0.0
4,5439,15669,14720,0.0


In [8]:
valid = df_lib.read_parquet(VALID_PATHS[0])
valid.head()

Unnamed: 0,User-ID,ISBN,Book-Title,Book-Rating
0,51664,1437,1284,0.0
1,29302,2,1,1.0
2,31,17657,15595,0.0
3,283,107357,83246,1.0
4,591,3168,1046,0.0


In [9]:
cat_features = ["User-ID", "ISBN"] >> ops.Categorify(dtype="int32", out_path=os.path.join(INPUT_DATA_DIR, "categories"))

In [10]:
feats_userId = cat_features["User-ID"] >> ops.TagAsUserID()
feats_itemId = cat_features["ISBN"] >> ops.TagAsItemID()
feats_target = (
    nvt.ColumnSelector(["Book-Rating"])
    >> ops.LambdaOp(lambda col: (col > 6).astype("int32"))
    >> ops.AddTags(["binary_classification", "target"])
    >> nvt.ops.Rename(name="rating_binary")
)
output = feats_itemId + feats_userId + feats_target

In [11]:
%%time

train_path = os.path.join(INPUT_DATA_DIR, "train.parquet")
valid_path = os.path.join(INPUT_DATA_DIR, "valid.parquet")
output_path = os.path.join(INPUT_DATA_DIR, "dataset_integration")

workflow_fit_transform(output, train_path, valid_path, output_path)



CPU times: user 7.74 s, sys: 915 ms, total: 8.65 s
Wall time: 8.62 s


Training a Recommender Model with Merlin Models

In [12]:
train = merlin.io.Dataset(
    os.path.join(INPUT_DATA_DIR + "dataset_integration", "train"), engine="parquet"
)
valid = merlin.io.Dataset(
    os.path.join(INPUT_DATA_DIR + "dataset_integration", "valid"), engine="parquet"
)



In [13]:
valid.head()

Unnamed: 0,ISBN,User-ID,rating_binary
0,1437,51664,0
1,2,29302,1
2,17657,31,0
3,107357,283,1
4,3168,591,0


In [14]:
train.schema.column_names

['ISBN', 'User-ID', 'rating_binary']

In [15]:
train.schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.cat_path,properties.embedding_sizes.dimension,properties.embedding_sizes.cardinality,properties.max_size,properties.freq_threshold,properties.domain.min,properties.domain.max,properties.domain.name
0,ISBN,"(Tags.ID, Tags.ITEM, Tags.CATEGORICAL)","DType(name='int32', element_type=<ElementType....",False,False,,book_dataset/categories/categories/unique.ISBN...,512.0,298687.0,0.0,0.0,0.0,298686.0,ISBN
1,User-ID,"(Tags.ID, Tags.USER, Tags.CATEGORICAL)","DType(name='int32', element_type=<ElementType....",False,False,,book_dataset/categories/categories/unique.User...,512.0,92969.0,0.0,0.0,0.0,92968.0,User-ID
2,rating_binary,"(Tags.BINARY_CLASSIFICATION, Tags.TARGET)","DType(name='int32', element_type=<ElementType....",False,False,,,,,,,,,


In [18]:
schema = train.schema
train.schema = schema
valid.schema = schema

### Two Tower Model

In [17]:
model_tt = mm.TwoTowerModel(
    schema,
    query_tower=mm.MLPBlock([128, 64], no_activation_last_layer=True),
    samplers=[mm.InBatchSampler()],
    embedding_options=mm.EmbeddingOptions(infer_embedding_sizes=True),
)

In [None]:
%%time
model_tt.compile(
    optimizer="adam",
    run_eagerly=False,
    loss="categorical_crossentropy",
    metrics=[mm.RecallAt(10), mm.NDCGAt(10)],
)
model_tt.fit(train, validation_data=valid, batch_size=1024 * 8, epochs=3)

Epoch 1/3
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module, class, method, function, traceback, frame, or code object was expected, got cython_function_or_method
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module, class, method, function, traceback, frame, or code object was expected, got cython_function_or_method


The sampler InBatchSampler returned no samples for this batch.




The sampler InBatchSampler returned no samples for this batch.


Epoch 2/3
 20/113 [====>.........................] - ETA: 11s - loss: 9.0078 - recall_at_10: 0.0033 - ndcg_at_10: 0.0015 - regularization_loss: 0.0000e+00 - loss_batch: 9.0078

In [25]:
#Exporting query (user) model
query_tower = model_tt.retrieval_block.query_block()
query_tower.save(os.path.join("book_dataset", "query_tower"))



INFO:tensorflow:Assets written to: book_dataset/query_tower/assets


INFO:tensorflow:Assets written to: book_dataset/query_tower/assets


### DLRM Model

In [26]:
model = mm.DLRMModel(
    train.schema,
    embedding_dim=64,
    bottom_block=mm.MLPBlock([128, 64]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryOutput(train.schema.select_by_tag(Tags.TARGET).column_names[0]),
)

model.compile(optimizer="adam")
model.fit(train, batch_size=1024)



<keras.callbacks.History at 0x7fedd44775e0>

In [27]:
metrics = model.evaluate(valid, batch_size=1024, return_dict=True)



In [28]:
metrics

{'loss': 0.5839466452598572,
 'precision': 0.0,
 'recall': 0.0,
 'binary_accuracy': 0.7162761688232422,
 'auc': 0.5652353167533875,
 'regularization_loss': 0.0,
 'loss_batch': 0.6062688231468201}