## Prepare data

Our goal is to get a small sample of the dataset to work with that is good enough to iterate on. A small sample makes it faster to iterate, hence enable easier debugging. If we can not make our model work on a reasonably small dataset then it's not likely to work on bigger one either.

We define a sufficiently small training dataset as:
- **Richness**: Every user should have at least 5 interactions and each item should have at least 10 interactions
- **Enough samples**: There should be at least 1000 users in the training dataset and about 1000 interactions in the validation set

In [17]:
import os
import sys
from functools import partial

import numpy as np
import plotly.express as px
from loguru import logger
from pydantic import BaseModel, model_validator
from load_dotenv import load_dotenv
import pandas as pd

from sqlalchemy import create_engine

sys.path.insert(0, "..")

from src.visualization.setup import FSDSColors
_ = load_dotenv(override=True)

## Controler

In [21]:
class Args(BaseModel):
    run_name: str = "003-prep-data"
    testing: bool = False
    notebook_persist_dp: str = None
    random_seed: int = 41

    user_col: str = "customer_id"
    item_col: str = "article_id"
    interaction_col: str = "price"
    timestamp_col: str = "t_dat"

    sample_users: int = 1000
    min_user_interactions: int = 5
    min_item_interactions: int = 10

    # Database credentials
    user: str = None
    password: str = None
    db: str = None
    host: str = None
    port: int = None

    @model_validator(mode="before")
    def load_env_vars(cls, values):
        # Load environment variables if not explicitly set
        values["user"] = values.get("user") or os.getenv("POSTGRES_USER")
        values["password"] = values.get("password") or os.getenv("POSTGRES_PASSWORD")
        values["db"] = values.get("db") or os.getenv("POSTGRES_DB")
        values["host"] = values.get("host") or os.getenv("POSTGRES_HOST")
        values["port"] = values.get("port") or os.getenv("POSTGRES_PORT")
        return values

    def init(self):
        self.notebook_persist_dp = os.path.abspath(f"../data/interim")
        if not self.testing:
            os.makedirs(self.notebook_persist_dp, exist_ok=True)

        return self


args = Args().init()

print(args.model_dump_json(indent=2))

{
  "run_name": "003-prep-data",
  "testing": false,
  "notebook_persist_dp": "/home/dinhln/Desktop/MLOPS/recsys/HM-ScalableRecs/data/interim",
  "random_seed": 41,
  "user_col": "customer_id",
  "item_col": "article_id",
  "interaction_col": "price",
  "timestamp_col": "t_dat",
  "sample_users": 1000,
  "min_user_interactions": 5,
  "min_item_interactions": 10,
  "user": "lastfirstkiss",
  "password": "nightchange",
  "db": "hm-recsys",
  "host": "localhost",
  "port": 5432
}


## Load data from OLAP (assume PostgreSQL in this context)

In [29]:
# Define the PostgreSQL connection URL
DATABASE_URL = f"postgresql+psycopg2://{args.user}:{args.password}@{args.host}:{args.port}/{args.db}"

# Create an SQLAlchemy engine
engine = create_engine(DATABASE_URL)

# Write a SQL query
query = "SELECT * FROM transactions;"

# Load data into a Pandas DataFrame
with engine.connect() as connection:
    data = pd.read_sql(query, con=connection)

# Display the first few rows
print(data.head())

       t_dat                                        customer_id  article_id  \
0 2018-09-20  000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...   663713001   
1 2018-09-20  000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...   541518023   
2 2018-09-20  00007d2de826758b65a93dd24ce629ed66842531df6699...   505221004   
3 2018-09-20  00007d2de826758b65a93dd24ce629ed66842531df6699...   685687003   
4 2018-09-20  00007d2de826758b65a93dd24ce629ed66842531df6699...   685687004   

      price  sales_channel_id  
0  0.050831                 2  
1  0.030492                 2  
2  0.015237                 2  
3  0.016932                 2  
4  0.016932                 2  


In [32]:
data.dtypes

t_dat               datetime64[ns]
customer_id                 object
article_id                   int64
price                      float64
sales_channel_id             int64
dtype: object

## Sampling data

Just randomly get X users will not guarantee that the output dataset would qualify the condition of **richness**. Instead we take an iterative approach where we gradually drop random users from the dataset while keeping an eye on the conditions and our sampling target.

In [33]:
def remove_random_users(df, k=10):
    users = df[args.user_col].unique()
    np.random.seed(args.random_seed)
    to_remove_users = np.random.choice(users, size=k, replace=False)
    return df.loc[lambda df: ~df[args.user_col].isin(to_remove_users)]


def get_unqualified(df, col: str, threshold: int):
    unqualified = df.groupby(col).size().loc[lambda s: s < threshold].index
    return unqualified


get_unqualified_users = partial(
    get_unqualified, col=args.user_col, threshold=args.min_user_interactions
)
get_unqualified_items = partial(
    get_unqualified, col=args.item_col, threshold=args.min_item_interactions
)

In [None]:
buffer_perc = 0.2
perc_users_removed_each_round = 0.01
debug = True
min_val_records = 1000

keep_random_removing = True
r = 1

sample_df = data.copy()

while keep_random_removing:
    num_users_removed_each_round = int(
        perc_users_removed_each_round * sample_df[args.user_col].nunique()
    )
    print(
        f"\n\nRandomly removing {num_users_removed_each_round} users - Round {r} started"
    )
    new_sample_df = remove_random_users(sample_df, k=num_users_removed_each_round)

    keep_removing = True
    i = 1

    while keep_removing:
        if debug:
            logger.info(f"Sampling round {i} started")
        keep_removing = False
        uu = get_unqualified_users(new_sample_df)
        if debug:
            logger.info(f"{len(uu)=}")
        if len(uu):
            new_sample_df = new_sample_df.loc[lambda df: ~df[args.user_col].isin(uu)]
            if debug:
                logger.info(f"After removing uu: {len(new_sample_df)=}")
            assert len(get_unqualified_users(new_sample_df)) == 0
            keep_removing = True
        ui = get_unqualified_items(new_sample_df)
        if debug:
            logger.info(f"{len(ui)=}")
        if len(ui):
            new_sample_df = new_sample_df.loc[lambda df: ~df[args.item_col].isin(ui)]
            if debug:
                logger.info(f"After removing ui: {len(new_sample_df)=}")
            assert len(get_unqualified_items(new_sample_df)) == 0
            keep_removing = True
        i += 1

    sample_users = sample_df[args.user_col].unique()
    sample_items = sample_df[args.item_col].unique()
    num_users = len(sample_users)
    logger.info(f"After randomly removing users - round {r}: {num_users=}")
    if num_users > args.sample_users * (1 + buffer_perc):
        logger.info(
            f"Number of users {num_users} are still greater than expected, keep removing..."
        )
        sample_df = new_sample_df.copy()
    else:
        logger.info(
            f"Number of users {num_users} are falling below expected threshold, stop and use `sample_df` as final output..."
        )
        keep_random_removing = False

    val_sample_df = val_raw.loc[
        lambda df: df[args.user_col].isin(sample_users)
        & df[args.item_col].isin(sample_items)
    ]
    if (num_val_records := val_sample_df.shape[0]) < min_val_records:
        logger.info(
            f"Number of val_df records {num_val_records} are falling below expected threshold, stop and use `sample_df` as final output..."
        )
        keep_random_removing = False

    r += 1