<a href="https://colab.research.google.com/github/gomzkevin/kontempo/blob/main/Early_Warnings_Model_01_de_agosto_2025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
import ast
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Optional

import numpy as np
import pandas as pd

###############################################################################
# CONFIG                                                                      #
###############################################################################

PAYMENTS_PATH = Path("payments.json")  # update to real path or pass via CLI
LIMITS_PATH = Path("Limit-Credit.csv")  # update to real path or pass via CLI
OUTPUT_PATH = Path("snapshots.parquet")

WINDOW_LOOKAHEAD_DAYS = 30   # how far after snapshot we collect due payments
DPD_THRESHOLD = 35           # default definition: unpaid ≥ 35 days past due
MATERIALITY = 0.96           # Amount Paid must reach 96% of Amount Due

###############################################################################
# RECEIPTS PARSER                                                             #
###############################################################################

def _clean_receipt_string(raw: str) -> str:
    """Remove EDN wrappers and return a simple token string."""
    return raw.replace("[#ordered/map", "").replace("[", "").replace("]", "")


def parse_receipts(raw: str) -> List[Dict[str, Optional[pd.Timestamp]]]:
    """Parse the EDN-style *Receipts* column into a list of dicts.

    Each dict contains:
        - amount_applied: float
        - applied_date: pandas.Timestamp | None
    """
    if not raw or raw.strip() in {"[]", ""}:
        return []

    blocks = re.split(r"#ordered/map", raw)
    receipts: List[Dict[str, Optional[pd.Timestamp]]] = []

    for block in blocks:
        if not block.strip():
            continue
        pairs = re.findall(r"\[:(\w+)\s+([^:\]]+)\]", block)
        data = {k: v for k, v in pairs}
        try:
            amt = float(str(data.get("amount_applied", "0")).replace(",", ""))
        except ValueError:
            amt = 0.0
        ts: Optional[pd.Timestamp]
        if "applied_date" in data:
            try:
                ts = pd.to_datetime(int(str(data["applied_date"]).replace(",", "")), unit="s")
            except ValueError:
                ts = pd.NaT
        else:
            ts = pd.NaT
        receipts.append({"amount_applied": amt, "applied_date": ts})
    return receipts

###############################################################################
# DATA LOAD & CLEAN                                                           #
###############################################################################

def _clean_numeric(col: pd.Series) -> pd.Series:
    return col.astype(str).str.replace(",", "", regex=False).astype(float)


def load_payments(path: Path) -> pd.DataFrame:
    df = pd.read_json(path, lines=False)
    df = df[df["Status"] != "voided"].copy()

    # basic cleanup
    df["Created_dt"] = pd.to_datetime(df["Created"].astype(str).str.replace(",", "", regex=False).astype(int), unit="s")
    df["Due_dt"] = pd.to_datetime(df["Due Date"])

    numeric_cols = ["Amount Due", "Amount Paid", "Principal Amount"]
    df[numeric_cols] = df[numeric_cols].apply(_clean_numeric)

    df["quota_balance"] = df["Amount Due"] - df["Amount Paid"]
    df["is_unpaid"] = df["Amount Paid"] < MATERIALITY * df["Amount Due"]

    # Parse receipts into an auxiliary list column for faster downstream checks
    df["_receipts"] = df["Receipts"].apply(parse_receipts)
    return df


def load_limits(path: Path) -> pd.DataFrame:
    limits = pd.read_csv(path)
    limits = (
        limits.groupby("Buyer Account ID", as_index=False)["Limit"].sum()
            .rename(columns={"Buyer Account ID": "Buyer Account", "Limit": "limite_de_credito"})
    )
    return limits

###############################################################################
# FEATURE ENGINEERING                                                         #
###############################################################################

def _months_between(date1: pd.Timestamp, date2: pd.Timestamp) -> int:
    return int((date2.to_period("M") - date1.to_period("M")).n)


def build_snapshot_features(pay_hist: pd.DataFrame, snap_date: pd.Timestamp) -> pd.DataFrame:
    """Compute all features for a given snapshot."""
    gb = pay_hist.groupby("Buyer Account")

    feat = pd.DataFrame({
        "snapshot_date": snap_date,
        "dias_desde_ultimo_prestamo": (snap_date - gb["Created_dt"].max()).dt.days,
        "acquisition_date": gb["Created_dt"].min(),
        "numero_total_prestamos_historico": gb["Loan ID"].nunique(),
        "saldo_pendiente_actual": gb["quota_balance"].sum(),
        "max_dpd_actual": (
            gb.apply(lambda g: ((snap_date - g.loc[g["is_unpaid"], "Due_dt"]).dt.days).max() if (g["is_unpaid"].any()) else 0))
    })

    # derive antiguedad & flags
    feat["antiguedad_cliente_meses"] = feat.apply(
        lambda r: _months_between(r["acquisition_date"], snap_date), axis=1
    )
    feat["es_primer_mes_activo"] = (feat["antiguedad_cliente_meses"] == 0).astype(int)

    # last loan aggregates
    last_loans = pay_hist.sort_values("Created_dt").drop_duplicates("Buyer Account", keep="last")
    feat = feat.join(last_loans.set_index("Buyer Account")[["total_installments", "Principal Amount"]].rename(
        columns={"total_installments": "installments_prestamo_reciente", "Principal Amount": "_last_principal"}
    ))

    # historical averages
    feat = feat.join(gb["total_installments"].mean().rename("promedio_installments_historico"))

    # rolling windows
    window_30 = pay_hist[pay_hist["Created_dt"] >= snap_date - pd.Timedelta(days=30)]
    gb30 = window_30.groupby("Buyer Account")
    feat = feat.join(gb30["Principal Amount"].sum().rename("monto_dispuesto_ultimos_30d"))
    feat = feat.join(gb30["Loan ID"].nunique().rename("frecuencia_prestamos_ultimos_30d"))

    window_31_60 = pay_hist[(pay_hist["Created_dt"] >= snap_date - pd.Timedelta(days=60)) & (pay_hist["Created_dt"] < snap_date - pd.Timedelta(days=30))]
    gb60 = window_31_60.groupby("Buyer Account")
    feat = feat.join(gb60["Principal Amount"].sum().rename("monto_dispuesto_31_60d"))
    feat = feat.join(gb60["Loan ID"].nunique().rename("frecuencia_prestamos_31_60d"))

    # acceleration & deltas
    feat["aceleracion_monto"] = feat["monto_dispuesto_ultimos_30d"].fillna(0) - feat["monto_dispuesto_31_60d"].fillna(0)
    feat["aceleracion_frecuencia"] = feat["frecuencia_prestamos_ultimos_30d"].fillna(0) - feat["frecuencia_prestamos_31_60d"].fillna(0)
    feat["cambio_en_installments_reciente"] = feat["installments_prestamo_reciente"] - feat["promedio_installments_historico"]

    return feat.reset_index()

###############################################################################
# LABEL GENERATION                                                            #
###############################################################################

def _payment_covered(row, due_date: pd.Timestamp) -> bool:
    """Return True if payment is covered ≥MATERIALITY within DPD_THRESHOLD."""
    deadline = due_date + timedelta(days=DPD_THRESHOLD)
    paid = 0.0
    for rec in row["_receipts"]:
        ts = rec.get("applied_date")
        if pd.isna(ts) or ts > deadline:
            continue
        paid += rec.get("amount_applied", 0.0)
    return paid >= MATERIALITY * row["Amount Due"]


def compute_label(pay_future: pd.DataFrame) -> pd.Series:
    chk = pay_future.groupby("Buyer Account").apply(
        lambda g: not all(_payment_covered(r, r["Due_dt"]) for _, r in g.iterrows())
    )
    return chk.astype(int)

###############################################################################
# MAIN BUILD FUNCTION                                                         #
###############################################################################

def build_snapshot_dataset(payments_path: Path = PAYMENTS_PATH,
                           limits_path: Path = LIMITS_PATH,
                           output_path: Path = OUTPUT_PATH) -> None:
    pay = load_payments(payments_path)
    limits = load_limits(limits_path)

    # merge limits; placeholder imputation is deferred till after snapshots
    pay = pay.merge(limits, how="left", on="Buyer Account")

    snapshots = []
    for snap in pd.date_range(pay["Due_dt"].min().floor("M"), pay["Due_dt"].max().ceil("M"), freq="M"):
        hist = pay[pay["Created_dt"] <= snap]
        feat = build_snapshot_features(hist, snap)

        # add credit limit & utilisation (after imputation later)
        feat = feat.join(limits.set_index("Buyer Account"), on="Buyer Account")
        feat["limite_de_credito"] = feat["limite_de_credito"].fillna(feat["_last_principal"])
        feat["porcentaje_utilizacion"] = feat["saldo_pendiente_actual"] / feat["limite_de_credito"]

        # build label
        future_mask = (pay["Due_dt"] > snap) & (pay["Due_dt"] <= snap + pd.Timedelta(days=WINDOW_LOOKAHEAD_DAYS))
        pay_future = pay[future_mask]
        labels = compute_label(pay_future)
        feat["default_en_35d"] = feat["Buyer Account"].map(labels).fillna(0).astype(int)

        # fossil filter
        feat = feat[feat["max_dpd_actual"] < 30]
        snapshots.append(feat.drop(columns=["_last_principal"]))

    snapshots_df = pd.concat(snapshots).reset_index(drop=True)
    snapshots_df.to_parquet(output_path, index=False)
    print(f"Snapshot dataset saved to {output_path} with {len(snapshots_df):,} rows.")

###############################################################################
# CLI ENTRYPOINT                                                              #
###############################################################################
if __name__ == "__main__":
    build_snapshot_dataset()
