# Sequence modeling for ranking task

# Set up

In [1]:
%load_ext autoreload
%autoreload 2
%load_ext tensorboard

In [2]:
import os
import sys

import lightning as L
import numpy as np
import pandas as pd
import torch
from dotenv import load_dotenv
from lightning.pytorch.callbacks import ModelCheckpoint
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from lightning.pytorch.loggers import MLFlowLogger
from loguru import logger
from mlflow.exceptions import MlflowException
from mlflow.models.signature import infer_signature
from pydantic import BaseModel
from torch.utils.data import DataLoader

import mlflow

sys.path.insert(0, "..")

from src.dataset import UserItemRatingDFDataset
from src.eval.compare_runs import ModelMetricsComparisonVisualizer
from src.id_mapper import IDMapper
from src.sequence.inference import SequenceRatingPredictionInferenceWrapper
from src.sequence.model import SequenceRatingPrediction
from src.sequence.trainer import LitSequenceRatingPrediction
from src.sequence.utils import generate_item_sequences
from src.viz import custom_style_plotly

load_dotenv()
custom_style_plotly()

  from .autonotebook import tqdm as notebook_tqdm


# Controller

In [3]:
# This is a parameter cell used by papermill
max_epochs = 100

In [4]:
class Args(BaseModel):
    testing: bool = False
    author: str = "quy.dinh"
    log_to_mlflow: bool = True
    experiment_name: str = "Retriever"
    run_name: str = "001-sequence-model"
    notebook_persist_dp: str = None
    random_seed: int = 41
    device: str = None

    max_epochs: int = max_epochs
    batch_size: int = 128

    user_col: str = "user_id"
    item_col: str = "parent_asin"
    rating_col: str = "rating"
    timestamp_col: str = "timestamp"

    top_K: int = 100
    top_k: int = 10

    batch_size: int = 128

    embedding_dim: int = 128
    dropout: float = 0.3
    early_stopping_patience: int = 5
    learning_rate: float = 0.001
    l2_reg: float = 1e-5

    mlf_item2vec_model_name: str = "item2vec"
    mlf_model_name: str = "sequence_rating_prediction"
    min_roc_auc: float = 0.7

    best_checkpoint_path: str = None

    def init(self):
        self.notebook_persist_dp = os.path.abspath(f"data/{self.run_name}")
        os.makedirs(self.notebook_persist_dp, exist_ok=True)

        if not (mlflow_uri := os.environ.get("MLFLOW_TRACKING_URI")):
            logger.warning(
                "Environment variable MLFLOW_TRACKING_URI is not set. Setting self.log_to_mlflow to false."
            )
            self.log_to_mlflow = False

        if self.log_to_mlflow:
            logger.info(
                f"Setting up MLflow experiment {self.experiment_name} - run {self.run_name}..."
            )
            self._mlf_logger = MLFlowLogger(
                experiment_name=self.experiment_name,
                run_name=self.run_name,
                tracking_uri=mlflow_uri,
                log_model=True,
            )

        if self.device is None:
            self.device = (
                "cuda"
                if torch.cuda.is_available()
                else "mps" if torch.backends.mps.is_available() else "cpu"
            )

        return self


args = Args().init()

print(args.model_dump_json(indent=2))

[32m2025-03-08 14:42:58.956[0m | [1mINFO    [0m | [36m__main__[0m:[36minit[0m:[36m47[0m - [1mSetting up MLflow experiment Retriever - run 001-sequence-model...[0m


{
  "testing": false,
  "author": "quy.dinh",
  "log_to_mlflow": true,
  "experiment_name": "Retriever",
  "run_name": "001-sequence-model",
  "notebook_persist_dp": "/home/dvq/frostmourne/recsys-blog/1-seq-model/notebooks/data/001-sequence-model",
  "random_seed": 41,
  "device": "cuda",
  "max_epochs": 100,
  "batch_size": 128,
  "user_col": "user_id",
  "item_col": "parent_asin",
  "rating_col": "rating",
  "timestamp_col": "timestamp",
  "top_K": 100,
  "top_k": 10,
  "embedding_dim": 128,
  "dropout": 0.3,
  "early_stopping_patience": 5,
  "learning_rate": 0.001,
  "l2_reg": 0.00001,
  "mlf_item2vec_model_name": "item2vec",
  "mlf_model_name": "sequence_rating_prediction",
  "min_roc_auc": 0.7,
  "best_checkpoint_path": null
}


# Implement

In [5]:
def init_model(n_users, n_items, embedding_dim, dropout, item_embedding=None):
    model = SequenceRatingPrediction(
        n_users, n_items, embedding_dim, dropout=dropout, item_embedding=item_embedding
    )
    return model

# Test implementation

In [None]:
embedding_dim = 8
batch_size = 2

# Mock data
user_indices = [0, 0, 1, 2, 2]
item_indices = [0, 1, 2, 3, 4]
timestamps = [0, 1, 2, 3, 4]
ratings = [0, 4, 5, 3, 0]
item_sequences = [
    [-1, -1, 2, 3],
    [-1, -1, 2, 3],
    [-1, -1, 1, 3],
    [-1, -1, 2, 1],
    [-1, -1, 2, 1],
]

n_users = len(set(user_indices))
n_items = len(set(item_indices))

train_df = pd.DataFrame(
    {
        "user_indice": user_indices,
        "item_indice": item_indices,
        args.timestamp_col: timestamps,
        args.rating_col: ratings,
        "item_sequence": item_sequences,
    }
)

model = init_model(n_users, n_items, embedding_dim, args.dropout)

# Example forward pass
model.eval()
user = torch.tensor([0])
item_sequence = torch.tensor([[-1, -1, -1, 0, 1]])
target_item = torch.tensor([2])
predictions = model.predict(user, item_sequence, target_item)
print(predictions)
model.train()

In [None]:
rating_dataset = UserItemRatingDFDataset(
    train_df, "user_indice", "item_indice", args.rating_col, args.timestamp_col
)

train_loader = DataLoader(
    rating_dataset, batch_size=batch_size, shuffle=False, drop_last=True
)

In [None]:
for batch_input in train_loader:
    print(batch_input)

In [None]:
# model
lit_model = LitSequenceRatingPrediction(model, log_dir=args.notebook_persist_dp)

# train model
trainer = L.Trainer(
    default_root_dir=f"{args.notebook_persist_dp}/test",
    max_epochs=2,
    accelerator=args.device if args.device else "auto",
)
trainer.fit(
    model=lit_model, train_dataloaders=train_loader, val_dataloaders=train_loader
)

In [None]:
users = torch.tensor([0, 0, 0, 0])
item_sequences = torch.tensor(
    [[-1, -1, 2, 3], [-1, -1, 2, 3], [-1, -1, 1, 3], [-1, -1, 2, 1]]
)
items = torch.tensor([0, 1, 2, 3])
predictions = model.predict(users, item_sequences, items)
print(predictions)

In [None]:
def create_predict_df(
    train_df,
    val_user_indices,
    val_timestamp,
    rating_col,
    timestamp_col,
    sequence_length=10,
):
    predict_df = pd.DataFrame(
        {
            "user_indice": val_user_indices,
            "item_indice": -1,  # placeholder
            "timestamp": val_timestamp,
            "source": "predict",
        }
    )

    predict_df = (
        pd.concat(
            [
                train_df.loc[lambda df: df[rating_col].gt(0)][
                    ["user_indice", "item_indice", timestamp_col]
                ].assign(source="train"),
                predict_df,
            ],
            axis=0,
        )
        .pipe(
            generate_item_sequences,
            "user_indice",
            "item_indice",
            timestamp_col,
            sequence_length=sequence_length,
            padding=True,
            padding_value=-1,
        )
        .loc[lambda df: df["source"].eq("predict")]
        .assign(item_sequence=lambda df: df["item_sequence"].apply(np.array))
    )

    return predict_df


predict_df = create_predict_df(
    train_df,
    user_indices,
    timestamps[-1],
    args.rating_col,
    args.timestamp_col,
    sequence_length=10,
)

predict_df

In [None]:
recommendations = model.recommend(
    torch.tensor(predict_df["user_indice"].values),
    torch.tensor(predict_df["item_sequence"].values.tolist()),
    k=2,
    batch_size=4,
)
recommendations

# Prep data

In [None]:
train_df = pd.read_parquet("../data/train_features_neg_df.parquet")
val_df = pd.read_parquet("../data/val_features_neg_df.parquet")
idm_fp = "../data/idm.json"
idm = IDMapper().load(idm_fp)

assert (
    train_df[args.user_col].map(lambda s: idm.get_user_index(s))
    != train_df["user_indice"]
).sum() == 0, "Mismatch IDM"
assert (
    val_df[args.user_col].map(lambda s: idm.get_user_index(s)) != val_df["user_indice"]
).sum() == 0, "Mismatch IDM"

In [None]:
user_indices = train_df["user_indice"].unique()
item_indices = train_df["item_indice"].unique()

logger.info(f"{len(user_indices)=:,.0f}, {len(item_indices)=:,.0f}")

In [None]:
train_df

# Train

In [None]:
rating_dataset = UserItemRatingDFDataset(
    train_df, "user_indice", "item_indice", args.rating_col, args.timestamp_col
)
val_rating_dataset = UserItemRatingDFDataset(
    val_df, "user_indice", "item_indice", args.rating_col, args.timestamp_col
)

train_loader = DataLoader(
    rating_dataset, batch_size=args.batch_size, shuffle=True, drop_last=True
)
val_loader = DataLoader(
    val_rating_dataset, batch_size=args.batch_size, shuffle=False, drop_last=False
)

In [None]:
n_items = len(item_indices)
n_users = len(user_indices)

model = init_model(n_users, n_items, args.embedding_dim, args.dropout)

#### Predict before train

In [None]:
model.item_embedding

In [None]:
val_df = val_rating_dataset.df
val_df.sample(10)

In [None]:
user_id = val_df.sample(1)[args.user_col].values[0]
# user_id = "AH4AOFTTDPHPAFAAVFMAF25H2LIQ"
test_df = val_df.loc[lambda df: df[args.user_col].eq(user_id)]
with pd.option_context("display.max_colwidth", None):
    display(test_df)

In [None]:
test_row = test_df.loc[lambda df: df[args.rating_col].gt(0)].iloc[0]
item_id = test_row[args.item_col]
item_sequence = test_row["item_sequence"]
logger.info(
    f"Test predicting before training with {args.user_col} = {user_id} and {args.item_col} = {item_id}"
)
user_indice = idm.get_user_index(user_id)
item_indice = idm.get_item_index(item_id)
user = torch.tensor([user_indice])
item_sequence = torch.tensor([item_sequence])
item = torch.tensor([item_indice])

model.eval()
model.predict(user, item_sequence, item)
model.train()

#### Training loop

##### Overfit 1 batch

In [None]:
early_stopping = EarlyStopping(
    monitor="val_roc_auc", patience=10, mode="max", verbose=False
)

model = init_model(n_users, n_items, args.embedding_dim, dropout=0)
lit_model = LitSequenceRatingPrediction(
    model,
    learning_rate=args.learning_rate,
    l2_reg=0.0,
    log_dir=args.notebook_persist_dp,
    accelerator=args.device,
)

log_dir = f"{args.notebook_persist_dp}/logs/overfit"

# train model
trainer = L.Trainer(
    default_root_dir=log_dir,
    accelerator=args.device if args.device else "auto",
    max_epochs=100,
    overfit_batches=1,
    callbacks=[early_stopping],
)
trainer.fit(
    model=lit_model,
    train_dataloaders=train_loader,
    val_dataloaders=train_loader,
)
logger.info(f"Logs available at {trainer.log_dir}")

In [None]:
# Need to make sure port 6006 at local is accessible
%tensorboard --logdir $trainer.log_dir

##### Fit on all data

In [None]:
# papermill_description=fit-model
early_stopping = EarlyStopping(
    monitor="val_roc_auc", patience=args.early_stopping_patience, mode="max", verbose=False
)

checkpoint_callback = ModelCheckpoint(
    dirpath=f"{args.notebook_persist_dp}/checkpoints",
    filename="best-checkpoint",
    save_top_k=1,
    monitor="val_roc_auc",
    mode="max",
)

model = init_model(
    n_users,
    n_items,
    args.embedding_dim,
    dropout=args.dropout
)
lit_model = LitSequenceRatingPrediction(
    model,
    learning_rate=args.learning_rate,
    l2_reg=args.l2_reg,
    log_dir=args.notebook_persist_dp,
    evaluate_ranking=True,
    idm=idm,
    args=args,
    accelerator=args.device,
    checkpoint_callback=checkpoint_callback,
)

log_dir = f"{args.notebook_persist_dp}/logs/run"

# train model
trainer = L.Trainer(
    default_root_dir=log_dir,
    max_epochs=args.max_epochs,
    callbacks=[early_stopping, checkpoint_callback],
    accelerator=args.device if args.device else "auto",
    logger=args._mlf_logger if args.log_to_mlflow else None,
)
trainer.fit(
    model=lit_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader,
)

In [None]:
logger.info(
    f"Test predicting after training with {args.user_col} = {user_id} and {args.item_col} = {item_id}"
)
model.eval()
model = model.to(user.device)
model.predict(user, item_sequence, item)
model.train()

# Load best checkpoint

In [None]:
logger.info(f"Loading best checkpoint from {checkpoint_callback.best_model_path}...")
args.best_checkpoint_path = checkpoint_callback.best_model_path

best_trainer = LitSequenceRatingPrediction.load_from_checkpoint(
    checkpoint_callback.best_model_path,
    model=init_model(n_users, n_items, args.embedding_dim, dropout=0),
)

In [None]:
best_model = best_trainer.model.to(lit_model.device)

In [None]:
best_model.eval()
best_model.predict(user, item_sequence, item)
best_model.train()

### Persist id mapping

In [None]:
if args.log_to_mlflow:
    # Persist id_mapping so that at inference we can predict based on item_ids (string) instead of item_index
    run_id = trainer.logger.run_id
    mlf_client = trainer.logger.experiment
    mlf_client.log_artifact(run_id, idm_fp)

### Wrap inference function and register best checkpoint as MLflow model

In [None]:
inferrer = SequenceRatingPredictionInferenceWrapper(best_model)

In [None]:
sample_input = {
    "user_ids": [idm.get_user_id(0)],
    "item_sequences": [[idm.get_item_id(0), idm.get_item_id(1)]],
    "item_ids": [idm.get_item_id(0)],
}
sample_output = inferrer.infer([0], [[0, 1]], [0])
sample_output

In [None]:
if args.log_to_mlflow:
    run_id = trainer.logger.run_id
    sample_output_np = sample_output
    signature = infer_signature(sample_input, sample_output_np)
    idm_filename = idm_fp.split("/")[-1]
    with mlflow.start_run(run_id=run_id):
        mlflow.pyfunc.log_model(
            python_model=inferrer,
            artifact_path="inferrer",
            # We log the id_mapping to the predict function so that it can accept item_id and automatically convert ot item_indice for PyTorch model to use
            artifacts={"idm": mlflow.get_artifact_uri(idm_filename)},
            signature=signature,
            input_example=sample_input,
            registered_model_name=args.mlf_model_name,
        )

# Set the newly trained model as champion

In [None]:
if args.log_to_mlflow:
    # Get current champion
    deploy_alias = "champion"
    curr_model_run_id = None

    min_roc_auc = args.min_roc_auc

    try:
        curr_champion_model = mlf_client.get_model_version_by_alias(
            args.mlf_model_name, deploy_alias
        )
        curr_model_run_id = curr_champion_model.run_id
    except MlflowException as e:
        if "not found" in str(e).lower():
            logger.info(
                f"There is no {deploy_alias} alias for model {args.mlf_model_name}"
            )

    # Compare new vs curr models
    new_mlf_run = trainer.logger.experiment.get_run(trainer.logger.run_id)
    new_metrics = new_mlf_run.data.metrics
    roc_auc = new_metrics["roc_auc"]
    if curr_model_run_id:
        curr_model_run_info = mlf_client.get_run(curr_model_run_id)
        curr_metrics = curr_model_run_info.data.metrics
        if (curr_roc_auc := curr_metrics["roc_auc"]) > min_roc_auc:
            logger.info(
                f"Current {deploy_alias} model has {curr_roc_auc:,.4f} ROC-AUC. Setting it to the deploy baseline..."
            )
            min_roc_auc = curr_roc_auc

        top_metrics = ["roc_auc"]
        vizer = ModelMetricsComparisonVisualizer(curr_metrics, new_metrics, top_metrics)
        print("Comparing metrics between new run and current champion:")
        display(vizer.compare_metrics_df())
        vizer.create_metrics_comparison_plot(n_cols=5)
        vizer.plot_diff()

    # Register new champion
    if roc_auc < min_roc_auc:
        logger.info(
            f"Current run has ROC-AUC = {roc_auc:,.4f}, smaller than {min_roc_auc:,.4f}. Skip aliasing this model as the new {deploy_alias}.."
        )
    else:
        logger.info("Aliasing the new model as champion...")
        # Get the model version for current run by assuming it's the most recent registered version
        model_version = (
            mlf_client.get_registered_model(args.mlf_model_name)
            .latest_versions[0]
            .version
        )

        mlf_client.set_registered_model_alias(
            name=args.mlf_model_name, alias="champion", version=model_version
        )

        mlf_client.set_model_version_tag(
            name=args.mlf_model_name,
            version=model_version,
            key="author",
            value=args.author,
        )

# Clean up

In [None]:
all_params = [args]

if args.log_to_mlflow:
    with mlflow.start_run(run_id=run_id):
        for params in all_params:
            params_dict = params.dict()
            params_ = dict()
            for k, v in params_dict.items():
                if k == "top_K":
                    k = "top_big_K"
                if k == "top_k":
                    k = "top_small_k"
                params_[f"{params.__repr_name__()}.{k}"] = v
            mlflow.log_params(params_)