# MCC Spend Ratio Feature Pipeline (Spark)

Computes the ratio of each transaction amount to the average spend at its
Merchant Category (MCC) over the last 90 days. Stores the result in the
`mcc_spend_ratio` feature group (offline only).

**Input feature groups** (from `inv datamart`):
- `credit_card_transactions`: t_id, cc_num, merchant_id, amount, ts, ...
- `merchant_details`: merchant_id, category, ...

**Output feature group**:
- `mcc_spend_ratio`: cc_num (PK), merchant_id, ts (event_time), mcc_spend_ratio_90d

In [None]:
import sys
from pathlib import Path

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import broadcast

import hopsworks
import hsfs

spark.conf.set("spark.sql.adaptive.enabled", "true")

In [None]:
!pip install pydantic_settings

In [None]:
# # Setup paths and config
# root_dir = Path().absolute()
# # Strip /notebooks or /ccfraud from path if notebook started in one of these subdirectories
# if root_dir.parts[-1:] == ('notebooks',):
#     root_dir = Path(*root_dir.parts[:-1])
# if root_dir.parts[-1:] == ('ccfraud',):
#     root_dir = Path(*root_dir.parts[:-1])
# root_dir = str(root_dir)

# sys.path.insert(0, root_dir)
# sys.path.insert(0, str(Path(root_dir) / 'ccfraud'))

# print(f'Root dir: {root_dir}')

# # from mlfs import config
# # settings = config.HopsworksSettings(_env_file=f'{root_dir}/.env')

In [None]:
# # Initialize Spark
# spark = SparkSession.builder \
#     .appName('MCC Spend Ratio Feature Pipeline') \
#     .getOrCreate()

In [None]:
# Connect to Hopsworks
project = hopsworks.login()
fs = project.get_feature_store()

In [None]:
# Read source feature groups
trans_fg = fs.get_feature_group('credit_card_transactions', version=1)
merchant_fg = fs.get_feature_group('merchant_details', version=1)

trans_df = trans_fg.read(dataframe_type='spark')
merchant_df = merchant_fg.read(dataframe_type='spark')

# print(f'Transactions: {trans_df.count()} rows')
# print(f'Merchants: {merchant_df.count()} rows')

In [None]:
# Join transactions with merchants to get category (MCC)

trans_with_cat = (
    trans_df.join(broadcast(merchant_df), on="merchant_id", how="inner")
    .withColumn("day", F.to_date("ts"))
)

# trans_with_cat = trans_df.join(
#     merchant_df.select('merchant_id', 'category'),
#     on='merchant_id',
#     how='inner'
# )
trans_with_cat.printSchema()

In [None]:
deduped = (
    trans_with_cat
    .groupBy("cc_num", "ts")
    .agg(F.max(F.struct("t_id", "merchant_id", "category", "day", "amount")).alias("r"))
    .select(
        "cc_num", "ts",
        F.col("r.t_id").alias("t_id"),
        F.col("r.merchant_id").alias("merchant_id"),
        F.col("r.category").alias("category"),
        F.col("r.day").alias("day"),
        F.col("r.amount").alias("amount"),
    )
)

# Daily aggregates per category (MUCH smaller than per-transaction rolling)
daily = (
    deduped
    .groupBy("category", "day")
    .agg(
        F.sum("amount").alias("day_amount_sum"),
        F.count(F.lit(1)).alias("day_tx_count"),
    )
)

# 90-day rolling sums over days
w90 = (
    Window.partitionBy("category")
    .orderBy(F.col("day").cast("timestamp").cast("long"))  # monotonic
    .rangeBetween(-89 * 86400, 0)  # last 90 days inclusive of current day
)

daily_roll = (
    daily
    .withColumn("roll_amount_sum_90d", F.sum("day_amount_sum").over(w90))
    .withColumn("roll_tx_count_90d", F.sum("day_tx_count").over(w90))
    .withColumn(
        "avg_amount_mcc_90d",
        F.when(F.col("roll_tx_count_90d") == 0, F.lit(None)).otherwise(
            F.col("roll_amount_sum_90d") / F.col("roll_tx_count_90d")
        )
    )
    .select("category", "day", "avg_amount_mcc_90d")
)

# Join rolling average back to each transaction (category, day)
with_avg = (
    deduped
    .join(daily_roll, on=["category", "day"], how="left")
)

# Ratio
# Cast ts from timestamp_ntz to timestamp (with timezone) for Hopsworks event_time compatibility
result_df = (
    with_avg
    .withColumn(
        "mcc_spend_ratio_90d",
        F.when(
            (F.col("avg_amount_mcc_90d").isNull()) | (F.col("avg_amount_mcc_90d") == 0),
            F.lit(None)
        ).otherwise(F.col("amount") / F.col("avg_amount_mcc_90d"))
    )
    .withColumn("ts", F.col("ts").cast("timestamp"))
    .select("cc_num", "merchant_id", "ts", "mcc_spend_ratio_90d")
)

In [None]:
# # Compute 90-day rolling average spend per MCC
# days_90_seconds = 90 * 24 * 60 * 60
# window_spec = (
#     Window.partitionBy('category')
#     .orderBy(F.unix_timestamp('ts'))
#     .rangeBetween(-days_90_seconds, 0)
# )

# trans_with_avg = trans_with_cat.withColumn(
#     'avg_amount_mcc_90d', F.avg('amount').over(window_spec)
# )

In [None]:
# # Compute ratio (handle null/zero avg)
# trans_with_ratio = trans_with_avg.withColumn(
#     'mcc_spend_ratio_90d',
#     F.when(
#         (F.col('avg_amount_mcc_90d').isNull()) | (F.col('avg_amount_mcc_90d') == 0),
#         None
#     ).otherwise(
#         F.col('amount') / F.col('avg_amount_mcc_90d')
#     )
# )

In [None]:
# # Deduplicate on (cc_num, ts), keeping last
# dedup_window = Window.partitionBy('cc_num', 'ts').orderBy(F.col('t_id').desc())
# trans_deduped = (
#     trans_with_ratio
#     .withColumn('_row_num', F.row_number().over(dedup_window))
#     .filter(F.col('_row_num') == 1)
#     .drop('_row_num')
# )

# # Select final columns
# result_df = trans_deduped.select('cc_num', 'merchant_id', 'ts', 'mcc_spend_ratio_90d')
# print(f'Result rows: {result_df.count()}')

In [None]:
# Create or get the feature group
mcc_fg = fs.get_or_create_feature_group(
    name='mcc_spend_ratio',
    version=1,
    description='Ratio of transaction amount to the average spend at the same Merchant Category (MCC) over the last 90 days',
    primary_key=['cc_num'],
    event_time='ts',
    online_enabled=False,
)

# Insert into feature group
mcc_fg.insert(result_df, wait=True)
print('MCC spend ratio feature pipeline completed successfully!')

In [None]:
# spark.stop()