In [None]:
!pip install -r /kaggle/input/requirements1.txt

In [None]:
import numpy as np


class StatelessRandomGenerator:
    def __init__(self, seed=42):
        self.seed = seed

    def set_seed(self, new_seed):
        self.seed = new_seed

    def random(self, size=None):
        rng = np.random.default_rng(self.seed)
        return rng.random(size)

    def integers(self, low, high=None, size=None):
        rng = np.random.default_rng(self.seed)
        return rng.integers(low, high, size)

    def choice(self, a, size=None, replace=True, p=None):
        rng = np.random.default_rng(self.seed)
        return rng.choice(a, size, replace, p)


global_rng = StatelessRandomGenerator(42)


def set_global_seed(new_seed):
    global_rng.set_seed(new_seed)

In [None]:
import torch


def wmape_metric(pred: torch.Tensor, true: torch.Tensor) -> torch.Tensor:
    return torch.sum(torch.abs(pred - true), dim=0) / torch.sum(true, dim=0)

In [None]:
from datetime import datetime
import json
from pathlib import Path
import polars as pl
#from data_processing.utils.stateless_rng import global_rng

def filter_purchases_purchases_per_month_pl(
    df_pl: pl.DataFrame, train_end: datetime.date, group_by_channel_id: bool = False
):
    """Filters extreme customers and groups purchases by date and optionally by sales channel.

    This function:
    1. Groups transactions by customer, date, and optionally sales channel
    2. Identifies extreme customers based on the 99th percentile of total items purchased
    3. Removes these customers from the dataset

    Args:
        df_pl (pl.DataFrame): Input transaction dataframe containing:
            - customer_id: Customer identifier
            - date: Transaction date
            - article_id: Product identifier
            - price: Transaction price
            - sales_channel_id: Sales channel identifier
        train_end (datetime.date): End date for training period.
        group_by_channel_id (bool, optional): Whether to group transactions by sales channel. Defaults to False.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame]: Tuple containing:
            - grouped_df: Grouped transaction data with columns:
                - customer_id, date, [sales_channel_id], article_ids, total_price, prices, num_items
            - extreme_customers: DataFrame of customers identified as outliers based on purchase behavior

    Notes:
        Extreme customers are identified using the 99th percentile of total items purchased
        during the training period.
    """
    # Used for multi variate time series
    if group_by_channel_id:
        grouped_df = (
            df_pl.lazy()
            .group_by(["customer_id", "date", "sales_channel_id"])
            .agg(
                [
                    pl.col("article_id").explode().alias("article_ids"),
                    pl.col("price").sum().round(2).alias("total_price"),
                    pl.col("price").explode().alias("prices"),
                ]
            )
            .with_columns(pl.col("article_ids").list.len().alias("num_items"))
        )
    else:
        grouped_df = (
            df_pl.lazy()
            .group_by(["customer_id", "date"])
            .agg(
                [
                    pl.col("article_id").explode().alias("article_ids"),
                    pl.col("price").sum().round(2).alias("total_price"),
                    pl.col("sales_channel_id").explode().alias("sales_channel_ids"),
                    pl.col("price").explode().alias("prices"),
                ]
            )
            .with_columns(pl.col("article_ids").list.len().alias("num_items"))
        )

    # Only remove customers with extreme purchases in train period
    customers_summary = (
        df_pl.lazy()
        .filter(pl.col("date") < train_end)
        .group_by("customer_id")
        .agg(
            [
                pl.col("date").n_unique().alias("total_purchases"),
                pl.col("price").sum().round(2).alias("total_spent"),
                pl.col("article_id").flatten().alias("flattened_ids")
            ]
        )
        .with_columns(pl.col("flattened_ids").list.len().alias("total_items"))
    )

    quantile = 0.99
    total_purchases_99, total_spending_99, total_items_99 = (
        customers_summary.select(
            [
                pl.col("total_purchases").quantile(quantile),
                pl.col("total_spent").quantile(quantile),
                pl.col("total_items").quantile(quantile),
            ]
        )
        .collect()
        .to_numpy()
        .flatten()
    )

    # Currently only remove customers with very large number of total items purchased
    extreme_customers = customers_summary.filter(
        (pl.col("total_items") >= total_items_99)
        # | (pl.col("total_purchases") >= total_purchases_99)
        # | (pl.col("total_spent") >= total_spending_99)
    )

    extreme_customers = extreme_customers.select("customer_id").unique()
    extreme_customers = extreme_customers.collect()

    print(
        f"""
        Cutoff Values for {quantile*100}th Percentiles:
        -----------------------------------
        Total items bought:    {total_items_99:.0f} items

        -----------------------------------
        Removed Customers:     {len(extreme_customers):,}
        """
    )

    return grouped_df.collect(), extreme_customers

def train_test_split(
    train_df: pl.DataFrame,
    test_df: pl.DataFrame,
    subset: int = None,
    train_subsample_percentage: float = None,
) -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]:
    """Splits data into train, validation, and test sets with optional subsampling.

    The function performs the following operations:
    1. Optional subsampling of both train and test data
    2. Optional percentage-based subsampling of training data
    3. Creates a validation set from 10% of the training data

    Args:
        train_df (pl.DataFrame): Training dataset.
        test_df (pl.DataFrame): Test dataset.
        subset (int, optional): If provided, limits both train and test sets to first n rows. 
            Defaults to None.
        train_subsample_percentage (float, optional): If provided, randomly samples this percentage 
            of training data. Defaults to None.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]: Tuple containing:
            - train_df: Final training dataset (90% of training data after subsampling)
            - val_df: Validation dataset (10% of training data)
            - test_df: Test dataset (potentially subsampled)

    Notes:
        If both subset and train_subsample_percentage are provided, subset is applied first.
        The validation set is always 10% of the remaining training data after any subsampling.
    """

    if subset is not None:
        train_df = train_df[:subset]
        test_df = test_df[:subset]
    elif train_subsample_percentage is not None:
        sampled_indices = global_rng.choice(
            len(train_df),
            size=int(train_subsample_percentage * len(train_df)),
            replace=False,
        )
        train_df = train_df[sampled_indices]

    # Train-val-split
    # Calculate 10% of the length of the array
    sampled_indices = global_rng.choice(
        len(train_df), size=int(0.1 * len(train_df)), replace=False
    )
    val_df = train_df[sampled_indices]
    train_df = train_df.filter(~pl.arange(0, pl.count()).is_in(sampled_indices))

    return train_df, val_df, test_df

def map_article_ids(df: pl.DataFrame, data_path: Path) -> pl.DataFrame:
    """Maps article IDs to new running IDs using a mapping dictionary from JSON.

    Args:
        df (pl.DataFrame): DataFrame with 'article_id' column to be mapped.
        data_path (Path): Path to directory with 'running_id_dict.json' containing ID mappings.

    Returns:
        pl.DataFrame: DataFrame with mapped article IDs, sorted by new IDs. Non-mapped articles are removed.
    """
    with open(data_path / "running_id_dict.json", "r") as f:
        data = json.load(f)
    article_id_dict = data["combined"]

    mapping_df = pl.DataFrame(
        {
            "old_id": list(article_id_dict.keys()),
            "new_id": list(article_id_dict.values()),
        },
        schema_overrides={"old_id": pl.Int32, "new_id": pl.Int32},
    )

    # Join and select
    df = df.join(
        mapping_df, left_on="article_id", right_on="old_id", how="inner"
    ).select(
        pl.col("new_id").alias("article_id"),
        pl.all().exclude(["article_id", "old_id", "new_id"]),
    )
    df = df.sort("article_id")

    return df

In [None]:
#from pathlib import Path
#from data_processing.customer_df.customer_df import get_customer_df_benchmarks
#from data_processing.transaction_df.transaction_df import get_tx_article_dfs
import polars as pl


def expand_list_columns(
    df: pl.DataFrame, date_col: str = "days_before_lst", num_col: str = "num_items_lst"
) -> pl.DataFrame:
    """
    Expand a Polars DataFrame by repeating each element in a list column according to
    the counts specified in another list column.

    Args:
        df: Input Polars DataFrame with list columns
        date_col: Name of the column containing the lists to be expanded
        num_col: Name of the column containing lists of counts

    Returns:
        A new Polars DataFrame where the list elements in date_col have been expanded
    """
    expanded = df.with_columns(
        pl.struct([date_col, num_col])
        .map_elements(
            lambda x: [
                date
                for date, count in zip(x[date_col], x[num_col])
                for _ in range(count)
            ]
        )
        .alias(date_col)
    )

    return expanded


def add_benchmark_tx_features(df: pl.DataFrame) -> pl.DataFrame:
    """Creates benchmark transaction features from aggregated customer transaction data.

    Args:
        df: A Polars DataFrame containing aggregated transaction data with list columns
            including total_price_lst, num_items_lst, days_before_lst, price_lst,
            and CLV_label.

    Returns:
        pl.DataFrame: A DataFrame with derived features including:
            - total_spent: Sum of all transaction amounts
            - total_purchases: Count of transactions
            - total_items: Sum of items purchased
            - days_since_last_purchase: Days since most recent transaction
            - days_since_first_purchase: Days since first transaction
            - avg_spent_per_transaction: Mean transaction amount
            - avg_items_per_transaction: Mean items per transaction
            - avg_days_between: Mean days between transactions
            - regression_label: CLV label for regression
            - classification_label: Binary CLV label (>0)

    Note:
        The avg_days_between calculation may return None for customers with single
        transactions, which is handled by tree-based algorithms.
    """
    return df.select(
        "customer_id",
        pl.col("total_price_lst").list.sum().alias("total_spent"),
        pl.col("total_price_lst").list.len().alias("total_purchases"),
        pl.col("num_items_lst").list.sum().alias("total_items"),
        pl.col("days_before_lst").list.get(-1).alias("days_since_last_purchase"),
        pl.col("days_before_lst").list.get(0).alias("days_since_first_purchase"),
        pl.col("price_lst").list.mean().alias("avg_spent_per_transaction"),
        (
            pl.col("num_items_lst")
            .list.mean()
            .cast(pl.Float32)
            .alias("avg_items_per_transaction")
        ),
        # Code below returns None values for customers with single Tx
        # Tree algos should be able to handle this
        (
            pl.col("days_before_lst")
            .list.diff(null_behavior="drop")
            .list.mean()
            .mul(-1)
            .cast(pl.Float32)
            .alias("avg_days_between")
        ),
        pl.col("CLV_label").alias("regression_label"),
        pl.col("CLV_label").gt(0).cast(pl.Int32).alias("classification_label"),
    )


def process_dataframe(df: pl.DataFrame, max_length: int = 20) -> pl.DataFrame:
    """Processes a polars DataFrame by expanding list columns and selecting specific columns with transformations.

    This function performs several operations on the input DataFrame:
    1. Expands list columns using the expand_list_columns function
    2. Selects and renames specific columns
    3. Truncates list columns to a maximum length

    Args:
        df: A polars DataFrame containing customer transaction data
        max_length: Maximum number of elements to keep in list columns (default: 20)

    Returns:
        A processed polars DataFrame with the following columns:
            - customer_id: Customer identifier
            - days_before_lst: Truncated list of days before some reference date
            - articles_ids_lst: Truncated list of article identifiers
            - regression_label: CLV label for regression tasks
            - classification_label: Binary classification label derived from CLV
    """
    df = expand_list_columns(df, date_col="days_before_lst", num_col="num_items_lst")
    return df.select(
        "customer_id",
        "days_before_lst",
        "articles_ids_lst",
        pl.col("CLV_label").alias("regression_label"),
        pl.col("CLV_label").gt(0).cast(pl.Int32).alias("classification_label"),
    ).with_columns(
        pl.col("days_before_lst").list.tail(max_length),
        pl.col("articles_ids_lst").list.tail(max_length),
    )


def get_benchmark_dfs(
    data_path: Path, config: dict
) -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]:
    """Creates benchmark train, validation, and test datasets with transaction and customer features.

    Args:
        data_path: Path object pointing to the data directory
        config: Dictionary containing configuration parameters for data processing

    Returns:
        tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]: A tuple containing:
            - train_df: Training dataset with benchmark features
            - val_df: Validation dataset with benchmark features
            - test_df: Test dataset with benchmark features

        Each DataFrame contains transaction-derived features joined with customer features.
    """
    train_article, val_article, test_article = get_tx_article_dfs(
        data_path=data_path,
        config=config,
        cols_to_aggregate=[
            "date",
            "days_before",
            "article_ids",
            "sales_channel_ids",
            "total_price",
            "prices",
            "num_items",
        ],
        keep_customer_id=True,
    )

    customer_df = get_customer_df_benchmarks(data_path=data_path, config=config)

    train_df = process_dataframe(
        df=train_article, max_length=config["max_length"]
    ).join(customer_df, on="customer_id", how="left")
    val_df = process_dataframe(df=val_article, max_length=config["max_length"]).join(
        customer_df, on="customer_id", how="left"
    )
    test_df = process_dataframe(df=test_article, max_length=config["max_length"]).join(
        customer_df, on="customer_id", how="left"
    )

    return train_df, val_df, test_df

In [None]:
import polars as pl
#from pathlib import Path


def get_customer_df_benchmarks(data_path: Path, config: dict):
    """Processes customer data with age grouping and zip code mapping.

    Args:
        data_path (Path): Path to directory containing 'customers.csv' and 'zip_code_count.csv'.
        config (dict): Configuration with 'min_zip_code_count'. Updated with 'num_age_groups' and 'num_zip_codes'.

    Returns:
        pl.DataFrame: Processed DataFrame with customer_id, age_group (0-6), and mapped zip_code_id.
    """
    file_path = data_path / "customers.csv"
    df = pl.scan_csv(file_path).select(
        (
            "customer_id",
            pl.col("age").fill_null(strategy="mean"),
            "postal_code",
        )
    )

    # df = df.with_columns(
    #     [
    #         pl.when(pl.col("age").is_null())
    #         .then(0)
    #         .when(pl.col("age") < 25)
    #         .then(1)
    #         .when(pl.col("age").is_between(25, 34))
    #         .then(2)
    #         .when(pl.col("age").is_between(35, 44))
    #         .then(3)
    #         .when(pl.col("age").is_between(45, 54))
    #         .then(4)
    #         .when(pl.col("age").is_between(55, 64))
    #         .then(5)
    #         .otherwise(6)
    #         .alias("age_group")
    #     ]
    # )
    # config["num_age_groups"] = 7

    return df.collect()

In [None]:
#from datetime import datetime
#from pathlib import Path
#import polars as pl

#from data_processing.utils.utils_transaction_df import (
 #   filter_purchases_purchases_per_month_pl,
  #  map_article_ids,
   # train_test_split,
#)


def generate_clv_data_pl(
    df: pl.DataFrame,
    agg_df: pl.DataFrame,
    label_threshold: datetime.date,
    pred_end: datetime.date,
    clv_periods: list,
    log_clv: bool = False,
):
    """Generates Customer Lifetime Value (CLV) data from transaction dataframe.

    Args:
        df (pl.DataFrame): Input transaction dataframe containing customer purchases.
        agg_df (pl.DataFrame): Aggregated dataframe containing customer data.
        label_threshold (datetime.date): Start date for CLV calculation period.
        pred_end (datetime.date): End date for CLV calculation period.
        clv_periods (list): List of periods for CLV calculation (currently supports single period only).
        log_clv (bool, optional): Whether to apply log1p transformation to CLV values. Defaults to False.

    Returns:
        pl.DataFrame: Aggregated dataframe with added CLV calculations.

    Raises:
        ValueError: If more than one CLV period is provided.
    """
    if len(clv_periods) > 1:
        raise ValueError("CLV periods should be a single number for now.")

    # Filter transactions between label_threshold and end_date for each period
    filtered_df = df.filter(
        (pl.col("date") >= label_threshold) & (pl.col("date") <= pred_end)
    )

    # Sum total_price for the filtered transactions by customer_id. This is the CLV
    summed_period_df = filtered_df.group_by("customer_id").agg(
        pl.sum("total_price").round(2).alias(f"CLV_label")
    )
    if log_clv:
        summed_period_df = summed_period_df.with_columns(
            pl.col(f"CLV_label").log1p().round(2).alias(f"CLV_label")
        )

    agg_df = agg_df.join(summed_period_df, on="customer_id", how="left")

    agg_df = agg_df.fill_null(0)
    return agg_df


def group_and_convert_df_pl(
    df: pl.DataFrame,
    label_start_date: datetime.date,
    pred_end: datetime.date,
    clv_periods: list,
    cols_to_aggregate: list = [
        "date",
        "days_before",
        "num_items",
        "article_ids",
        "sales_channel_ids",
        "total_price",
        "prices",
    ],
    keep_customer_id: bool = True,
    log_clv: bool = False,
) -> pl.DataFrame:
    """Groups and converts transaction data into aggregated customer-level features.

    Args:
        df (pl.DataFrame): Input transaction dataframe.
        label_start_date (datetime.date): Start date for clv label period.
        pred_end (datetime.date): End date for prediction period.
        clv_periods (list): List of periods for CLV calculation.
        cols_to_aggregate (list, optional): Columns to include in aggregation. Defaults to standard transaction columns.
        keep_customer_id (bool, optional): Whether to retain customer_id in output. Defaults to True.
        log_clv (bool, optional): Whether to apply log1p transformation to CLV values. Defaults to False.

    Returns:
        pl.DataFrame: Aggregated customer-level dataframe.

    Raises:
        ValueError: If required columns (days_before, article_ids, num_items) are missing from cols_to_aggregate.
    """

    if any(
        col not in cols_to_aggregate
        for col in ["days_before", "article_ids", "num_items"]
    ):
        raise ValueError(
            "The columns days_before, article_ids, and num_items are required "
            "for the aggregation"
        )

    mapping = {
        "date": "date_lst",
        "days_before": "days_before_lst",
        "article_ids": "articles_ids_lst",
        "sales_channel_ids": "sales_channel_id_lst",
        "total_price": "total_price_lst",
        "prices": "price_lst",
        "num_items": "num_items_lst",
    }

    agg_df = (
        df.filter(pl.col("date") < label_start_date)
        .with_columns(
            (label_start_date - pl.col("date"))
            .dt.total_days()
            .cast(pl.Int32)
            .alias("days_before"),
            (
                pl.col("sales_channel_ids")
                .cast(pl.List(pl.Int32))
                .alias("sales_channel_ids")
            ),
            pl.col("article_ids").cast(pl.List(pl.Int32)).alias("article_ids"),
        )
        .sort("customer_id", "date")
        .group_by("customer_id")
        .agg(
            pl.col("date").explode().alias("date_lst"),
            pl.col("days_before").explode().alias("days_before_lst"),
            pl.col("article_ids").explode().alias("articles_ids_lst"),
            pl.concat_list(pl.col("sales_channel_ids")).alias("sales_channel_id_lst"),
            pl.col("total_price").explode().alias("total_price_lst"),
            pl.col("prices").explode().alias("price_lst"),
            pl.col("num_items").explode().alias("num_items_lst"),
        )
    )

    if clv_periods is not None:
        agg_df = generate_clv_data_pl(
            df=df,
            agg_df=agg_df,
            label_threshold=label_start_date,
            pred_end=pred_end,
            clv_periods=clv_periods,
            log_clv=log_clv,
        )

    # Drop columns which are not to be aggregated
    cols_to_drop = [v for k, v in mapping.items() if k not in cols_to_aggregate]
    if not keep_customer_id:
        cols_to_drop.append("customer_id")
    agg_df = agg_df.drop(*cols_to_drop)

    return agg_df


def split_df_and_group_pl(
    df: pl.DataFrame,
    clv_periods: list,
    config: dict,
    cols_to_aggregate: list = [
        "date",
        "days_before",
        "article_ids",
        "sales_channel_ids",
        "total_price",
        "prices",
        "num_items",
    ],
    keep_customer_id: bool = True,
    log_clv: bool = False,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """Splits transaction data into training and test sets and performs aggregation.

    Args:
        df (pl.DataFrame): Input transaction dataframe.
        clv_periods (list): List of periods for CLV calculation.
        config (dict): Configuration dictionary containing:
        cols_to_aggregate (list, optional): Columns to include in aggregation. Defaults to standard transaction columns.
        keep_customer_id (bool, optional): Whether to retain customer_id in output. Defaults to True.
        log_clv (bool, optional): Whether to apply log1p transformation to CLV values. Defaults to False.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame]: Tuple containing:
            - train_df: Aggregated training dataset
            - test_df: Aggregated test dataset
    """

    train_begin = datetime.strptime(config.get("train_begin"), "%Y-%m-%d")
    train_label_start = datetime.strptime(config.get("train_label_begin"), "%Y-%m-%d")
    train_end = datetime.strptime(config.get("train_end"), "%Y-%m-%d")
    test_begin = datetime.strptime(config.get("test_begin"), "%Y-%m-%d")
    test_label_start = datetime.strptime(config.get("test_label_begin"), "%Y-%m-%d")
    test_end = datetime.strptime(config.get("test_end"), "%Y-%m-%d")

    # Creating the training DataFrame by filtering dates up to `train_end`
    train_df = df.filter(
        (pl.col("date") <= train_end) & (pl.col("date") >= train_begin)
    )

    train_df = group_and_convert_df_pl(
        df=train_df,
        label_start_date=train_label_start,
        pred_end=train_end,
        clv_periods=clv_periods,
        cols_to_aggregate=cols_to_aggregate,
        keep_customer_id=keep_customer_id,
        log_clv=log_clv,
    )

    # Creating the test DataFrame by filtering dates after `test_begin`
    test_df = df.filter((pl.col("date") >= test_begin) & (pl.col("date") <= test_end))

    test_df = group_and_convert_df_pl(
        df=test_df,
        label_start_date=test_label_start,
        pred_end=test_end,
        clv_periods=clv_periods,
        cols_to_aggregate=cols_to_aggregate,
        keep_customer_id=keep_customer_id,
        log_clv=log_clv,
    )

    return train_df, test_df


def load_data_rem_outlier_pl(
    data_path: Path, train_end: datetime.date, group_by_channel_id: bool = False
):
    """Loads transaction data, applies price scaling, and removes outliers.

    Args:
        data_path (Path): Path to directory containing transaction data parquet file.
        train_end (datetime.date): End date for training period.
        group_by_channel_id (bool, optional): Whether to group data by sales channel ID. Defaults to False.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame]: Tuple containing:
            - grouped_df: Processed transaction dataframe
            - extreme_customers: Dataframe of customers identified as outliers
    """
    file_path = data_path / "transactions_polars.parquet"
    df_pl = pl.read_parquet(file_path)

    df_pl = df_pl.with_columns(
        pl.col("t_dat").alias("date").cast(pl.Date), pl.col("article_id").cast(pl.Int32)
    )

    df_pl = df_pl.with_columns(
        pl.col("price").mul(590).cast(pl.Float32).round(2).alias("price")
    )

    # Map article ids to running ids so that they match with feature matrix
    df_pl = map_article_ids(df=df_pl, data_path=data_path)

    grouped_df, extreme_customers = filter_purchases_purchases_per_month_pl(
        df_pl, train_end=train_end, group_by_channel_id=group_by_channel_id
    )

    return grouped_df, extreme_customers


def get_customer_train_test_articles_pl(
    data_path: Path,
    config: dict,
    clv_periods: list = None,
    cols_to_aggregate: list = [
        "date",
        "days_before",
        "article_ids",
        "sales_channel_ids",
        "total_price",
        "prices",
        "num_items",
    ],
    keep_customer_id: bool = True,
):
    """Processes customer transaction data into train and test sets with article information.

    Args:
        data_path (Path): Path to directory containing transaction data.
        config (dict): Configuration dictionary for data processing parameters.
        clv_periods (list, optional): List of periods for CLV calculation. Defaults to None.
        cols_to_aggregate (list, optional): Columns to include in aggregation. Defaults to standard transaction columns.
        keep_customer_id (bool, optional): Whether to retain customer_id in output. Defaults to True.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame]: Tuple containing:
            - train_df: Processed training dataset with article information
            - test_df: Processed test dataset with article information
    """
    train_end = datetime.strptime(config.get("train_end"), "%Y-%m-%d")
    grouped_df, extreme_customers = load_data_rem_outlier_pl(
        data_path=data_path, train_end=train_end
    )

    train_df, test_df = split_df_and_group_pl(
        df=grouped_df,
        clv_periods=clv_periods,
        config=config,
        cols_to_aggregate=cols_to_aggregate,
        keep_customer_id=True,
        log_clv=config.get("log_clv", False),
    )

    train_df = train_df.join(extreme_customers, on="customer_id", how="anti")
    test_df = test_df.join(extreme_customers, on="customer_id", how="anti")

    if not keep_customer_id:
        train_df = train_df.drop("customer_id")
        test_df = test_df.drop("customer_id")

    return train_df, test_df


def get_tx_article_dfs(
    data_path: Path,
    config: dict,
    cols_to_aggregate: list = [
        "date",
        "days_before",
        "article_ids",
        "sales_channel_ids",
        "total_price",
        "prices",
        "num_items",
    ],
    keep_customer_id: bool = True,
):
    """Creates train, validation, and test datasets with optional subsampling.

    Args:
        data_path (Path): Path to directory containing transaction data files.
        config (dict): Configuration dictionary containing:
        cols_to_aggregate (list, optional): Transaction columns to include in output.
        keep_customer_id (bool, optional): Whether to retain customer_id column.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]: Tuple containing:
            - train_df: Final training dataset (subset of original training data)
            - val_df: Validation dataset (10% of original training data)
            - test_df: Test dataset (optionally subsampled)
    """
    """
    Columns of dfs:
        - customer_id
        - date_lst (list[date]): Dates of each transaction
        - days_before_lst (list[int]): Number of days between start of prediction and date of transction
        - articles_ids_lst (list[int]): Flattened list of all items a customer purchased 
        - sales_channel_id_lst (list[list[int]]): Sales channel of a transaction (repeated for each item within a transaction)
        - total_price_lst (list[float]): Value of each transaction
        - price_lst (list[float]): Flattened list of prices of all items customer purchased
        - num_items_lst (list[int]): Number of items in each transaction
        - CLV_label (float): Sales in prediction period (label to be used)
    """
    train_df, test_df = get_customer_train_test_articles_pl(
        data_path=data_path,
        config=config,
        clv_periods=config.get("clv_periods", [6]),
        cols_to_aggregate=cols_to_aggregate,
        keep_customer_id=keep_customer_id,
    )
    train_df, val_df, test_df = train_test_split(
        train_df=train_df,
        test_df=test_df,
        subset=config.get("subset"),
        train_subsample_percentage=config.get("train_subsample_percentage"),
    )
    return train_df, val_df, test_df

In [None]:
config = {
    "train_begin": "2018-09-20",
    "train_label_begin": "2019-09-20",
    "train_end": "2020-03-17",
    "test_begin": "2019-03-19",
    "test_label_begin": "2020-03-18",
    "test_end": "2020-09-13",
    "min_zip_code_count": 3,
    "date_aggregation": "daily",
    "group_by_channel_id": False,
    "log_clv": False,
    "clv_periods": [6],
    "subset": None,
    "train_subsample_percentage": None,
    "max_length":20, # DEFINE HOW MANY ITEMS ARE TO BE CONSIDERED IN TRANSFORMER SEQUENCE
}
# data_path = Path("/kaggle/input/hm-dataset/data/data")
data_path = Path("/kaggle/input/data/data/")

print(10 * "#", " Loading data ", 10 * "#")
train_df, val_df, test_df = get_benchmark_dfs(data_path, config)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.special  # for hyp2f1
import numpy as np
import pandas as pd

In [None]:
!pip install lifetimes

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.special  # for hyp2f1
import numpy as np
import pandas as pd

# If you are using Polars, import it (optional)
try:
    import polars as pl
except ImportError:
    pl = None


class BGNBDModel(nn.Module):
    """
    BG/NBD model for transaction frequency:
      - r, alpha : Gamma-Poisson mixture for transaction rates
      - a, b     : Beta-Geometric mixture for dropout
    """
    def __init__(self, init_params=None):
        super(BGNBDModel, self).__init__()
        if init_params is None:
            init_params = {'r': 1.0, 'alpha': 1.0, 'a': 1.0, 'b': 1.0}
        self.log_r = nn.Parameter(torch.log(torch.tensor(init_params['r'], dtype=torch.float32)))
        self.log_alpha = nn.Parameter(torch.log(torch.tensor(init_params['alpha'], dtype=torch.float32)))
        self.log_a = nn.Parameter(torch.log(torch.tensor(init_params['a'], dtype=torch.float32)))
        self.log_b = nn.Parameter(torch.log(torch.tensor(init_params['b'], dtype=torch.float32)))

    def forward(self, x, t_x, T):
        """
        Compute the log-likelihood (LL) for each customer under BG/NBD.
        """
        # Ensure inputs are floats
        x   = x.float()
        t_x = t_x.float()
        T   = T.float()

        # Recover parameters
        r = torch.exp(self.log_r)
        alpha = torch.exp(self.log_alpha)
        a = torch.exp(self.log_a)
        b = torch.exp(self.log_b)

        ll = torch.empty_like(x)

        mask0 = (x == 0)
        mask1 = (x > 0)

        # --- x = 0 case ---
        if mask0.any():
            T0 = T[mask0]
            ll0 = (r * torch.log(alpha)
                   - r * torch.log(alpha + T0)
                   + torch.log(b)
                   - torch.log(a + b))
            ll[mask0] = ll0

        # --- x > 0 case ---
        if mask1.any():
            x1   = x[mask1]
            t_x1 = t_x[mask1]
            T1   = T[mask1]

            term1 = ( torch.lgamma(r + x1)
                      - torch.lgamma(r)
                      - torch.lgamma(x1 + 1) )
            term2 = r * ( torch.log(alpha) - torch.log(alpha + T1) )
            term3 = x1 * ( torch.log(T1 - t_x1) - torch.log(alpha + T1) )
            term4 = ( torch.log(a)
                      + torch.lgamma(a + b)
                      - torch.lgamma(a)
                      - torch.lgamma(a + b + x1)
                      + torch.lgamma(a + x1) )
            z = (T1 - t_x1) / (alpha + T1)
            # hypergeometric term
            hyp_val = torch.special.hyp2f1(r + x1, a, a + b + x1, z)
            term5 = torch.log(hyp_val + 1e-30)

            ll1 = term1 + term2 + term3 + term4 + term5
            ll[mask1] = ll1

        return ll

    def negative_log_likelihood(self, x, t_x, T):
        ll = self.forward(x, t_x, T)
        return -ll.sum()

    def expected_transactions(self, x, t_x, T, t_future=10.0):
        """
        Approximate E[# of transactions in (T, T+t_future)].
        We'll use a simplified approximation:
            E[X(t_future)|...] ≈ p_alive * ((r + x)/(alpha + T)) * t_future
        where p_alive is computed using a formula from Hardie's notes.
        """
        x   = x.float()
        t_x = t_x.float()
        T   = T.float()

        r = torch.exp(self.log_r)
        alpha = torch.exp(self.log_alpha)
        a = torch.exp(self.log_a)
        b = torch.exp(self.log_b)

        # Compute log_factor for p_alive
        log_factor = ( torch.lgamma(a + 1)
                       + torch.lgamma(b + x)
                       - torch.lgamma(a)
                       - torch.lgamma(b + x + 1) )
        log_factor += (r + x) * ( torch.log(alpha + T) - torch.log(alpha + t_x + 1e-8) )
        factor = torch.exp(log_factor)
        p_alive = 1.0 / (1.0 + factor)

        return p_alive * ((r + x) / (alpha + T + 1e-8)) * t_future


class GammaGammaModel(nn.Module):
    """
    Gamma–Gamma model for monetary value.
    """
    def __init__(self, init_params=None):
        super(GammaGammaModel, self).__init__()
        if init_params is None:
            init_params = {'p': 1.0, 'q': 1.0, 'v': 1.0}
        self.log_p = nn.Parameter(torch.log(torch.tensor(init_params['p'], dtype=torch.float32)))
        self.log_q = nn.Parameter(torch.log(torch.tensor(init_params['q'], dtype=torch.float32)))
        self.log_v = nn.Parameter(torch.log(torch.tensor(init_params['v'], dtype=torch.float32)))

    def forward(self, x, m):
        """
        Compute the log-likelihood for the Gamma–Gamma model.
        """
        x = x.float()
        m = m.float()
        p = torch.exp(self.log_p)
        q = torch.exp(self.log_q)
        v = torch.exp(self.log_v)
        eps = 1e-30
        term1 = torch.lgamma(p + q*x) - torch.lgamma(p) - torch.lgamma(q*x + eps)
        term2 = p * torch.log(v + eps)
        term3 = (q - 1) * x * torch.log(m + eps)
        term4 = - (p + q*x) * torch.log(v + x*m + eps)
        return term1 + term2 + term3 + term4

    def negative_log_likelihood(self, x, m):
        ll = self.forward(x, m)
        return -ll.sum()

    def conditional_expected_value(self, x, m):
        """
        Compute E[m|x, m] = (p + q*x) / (v + x*m).
        """
        x = x.float()
        m = m.float()
        p = torch.exp(self.log_p)
        q = torch.exp(self.log_q)
        v = torch.exp(self.log_v)
        eps = 1e-8
        return (p + q * x) / (v + x*m + eps)


class CompositeCLVModel(nn.Module):
    """
    Composite model that predicts future CLV as:
         Predicted CLV = E[# future transactions] * E[monetary value]
    """
    def __init__(self, init_bgnbd=None, init_gg=None, t_future=10.0):
        super(CompositeCLVModel, self).__init__()
        self.bgnbd = BGNBDModel(init_params=init_bgnbd)
        self.ggamma = GammaGammaModel(init_params=init_gg)
        self.t_future = t_future

    def forward(self, x, t_x, T, m):
        count_pred = self.bgnbd.expected_transactions(x, t_x, T, self.t_future)
        value_pred = self.ggamma.conditional_expected_value(x, m)
        return count_pred * value_pred

    def loss_mse(self, x, t_x, T, m, actual_spend):
        pred = self.forward(x, t_x, T, m)
        return torch.mean((pred - actual_spend)**2)


def parse_x_t_x_T(row):
    """
    Parse (x, t_x, T) from 'days_before_lst'.
    """
    days_list = row['days_before_lst']
    if not isinstance(days_list, list) or len(days_list) == 0:
        return 0, 0.0, 0.0
    else:
        x = len(days_list)
        T = float(sum(days_list))
        t_x = float(days_list[-1])
        return x, t_x, T

def parse_avg_monetary_value(row):
    """
    Parse a dummy average monetary value from 'articles_ids_lst'.
    Here we define: m = 20 + 0.1 * (number of articles).
    """
    arts = row['articles_ids_lst']
    if not isinstance(arts, list) or len(arts) == 0:
        return 20.0
    else:
        return 20.0 + 0.1 * len(arts)

def build_tensor_dataset(df):
    """
    Build PyTorch tensors from the DataFrame.
    Expects columns:
      - 'days_before_lst'
      - 'articles_ids_lst'
      - 'regression_label'
    """
    # If the DataFrame is a Polars DataFrame, convert it to Pandas.
    if not hasattr(df, "iterrows"):
        df = df.to_pandas()
        
    x_list, t_x_list, T_list, m_list, spend_list = [], [], [], [], []
    for _, row in df.iterrows():
        x_val, tx_val, T_val = parse_x_t_x_T(row)
        m_val = parse_avg_monetary_value(row)
        clv = row.get('regression_label', 0.0)
        x_list.append(x_val)
        t_x_list.append(tx_val)
        T_list.append(T_val)
        m_list.append(m_val)
        spend_list.append(clv)
    x_ten = torch.tensor(x_list, dtype=torch.float32)
    t_x_ten = torch.tensor(t_x_list, dtype=torch.float32)
    T_ten = torch.tensor(T_list, dtype=torch.float32)
    m_ten = torch.tensor(m_list, dtype=torch.float32)
    spend_ten = torch.tensor(spend_list, dtype=torch.float32)
    return x_ten, t_x_ten, T_ten, m_ten, spend_ten


def train_and_validate(train_df, val_df, test_df):
    # Build tensors from DataFrames
    x_train, t_x_train, T_train, m_train, spend_train = build_tensor_dataset(train_df)
    x_val, t_x_val, T_val, m_val, spend_val = build_tensor_dataset(val_df)
    x_test, t_x_test, T_test, m_test, spend_test = build_tensor_dataset(test_df)

    # Create composite model
    model = CompositeCLVModel(
        init_bgnbd={'r':1.0, 'alpha':1.0, 'a':1.0, 'b':1.0},
        init_gg={'p':1.0, 'q':1.0, 'v':1.0},
        t_future=10.0
    )
    optimizer = optim.Adam(model.parameters(), lr=1e-2)
    n_epochs = 500

    for epoch in range(n_epochs):
        optimizer.zero_grad()
        loss = model.loss_mse(x_train, t_x_train, T_train, m_train, spend_train)
        loss.backward()
        optimizer.step()
        if (epoch+1) % 100 == 0:
            with torch.no_grad():
                val_preds = model.forward(x_val, t_x_val, T_val, m_val)
                val_loss = torch.mean((val_preds - spend_val)**2)
            print(f"Epoch {epoch+1}/{n_epochs} | Train MSE: {loss.item():.4f} | Val MSE: {val_loss.item():.4f}")

    with torch.no_grad():
        test_preds = model.forward(x_test, t_x_test, T_test, m_test)
        test_mse = torch.mean((test_preds - spend_test)**2).item()
    print(f"\nFinal Test MSE: {test_mse:.4f}")

    # Print learned parameters
    print("\nLearned BG/NBD parameters:")
    print("r     =", torch.exp(model.bgnbd.log_r).item())
    print("alpha =", torch.exp(model.bgnbd.log_alpha).item())
    print("a     =", torch.exp(model.bgnbd.log_a).item())
    print("b     =", torch.exp(model.bgnbd.log_b).item())

    print("\nLearned Gamma–Gamma parameters:")
    print("p =", torch.exp(model.ggamma.log_p).item())
    print("q =", torch.exp(model.ggamma.log_q).item())
    print("v =", torch.exp(model.ggamma.log_v).item())


train_and_validate(train_df, val_df, test_df)


In [None]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

# Import official BG–NBD and Gamma–Gamma from lifetimes:
from lifetimes import BetaGeoFitter, GammaGammaFitter

# Import Pareto–NBD and MBG/NBD fitters from lifetimes (assumed to be available in your version)
from lifetimes import ParetoNBDFitter, ModifiedBetaGeoFitter

# --- Utility: Convert to Pandas if needed ---
def to_pandas(df):
    if not hasattr(df, "iterrows"):
        return df.to_pandas()
    return df

# --- Feature Extraction ---
def extract_history_features(df):
    """
    For each customer, extract:
      - frequency: number of repeat purchases = (# transactions - 1)
      - recency: last transaction – first transaction (in days)
      - T: calibration period (here defined as recency; adjust as needed)
      - monetary_value: estimate = 20 + 0.1 * (number of articles in articles_ids_lst)
      - regression_label: true future spend
    """
    frequencies, recencies, Ts, monetary_values = [], [], [], []
    df = to_pandas(df)
    for idx, row in df.iterrows():
        days = row['days_before_lst']
        if not isinstance(days, list) or len(days)==0:
            frequencies.append(0)
            recencies.append(0.0)
            Ts.append(0.0)
        else:
            x = len(days)
            frequencies.append(max(x - 1, 0))
            recencies.append(days[-1] - days[0])
            Ts.append(days[-1] - days[0])
        arts = row['articles_ids_lst']
        if not isinstance(arts, list) or len(arts)==0:
            monetary_values.append(20.0)
        else:
            monetary_values.append(20.0 + 0.1 * len(arts))
    df_out = pd.DataFrame({
        'frequency': frequencies,
        'recency': recencies,
        'T': Ts,
        'monetary_value': monetary_values,
        'regression_label': df['regression_label']
    })
    return df_out

# --- Predicted Future Transactions and Monetary Value ---
def predict_future_transactions(fitter, frequency, recency, T, t_future):
    """
    Compute:
      E[X(t_future)|frequency, recency, T] = conditional_probability_alive *
          ((r + frequency) / (alpha + T)) * t_future
    This formula is used in BetaGeoFitter and assumed to be similarly available in ParetoNBDFitter and MBGNBDFitter.
    """
    p_alive = fitter.conditional_probability_alive(frequency, recency, T)
    return p_alive * ((fitter.r_ + frequency) / (fitter.alpha_ + T)) * t_future

def predict_monetary_value(ggf, frequency, monetary_value):
    """
    Compute:
       E[m|frequency, monetary_value] = (p + q*frequency) / (v + frequency * monetary_value)
    """
    return (ggf.p_ + ggf.q_ * frequency) / (ggf.v_ + frequency * monetary_value + 1e-8)

def compute_predicted_clv(fitter, ggf, features_df, t_future):
    """
    For each row in features_df, compute:
      predicted_CLV = E[X(t_future)] * E[m]
    """
    predictions = []
    for idx, row in features_df.iterrows():
        freq = row['frequency']
        rec = row['recency']
        T_val = row['T']
        mon_val = row['monetary_value']
        exp_tx = predict_future_transactions(fitter, freq, rec, T_val, t_future)
        exp_m = predict_monetary_value(ggf, freq, mon_val)
        predictions.append(exp_tx * exp_m)
    df_out = features_df.copy()
    df_out['predicted_clv'] = predictions
    return df_out

# --- Main Pipeline Function ---
def main_pipeline(train_df, val_df, test_df, t_future=12):
    """
    Calibrate the different transaction models (BG–NBD, Pareto–NBD, MBG/NBD) on train_df,
    calibrate a Gamma–Gamma model on train_df (for customers with frequency > 0),
    then compute predicted CLV for each dataset.
    Returns a dictionary with results for each model.
    """
    # Convert to Pandas and extract features
    train_features = extract_history_features(train_df)
    val_features = extract_history_features(val_df)
    test_features = extract_history_features(test_df)
    
    results = {}
    
    # --- BG–NBD + Gamma–Gamma ---
    bgf = BetaGeoFitter(penalizer_coef=0.0)
    bgf.fit(train_features['frequency'], train_features['recency'], train_features['T'])
    mask = train_features['frequency'] > 0
    ggf_bg = GammaGammaFitter(penalizer_coef=0.0)
    ggf_bg.fit(train_features.loc[mask, 'monetary_value'], train_features.loc[mask, 'frequency'])
    results['BG-NBD'] = {
        'train': compute_predicted_clv(bgf, ggf_bg, train_features, t_future),
        'val': compute_predicted_clv(bgf, ggf_bg, val_features, t_future),
        'test': compute_predicted_clv(bgf, ggf_bg, test_features, t_future)
    }
    
    # --- Pareto–NBD + Gamma–Gamma ---
    pareto_fitter = ParetoNBDFitter(penalizer_coef=0.0)
    pareto_fitter.fit(train_features['frequency'], train_features['recency'], train_features['T'])
    # Use same GammaGammaFitter as above (or fit separately if desired)
    results['Pareto-NBD'] = {
        'train': compute_predicted_clv(pareto_fitter, ggf_bg, train_features, t_future),
        'val': compute_predicted_clv(pareto_fitter, ggf_bg, val_features, t_future),
        'test': compute_predicted_clv(pareto_fitter, ggf_bg, test_features, t_future)
    }
    
    # --- MBG/NBD + Gamma–Gamma ---
    mbg_fitter = MBGNBDFitter(penalizer_coef=0.0)
    mbg_fitter.fit(train_features['frequency'], train_features['recency'], train_features['T'])
    results['MBG-NBD'] = {
        'train': compute_predicted_clv(mbg_fitter, ggf_bg, train_features, t_future),
        'val': compute_predicted_clv(mbg_fitter, ggf_bg, val_features, t_future),
        'test': compute_predicted_clv(mbg_fitter, ggf_bg, test_features, t_future)
    }
    
    return results

# --- Example Usage ---

    # Run the pipeline
results = main_pipeline(train_df, val_df, test_df, t_future=12)
    
print("BG–NBD + Gamma–Gamma Predictions (Test):")
print(results['BG-NBD']['test'])
print("\nPareto–NBD + Gamma–Gamma Predictions (Test):")
print(results['Pareto-NBD']['test'])
print("\nMBG–NBD + Gamma–Gamma Predictions (Test):")
print(results['MBG-NBD']['test'])


In [None]:
import numpy as np
import pandas as pd
import datetime
import warnings
warnings.filterwarnings("ignore")

# Import lifetimes models
from lifetimes import BetaGeoFitter, GammaGammaFitter

# If using Polars, convert to Pandas
def to_pandas(df):
    if not hasattr(df, "iterrows"):
        return df.to_pandas()
    return df

# --- Feature Extraction ---
def extract_history_features(df):
    """
    For each customer, compute:
      - frequency: number of repeat purchases (total transactions - 1)
      - recency: difference between the last and first transaction (in days)
      - T: observation period (here, set equal to recency)
      - monetary_value: estimated as 20 + 0.1 * (number of articles)
    """
    frequencies = []
    recencies = []
    Ts = []
    monetary_values = []
    
    df = to_pandas(df)
    for idx, row in df.iterrows():
        days = row['days_before_lst']
        if not isinstance(days, list) or len(days) == 0:
            frequencies.append(0)
            recencies.append(0.0)
            Ts.append(0.0)
        else:
            x = len(days)
            frequencies.append(max(x - 1, 0))
            recencies.append(days[-1] - days[0])
            Ts.append(days[-1] - days[0])  # In practice, T can be defined differently
        arts = row['articles_ids_lst']
        if not isinstance(arts, list) or len(arts) == 0:
            monetary_values.append(20.0)
        else:
            monetary_values.append(20.0 + 0.1 * len(arts))
    
    df_new = pd.DataFrame({
        'frequency': frequencies,
        'recency': recencies,
        'T': Ts,
        'monetary_value': monetary_values,
        'regression_label': df['regression_label']
    })
    return df_new

# --- CLV Calculation Without customer_lifetime_value ---
def compute_predicted_clv(bgf, ggf, frequency, recency, T, monetary_value, t_future):
    """
    Computes predicted CLV as:
      predicted_CLV = E[# future txns] * E[monetary value],
    where:
      p_alive = bgf.conditional_probability_alive(frequency, recency, T)
      E[# future txns] = p_alive * ((r + frequency) / (alpha + T)) * t_future
      E[m] = (ggf.p_ + ggf.q_ * frequency) / (ggf.v_ + frequency * monetary_value)
    """
    # Compute probability alive (returns a numpy array)
    p_alive = bgf.conditional_probability_alive(frequency, recency, T)
    # Expected transactions in future period
    expected_txns = p_alive * ((bgf.r_ + frequency) / (bgf.alpha_ + T)) * t_future
    # Expected monetary value
    expected_m = (ggf.p_ + ggf.q_ * frequency) / (ggf.v_ + frequency * monetary_value + 1e-8)
    return expected_txns * expected_m

def compute_clv_custom(train_df, val_df, test_df, t_future=12):
    """
    Calibrate the BG/NBD and Gamma–Gamma models on train_df,
    then compute predicted CLV (using our custom function) for train, val, and test.
    """
    # Convert to Pandas if needed
    train_df = to_pandas(train_df)
    val_df = to_pandas(val_df)
    test_df = to_pandas(test_df)
    
    # Extract historical features
    train_hist = extract_history_features(train_df)
    val_hist = extract_history_features(val_df)
    test_hist = extract_history_features(test_df)
    
    # Fit BG/NBD on training data
    bgf = BetaGeoFitter(penalizer_coef=0.0)
    bgf.fit(train_hist['frequency'], train_hist['recency'], train_hist['T'])
    
    # Fit Gamma–Gamma on customers with frequency > 0
    mask = train_hist['frequency'] > 0
    ggf = GammaGammaFitter(penalizer_coef=0.0)
    ggf.fit(train_hist.loc[mask, 'monetary_value'], train_hist.loc[mask, 'frequency'])
    
    # Compute predicted CLV for each dataset
    train_hist['predicted_clv'] = train_hist.apply(
        lambda row: compute_predicted_clv(bgf, ggf,
                                          row['frequency'], row['recency'], row['T'],
                                          row['monetary_value'], t_future), axis=1)
    val_hist['predicted_clv'] = val_hist.apply(
        lambda row: compute_predicted_clv(bgf, ggf,
                                          row['frequency'], row['recency'], row['T'],
                                          row['monetary_value'], t_future), axis=1)
    test_hist['predicted_clv'] = test_hist.apply(
        lambda row: compute_predicted_clv(bgf, ggf,
                                          row['frequency'], row['recency'], row['T'],
                                          row['monetary_value'], t_future), axis=1)
    return train_hist, val_hist, test_hist, bgf, ggf

print("=== CLV Prediction Without customer_lifetime_value ===")

train_hist, val_hist, test_hist, bgf, ggf = compute_clv_custom(train_df, val_df, test_df, t_future=12)
    
print("\n--- Train Data ---")
print(train_hist)
print("\n--- Validation Data ---")
print(val_hist)
print("\n--- Test Data ---")
print(test_hist)


In [None]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

# Import the official BG–NBD and Gamma–Gamma models from lifetimes
from lifetimes import BetaGeoFitter, GammaGammaFitter

from lifetimes import ParetoNBDFitter, ModifiedBetaGeoFitter


def to_pandas(df):
    """If df is a Polars DataFrame, convert it to Pandas."""
    if not hasattr(df, "iterrows"):
        return df.to_pandas()
    return df

def impute_and_extract_features(df):
    """
    Impute missing values and extract key features for CLV modeling.
    Expected columns:
      - days_before_lst: list of transaction times (in days); if NaN, impute as empty list.
      - articles_ids_lst: list of article IDs; if NaN, impute as empty list.
      - regression_label: the true future spend (CLV); if NaN, impute as 0.
      
    Then, compute:
      - frequency: # transactions - 1 (if no transaction, 0)
      - recency: difference between last and first transaction (in days)
      - T: calibration period (here set equal to recency; adapt if needed)
      - monetary_value: estimate = 20 + 0.1 * (number of articles)
    """
    df = to_pandas(df).copy()
    
    # Impute missing lists with empty lists:
    df['days_before_lst'] = df['days_before_lst'].apply(lambda x: x if isinstance(x, list) else [] if pd.isnull(x) else x)
    df['articles_ids_lst'] = df['articles_ids_lst'].apply(lambda x: x if isinstance(x, list) else [] if pd.isnull(x) else x)
    # Impute missing regression_label with 0
    df['regression_label'] = df['regression_label'].fillna(0)
    
    frequencies, recencies, Ts, monetary_values = [], [], [], []
    
    for idx, row in df.iterrows():
        days = row['days_before_lst']
        if not days or len(days)==0:
            frequencies.append(0)
            recencies.append(0.0)
            Ts.append(0.0)
        else:
            x = len(days)
            frequencies.append(max(x-1, 0))
            recencies.append(days[-1] - days[0])
            Ts.append(days[-1] - days[0])
        arts = row['articles_ids_lst']
        if not arts or len(arts)==0:
            monetary_values.append(20.0)
        else:
            monetary_values.append(20.0 + 0.1 * len(arts))
    
    features = pd.DataFrame({
        'frequency': frequencies,
        'recency': recencies,
        'T': Ts,
        'monetary_value': monetary_values,
        'regression_label': df['regression_label']
    })
    return features


def predict_future_transactions(fitter, frequency, recency, T, t_future):
    """
    Compute the expected number of future transactions over a period t_future:
      E[X(t_future)] = p_alive * ((r + frequency) / (alpha + T)) * t_future
    Here, p_alive is obtained via fitter.conditional_probability_alive.
    This method is available in BetaGeoFitter, and we assume ParetoNBDFitter and MBGNBDFitter
    mimic the same API.
    """
    p_alive = fitter.conditional_probability_alive(frequency, recency, T)
    return p_alive * ((fitter.r_ + frequency) / (fitter.alpha_ + T)) * t_future

def predict_monetary_value(ggf, frequency, monetary_value):
    """
    Compute the expected average monetary value:
      E[m] = (p + q*frequency) / (v + frequency * monetary_value)
    """
    return (ggf.p_ + ggf.q_ * frequency) / (ggf.v_ + frequency * monetary_value + 1e-8)

def compute_predicted_clv(fitter, ggf, features_df, t_future):
    """
    For each record, compute:
       predicted_CLV = E[X(t_future)] * E[m]
    """
    predictions = []
    for idx, row in features_df.iterrows():
        freq = row['frequency']
        rec = row['recency']
        T_val = row['T']
        mon_val = row['monetary_value']
        exp_tx = predict_future_transactions(fitter, freq, rec, T_val, t_future)
        exp_m = predict_monetary_value(ggf, freq, mon_val)
        predictions.append(exp_tx * exp_m)
    features_df = features_df.copy()
    features_df['predicted_clv'] = predictions
    return features_df


def main_pipeline(train_df, val_df, test_df, t_future=12, discount_rate=0.01):
    """
    For each model variant (BG–NBD, Pareto–NBD, MBG/NBD) calibrate on train_df,
    then compute predicted CLV on train, validation, and test sets.
    Extra columns (like customer_id, age, postal_code) are ignored.
    """
    # Extract features (with imputation)
    train_features = impute_and_extract_features(train_df)
    val_features = impute_and_extract_features(val_df)
    test_features = impute_and_extract_features(test_df)
    
    results = {}
    
    # 1. BG–NBD + Gamma–Gamma using lifetimes’ BetaGeoFitter and GammaGammaFitter
    bgf = BetaGeoFitter(penalizer_coef=0.0)
    bgf.fit(train_features['frequency'], train_features['recency'], train_features['T'])
    # Fit Gamma–Gamma on customers with frequency > 0
    mask = train_features['frequency'] > 0
    ggf_bg = GammaGammaFitter(penalizer_coef=0.0)
    ggf_bg.fit(train_features.loc[mask, 'monetary_value'], train_features.loc[mask, 'frequency'])
    results['BG-NBD'] = {
        'train': compute_predicted_clv(bgf, ggf_bg, train_features, t_future),
        'val': compute_predicted_clv(bgf, ggf_bg, val_features, t_future),
        'test': compute_predicted_clv(bgf, ggf_bg, test_features, t_future)
    }
    
    # 2. Pareto–NBD + Gamma–Gamma using lifetimes’ ParetoNBDFitter and GammaGammaFitter
    pareto_fitter = ParetoNBDFitter(penalizer_coef=0.0)
    pareto_fitter.fit(train_features['frequency'], train_features['recency'], train_features['T'])
    results['Pareto-NBD'] = {
        'train': compute_predicted_clv(pareto_fitter, ggf_bg, train_features, t_future),
        'val': compute_predicted_clv(pareto_fitter, ggf_bg, val_features, t_future),
        'test': compute_predicted_clv(pareto_fitter, ggf_bg, test_features, t_future)
    }
    
    # 3. MBG/NBD + Gamma–Gamma using lifetimes’ MBGNBDFitter and GammaGammaFitter
    mbg_fitter = MBGNBDFitter(penalizer_coef=0.0)
    mbg_fitter.fit(train_features['frequency'], train_features['recency'], train_features['T'])
    results['MBG-NBD'] = {
        'train': compute_predicted_clv(mbg_fitter, ggf_bg, train_features, t_future),
        'val': compute_predicted_clv(mbg_fitter, ggf_bg, val_features, t_future),
        'test': compute_predicted_clv(mbg_fitter, ggf_bg, test_features, t_future)
    }
    
    return results



results = main_pipeline(train_df, val_df, test_df, t_future=12)
    
print("=== BG–NBD + Gamma–Gamma Predictions (Test) ===")
print(results['BG-NBD']['test'])
print("\n=== Pareto–NBD + Gamma–Gamma Predictions (Test) ===")
print(results['Pareto-NBD']['test'])
print("\n=== MBG–NBD + Gamma–Gamma Predictions (Test) ===")
print(results['MBG-NBD']['test'])
