# Graph Feature Generator
---
**Purpose:** Add graph-based AML features to the transaction table (with additional features).

**Input:** `transaction_additional_feature.parquet` (or CSV fallback) from `outputs/`.

**Output:** `transaction_with_graph_features.parquet` in `outputs/`.

**Features:**
- **Sender-side:** in/out degree, total inflow/outflow, unique counterparties, repeat counterparty ratio (30d).
- **Receiver-side:** in/out degree, unique senders (30d) — for internal transfers.
- **High-signal:** pass-through ratios (24h/7d), outflow-to-inflow ratio, avg time gap in→out, accounts/devices per device/account, device shared high-risk ratio, shared device fraud count.

Config is in `config/base_config.ipynb` under `GRAPH_FEATURE_CONFIG`.

In [1]:
# ── Imports & config ─────────────────────────────────────────────────────────
import warnings
from pathlib import Path

import numpy as np
import pandas as pd

warnings.filterwarnings("ignore")

# Config: set GRAPH_FEATURE_CONFIG in config/base_config.ipynb (section 11b) or use inline below.
try:
    from config.base_config import GRAPH_FEATURE_CONFIG
except Exception:
    GRAPH_FEATURE_CONFIG = {
        "input_path"       : "transaction_additional_feature.parquet",
        "output_path"      : "transaction_with_graph_features.parquet",
        "input_dir"        : "outputs",
        "rolling_days"     : 30,
        "rolling_days_7d"  : 7,
        "rolling_hours_24h": 24,
        "features_enabled" : [
            "sender_in_degree_30d", "sender_out_degree_30d", "sender_total_inflow_30d",
            "sender_total_outflow_30d", "sender_unique_counterparties_30d", "sender_repeat_counterparty_ratio",
            "receiver_in_degree_30d", "receiver_out_degree_30d", "receiver_unique_senders_30d",
            "pass_through_ratio_24h", "pass_through_ratio_7d", "outflow_to_inflow_ratio_7d",
            "avg_time_gap_in_out", "accounts_per_device", "devices_per_account",
            "device_shared_high_risk_ratio", "shared_device_fraud_count",
        ],
    }

INPUT_DIR   = Path(GRAPH_FEATURE_CONFIG["input_dir"])
INPUT_PATH  = INPUT_DIR / GRAPH_FEATURE_CONFIG["input_path"]
OUTPUT_PATH = INPUT_DIR / GRAPH_FEATURE_CONFIG["output_path"]
ROLLING_30D = f"{GRAPH_FEATURE_CONFIG['rolling_days']}d"
ROLLING_7D  = f"{GRAPH_FEATURE_CONFIG['rolling_days_7d']}d"
ROLLING_24H = f"{GRAPH_FEATURE_CONFIG['rolling_hours_24h']}h"
FEATURES    = GRAPH_FEATURE_CONFIG["features_enabled"]

def load_transactions() -> pd.DataFrame:
    if INPUT_PATH.suffix == ".parquet" and INPUT_PATH.exists():
        return pd.read_parquet(INPUT_PATH)
    csv_path = INPUT_PATH.with_suffix(".csv")
    if csv_path.exists():
        df = pd.read_csv(csv_path)
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
        return df
    alt = INPUT_DIR / "transactions_additional_features.csv"
    if alt.exists():
        df = pd.read_csv(alt)
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
        return df
    fallback = INPUT_DIR / "transactions.csv"
    if fallback.exists():
        df = pd.read_csv(fallback)
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
        return df
    raise FileNotFoundError(f"No input found. Tried: {INPUT_PATH}, {csv_path}, {alt}, {fallback}")

print("Config loaded.")
print(f"  Input:  {INPUT_PATH}")
print(f"  Output: {OUTPUT_PATH}")
print(f"  Rolling: 30d / 7d / 24h")

Config loaded.
  Input:  outputs/transaction_additional_feature.parquet
  Output: outputs/transaction_with_graph_features.parquet
  Rolling: 30d / 7d / 24h


In [2]:
# ── Load data ─────────────────────────────────────────────────────────────────
df = load_transactions()
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values("timestamp").reset_index(drop=True)

required = ["transaction_id", "sender_account_id", "receiver_account_id", "beneficiary_id", "device_id", "timestamp", "amount"]
missing = [c for c in required if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns: {missing}")

if "label" not in df.columns:
    df["label"] = 0
if "high_risk_beneficiary" not in df.columns:
    df["high_risk_beneficiary"] = 0

print(f"Loaded {len(df):,} rows, {df.shape[1]} columns.")
df.head(2)

Loaded 86,992 rows, 109 columns.


Unnamed: 0,timestamp,transaction_id,customer_id,sender_account_id,receiver_account_id,beneficiary_id,device_id,amount,channel,debit_credit,...,rule_very_high_risk_offshore,rule_low_kyc_offshore,rule_low_income_large_txn,rule_vpn_offshore,rule_emulator_crypto,rule_new_account_offshore,rule_new_acct_high_cust_velocity,rule_trigger_count,max_rule_severity,weighted_rule_score
0,2025-09-01 00:04:29,T60660,C45,A1209,,B244,D134,15438.75,web,debit,...,0,0,0,0,0,0,0,2,3,5
1,2025-09-01 00:04:35,T6942,C395,A139,A1051,,D597,17698.42,web,debit,...,0,0,0,0,0,0,0,1,2,2


In [3]:
# ── Counterparty column (receiver account or beneficiary) ─────────────────────
df["_counterparty"] = df["receiver_account_id"].fillna(df["beneficiary_id"].astype(str))

# Inflow events: rows where this account is the receiver (internal only)
inflow = df.loc[df["receiver_account_id"].notna(), ["receiver_account_id", "sender_account_id", "timestamp", "amount"]].copy()
inflow = inflow.rename(columns={"receiver_account_id": "account_id", "sender_account_id": "sender_id"})
inflow = inflow.sort_values(["account_id", "timestamp"])

# Outflow events: all rows (sender, counterparty, device, amount, timestamp)
outflow = df[["sender_account_id", "timestamp", "amount", "_counterparty", "device_id", "label", "high_risk_beneficiary"]].copy()
outflow = outflow.rename(columns={"sender_account_id": "account_id"})
outflow = outflow.sort_values(["account_id", "timestamp"])

print("Inflow events (internal):", len(inflow))
print("Outflow events (all):", len(outflow))

Inflow events (internal): 32888
Outflow events (all): 86992


In [4]:
# ── Sender-side rolling 30d (outflow + inflow for sender) ──────────────────────
# Rolling nunique not supported on object cols; compute per-group with time window.
def _rolling_nunique(df, group_col, time_col, value_col, window_td, out_name):
    out = []
    for _, g in df.groupby(group_col, sort=False):
        g = g.sort_values(time_col).reset_index(drop=True)
        ts = pd.to_datetime(g[time_col]).values
        vals = g[value_col].values
        for i in range(len(g)):
            t_end = ts[i]
            t_start = t_end - np.timedelta64(int(window_td.total_seconds() * 1e9), "ns")
            start_idx = np.searchsorted(ts, t_start, side="right")
            window_vals = vals[start_idx : i + 1]
            out.append((g[group_col].iloc[i], pd.Timestamp(t_end), len(np.unique(window_vals))))
    res = pd.DataFrame(out, columns=[group_col, time_col, out_name])
    res[time_col] = pd.to_datetime(res[time_col])
    return res

WINDOW_30D_TD = pd.Timedelta(ROLLING_30D)

def rolling_sender_out():
    o = outflow.set_index("timestamp", drop=False)
    g = o.groupby("account_id", group_keys=False)
    r = g.rolling(ROLLING_30D, on="timestamp").agg({"amount": ["count", "sum"]}).reset_index()
    r.columns = ["account_id", "timestamp", "sender_out_degree_30d", "sender_total_outflow_30d"]
    cp = _rolling_nunique(outflow, "account_id", "timestamp", "_counterparty", WINDOW_30D_TD, "sender_unique_counterparties_30d")
    dev = _rolling_nunique(outflow, "account_id", "timestamp", "device_id", WINDOW_30D_TD, "devices_per_account")
    r = r.merge(cp, on=["account_id", "timestamp"]).merge(dev, on=["account_id", "timestamp"])
    r["sender_repeat_counterparty_ratio"] = (
        1 - r["sender_unique_counterparties_30d"] / r["sender_out_degree_30d"].replace(0, np.nan)
    ).fillna(0).round(4)
    return r

def rolling_sender_in():
    i = inflow.set_index("timestamp", drop=False)
    r = i.groupby("account_id", group_keys=False).rolling(ROLLING_30D, on="timestamp").agg(
        {"amount": ["count", "sum"]}
    ).reset_index()
    r.columns = ["account_id", "timestamp", "sender_in_degree_30d", "sender_total_inflow_30d"]
    return r

sender_out_30 = rolling_sender_out()
sender_in_30  = rolling_sender_in()
print("Sender outflow rolling:", sender_out_30.shape)
print("Sender inflow rolling:", sender_in_30.shape)

TypeError: '<' not supported between instances of 'int' and 'Timestamp'

In [None]:
# ── Receiver-side rolling 30d (for internal transfers: receiver = account) ─────
def rolling_receiver():
    i = inflow.copy()
    i = i.rename(columns={"account_id": "receiver_id", "sender_id": "sender_id"})
    i = i.set_index("timestamp", drop=False)
    r = i.groupby("receiver_id", group_keys=False).rolling(ROLLING_30D, on="timestamp").agg({"amount": ["count", "sum"]}).reset_index()
    r.columns = ["account_id", "timestamp", "receiver_in_degree_30d", "receiver_total_inflow_30d"]
    su = _rolling_nunique(i.reset_index(drop=True), "receiver_id", "timestamp", "sender_id", WINDOW_30D_TD, "receiver_unique_senders_30d")
    su = su.rename(columns={"receiver_id": "account_id"})
    r = r.merge(su, on=["account_id", "timestamp"])
    return r

# receiver_out_degree_30d merged later from sender_out_30 (same account as sender)
receiver_in_30 = rolling_receiver()
print("Receiver inflow rolling:", receiver_in_30.shape)

In [None]:
# ── 24h / 7d inflow & outflow for pass-through and ratio ─────────────────────────
def rolling_inflow_24h_7d():
    i = inflow.set_index("timestamp", drop=False)
    g = i.groupby("account_id", group_keys=False)
    r24 = g.rolling(ROLLING_24H, on="timestamp").agg({"amount": "sum"}).reset_index()
    r24 = r24.rename(columns={"amount": "sender_total_inflow_24h"})
    r7  = g.rolling(ROLLING_7D,  on="timestamp").agg({"amount": "sum"}).reset_index()
    r7  = r7.rename(columns={"amount": "sender_total_inflow_7d"})
    return r24, r7

def rolling_outflow_24h_7d():
    o = outflow.set_index("timestamp", drop=False)
    g = o.groupby("account_id", group_keys=False)
    r24 = g.rolling(ROLLING_24H, on="timestamp").agg({"amount": "sum"}).reset_index()
    r24 = r24.rename(columns={"amount": "sender_total_outflow_24h"})
    r7  = g.rolling(ROLLING_7D,  on="timestamp").agg({"amount": "sum"}).reset_index()
    r7  = r7.rename(columns={"amount": "sender_total_outflow_7d"})
    return r24, r7

in_24, in_7  = rolling_inflow_24h_7d()
out_24, out_7 = rolling_outflow_24h_7d()
print("24h/7d inflow and outflow rolling done.")

In [None]:
# ── Device rolling: accounts_per_device, device_shared_high_risk_ratio, shared_device_fraud_count ──
dev = df[["device_id", "timestamp", "sender_account_id", "label", "high_risk_beneficiary"]].copy()
dev = dev.sort_values(["device_id", "timestamp"]).set_index("timestamp", drop=False)
g = dev.groupby("device_id", group_keys=False)
r = g.rolling(ROLLING_30D, on="timestamp").agg({"sender_account_id": "count", "label": "sum", "high_risk_beneficiary": "sum"}).reset_index()
r.columns = ["device_id", "timestamp", "_dev_txn_count", "_dev_fraud_count", "_dev_high_risk_count"]
dev_df = dev.reset_index(drop=True)
nacc = _rolling_nunique(dev_df, "device_id", "timestamp", "sender_account_id", WINDOW_30D_TD, "accounts_per_device")
device_rolling = r.merge(nacc, on=["device_id", "timestamp"])
device_rolling = device_rolling[["device_id", "timestamp", "accounts_per_device", "_dev_txn_count", "_dev_fraud_count", "_dev_high_risk_count"]]
device_rolling["device_shared_high_risk_ratio"] = (
    (device_rolling["_dev_fraud_count"] + device_rolling["_dev_high_risk_count"])
    / device_rolling["_dev_txn_count"].replace(0, np.nan)
).fillna(0).round(4)
device_rolling["shared_device_fraud_count"] = device_rolling["_dev_fraud_count"]
device_rolling = device_rolling.drop(columns=["_dev_txn_count", "_dev_fraud_count", "_dev_high_risk_count"])
print("Device rolling:", device_rolling.shape)

In [None]:
# ── avg_time_gap_in_out: per account, average time from inflow to next outflow ──
def avg_time_gap_per_account() -> pd.DataFrame:
    gaps = []
    for acc, grp_in in inflow.groupby("account_id"):
        grp_out = outflow[outflow["account_id"] == acc].sort_values("timestamp")
        if grp_out.empty:
            continue
        ts_out = pd.to_datetime(grp_out["timestamp"]).values
        for _, row in grp_in.iterrows():
            t_in = np.datetime64(pd.Timestamp(row["timestamp"]))
            idx = np.searchsorted(ts_out, t_in, side="right")
            if idx < len(ts_out):
                gap_sec = (ts_out[idx] - t_in) / np.timedelta64(1, "s")
                gaps.append({"account_id": acc, "timestamp": t_in, "_gap_sec": gap_sec})
    if not gaps:
        return pd.DataFrame(columns=["account_id", "timestamp", "avg_time_gap_in_out"])
    gap_df = pd.DataFrame(gaps)
    gap_df["timestamp"] = pd.to_datetime(gap_df["timestamp"])
    avg_by_account = gap_df.groupby("account_id")["_gap_sec"].mean().reset_index()
    avg_by_account = avg_by_account.rename(columns={"_gap_sec": "avg_time_gap_in_out"})
    return avg_by_account

avg_gap_df = avg_time_gap_per_account()
print("Avg time gap (in→out) computed for", len(avg_gap_df), "accounts.")

In [None]:
# ── Merge all rolling stats back to main df (by exact timestamp + key) ──────────
# We need to merge on (sender_account_id, timestamp) for sender stats; (receiver_account_id, timestamp) for receiver; (device_id, timestamp) for device.
# Rolling tables have one row per (key, timestamp) so we merge on (key, timestamp).

merge_cols = ["account_id", "timestamp"]

sender_out_cols = ["sender_out_degree_30d", "sender_total_outflow_30d", "sender_unique_counterparties_30d",
                   "sender_repeat_counterparty_ratio", "devices_per_account"]
df = df.merge(
    sender_out_30[merge_cols + sender_out_cols],
    left_on=["sender_account_id", "timestamp"],
    right_on=merge_cols,
    how="left",
    suffixes=("", "_sout")
).drop(columns=["account_id"], errors="ignore")

sender_in_cols = ["sender_in_degree_30d", "sender_total_inflow_30d"]
df = df.merge(
    sender_in_30[merge_cols + sender_in_cols],
    left_on=["sender_account_id", "timestamp"],
    right_on=merge_cols,
    how="left"
)
if "account_id" in df.columns and df["account_id"].equals(df["sender_account_id"]):
    df = df.drop(columns=["account_id"], errors="ignore")

in_24_cols = ["sender_total_inflow_24h"]
in_7_cols  = ["sender_total_inflow_7d"]
out_24_cols = ["sender_total_outflow_24h"]
out_7_cols  = ["sender_total_outflow_7d"]
df = df.merge(in_24[merge_cols + in_24_cols],   left_on=["sender_account_id", "timestamp"], right_on=merge_cols, how="left").drop(columns=["account_id"], errors="ignore")
df = df.merge(in_7[merge_cols + in_7_cols],     left_on=["sender_account_id", "timestamp"], right_on=merge_cols, how="left").drop(columns=["account_id"], errors="ignore")
df = df.merge(out_24[merge_cols + out_24_cols], left_on=["sender_account_id", "timestamp"], right_on=merge_cols, how="left").drop(columns=["account_id"], errors="ignore")
df = df.merge(out_7[merge_cols + out_7_cols],   left_on=["sender_account_id", "timestamp"], right_on=merge_cols, how="left").drop(columns=["account_id"], errors="ignore")

df["pass_through_ratio_24h"] = (
    np.minimum(df["sender_total_inflow_24h"].fillna(0), df["sender_total_outflow_24h"].fillna(0))
    / df["sender_total_inflow_24h"].replace(0, np.nan)
).fillna(0).round(4)
df["pass_through_ratio_7d"] = (
    np.minimum(df["sender_total_inflow_7d"].fillna(0), df["sender_total_outflow_7d"].fillna(0))
    / df["sender_total_inflow_7d"].replace(0, np.nan)
).fillna(0).round(4)
df["outflow_to_inflow_ratio_7d"] = (
    df["sender_total_outflow_7d"].fillna(0) / df["sender_total_inflow_7d"].replace(0, np.nan)
).fillna(0).round(4)

df = df.merge(
    device_rolling[["device_id", "timestamp", "accounts_per_device", "device_shared_high_risk_ratio", "shared_device_fraud_count"]],
    on=["device_id", "timestamp"],
    how="left"
)

df = df.merge(
    avg_gap_df,
    left_on="sender_account_id",
    right_on="account_id",
    how="left"
).drop(columns=["account_id"], errors="ignore")

receiver_in_cols = ["receiver_in_degree_30d", "receiver_total_inflow_30d", "receiver_unique_senders_30d"]
receiver_in_30_renamed = receiver_in_30.rename(columns={"account_id": "receiver_account_id"})
df = df.merge(
    receiver_in_30_renamed[["receiver_account_id", "timestamp"] + receiver_in_cols],
    on=["receiver_account_id", "timestamp"],
    how="left"
)
receiver_out_30_for_merge = sender_out_30[["account_id", "timestamp", "sender_out_degree_30d"]].rename(
    columns={"account_id": "receiver_account_id", "sender_out_degree_30d": "receiver_out_degree_30d"}
)
df = df.merge(receiver_out_30_for_merge, on=["receiver_account_id", "timestamp"], how="left")

print("Merged. Columns:", df.shape[1])

In [None]:
# ── Drop helper columns and keep only requested features if config says so ─────
df = df.drop(columns=["_counterparty"], errors="ignore")

optional_drop = ["sender_total_inflow_24h", "sender_total_outflow_24h", "sender_total_inflow_7d", "sender_total_outflow_7d"]
for c in optional_drop:
    if c in df.columns and c not in FEATURES:
        df = df.drop(columns=[c], errors="ignore")

print("Final columns:", len(df.columns))
graph_cols = [c for c in df.columns if any(x in c for x in ["degree", "inflow", "outflow", "counterpart", "pass_through", "ratio_7d", "avg_time_gap", "accounts_per_device", "devices_per", "device_shared", "shared_device_fraud", "receiver_in", "receiver_out", "receiver_unique"])]
print("Graph feature columns:", graph_cols[:20], "..." if len(graph_cols) > 20 else graph_cols)

In [None]:
# ── Save ──────────────────────────────────────────────────────────────────────
INPUT_DIR.mkdir(parents=True, exist_ok=True)
df.to_parquet(OUTPUT_PATH, index=False)
print(f"Saved: {OUTPUT_PATH} ({len(df):,} rows, {df.shape[1]} columns)")