# Examples for Recommendation
## Training

1. Configure the data configuration file, which is used to define the parameters of the training dataset, such as item column, context_features, etc. An example is as follows:

    ```json
    {
        "name": "recflow",
        "type": "hdfs",
        "url": "hdfs://node1:8020/recstudio/recflow/realshow",
        "item_col": "video_id",
        "context_features": ["user_id", "device_id", "age", "gender", "province"],
        "labels": ["like"],
        "item_batch_size": 4096,
        "filter_settings": {
            "like": ["==1"]
        },
        "item_info": {
            "url": "hdfs://node1:8020/recstudio/recflow/others/video_info.pkl",
            "key": "video_id",
            "columns": ["video_id", "author_id", "category_level_two", "upload_type", "upload_timestamp", "category_level_one"],
            "use_cols": ["video_id", "author_id", "category_level_two", "upload_type", "category_level_one"]
        },
        "user_sequential_info": {
            "url": "hdfs://node1:8020/recstudio/recflow/seq_effective_50",
            "key": "request_id",
            "columns": ["video_id", "author_id", "category_level_two", "category_level_one", "upload_type", "upload_timestamp", "duration", "request_timestamp", "playing_time", "request_id"],
            "use_cols": ["video_id", "author_id", "category_level_two", "category_level_one", "upload_type"]
        },
        "stats": {
            "request_id": 9370581,
            "user_id": 42472,
            "device_id": 42561,
            "age": 8,
            "gender": 3,
            "province": 79,
            "video_id": 82216301,
            "author_id": 33474011,
            "category_level_one": 140,
            "category_level_two": 784,
            "upload_type": 40
        },
        "train_settings": {
            "start_date": "2024-02-08",
            "end_date": "2024-02-09"
        },
        "test_settings": {
            "start_date": "2024-02-08",
            "end_date": "2024-02-09"
        }
    }
    ```

2. Configure the model configuration file, which is used to define the structural parameters of the model, such as embedding size, hidden size, etc. An example is as follows:

    ```json
    {
        "embedding_dim": 4,
        "mlp_layers": [128, 128],
        "prediction_layers": [32],
        "activation": "relu",
        "dropout": 0.1,
        "batch_norm": false
    }
    ```

3. Configure the training parameters, which are used to define the hyperparameters for training, such as batch size, learning rate, etc. An example is as follows:

    ```json
    {
        "epochs": 1,
        "train_batch_size": 2048,
        "eval_batch_size": 4096,
        "optimizer": "adam",
        "learning_rate": 1e-2,
        "checkpoint_dir": "path_to_save_checkpoint",
        "checkpoint_steps": 1000,
        "evaluation_strategy": "epoch",
        "eval_interval": 1,
        "metrics": ["auc", "logloss"],
        "earlystop_metric": "auc"
    }
    ```
    
3. Create a new Python script to import the dataset and model and perform training.


In [None]:
# train.py
from UniRetrieval.training.embedder.recommendation.runner import RetrieverRunner
from UniRetrieval.training.embedder.recommendation.modeling import MLPRetriever


data_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/data/recflow_retriever.json"
train_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/mlp_retriever/train.json"
model_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/mlp_retriever/model.json"

runner = RetrieverRunner(
    model_config_path=model_config_path,
    data_config_path=data_config_path,
    train_config_path=train_config_path,
    model_class=MLPRetriever,
)
runner.run()

5. At this point, the model training script is complete, and you can run the script to train the model. Executing the script with the Python command will default to single-machine single-GPU training. If you need single-machine multi-GPU or multi-machine multi-GPU training, you can refer to Distributed Training for configuration.

    ```bash
    python train.py
    ```

### Custom your models

This section will demonstrate how to train custom models by inheriting base classes. 

#### Retriever Model

1. Import the BaseRetriever class and inherit from it to implement your custom model. A recall model is typically composed of four main modules:

- query_encoder: The context (query) feature encoder, which encodes user and context features into vector representations.
- item_encoder: The item feature encoder, which encodes item features into vector representations.
- score_function: The scoring function, which calculates the match degree between user-item pairs.
- loss_function: The loss function, which calculates the difference between the model's predicted values and the true labels.

Therefore, you need to override the following methods. The configuration parameters required when defining the model structure come from the model.json file.

In [None]:
import torch
from collections import OrderedDict
from UniRetrieval.training.embedder.recommendation.modeling import BaseRetriever
from UniRetrieval.modules.arguments import get_modules
from UniRetrieval.modules.embedding import MultiFeatEmbedding
from UniRetrieval.modules.layer import MLPModule

class MYMLPRetriever(BaseRetriever):
    def __init__(self, config, *args, **kwargs):
        super().__init__(config, *args, **kwargs)

    def get_item_encoder(self):
        item_emb = MultiFeatEmbedding(
            features=self.data_config.item_features,
            stats=self.data_config.stats,
            embedding_dim=self.model_config.embedding_dim,
            concat_embeddings=True
        )
        mlp = MLPModule(
            mlp_layers= [item_emb.total_embedding_dim] + self.model_config.mlp_layers,
            activation_func=self.model_config.activation,
            dropout=self.model_config.dropout,
            bias=True,
            batch_norm=self.model_config.batch_norm,
            last_activation=False,
            last_bn=False
        )
        return torch.nn.Sequential(OrderedDict([
            ("item_embedding", item_emb),
            ("mlp", mlp)
            ]))
    

    def get_query_encoder(self):
        context_emb = MultiFeatEmbedding(
            features=self.data_config.context_features,
            stats=self.data_config.stats,
            embedding_dim=self.model_config.embedding_dim
        )
        base_encoder = get_modules("encoder", "BaseQueryEncoderWithSeq")(
            context_embedding=context_emb,
            item_encoder=self.item_encoder
        )
        output_dim = self.model_config.mlp_layers[-1] + context_emb.total_embedding_dim
        mlp = MLPModule(
            mlp_layers= [output_dim] + self.model_config.mlp_layers,
            activation_func=self.model_config.activation,
            dropout=self.model_config.dropout,
            bias=True,
            batch_norm=self.model_config.batch_norm,
            last_activation=False,
            last_bn=False
        )

        return torch.nn.Sequential(OrderedDict([
            ("encoder", base_encoder),
            ("mlp", mlp)
            ]))

    def get_score_function(self):
        return get_modules("score", "InnerProductScorer")()
    
    def get_loss_function(self):
        return get_modules("loss", "BPRLoss")()
    
    def get_negative_sampler(self):
        sampler_cls = get_modules("sampler", "UniformSampler")
        return sampler_cls(num_items=self.data_config.num_items)

2. After implementing your custom recall model by inheriting from BaseRetriever, the process of creating a training script using UniRetrieval is similar to that of training built-in models:

In [None]:
# train.py
from UniRetrieval.training.embedder.recommendation.runner import RetrieverRunner


data_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/data/recflow_retriever.json"
train_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/mlp_retriever/train.json"
model_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/mlp_retriever/model.json"

runner = RetrieverRunner(
    model_config_path=model_config_path,
    data_config_path=data_config_path,
    train_config_path=train_config_path,
    model_class=MYMLPRetriever,
)
runner.run()

#### Ranker Model

Unlike retriever models, ranker models typically focus on the interaction between features and the combination of features. Therefore, the functions that need to be overridden are different, and the modules that need to be built include:

- Sequence Feature Aggregator: Used to aggregate a feature sequence of shape (L,D) into a single feature of shape (D) for subsequent feature interaction.
- Feature Interaction Module: Used to interact a series of features, usually the single feature output by the Sequence Feature Aggregator. Common modules include MLP, FM, etc.
- Prediction Module: Used for the final prediction after feature interaction, typically a fully connected layer, following the feature interaction module.
- Loss Function: Used to calculate the loss between predicted values and true labels.


3. Import the BaseRanker class and inherit from the BaseRanker class to implement a custom model:

In [None]:
import torch
from UniRetrieval.training.reranker.recommendation.modeling import BaseRanker
from UniRetrieval.modules.arguments import get_modules
from UniRetrieval.modules.layer import MLPModule, LambdaModule



class MYMLPRanker(BaseRanker):
    def get_sequence_encoder(self):
        cls = get_modules("module", "AverageAggregator")
        encoder = cls(dim=1)
        return encoder
    
    def get_feature_interaction_layer(self):
        flatten_layer = LambdaModule(lambda x: x.flatten(start_dim=1))  # [B, N, D] -> [B, N*D]
        mlp_layer = MLPModule(
            mlp_layers= [self.num_feat * self.model_config.embedding_dim] + self.model_config.mlp_layers,
            activation_func=self.model_config.activation,
            dropout=self.model_config.dropout,
            bias=True,
            batch_norm=self.model_config.batch_norm,
            last_activation=False,
            last_bn=False
        )
        return torch.nn.Sequential(flatten_layer, mlp_layer)
    
    def get_prediction_layer(self):
        pred_mlp = MLPModule(
            mlp_layers=[self.model_config.mlp_layers[-1]] + self.model_config.prediction_layers + [1],
            activation_func=self.model_config.activation,
            dropout=self.model_config.dropout,
            bias=True,
            batch_norm=self.model_config.batch_norm,
            last_activation=False,
            last_bn=False
        )
        return pred_mlp

    def get_loss_function(self):
        return get_modules("loss", "BCEWithLogitLoss")(reduction='mean')

4. Then, consistent with training built-in models, by using the dataset, model, and training configuration file, you can quickly complete the training script with UniRetrieval.

In [None]:
# train.py
from UniRetrieval.training.reranker.recommendation.runner import RankerRunner


def main():
    data_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/data/recflow_ranker.json"
    train_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/mlp_ranker/train.json"
    model_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/config/mlp_ranker/model.json"
    
    runner = RankerRunner(
        model_config_path=model_config_path,
        data_config_path=data_config_path,
        train_config_path=train_config_path,
        model_class=MYMLPRanker
    )
    runner.run()

## Single-Machine Training and Distributed Multi-Machine Training of Models

UniRetrieval supports basic single-machine single-GPU training, single-machine multi-GPU training, and distributed training

1. Single-machine single-GPU training: Directly start with the Python command or start with `accelerate` command (the configuration file of accelerate refer to [single_gpu.json](config/distributed_training/single_gpu.json)).

   ```shell
   # start with Python command
   CUDA_VISIBLE_DEVICES=1 python main.py
   # start with accelerate command
   accelerate launch --config_file single_gpu.json main.py
   ```

2. Single-machine multi-GPU training: First, configure for single-machine multi-GPU, refer to the example file [configuration file single_node.json](config/distributed_training/single_node.json). Then start with the accelerate command.

    ```shell
    accelerate launch --config_file single_node.json main.py
    ```

    Note that multi-GPU training on a single machine will by default occupy port 29500 on the local machine. If you need to run multiple tasks, you need to specify different port numbers in the command or in the JSON file: --main_process_port 29501 (specified in the command line) or "main_process_port": 29501 (JSON file).

    In addition, the current training methods for both single-machine multi-GPU and multi-machine multi-GPU environments adopt DistributedDataParallel (DDP). During the training process, each process will save a complete model and optimizer on the corresponding GPU. Additionally, each GPU maintains a "bucket" to gather gradients from other GPUs during training. Therefore, during model preparation, twice the model size of GPU memory overhead will be occupied than training with a single GPU. For more details, please refer to: [blog1](https://discuss.pytorch.org/t/memory-consumption-for-the-model-get-doubled-after-wrapped-with-ddp/130837), [blog2](https://medium.com/deep-learning-for-protein-design/a-comprehensive-guide-to-memory-usage-in-pytorch-b9b7c78031d3).

3. Multi-machine multi-GPU distributed training:
    - Configure the environment on multiple machines, download UniRetrieval, and install dependencies.
    - Configure for multi-machine multi-GPU on each machine, refer to the example files [configuration file multi_node_rank0.json](config/distributed_training/multi_nodes_rank0.json) and [configuration file multi_node_rank1.json](config/distributed_training/multi_nodes_rank1.json). Then start with the accelerate command on the rank0 machine first, and then start the other machines in sequence:
    
    ```shell
    accelerate launch --config_file multi_node_rank0.json main.py
    ```


Note:
All the acclerate configuration files mentioned above are created by `accelerate config` command.

```shell
accelerate config --config_file xxx.json
```

The you need to select the corresponding options according to your needs in an interactive way.
For more details, please refer to the [accelerate](https://github.com/huggingface/accelerate) documentation.


# Inference
In an online recommendation system, handling a single request typically involves the following steps:
- **Receiving the request header**: The request header includes the user ID and context-specific features (e.g., location and timestamp of the request).
- **Obtaining the Candidate Item Set**: At each stage, the recommendation model receives the candidate item set from the previous stage (for the retrieval model, it is the entire item pool).
- **Retrieving Features**: At each stage, the system retrieves user- and item-related features required by the recommendation model based on the user ID and candidate item IDs. To enable fast access, user and item features are stored in a cache database (e.g., Redis) in a key-value format.
- **Sorting the Candidate Item Set**: At each stage, the recommendation model ranks the candidate items using the retrieved features and selects the top-k items to pass to the next stage (for the final stage, the top-k items are directly presented to the user).

## Storing Features in Cache Database
### Defining message in protobuf
To reduce the cache size occupied by features, Protobuf is used to serialize the features before storing them in the cache database. To use Protobuf,  message data structures must first be defined.

In the .proto file, the user and item message data structures are defined. For example, in recflow.proto:

Each feature of user and item is treated as a field of the message structure.

# the version of protobuf

syntax = "proto3"; 

package example;

message Item {
  int64 video_id = 1;
  int64 author_id = 2;
  int64 category_level_two = 3;
  int64 upload_type = 4;
  int64 upload_timestamp = 5;
  int64 category_level_one = 6;
  int64 request_timestamp = 7; 
}

message UserTimestamp {
  int64 request_id = 1;          
  int64 user_id = 2;             
  int64 request_timestamp = 3;    
  int64 device_id = 4;           
  int32 age = 5;                  
  int64 gender = 6;              
  int64 province = 7;
  repeated Item seq_effective_50 = 8;
}

Then, generate Python code from the .proto file using protoc:

# create proto
protoc --python_out=. ./inference/feature_insert/protos/recflow.proto

### Inserting Features into Redis Database
When storing user-side or item-side features in a Redis database, the process typically involves several steps:

​	1.	Create a message object.

​	2.	Assign values to each field of the message object.

​	3.	Serialize the message object.

​	4.	Store the serialized message object in the Redis database. The key is usually set as {dataset_name}:{object_name}:{object_primary_key}.

An example of inserting features into the Redis database using recflow is shown below:

In [None]:
import redis
import numpy as np
import pandas as pd
from tqdm import *

import recflow_pb2

r = redis.Redis(host='localhost', port=6379, db=0)

# Item
test_video_info = pd.read_feather('./inference/feature_data/recflow/realshow_test_video_info.feather')
for row in tqdm(test_video_info.itertuples(), total=len(test_video_info)):

    # 0. Create a message object
    item = recflow_pb2.Item()
    item.video_id = getattr(row, 'video_id')
    item.author_id = getattr(row, 'author_id')
    item.category_level_two = getattr(row, '_3')
    item.upload_type = getattr(row, 'upload_type')
    item.upload_timestamp = getattr(row, 'upload_timestamp')
    item.category_level_one = getattr(row, 'category_level_one')
    
    # 1. Serialize the Protobuf object into binary data
    serialized_data = item.SerializeToString()

    # 2. Store the compressed data in Redis
    r.set(f"recflow:item:{item.video_id}", serialized_data)
    

print("Item features are stored in Redis.")

# User
test_user_info = np.load('./inference/feature_data/recflow/test_user_info.npz')['arr_0']
for row in tqdm(test_user_info):

    # 0. Create a message object 
    user_timestamp = recflow_pb2.UserTimestamp()
    user_timestamp.request_id = row[0]
    user_timestamp.user_id = row[1]
    user_timestamp.request_timestamp = row[2]
    user_timestamp.device_id = row[3]
    user_timestamp.age = row[4]
    user_timestamp.gender = row[5]
    user_timestamp.province = row[6]
    
    for behavior in np.split(test_user_info[0][7:], len(test_user_info[0][7:]) // 6):
        item = user_timestamp.seq_effective_50.add()
        item.video_id = behavior[0]
        item.author_id = behavior[1]
        item.category_level_two = behavior[2]
        item.category_level_one = behavior[3]
        item.upload_type = behavior[4]
        item.request_timestamp = behavior[5]

    # 1. Serialize the Protobuf object into binary data
    serialized_data = user_timestamp.SerializeToString()

    # 2. Store the compressed data in Redis
    r.set(f"recflow:user_timestamp:{row[1]}_{row[2]}", serialized_data)

print("UserTimestamp features are stored in Redis.")

### Generate cache configuration file `feature_cache_config.yaml`

To enable the use of features stored in the cache, we need to generate a configuration file `feature_cache_config.yaml` for each dataset.

Taking Recflow as an example:

The `host`, `port`, and `db` fields specify details of Redis database. `features`  specifies the storage details for each feature. Within `features`, `key_temp` represents the key template for the feature in Redis database, where the content inside {} is replaced with specific item or user information, and `field` specifies the attribute name of the feature in the message object. `key_temp2proto` maps each key template to the corresponding message class name, which is used to create message objects.

Running ./inference/feature_insert/recflow_script/run.sh completes the three steps mentioned above.

## InferenceEngine

[InferenceEngine](../../UniRetrieval/abc/inference/inference_engine.py) class can be initialized to perform the inference process. By inheriting InferenceEngine, we further define BaseEmbedderInferenceEngine and BaseRerankerInferenceEngine and use them for inference

### Inference: Embedder

In [None]:
import yaml
import pandas as pd
from UniRetrieval.inference.embedder.recommendation import BaseEmbedderInferenceEngine
import pycuda.driver as cuda

infer_config_path = "/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/inference/config/recflow_infer_retrieval_config.yaml"

with open(infer_config_path, 'r') as f:
    config = yaml.safe_load(f)


retriever_inference_engine = BaseEmbedderInferenceEngine(config)

infer_df = pd.read_feather('/data1/home/recstudio/haoran/RecStudio-Industry/inference/inference_data/recflow/recflow_infer_data.feather')
for batch_idx in range(10):
    print(f"This is batch {batch_idx}")
    batch_st = batch_idx * 128 
    batch_ed = (batch_idx + 1) * 128 
    batch_infer_df = infer_df.iloc[batch_st:batch_ed]
    retriever_outputs = retriever_inference_engine.batch_inference(batch_infer_df)
    print(type(retriever_outputs), retriever_outputs.shape)
if retriever_inference_engine.config['infer_mode'] == 'trt':
    cuda.Context.pop()

### Inference: Ranker

In [None]:
import yaml
import pandas as pd
from UniRetrieval.inference.reranker.recommendation import BaseRerankerInferenceEngine
import pycuda.driver as cuda
import numpy as np

infer_config_path = "/data1/home/recstudio/haoran/angqing_temp/mlp_reranker/recflow_infer_ranker_config.yaml"

with open(infer_config_path, 'r') as f:
    config = yaml.safe_load(f)
    print(config)

rank_inference_engine = BaseRerankerInferenceEngine(config)


infer_df = pd.read_feather('/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/inference/inference_data/recflow/recflow_infer_data.feather')
item_df = pd.read_feather('/data1/home/recstudio/haoran/UniRetrieval/examples/recommendation/inference/inference_data/recflow/realshow_test_video_info.feather')
all_item_ids = np.array(item_df['video_id'])
for batch_idx in range(10):
    print(f"This is batch {batch_idx}")
    batch_st = batch_idx * 128 
    batch_ed = (batch_idx + 1) * 128 
    batch_infer_df = infer_df.iloc[batch_st:batch_ed]
    batch_candidates = np.random.choice(all_item_ids, size=(128, 50))
    batch_candidates_df = pd.DataFrame({rank_inference_engine.feature_config['fiid']: batch_candidates.tolist()})
    ranker_outputs = rank_inference_engine.batch_inference(batch_infer_df, batch_candidates_df)
    print(type(ranker_outputs), ranker_outputs.shape)
    
if rank_inference_engine.config['infer_mode'] == 'trt':
    cuda.Context.pop()

We support onnx and tensorrt for inference acceleration. You only need to adjust the infer_mode parameter in config to ort or trt.