In [2]:
!pip install "feast==0.31" faiss-gpu 

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [135]:
import os
import nvtabular as nvt
from nvtabular.ops import Rename, Filter, Dropna, LambdaOp, Categorify, \
    TagAsUserFeatures, TagAsUserID, TagAsItemFeatures, TagAsItemID, AddMetadata

from merlin.schema.tags import Tags
from merlin.dag.ops.subgraph import Subgraph
import merlin.models.tf as mm
from merlin.io.dataset import Dataset
import tensorflow as tf
import cudf as pd
from merlin.core.dispatch import get_lib

from merlin.core.utils import Distributed
from merlin.models.xgb import XGBoost

os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

In [4]:
import logging
logging.disable(logging.WARNING)

In [6]:
anime = get_lib().read_csv('data/anime.csv')
review = get_lib().read_csv('data/review.csv')

In [7]:
anime.rename(columns={"name":"anime_name"},inplace=True)

In [8]:
data = review.merge(anime,on='anime_name')
data = data.sample(frac=1).reset_index(drop=True)

In [9]:
data['genre_list']=pd.Series(i.split(',') for i in data['genres'].fillna('').to_arrow().to_pylist())
data['source']=pd.Series(i.split(',') for i in data['source'].fillna('').to_arrow().to_pylist())
data['studios']=pd.Series(i.split(',') for i in data['studios'].fillna('').to_arrow().to_pylist())
data['recommended']=[True if i=='Recommended' else False for i in data['recommended'].to_arrow().to_pylist()]

In [10]:
data.head()

Unnamed: 0,username,anime_name,anime_link,recommended,rating_user,timestamp,url,description,score,type,...,genre-1,genre-2,genre-3,genre-4,genre-5,genre-6,genre-7,genres,anime_id,genre_list
0,Godslayer27,My Home Hero,https://myanimelist.net/anime/52092/My_Home_Hero,True,7,"Nov 28, 2023",https://myanimelist.net/anime/52092/My_Home_Hero,"Tetsuo Tosu never expected his daughter, Reika...",7.06,TV,...,Drama,Suspense,,,,,,"Drama,Suspense",52092,"[Drama, Suspense]"
1,flowerscissors,Love Live! Superstar!!,https://myanimelist.net/anime/41169/Love_Live_...,True,9,"Oct 18, 2021",https://myanimelist.net/anime/41169/Love_Live_...,"Everyone has a dream they strive to achieve, a...",7.95,TV,...,SliceofLife,,,,,,,SliceofLife,41169,[SliceofLife]
2,Wizard4Hire1219,"Fuufu Ijou, Koibito Miman.",https://myanimelist.net/anime/50425/Fuufu_Ijou...,True,10,"Feb 7, 2023",https://myanimelist.net/anime/50425/Fuufu_Ijou...,Third-year high school student Jirou Yakuin is...,7.59,TV,...,Romance,,,,,,,Romance,50425,[Romance]
3,Ailes_Grises,Kimi no Suizou wo Tabetai,https://myanimelist.net/anime/36098/Kimi_no_Su...,False,3,"Nov 21, 2023",https://myanimelist.net/anime/36098/Kimi_no_Su...,The aloof protagonist: a bookworm who is deepl...,8.55,Movie,...,Drama,Romance,,,,,,"Drama,Romance",36098,"[Drama, Romance]"
4,TetGaming,No Game No Life,https://myanimelist.net/anime/19815/No_Game_No...,True,9,"Jul 25, 2023",https://myanimelist.net/anime/19815/No_Game_No...,"Sixteen sentient races inhabit Disboard, a wor...",8.06,TV,...,Comedy,Fantasy,Ecchi,,,,,"Comedy,Fantasy,Ecchi",19815,"[Comedy, Fantasy, Ecchi]"


In [11]:
data.isnull().sum()

username           0
anime_name         0
anime_link         0
recommended        0
rating_user        0
timestamp          0
url                0
description        0
score            112
type             112
episodes         112
premiered      15947
studios            0
source             0
rating           112
members          112
favorites        112
genre-1          570
genre-2        10784
genre-3        29063
genre-4        48740
genre-5        57010
genre-6        59620
genre-7        60293
genres           570
anime_id           0
genre_list         0
dtype: int64

In [12]:
train = data.iloc[:48320]
valid = data.iloc[48320:]

In [13]:
train_ds = nvt.Dataset(train, npartitions=10)
valid_ds = nvt.Dataset(valid)

In [14]:
userId = ['username'] >> nvt.ops.Categorify() >> nvt.ops.AddTags(tags=[Tags.USER_ID, Tags.CATEGORICAL, Tags.USER])
movieId = ['anime_name'] >> nvt.ops.Categorify() >> nvt.ops.AddTags(tags=[Tags.ITEM_ID, Tags.CATEGORICAL, Tags.ITEM])
genre = ['genre_list'] >> nvt.ops.Categorify(freq_threshold=7) >> TagAsItemFeatures()
studio = ['studios'] >> nvt.ops.Categorify(freq_threshold=7) >> TagAsItemFeatures()
source = ['source'] >> nvt.ops.Categorify(freq_threshold=7) >> TagAsItemFeatures()

recommended = ['recommended'] >> nvt.ops.AddTags(tags=[Tags.TARGET, Tags.BINARY_CLASSIFICATION])
score = (['score'] >>
         nvt.ops.FillMissing(5) >>
         nvt.ops.Normalize() >>
         LambdaOp(lambda x: x.astype("float32"))>>
         TagAsItemFeatures()
        )
type = (['type'] >>
        nvt.ops.FillMissing('Old') >>
        nvt.ops.Categorify() >>
        TagAsItemFeatures()
       )
rating = (['rating'] >>
        nvt.ops.FillMissing('Not Rated') >>
        nvt.ops.Categorify() >>
        TagAsItemFeatures()
       )
# episode = (['episodes']>>
#            nvt.ops.FillMissing(0)>>
#            nvt.ops.Normalize()>>
#            LambdaOp(lambda x: x.astype("float"))>>
#            TagAsItemFeatures()
#           )

In [15]:
workflow = nvt.Workflow(userId + movieId + genre + recommended + score + type + rating + studio + source)

In [34]:
train = workflow.fit_transform(train_ds)
valid = workflow.transform(valid_ds)
valid.compute()

Unnamed: 0,username,anime_name,genre_list,recommended,score,type,rating,studios,source
0,1863,159,"[12, 8, 10]",True,0.737789,3,4,[28],[5]
1,21570,53,"[3, 5, 8, 9]",True,-0.434288,3,3,"[4, 21, 6]",[4]
2,6171,9,"[3, 13, 4]",False,1.017843,3,4,[5],[3]
3,1085,571,"[16, 10]",True,-0.309820,3,3,[27],[6]
4,2,19,"[3, 7, 5, 4, 23]",True,-1.326311,3,5,[41],[5]
...,...,...,...,...,...,...,...,...,...
12075,2,70,"[3, 7, 5]",True,1.235662,3,4,[5],[3]
12076,2,655,"[6, 8]",True,0.291777,3,3,[3],[3]
12077,765,1295,"[3, 7, 5]",True,-0.807693,4,5,[72],[3]
12078,2,1437,"[3, 7, 4]",False,-1.834557,3,3,[24],[9]


In [35]:
schema = train.schema
target_column = 'recommended'

In [115]:
batch_size = 64
LR = 1e-3

## NCF Model

In [121]:
model = mm.benchmark.NCFModel(
    schema,
    embedding_dim=64,
    mlp_block=mm.MLPBlock([128, 64]),
    prediction_tasks=mm.BinaryOutput(target_column),
)

In [122]:
%%time
opt = tf.keras.optimizers.legacy.Adagrad(learning_rate=LR)
model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()])
history_ncf = model.fit(train, validation_data=valid, batch_size=batch_size,epochs=10)

Epoch 1/10


2024-05-05 19:37:01.205031: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




2024-05-05 19:37:06.032371: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
CPU times: user 1min 19s, sys: 8.98 s, total: 1min 28s
Wall time: 44.8 s


In [123]:
metrics_ncf = model.evaluate(valid, batch_size=64, return_dict=True)
metrics_ncf

  1/189 [..............................] - ETA: 32s - loss: 0.5471 - auc_11: 0.4891 - regularization_loss: 0.0000e+00 - loss_batch: 0.5471

2024-05-05 19:37:45.233529: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




{'loss': 0.4959215223789215,
 'auc_11': 0.530495285987854,
 'regularization_loss': 0.0,
 'loss_batch': 0.42217084765434265}

## MLP Model

In [124]:
model = mm.Model.from_block(mm.MLPBlock([64, 32]),
    schema, prediction_tasks=mm.BinaryOutput(target_column)
)

In [125]:
%%time
opt = tf.keras.optimizers.legacy.Adagrad(learning_rate=LR)
model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC(name="auc")])
histor_mlp = model.fit(train, validation_data=valid, batch_size=batch_size,epochs=10)



Epoch 1/10


2024-05-05 19:37:47.278768: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




2024-05-05 19:37:55.016141: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
CPU times: user 2min 41s, sys: 21.4 s, total: 3min 2s
Wall time: 1min 11s


In [126]:
metrics_mlp = model.evaluate(valid, batch_size=64, return_dict=True)
metrics_mlp

  1/189 [..............................] - ETA: 37s - loss: 0.5930 - auc: 0.4850 - regularization_loss: 0.0000e+00 - loss_batch: 0.5930

2024-05-05 19:38:57.951056: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




{'loss': 0.45246192812919617,
 'auc': 0.7042537927627563,
 'regularization_loss': 0.0,
 'loss_batch': 0.41707155108451843}

## DLRM Model

In [127]:
model = mm.DLRMModel(
    train_transformed.schema,
    embedding_dim=64,
    bottom_block=mm.MLPBlock([128, 64]),
    top_block=mm.MLPBlock([128, 64, 32]),
    prediction_tasks=mm.BinaryOutput('recommended')
)

In [128]:
%%time
opt = tf.keras.optimizers.legacy.Adagrad(learning_rate=LR)
model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC(name="auc")])
history_dlrm = model.fit(train, validation_data=valid, batch_size=batch_size,epochs=10)

Epoch 1/10


2024-05-05 19:39:00.532558: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




2024-05-05 19:39:10.361322: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
CPU times: user 2min 45s, sys: 21.7 s, total: 3min 6s
Wall time: 1min 19s


In [129]:
metrics_dlrm = model.evaluate(valid, batch_size=64, return_dict=True)
metrics_dlrm

2024-05-05 19:40:19.338611: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




{'loss': 0.45304444432258606,
 'auc': 0.6979348659515381,
 'regularization_loss': 0.0,
 'loss_batch': 0.4475032389163971}

## DCN Model

In [130]:
model = mm.DCNModel(
    schema,
    depth=2,
    deep_block=mm.MLPBlock([64, 32]),
    prediction_tasks=mm.BinaryOutput(target_column),
)

In [131]:
%%time
opt = tf.keras.optimizers.legacy.Adagrad(learning_rate=LR)
model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC(name="auc")])
history_DCN = model.fit(train, validation_data=valid, batch_size=batch_size,epochs=10)

Epoch 1/10


2024-05-05 19:40:22.714963: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




2024-05-05 19:40:31.227063: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
CPU times: user 2min 45s, sys: 22.6 s, total: 3min 8s
Wall time: 1min 16s


In [132]:
metrics_dcn = model.evaluate(valid, batch_size=64, return_dict=True)
metrics_dcn

  1/189 [..............................] - ETA: 37s - loss: 0.6117 - auc: 0.4551 - regularization_loss: 0.0000e+00 - loss_batch: 0.6117

2024-05-05 19:41:38.558355: I tensorflow/core/common_runtime/executor.cc:1209] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]




{'loss': 0.4542255997657776,
 'auc': 0.7010928988456726,
 'regularization_loss': 0.0,
 'loss_batch': 0.4344296455383301}

## XGBoost

In [155]:
userId = ['username'] >> nvt.ops.Categorify() >> nvt.ops.AddTags(tags=[Tags.USER_ID, Tags.CATEGORICAL, Tags.USER])
movieId = ['anime_name'] >> nvt.ops.Categorify() >> nvt.ops.AddTags(tags=[Tags.ITEM_ID, Tags.CATEGORICAL, Tags.ITEM])
genre = ['genre_list'] >> nvt.ops.Categorify(freq_threshold=7) >> TagAsItemFeatures()
studio = ['studios'] >> nvt.ops.Categorify(freq_threshold=7) >> TagAsItemFeatures()
source = ['source'] >> nvt.ops.Categorify(freq_threshold=7) >> TagAsItemFeatures()

recommended = ['recommended'] >> LambdaOp(lambda x:x*1) >> nvt.ops.AddTags(tags=[Tags.TARGET, Tags.BINARY_CLASSIFICATION])
score = (['score'] >>
         nvt.ops.FillMissing(5) >>
         nvt.ops.Normalize() >>
         LambdaOp(lambda x: x.astype("float32"))>>
         TagAsItemFeatures()
        )
type = (['type'] >>
        nvt.ops.FillMissing('Old') >>
        nvt.ops.Categorify() >>
        TagAsItemFeatures()
       )
rating = (['rating'] >>
        nvt.ops.FillMissing('Not Rated') >>
        nvt.ops.Categorify() >>
        TagAsItemFeatures()
       )

workflow = nvt.Workflow(userId + movieId + genre + recommended + score + type + rating + studio + source)

train = workflow.fit_transform(train_ds)
valid = workflow.transform(valid_ds)

In [154]:
xgb_booster_params = {
    'objective':'binary:logistic',
    'tree_method':'gpu_hist',
}

xgb_train_params = {
    'num_boost_round': 100,
    'verbose_eval': 20,
    'early_stopping_rounds': 10,
}

with Distributed():
    model = XGBoost(schema=train.schema, **xgb_booster_params)
    model.fit(
        train,
        evals=[(valid, 'validation_set'),],
        **xgb_train_params
    )
    metrics = model.evaluate(valid)



Perhaps you already have a cluster running?
Hosting the HTTP server on port 38933 instead
2024-05-05 19:49:08,378 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2024-05-05 19:49:08,378 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
  warn(f"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}")
  client.wait_for_workers(n_workers)
[19:49:36] task [xgboost.dask-0]:tcp://127.0.0.1:37075 got new rank 0


[0]	validation_set-logloss:0.57662
[15]	validation_set-logloss:0.48362
