In [None]:
# Feature Engineering
import numpy as np
import pandas as pd
from typing import Any, Dict, List, NamedTuple, Optional
feature_frame = globals().get("feature_frame", pd.DataFrame())
alerts_frame = globals().get("alerts_frame", pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))
master_frame = globals().get("master_frame")

def _build_sample_master_frame() -> pd.DataFrame:
    return pd.DataFrame({
        "customer_id": ["CUST001", "CUST002", "CUST003"],
        "date": ["2024-01-01", "2024-01-01", "2024-01-01"],
        "balance": [100000, 50000, 25000],
        "credit_limit": [150000, 75000, 30000],
        "dpd": [0, 45, 95],
        "product_code": ["CC", "PL", "CC"],
        "origination_date": ["2023-01-01", "2023-06-01", "2023-12-01"],
        "industry": ["Technology", "Manufacturing", "Government"],
        "kam_owner": ["KAM001", "KAM002", "KAM001"]
    })
if master_frame is None or getattr(master_frame, "empty", True):
    master_frame = _build_sample_master_frame()
    print("Created sample master_frame with", len(master_frame), "records")

DELINQUENCY_BUCKETS = [-np.inf, 0, 30, 60, 90, 120, np.inf]
DELINQUENCY_LABELS = ["current", "1_30", "31_60", "61_90", "91_120", "120_plus"]
SEGMENT_LABELS = list("ABCDEF")

class FeatureArtifacts(NamedTuple):
    features: pd.DataFrame
    alerts: pd.DataFrame

class FeatureEngineer:
    def __init__(self, reference_date: Optional[pd.Timestamp] = None) -> None:
        self.reference_date = reference_date or pd.Timestamp.utcnow().normalize()

    def _derive_customer_type(self, frame: pd.DataFrame) -> pd.Series:
        if "customer_type" in frame.columns:
            return frame["customer_type"].fillna("unspecified")
        balance = self._normalize_series(frame.get("balance"), frame.index, 0).fillna(0)

        exposure = frame.get("credit_limit")
        if exposure is None:
            exposure = balance.clip(lower=1)
        else:
            exposure = self._normalize_series(exposure, frame.index, 0)
            exposure = exposure.where(exposure.notna() & (exposure != 0), balance.clip(lower=1))
        ratio = balance / exposure.replace({0: np.nan})
        derived = np.where(
            balance >= 5_000_000,
            "enterprise",
            np.where(balance >= 500_000, "corporate", np.where(balance >= 50_000, "sme", "micro"))
        )
        derived = np.where(ratio >= 0.9, "intensive", derived)
        return pd.Series(derived, index=frame.index)

    def _segmentation(self, frame: pd.DataFrame) -> pd.Series:
        try:
            unique = frame["balance"].nunique()
            buckets = min(6, unique)
            return pd.qcut(frame["balance"], q=buckets, labels=SEGMENT_LABELS[: buckets], duplicates="drop").astype(str)
        except Exception:
            return pd.Series(["A"] * len(frame), index=frame.index)

    def _delinquency_bucket(self, frame: pd.DataFrame) -> pd.Series:
        if "dpd" in frame.columns:
            dpd_source = frame["dpd"]
        else:
            dpd_source = frame.get("days_past_due", pd.Series(0, index=frame.index))
        if not isinstance(dpd_source, pd.Series):
            dpd_source = pd.Series(dpd_source, index=frame.index)
        dpd_series = pd.to_numeric(dpd_source, errors="coerce").fillna(0)
        return pd.cut(dpd_series, bins=DELINQUENCY_BUCKETS, labels=DELINQUENCY_LABELS, right=True)

    def transform(self, frame: pd.DataFrame) -> FeatureArtifacts:
        if frame.empty:
            empty_alerts = pd.DataFrame(columns=["customer_id", "rule", "severity", "details"])
            return FeatureArtifacts(frame.copy(), empty_alerts)
        prepared = self._prepare_base_features(frame.copy())
        enriched = self._compute_financial_metrics(prepared)
        alerts = self._collect_alerts(enriched)
        return FeatureArtifacts(features=enriched.reset_index(drop=True), alerts=alerts)

    def _prepare_base_features(self, features: pd.DataFrame) -> pd.DataFrame:
        features["date"] = pd.to_datetime(features["date"], errors="coerce", utc=True)
        features["customer_type"] = self._derive_customer_type(features)
        features["segment_code"] = self._segmentation(features)
        features["delinquency_bucket"] = self._delinquency_bucket(features).astype(str)
        features["dpd"] = self._normalize_dpd(features)
        return features

    def _compute_financial_metrics(self, features: pd.DataFrame) -> pd.DataFrame:
        balance_clip = features["balance"].clip(lower=1)
        credit_limit_source = features.get("credit_limit")
        credit_limit_series = self._normalize_series(credit_limit_source, features.index, np.nan)
        credit_limit_series = pd.to_numeric(credit_limit_series, errors="coerce").where(
            lambda s: s.notna() & (s != 0),
            balance_clip
        )
        utilization = features["balance"] / credit_limit_series
        features["utilization_ratio"] = utilization.replace([np.inf, -np.inf], np.nan).clip(upper=5).fillna(0)
        features["apr"] = self._prepare_apr(features)
        balance_share = features.groupby("customer_id")["balance"].transform(
            lambda values: values / values.sum()
        ).fillna(0.0)
        features["weighted_apr"] = balance_share * features["apr"]
        zscore = (features["balance"] - features["balance"].mean()) / features["balance"].std(ddof=0)
        features["balance_zscore"] = zscore.fillna(0).clip(-3, 3)
        features["industry"] = self._normalize_series(
            features.get("industry"),
            features.index,
            "unspecified"
        ).fillna("unspecified")
        features["kam_owner"] = self._normalize_series(
            features.get("kam_owner"),
            features.index,
            "unassigned"
        ).fillna("unassigned")
        industry_lower = features["industry"].str.lower()
        features["b2g_flag"] = industry_lower.str.contains("government|public").fillna(False).astype(int)
        origination_source = features.get("origination_date", features["date"])
        days_open = (self.reference_date - pd.to_datetime(origination_source, utc=True)).dt.days
        features["days_since_origination"] = days_open.clip(lower=0).fillna(0).astype(int)
        features["roll_rate_key"] = features["customer_id"].astype(str) + "_" + features["product_code"].astype(str)
        features = features.sort_values(["roll_rate_key", "date"])
        features["prev_dpd"] = features.groupby("roll_rate_key")["dpd"].shift(1).fillna(0)
        features["roll_rate_delta"] = features["dpd"] - features["prev_dpd"]
        features["roll_rate_direction"] = np.select(
            [features["roll_rate_delta"] > 0, features["roll_rate_delta"] < 0],
            ["deteriorating", "improving"],
            default="stable"
        )
        features["alert_usury_micro"] = ((features["customer_type"] == "micro") & (features["apr"] > 0.85)).astype(int)
        features["alert_high_utilization"] = (features["utilization_ratio"] > 0.95).astype(int)
        features["alert_high_dpd"] = (features["dpd"] >= 90).astype(int)
        features["alert_pdf_gap"] = 0
        return features

    def _collect_alerts(self, features: pd.DataFrame) -> pd.DataFrame:
        alerts_records: List[Dict[str, Any]] = []
        alert_columns = {
            "alert_usury_micro": "critical",
            "alert_high_utilization": "high",
            "alert_high_dpd": "critical",
            "alert_pdf_gap": "medium"
        }
        for alert_col, severity in alert_columns.items():
            flagged = features[features[alert_col] == 1]
            for _, row in flagged.iterrows():
                alerts_records.append({
                    "customer_id": row.get("customer_id"),
                    "rule": alert_col,
                    "severity": severity,
                    "details": f"DPD={row.get('dpd')}|Util={row.get('utilization_ratio'):.2f}"
                })
        if alerts_records:
            return pd.DataFrame(alerts_records)
        return pd.DataFrame(columns=["customer_id", "rule", "severity", "details"])

    def _normalize_series(self, source: Any, index: pd.Index, default: Any) -> pd.Series:
        if source is None:
            return pd.Series(default, index=index)
        if isinstance(source, pd.Series):
            return source
        return pd.Series(source, index=index)

    def _normalize_dpd(self, features: pd.DataFrame) -> pd.Series:
        dpd_source = features.get("dpd")
        if dpd_source is None:
            dpd_source = features.get("days_past_due")
        normalized = self._normalize_series(dpd_source, features.index, 0)
        return pd.to_numeric(normalized, errors="coerce").fillna(0).astype(int)

    def _prepare_apr(self, features: pd.DataFrame) -> pd.Series:
        apr_source = features.get("apr") if "apr" in features.columns else features.get("nominal_rate")
        apr_series = pd.to_numeric(self._normalize_series(apr_source, features.index, np.nan), errors="coerce")
        apr_median = apr_series.median(skipna=True)
        if pd.isna(apr_median):
            apr_median = 0.0
        return apr_series.fillna(apr_median).astype(float)

feature_engineer = FeatureEngineer()
feature_artifacts = feature_engineer.transform(master_frame)
feature_frame = feature_artifacts.features
alerts_frame = feature_artifacts.alerts

In [None]:
# KPI Calculation Engine
import numpy as np
import pandas as pd
from typing import Any, Dict
feature_frame = globals().get("feature_frame", pd.DataFrame())
alerts_frame = globals().get("alerts_frame", pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))

class KPIEngine:
    def __init__(self, frame: pd.DataFrame) -> None:
        self.frame = frame.copy()
        if not self.frame.empty:
            self.frame["date"] = pd.to_datetime(self.frame["date"], utc=True)
            self.frame["month"] = self.frame["date"].dt.to_period("M").dt.to_timestamp()

    def _ratio(self, numerator: pd.Series, denominator: pd.Series) -> float:
        denom = denominator.sum()
        if denom == 0:
            return float("nan")
        return numerator.sum() / denom

    def compute(self) -> Dict[str, Any]:
        if self.frame.empty:
            return {}

        result: Dict[str, Any] = {}
        current_frame = self.frame.copy()

        result["aum"] = current_frame["balance"].sum()
        result["active_clients"] = current_frame["customer_id"].nunique()
        result["credit_lines"] = current_frame.get("credit_limit", pd.Series(0, index=current_frame.index)).sum()

        churn_mask = current_frame.get("status", pd.Series("active", index=current_frame.index)).str.lower().eq("churned")
        result["churn_rate"] = churn_mask.mean()

        default_mask = current_frame.get("default_flag", pd.Series(0, index=current_frame.index)).astype(int)
        result["default_rate"] = default_mask.mean()

        dpd_group = current_frame.groupby("delinquency_bucket")["balance"].sum().rename("aum")
        result["dpd_buckets"] = dpd_group

        result["rotation"] = self._ratio(
            current_frame.get("payments", pd.Series(0, index=current_frame.index)),
            current_frame.get("balance", pd.Series(0, index=current_frame.index))
        )

        result["weighted_apr"] = current_frame["weighted_apr"].mean()

        result["revenue"] = current_frame.get("interest_income", pd.Series(0, index=current_frame.index)).sum()
        result["ebitda"] = current_frame.get("ebitda", pd.Series(0, index=current_frame.index)).sum()

        result["concentration_top10"] = (
            current_frame.groupby("customer_id")["balance"].sum().nlargest(10).sum() / result["aum"]
            if result["aum"]
            else float("nan")
        )

        ltv = current_frame.get("ltv", pd.Series(0, index=current_frame.index))
        cac = current_frame.get("cac", pd.Series(np.nan, index=current_frame.index))
        current_frame["ltv_cac_ratio"] = np.where(cac.fillna(0) == 0, np.nan, ltv / cac)

        channel_col = next((col for col in ("channel", "source_name") if col in current_frame.columns), None)
        if channel_col:
            result["ltv_cac_by_segment"] = current_frame.groupby(["segment_code", channel_col]).ltv_cac_ratio.mean()
        else:
            result["ltv_cac_by_segment"] = current_frame.groupby(["segment_code"]).ltv_cac_ratio.mean()

        result["nrr"] = self._ratio(
            current_frame.get("recurring_revenue", pd.Series(0, index=current_frame.index)),
            current_frame.get("starting_revenue", pd.Series(1, index=current_frame.index))
        )

        result["nsm"] = current_frame.get("north_star_metric", pd.Series(0, index=current_frame.index)).mean()

        result["penetration"] = self._ratio(
            current_frame.get("active_products", pd.Series(0, index=current_frame.index)),
            current_frame.get("available_products", pd.Series(1, index=current_frame.index))
        )

        result["b2g_percent"] = current_frame["b2g_flag"].mean()

        status_column = current_frame.get("status", pd.Series("active", index=current_frame.index)).str.lower()
        result["new_recurrent_recovered"] = status_column.value_counts(dropna=False)

        group_cols = ["industry", "kam_owner", "segment_code", "customer_type"]
        aggregation = current_frame.groupby(group_cols)["balance"].sum().rename("aum")
        result["aum_by_group"] = aggregation

        behavior_mask = (current_frame["customer_type"] == "micro") & (current_frame["apr"] > 0.85)
        result["usury_micro_share"] = behavior_mask.mean()

        result["pod"] = current_frame.get("probability_of_default", pd.Series(np.nan, index=current_frame.index)).mean()

        if not alerts_frame.empty:
            result["alerts_active"] = alerts_frame.groupby("severity").size()

        return result

kpi_engine = KPIEngine(feature_frame)
kpi_summary = kpi_engine.compute()

In [None]:
# Marketing & Sales Analysis
import pandas as pd
from typing import Dict, List
feature_frame = globals().get("feature_frame", pd.DataFrame())

def marketing_sales_breakdown(frame: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    if frame.empty:
        return {}

    aggregations: Dict[str, pd.DataFrame] = {}
    group_fields: Dict[str, List[str]] = {
        "industry": ["industry"],
        "kam": ["kam_owner"]
    }
    channel_columns = [column for column in ("channel", "source_name") if column in frame.columns]
    if channel_columns:
        group_fields["channel"] = channel_columns

    for label, fields in group_fields.items():
        grouped = frame.groupby(fields, dropna=False).agg(
            aum=("balance", "sum"),
            clients=("customer_id", "nunique"),
            weighted_apr=("weighted_apr", "mean"),
            ltv_cac=("ltv_cac_ratio", "mean")
        ).reset_index()
        aggregations[label] = grouped

    return aggregations

marketing_sales_tables = marketing_sales_breakdown(feature_frame)
treemap_ready = marketing_sales_tables.get("industry") if marketing_sales_tables else pd.DataFrame()

In [None]:
# Data Quality Audit
import numpy as np
import pandas as pd
from pandas.io.formats.style import Styler
from typing import Any, Dict, List, Optional, TYPE_CHECKING, cast
try:
    import pdfplumber  # type: ignore[import-not-found]
except ModuleNotFoundError:
    pdfplumber = None
if TYPE_CHECKING:
    import pdfplumber as _pdfplumber_stub
feature_frame = globals().get("feature_frame", pd.DataFrame())

CRITICAL_COLUMNS = {"customer_id", "date", "balance", "dpd"}

def data_quality_audit(frame: pd.DataFrame) -> Dict[str, Any]:
    if frame.empty:
        return {"score": np.nan, "table": pd.DataFrame(), "styled": None, "pdf_completeness": 0.0}

    total_rows = len(frame)
    audit_records: List[Dict[str, Any]] = []
    penalties = 0.0

    for column in frame.columns:
        nulls = frame[column].isna().sum()
        zeros = (frame[column] == 0).sum() if pd.api.types.is_numeric_dtype(frame[column]) else np.nan
        coverage = 1 - (nulls / total_rows) if total_rows else np.nan
        if column in CRITICAL_COLUMNS and coverage < 0.9:
            penalties += 0.1
        audit_records.append(
            {"column": column, "nulls": int(nulls), "zeros": int(zeros) if not pd.isna(zeros) else np.nan, "coverage": coverage}
        )

    audit_table = pd.DataFrame(audit_records)
    coverage_mean = audit_table["coverage"].mean()
    quality_score = max(0.0, min(1.0, (coverage_mean if not pd.isna(coverage_mean) else 0.0) - penalties))

    def _color(value: float) -> str:
        if pd.isna(value):
            return "color: #E6E6EF; background-color: #3730A3"
        if value >= 0.95:
            return "color: #05101a; background-color: #22E7CC"
        if value >= 0.85:
            return "color: #F5F3FF; background-color: #2563EB"
        return "color: #F5F3FF; background-color: #B91C1C"

    styler = audit_table.style.format({"coverage": "{:.2%}"})
    apply_map = getattr(styler, "applymap", None)
    styled = apply_map(_color, subset=["coverage"]) if callable(apply_map) else styler

    pdf_completeness = 1.0 if pdfplumber else 0.0

    return {"score": quality_score, "table": audit_table, "styled": styled, "pdf_completeness": pdf_completeness}
quality_artifacts = data_quality_audit(feature_frame)
quality_score = quality_artifacts.get("score")
quality_table = quality_artifacts.get("table")
quality_styled = quality_artifacts.get("styled")

print(f"Data Quality Score: {quality_score:.2%}" if not pd.isna(quality_score) else "Data Quality Score: N/A")
print(f"PDF Completeness: {quality_artifacts.get('pdf_completeness', 0.0):.1%}")

if quality_styled is not None:
    display(quality_styled)
else:
    print("No data quality table to display - feature_frame is empty")