---
title: "Building Real-time RecSys Chapter 2 - Understanding the Data and Feature Engineering"
subtitle: "Exploring Amazon product dataset and building features for sequence models"
date: "2025-05-24"
categories: [recsys]
image: "../static/L2%20-%20Data%20signals.excalidraw.png"
format:
  html:
    code-fold: false
    code-annotations: hover
# draft: true
---


## Introduction

In [Chapter 1](../c1/index.qmd), we set up our development environment and explored the project architecture. Now it's time to dive into the heart of any recommendation system: **data**. The quality of our recommendations depends entirely on how well we understand, process, and engineer features from our data.

This chapter focuses on the data pipeline from raw data to model-ready features. We'll explore the Amazon product dataset, understand user-item interaction sequences, and build the feature engineering pipeline that will power our session-based recommender.

::: {.callout-note appearance="simple"}
## Code
All code for this chapter is available in the `notebooks/000-prep-data.ipynb` and `notebooks/001-features.ipynb` files in the [project repository](https://github.com/dvquy13/recsys-seq-model).
:::

## Dataset Overview: Amazon Reviews 2023

A good dataset for this kind of sequential recommenandation project would typically have the following characteristics:

1. **Temporal richness**: Each interaction has a timestamp, enabling sequence modeling
2. **Scale**: Millions of interactions across thousands of users and items
3. **Real-world patterns**: Authentic user behavior with natural sparsity

Among some of the well known public data sources, I choose to use the [Amazon Reviews 2023](https://amazon-reviews-2023.github.io/) dataset from the McAuley Lab, specifically focusing on the "Books" category. Apart from possessing the above characteristics, this dataset also contains other potentially useful features regarding both users, items and their interactions like reviews, with a decent amount of observations.

::: {.column-page}
![](../static/dataset.png)
:::

::: {.column-page}
![](../static/dataset-info.png)
:::

The main schema of the dataset is as simple as follows:

```python
# From cfg/common.yaml
data:
  user_col: "user_id"        # <1>
  item_col: "parent_asin"    # <2>
  rating_col: "rating"       # <3>
  timestamp_col: "timestamp" # <4>
```
1. Unique user identifier
2. Product identifier (in our case, the ASIN—Amazon Standard Identification Number)
3. User rating (1-5 scale)
4. Interaction timestamp

::: {.callout-note collapse="true"}
## Configuration Management

```python
from src.cfg import ConfigLoader

# Load configuration
cfg = ConfigLoader("cfg/common.yaml")
```

Throughout the project, we will define a central place to store most of the configuration in `cfg/common.yaml` to have notebooks and scripts more easily access their inputs and outputs. Also, as running multiple experiments is a nature of working in ML, this design aims to make it easier to try different sets of configurations.
:::

## Data Preprocessing Pipeline

The data flows through several key preprocessing steps:

### 1. Data Sampling and Filtering

As ML is mostly about iterative development, the main rationale of sampling and filtering is to have a small but decent enough dataset to experiment different ideas **fast**. The more experiments we run, the more likely we would introduce improvements to our models.

What does it mean for a decent RecSys dataset? One of the key criteria is **sparsity**—the ratio of observed interactions to all possible user-item pairs. 

To understand why sparsity is problematic, consider the interaction matrix where each cell represents a potential user-item interaction:

- **Matrix size**: `num_users × num_items` (total possible interactions)
- **Actual interactions**: Much smaller number of observed ratings/clicks
- **Sparsity**: `1 - (actual_interactions / (num_users × num_items))`

The sparsity problem gets **quadratically worse** as datasets grow:

```python
# Small dataset example
users = 1,000, items = 1,000 → possible interactions = 1M
actual interactions = 50,000 → sparsity = 95%

# Larger dataset 
users = 10,000, items = 10,000 → possible interactions = 100M  
actual interactions = 500,000 → sparsity = 99.5%
```

Each new user adds an entire **row** of mostly empty interactions, and each new item adds an entire **column** of mostly empty interactions. Since users typically interact with only a tiny fraction of available items, the interaction matrix becomes increasingly sparse as the catalog grows.

In RecSys, the interaction distribution is typically long tail—a lot of noisy items or users have just a few interactions. So while randomly sampling may work just fine for many ML use cases, we need to apply it a bit more carefully here. Remember that ultimately we want to have a sample dataset where each user/item has at least X interactions.

The tricky part is that a basic random sampling of users and items would create **sparsity cascade**—a domino effect that breaks your dataset.

Here's what happens: You start with users and items that look fine on their own. User A has 10 interactions, Item X has 15 interactions. When you remove User B, you also lose all of User B's interactions with Item X. Suddenly Item X only has 8 interactions. Oops, now it's too sparse, so you remove it too. But removing Item X means User A loses some interactions and might become too sparse as well.

It's like pulling threads from a sweater—everything starts unraveling.

To deal with this problem, we can take an iterative approach where we gradually drop random users from the dataset while keeping an eye on the conditions and our sampling target. The trade-off is that we would no longer have an exact fixed number of users and items in the dataset as we would like, but rather defining minimum acceptable thresholds, like below:

```python
# From cfg/common.yaml
sample:
  sample_users: 10000
  min_val_records: 5000         # <1>
  min_user_interactions: 5
  min_item_interactions: 10
```
1. We need to ensure sufficient validation data to evaluate our models.

```python
from src.sample import InteractionDataSampler

data_sampler = InteractionDataSampler(
    user_col=cfg.data.user_col,
    item_col=cfg.data.item_col,
    sample_users=cfg.sample.sample_users,
    min_val_records=cfg.sample.min_val_records,
    random_seed=cfg.run.random_seed,
    min_item_interactions=cfg.sample.min_item_interactions,
    min_user_interactions=cfg.sample.min_user_interactions,
    perc_users_removed_each_round=0.1,
    debug=False,
)
```


In [None]:
Randomly removing 2960 users - Round 18 started
2025-03-09 15:23:54.942 | INFO     | src.sample:sample:118 - After randomly removing users - round 18: num_users=29,605
2025-03-09 15:23:54.943 | INFO     | src.sample:sample:122 - Number of users 29,605 are still greater than expected, keep removing...


Randomly removing 2413 users - Round 19 started
2025-03-09 15:23:55.594 | INFO     | src.sample:sample:118 - After randomly removing users - round 19: num_users=24,137
2025-03-09 15:23:55.594 | INFO     | src.sample:sample:122 - Number of users 24,137 are still greater than expected, keep removing...
2025-03-09 15:23:55.622 | INFO     | src.sample:sample:137 - Number of val_df records 4,282 are falling below expected threshold, stop and use `sample_df` as final output...
2025-03-09 15:23:55.634 | INFO     | src.sample:sample:146 - len(sample_users)=19,734 len(sample_items)=7,388

<details>
<summary>InteractionDataSampler implementation</summary>
```python
from functools import partial

import numpy as np
import pandas as pd
from loguru import logger


class InteractionDataSampler:
    """
    Just randomly get X users will not guarantee that the output dataset would qualify the condition of richness.
    Instead we take an iterative approach where we gradually drop random users from the dataset while keeping an eye on the conditions and our sampling target.
    """

    def __init__(
        self,
        user_col: str = "user_id",
        item_col: str = "item_id",
        sample_users: int = 1000,
        min_val_records: int = 1000,
        random_seed: int = 41,
        min_user_interactions: int = 5,
        min_item_interactions: int = 10,
        buffer_perc: float = 0.2,
        perc_users_removed_each_round: float = 0.01,
        debug: bool = False,
    ):
        self.user_col = user_col
        self.item_col = item_col
        self.sample_users = sample_users
        self.min_val_records = min_val_records
        self.random_seed = random_seed
        self.min_user_interactions = min_user_interactions
        self.min_item_interactions = min_item_interactions
        self.buffer_perc = buffer_perc
        self.perc_users_removed_each_round = perc_users_removed_each_round
        self.debug = debug
        self.min_val_records = min_val_records

    def remove_random_users(self, df, k=10):
        users = df[self.user_col].unique()
        np.random.seed(self.random_seed)
        to_remove_users = np.random.choice(users, size=k, replace=False)
        return df.loc[lambda df: ~df[self.user_col].isin(to_remove_users)]

    def get_unqualified(self, df, col: str, threshold: int):
        unqualified = df.groupby(col).size().loc[lambda s: s < threshold].index
        return unqualified

    def sample(
        self,
        train_df: pd.DataFrame,
        val_df: pd.DataFrame,
        keep_random_removing: bool = True,
    ):
        get_unqualified_users = partial(
            self.get_unqualified,
            col=self.user_col,
            threshold=self.min_user_interactions,
        )
        get_unqualified_items = partial(
            self.get_unqualified,
            col=self.item_col,
            threshold=self.min_item_interactions,
        )

        r = 1

        sample_df = train_df.copy()

        while keep_random_removing:
            keep_removing = True
            i = 1

            num_users_removed_each_round = int(
                self.perc_users_removed_each_round * sample_df[self.user_col].nunique()
            )
            if r > 1:
                print(
                    f"\n\nRandomly removing {num_users_removed_each_round} users - Round {r} started"
                )
                new_sample_df = self.remove_random_users(
                    sample_df, k=num_users_removed_each_round
                )
            else:
                new_sample_df = sample_df.copy()

            while keep_removing:
                if self.debug:
                    logger.info(f"Sampling round {i} started")
                keep_removing = False
                uu = get_unqualified_users(new_sample_df)
                if self.debug:
                    logger.info(f"{len(uu)=:,.0f}")
                if len(uu):
                    new_sample_df = new_sample_df.loc[
                        lambda df: ~df[self.user_col].isin(uu)
                    ]
                    if self.debug:
                        logger.info(f"After removing uu: {len(new_sample_df)=:,.0f}")
                    assert len(get_unqualified_users(new_sample_df)) == 0
                    keep_removing = True
                ui = get_unqualified_items(new_sample_df)
                if self.debug:
                    logger.info(f"{len(ui)=:,.0f}")
                if len(ui):
                    new_sample_df = new_sample_df.loc[
                        lambda df: ~df[self.item_col].isin(ui)
                    ]
                    if self.debug:
                        logger.info(f"After removing ui: {len(new_sample_df)=:,.0f}")
                    assert len(get_unqualified_items(new_sample_df)) == 0
                    keep_removing = True
                i += 1

            sample_users = sample_df[self.user_col].unique()
            sample_items = sample_df[self.item_col].unique()
            num_users = len(sample_users)
            logger.info(f"After randomly removing users - round {r}: {num_users=:,.0f}")
            if (
                num_users > self.sample_users * (1 + self.buffer_perc) or r == 1
            ):  # First round always overriding sample_df with new_sample_df to keep all qualified items and users
                logger.info(
                    f"Number of users {num_users:,.0f} are still greater than expected, keep removing..."
                )
                sample_df = new_sample_df.copy()
            else:
                logger.info(
                    f"Number of users {num_users:,.0f} are falling below expected threshold, stop and use `sample_df` as final output..."
                )
                keep_random_removing = False

            val_sample_df = val_df.loc[
                lambda df: df[self.user_col].isin(sample_users)
                & df[self.item_col].isin(sample_items)
            ]
            if (num_val_records := val_sample_df.shape[0]) < self.min_val_records:
                logger.info(
                    f"Number of val_df records {num_val_records:,.0f} are falling below expected threshold, stop and use `sample_df` as final output..."
                )
                keep_random_removing = False

            r += 1

        sample_users = sample_df[self.user_col].unique()
        sample_items = sample_df[self.item_col].unique()
        logger.info(f"{len(sample_users)=:,.0f} {len(sample_items)=:,.0f}")

        return sample_df, val_sample_df
```
</details>

In the end, we would not have exact like 10000 users, but the numbers would be close to that.

### 2. Temporal Splitting

Unlike traditional ML, recommendation systems require **temporal splits** to simulate real-world deployment:

```python
def create_temporal_splits(df, train_ratio=0.8):
    """
    Create time-aware train/validation splits.
    This simulates the real-world scenario where we predict future interactions.
    """
    # Sort by timestamp - critical for temporal integrity
    df_sorted = df.sort_values('timestamp')
    
    # Split based on time, not random sampling
    split_idx = int(len(df_sorted) * train_ratio)
    
    train_data = df_sorted.iloc[:split_idx]
    val_data = df_sorted.iloc[split_idx:]
    
    return train_data, val_data
```

:::{.column-page}
![Temporal data patterns show clear user interaction sequences that our models need to capture](../static/L2%20-%20Data%20signals.excalidraw.png){.lightbox}
:::

## ID Mapping: From Strings to Indices

Deep learning models work with numerical indices, not string IDs. Our `IDMapper` class handles this conversion:

```python
from src.id_mapper import IDMapper, map_indice

# Create and fit the ID mapper
idm = IDMapper()
idm.fit(user_ids=df['user_id'].unique(), 
        item_ids=df['parent_asin'].unique())

# Transform string IDs to numerical indices
df_indexed = map_indice(df, idm, 
                       user_col='user_id', 
                       item_col='parent_asin')

print(f"Users: {len(idm.user_to_index)} → indices 0-{len(idm.user_to_index)-1}")
print(f"Items: {len(idm.item_to_index)} → indices 0-{len(idm.item_to_index)-1}")
```

The `IDMapper` provides:
- **Deterministic mapping**: Same IDs always get same indices
- **Unknown handling**: Graceful handling of new users/items
- **Serialization**: Save/load mappings for production deployment

```python
# Save for later use in model serving
idm.save("data/idm.json")
```

## Sequence Generation: The Heart of Sequential RecSys

The key innovation in our approach is generating **item sequences** for each user interaction. This captures the temporal context that traditional collaborative filtering misses.

```python
from src.sequence.utils import generate_item_sequences

def generate_sequences(df, sequence_length=10):
    """
    For each user interaction, create a sequence of their previous items.
    This gives our model context about user behavior patterns.
    """
    df_with_sequences = generate_item_sequences(
        df=df,
        user_col='user_indice',
        item_col='item_indice', 
        timestamp_col='timestamp',
        sequence_length=sequence_length,
        padding=True,
        padding_value=-1  # Special token for "no previous item"
    )
    
    return df_with_sequences
```

### How Sequence Generation Works

Let's trace through an example:

```python
# User interactions over time:
# Time 1: User buys "Python Programming" (item_indice: 42)
# Time 2: User buys "Machine Learning" (item_indice: 73) 
# Time 3: User buys "Deep Learning" (item_indice: 91)

# Generated sequences:
# Row 1: item_sequence = [-1, -1, ..., -1]           # No previous items
# Row 2: item_sequence = [-1, -1, ..., 42]          # Previous: Python book
# Row 3: item_sequence = [-1, -1, ..., 42, 73]      # Previous: Python, ML books
```

This gives our model the **context** it needs to understand user preferences and make sequential predictions.

### Configuration for Sequences

```python
# From cfg/common.yaml
train:
  sequence:
    sequence_length: 10  # Keep last 10 items as context
```

The sequence length is a key hyperparameter:
- **Longer sequences**: More context but higher memory usage
- **Shorter sequences**: Less context but faster training
- **Padding**: Ensures all sequences have the same length for batch processing

## Dataset Classes for Training

Our `src/dataset.py` provides PyTorch-compatible dataset classes:

```python
from src.dataset import UserItemBinaryDFDataset

# Create dataset for binary feedback (clicked/not clicked)
dataset = UserItemBinaryDFDataset(
    df=df_with_sequences,
    user_col='user_indice',
    item_col='item_indice',
    rating_col='rating',
    timestamp_col='timestamp'
)

# Each sample contains:
sample = dataset[0]
print(f"User: {sample['user']}")           # User index
print(f"Item: {sample['item']}")           # Item index  
print(f"Rating: {sample['rating']}")       # Binary label (0 or 1)
print(f"Sequence: {sample['item_sequence']}")  # Previous items
```

### Why Binary Labels?

We convert ratings to binary labels because:
- **Implicit feedback**: Focus on engagement (rating ≥ 1) vs no engagement
- **Training stability**: Binary classification is more stable than rating prediction
- **Real-world relevance**: Most recommendation systems optimize for engagement, not rating accuracy

## Data Flow Through the Pipeline

Let's trace the complete data flow from our configuration:

```python
# 1. Raw data paths (from cfg/common.yaml)
data:
  train_fp: "$ROOT_DIR/data/train.parquet"           # Processed train split
  val_fp: "$ROOT_DIR/data/val.parquet"               # Processed validation split
  train_features_fp: "$ROOT_DIR/data/train_features.parquet"  # With sequences
  val_features_fp: "$ROOT_DIR/data/val_features.parquet"      # With sequences
  idm_fp: "$ROOT_DIR/data/idm.json"                  # ID mappings

# 2. Processing pipeline (notebooks/001-features.ipynb)
raw_data → filtering → temporal_split → id_mapping → sequence_generation → save
```

### Key Processing Steps

1. **Data Loading**: Load preprocessed train/val splits
2. **ID Mapping**: Convert string IDs to numerical indices  
3. **Sequence Generation**: Create item sequence features
4. **Data Validation**: Ensure data quality and consistency
5. **Serialization**: Save processed features for model training

## Configuration-Driven Development

Everything is controlled through our YAML configuration:

```yaml
# cfg/common.yaml - Data section
data:
  hf_datasets:
    name: "McAuley-Lab/Amazon-Reviews-2023"
    mcauley_variant: "Books"
  
  # File paths with environment variable substitution
  train_fp: "$ROOT_DIR/data/train.parquet"
  val_fp: "$ROOT_DIR/data/val.parquet"
  idm_fp: "$ROOT_DIR/data/idm.json"
  
  # Column mappings
  user_col: "user_id"
  item_col: "parent_asin" 
  rating_col: "rating"
  timestamp_col: "timestamp"

# Sampling configuration  
sample:
  sample_users: 10000
  min_user_interactions: 5
  min_item_interactions: 10

# Sequence configuration
train:
  sequence:
    sequence_length: 10
```

This approach provides:
- **Reproducibility**: Same config = same results
- **Flexibility**: Easy to experiment with different parameters
- **Environment handling**: Automatic path resolution with `$ROOT_DIR`

## Data Quality Checks

Before moving to model training, we validate our processed data:

```python
def validate_processed_data(df, idm):
    """Ensure our processed data is ready for model training."""
    
    print(f"Data shape: {df.shape}")
    print(f"Users: {df['user_indice'].nunique()}")
    print(f"Items: {df['item_indice'].nunique()}")
    
    # Check for invalid indices
    max_user_idx = len(idm.user_to_index) - 1
    max_item_idx = len(idm.item_to_index) - 1
    
    assert df['user_indice'].max() <= max_user_idx, "Invalid user indices"
    assert df['item_indice'].max() <= max_item_idx, "Invalid item indices"
    
    # Check sequence validity
    if 'item_sequence' in df.columns:
        seq_lengths = df['item_sequence'].apply(len)
        print(f"Sequence lengths: min={seq_lengths.min()}, max={seq_lengths.max()}")
    
    print("✅ Data validation passed!")
```

## What We've Accomplished

By the end of this chapter, you have:

✅ **Understanding of recommendation data characteristics** - Temporal patterns, sparsity, and user behavior  
✅ **Processed dataset with proper ID mappings** - Ready for deep learning models  
✅ **Feature engineering pipeline for sequences** - The core of session-based recommendations  
✅ **Configuration-driven data processing** - Reproducible and flexible pipeline  
✅ **Data validation and quality checks** - Ensuring reliability for model training  

### Key Files Created

- `data/train_features.parquet` - Training data with sequences
- `data/val_features.parquet` - Validation data with sequences  
- `data/idm.json` - ID mappings for production deployment

## What's Next

In **Chapter 3**, we'll tackle the critical challenge of **negative sampling**. Since we only observe positive interactions (ratings), we need to carefully generate negative examples for training our models.

We'll explore:
- Why negative sampling is essential for implicit feedback
- Different negative sampling strategies and their trade-offs
- Implementation using `src/negative_sampling.py`
- Creating balanced datasets ready for model training

The quality of negative sampling directly impacts model performance, so let's dive into this crucial component next!

---

<br>
If you find this tutorial helpful, please cite this writeup as:

> Quy, Dinh. (May 2025). {{< meta title >}}. {{< var website_url >}}. https://{{< var website_url >}}/projects/real-time-seq-recsys/c2/. 