In [None]:
# Feature Engineering
import numpy as np
import pandas as pd
from typing import Any, Dict, Iterable, List, NamedTuple, Optional
feature_frame = globals().get("feature_frame", pd.DataFrame())
alerts_frame = globals().get("alerts_frame", pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))
master_frame = globals().get("master_frame")

def _build_sample_master_frame() -> pd.DataFrame:
    return pd.DataFrame({
        "customer_id": ["CUST001", "CUST002", "CUST003"],
        "date": ["2024-01-01", "2024-01-01", "2024-01-01"],
        "balance": [100000, 50000, 25000],
        "credit_limit": [150000, 75000, 30000],
        "dpd": [0, 45, 95],
        "product_code": ["CC", "PL", "CC"],
        "origination_date": ["2023-01-01", "2023-06-01", "2023-12-01"],
        "industry": ["Technology", "Manufacturing", "Government"],
        "kam_owner": ["KAM001", "KAM002", "KAM001"]
    })
if master_frame is None or getattr(master_frame, "empty", True):
    master_frame = _build_sample_master_frame()
    print("Created sample master_frame with", len(master_frame), "records")

DELINQUENCY_BUCKETS = [-np.inf, 0, 30, 60, 90, 120, np.inf]
DELINQUENCY_LABELS = ["current", "1_30", "31_60", "61_90", "91_120", "120_plus"]
SEGMENT_LABELS = list("ABCDEF")

class FeatureArtifacts(NamedTuple):
    features: pd.DataFrame
    alerts: pd.DataFrame

class FeatureEngineer:
    def __init__(self, reference_date: Optional[pd.Timestamp] = None) -> None:
        self.reference_date = reference_date or pd.Timestamp.utcnow().normalize()

    def _derive_customer_type(self, frame: pd.DataFrame) -> pd.Series:
        if "customer_type" in frame.columns:
            return frame["customer_type"].fillna("unspecified")
        balance = frame.get("balance")
        if balance is None:
            balance = pd.Series(0, index=frame.index)
        elif not isinstance(balance, pd.Series):
            balance = pd.Series(balance, index=frame.index)
        balance = balance.fillna(0)
        exposure = frame.get("credit_limit")
        if exposure is None:
            exposure = balance.clip(lower=1)
        elif not isinstance(exposure, pd.Series):
            exposure = pd.Series(exposure, index=frame.index)
        exposure = exposure.where(exposure.notna(), balance.clip(lower=1))
        ratio = balance / exposure.replace({0: np.nan})
        derived = np.where(balance >= 5_000_000, "enterprise", np.where(balance >= 500_000, "corporate", np.where(balance >= 50_000, "sme", "micro")))
        derived = pd.Series(derived, index=frame.index)
        derived = np.where(ratio >= 0.9, "intensive", derived)
        return pd.Series(derived, index=frame.index)

    def _segmentation(self, frame: pd.DataFrame) -> pd.Series:
        try:
            return pd.qcut(frame["balance"], q=min(6, frame["balance"].nunique()), labels=SEGMENT_LABELS[: min(6, frame["balance"].nunique())], duplicates="drop").astype(str)
        except Exception:
            return pd.Series(["A"] * len(frame), index=frame.index)

    def _delinquency_bucket(self, frame: pd.DataFrame) -> pd.Series:
        if "dpd" in frame.columns:
            dpd_source = frame["dpd"]
        else:
            dpd_source = frame.get("days_past_due", pd.Series(0, index=frame.index))
        if not isinstance(dpd_source, pd.Series):
            dpd_source = pd.Series(dpd_source, index=frame.index)
        dpd_series = pd.to_numeric(dpd_source, errors="coerce").fillna(0)
        return pd.cut(dpd_series, bins=DELINQUENCY_BUCKETS, labels=DELINQUENCY_LABELS, right=True)

    def transform(self, frame: pd.DataFrame) -> FeatureArtifacts:
        if frame.empty:
            return FeatureArtifacts(frame.copy(), pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))
        features = frame.copy()
        features["date"] = pd.to_datetime(features["date"], errors="coerce", utc=True)
        features["customer_type"] = self._derive_customer_type(features)
        features["segment_code"] = self._segmentation(features)
        features["delinquency_bucket"] = self._delinquency_bucket(features).astype(str)
        features["dpd"] = pd.to_numeric(features.get("dpd", features.get("days_past_due", 0)), errors="coerce").fillna(0).astype(int)
        balance_clip = features["balance"].clip(lower=1)
        credit_limit = features.get("credit_limit")
        if credit_limit is None:
            credit_limit_series = balance_clip
        elif isinstance(credit_limit, pd.Series):
            credit_limit_series = pd.to_numeric(credit_limit, errors="coerce").where(lambda s: s.notna() & (s != 0), balance_clip)
        else:
            credit_limit_series = pd.Series(credit_limit, index=features.index)
            credit_limit_series = pd.to_numeric(credit_limit_series, errors="coerce").where(lambda s: s.notna() & (s != 0), balance_clip)
        features["utilization_ratio"] = (features["balance"] / credit_limit_series).replace([np.inf, -np.inf], np.nan)
        features["utilization_ratio"] = features["utilization_ratio"].clip(upper=5).fillna(0)
        apr_source = features.get("apr") if "apr" in features.columns else features.get("nominal_rate", np.nan)
        if not isinstance(apr_source, pd.Series):
            apr_source = pd.Series(apr_source, index=features.index)
        apr_series = pd.to_numeric(apr_source, errors="coerce")
        features["apr"] = apr_series.fillna(apr_series.median(skipna=True)).fillna(0)
        features["weighted_apr"] = features.groupby("customer_id")["balance"].transform(lambda x: x / x.sum()).fillna(0) * features["apr"]
        features["balance_zscore"] = (features["balance"] - features["balance"].mean()) / features["balance"].std(ddof=0)
        features["balance_zscore"] = features["balance_zscore"].fillna(0).clip(-3, 3)
        features["industry"] = features.get("industry", pd.Series("unspecified", index=features.index)).fillna("unspecified")
        features["kam_owner"] = features.get("kam_owner", pd.Series("unassigned", index=features.index)).fillna("unassigned")
        features["b2g_flag"] = features["industry"].str.lower().str.contains("government|public").fillna(False).astype(int)
        features["days_since_origination"] = (self.reference_date - pd.to_datetime(features.get("origination_date", features["date"]), utc=True)).dt.days
        features["days_since_origination"] = features["days_since_origination"].clip(lower=0).fillna(0).astype(int)
        features["roll_rate_key"] = features["customer_id"].astype(str) + "_" + features["product_code"].astype(str)
        features = features.sort_values(["roll_rate_key", "date"])
        features["prev_dpd"] = features.groupby("roll_rate_key")["dpd"].shift(1).fillna(0)
        features["roll_rate_delta"] = features["dpd"] - features["prev_dpd"]
        features["roll_rate_direction"] = np.select([features["roll_rate_delta"] > 0, features["roll_rate_delta"] < 0], ["deteriorating", "improving"], default="stable")
        features["alert_usury_micro"] = ((features["customer_type"] == "micro") & (features["apr"] > 0.85)).astype(int)
        features["alert_high_utilization"] = (features["utilization_ratio"] > 0.95).astype(int)
        features["alert_high_dpd"] = (features["dpd"] >= 90).astype(int)
        features["alert_pdf_gap"] = 0
        alerts_records: List[Dict[str, Any]] = []
        alert_columns = {"alert_usury_micro": "critical", "alert_high_utilization": "high", "alert_high_dpd": "critical", "alert_pdf_gap": "medium"}
        for alert_col, severity in alert_columns.items():
            flagged = features[features[alert_col] == 1]
            for _, row in flagged.iterrows():
                alerts_records.append(dict(customer_id=row.get("customer_id"), rule=alert_col, severity=severity, details=f"DPD={row.get('dpd')}|Util={row.get('utilization_ratio'):.2f}"))
        alerts = pd.DataFrame(alerts_records) if alerts_records else pd.DataFrame(columns=["customer_id", "rule", "severity", "details"])
        return FeatureArtifacts(features=features.reset_index(drop=True), alerts=alerts)

feature_engineer = FeatureEngineer()
feature_artifacts = feature_engineer.transform(master_frame)
feature_frame = feature_artifacts.features
alerts_frame = feature_artifacts.alerts

Created sample master_frame with 3 records


  return np.nanmean(a, axis, out=out, keepdims=keepdims)


In [9]:
# KPI Calculation Engine
import numpy as np
import pandas as pd
from typing import Any, Dict
feature_frame = globals().get("feature_frame", pd.DataFrame())
alerts_frame = globals().get("alerts_frame", pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))

class KPIEngine:
    def __init__(self, frame: pd.DataFrame) -> None:
        self.frame = frame.copy()
        if not self.frame.empty:
            self.frame["date"] = pd.to_datetime(self.frame["date"], utc=True)
            self.frame["month"] = self.frame["date"].dt.to_period("M").dt.to_timestamp()

    def _ratio(self, numerator: pd.Series, denominator: pd.Series) -> float:
        denom = denominator.sum()
        if denom == 0:
            return float("nan")
        return numerator.sum() / denom

    def compute(self) -> Dict[str, Any]:
        if self.frame.empty:
            return {}

        result: Dict[str, Any] = {}
        current_frame = self.frame.copy()

        result["aum"] = current_frame["balance"].sum()
        result["active_clients"] = current_frame["customer_id"].nunique()
        result["credit_lines"] = current_frame.get("credit_limit", pd.Series(0, index=current_frame.index)).sum()

        churn_mask = current_frame.get("status", pd.Series("active", index=current_frame.index)).str.lower().eq("churned")
        result["churn_rate"] = churn_mask.mean()

        default_mask = current_frame.get("default_flag", pd.Series(0, index=current_frame.index)).astype(int)
        result["default_rate"] = default_mask.mean()

        dpd_group = current_frame.groupby("delinquency_bucket")["balance"].sum().rename("aum")
        result["dpd_buckets"] = dpd_group

        result["rotation"] = self._ratio(
            current_frame.get("payments", pd.Series(0, index=current_frame.index)),
            current_frame.get("balance", pd.Series(0, index=current_frame.index)),
        )

        result["weighted_apr"] = current_frame["weighted_apr"].mean()

        result["revenue"] = current_frame.get("interest_income", pd.Series(0, index=current_frame.index)).sum()
        result["ebitda"] = current_frame.get("ebitda", pd.Series(0, index=current_frame.index)).sum()

        result["concentration_top10"] = (
            current_frame.groupby("customer_id")["balance"].sum().nlargest(10).sum() / result["aum"]
            if result["aum"]
            else float("nan")
        )

        ltv = current_frame.get("ltv", pd.Series(0, index=current_frame.index))
        cac = current_frame.get("cac", pd.Series(np.nan, index=current_frame.index))
        current_frame["ltv_cac_ratio"] = np.where(cac.fillna(0) == 0, np.nan, ltv / cac)

        channel_col = "channel" if "channel" in current_frame.columns else "source_name"
        result["ltv_cac_by_segment"] = current_frame.groupby(["segment_code", channel_col]).ltv_cac_ratio.mean()

        result["nrr"] = self._ratio(
            current_frame.get("recurring_revenue", pd.Series(0, index=current_frame.index)),
            current_frame.get("starting_revenue", pd.Series(1, index=current_frame.index)),
        )

        result["nsm"] = current_frame.get("north_star_metric", pd.Series(0, index=current_frame.index)).mean()

        result["penetration"] = self._ratio(
            current_frame.get("active_products", pd.Series(0, index=current_frame.index)),
            current_frame.get("available_products", pd.Series(1, index=current_frame.index)),
        )

        result["b2g_percent"] = current_frame["b2g_flag"].mean()

        status_column = current_frame.get("status", pd.Series("active", index=current_frame.index)).str.lower()
        result["new_recurrent_recovered"] = status_column.value_counts(dropna=False)

        group_cols = ["industry", "kam_owner", "segment_code", "customer_type"]
        aggregation = current_frame.groupby(group_cols)["balance"].sum().rename("aum")
        result["aum_by_group"] = aggregation

        behavior_mask = (current_frame["customer_type"] == "micro") & (current_frame["apr"] > 0.85)
        result["usury_micro_share"] = behavior_mask.mean()

        result["pod"] = current_frame.get("probability_of_default", pd.Series(np.nan, index=current_frame.index)).mean()

        if not alerts_frame.empty:
            result["alerts_active"] = alerts_frame.groupby("severity").size()

        return result

kpi_engine = KPIEngine(feature_frame)
kpi_summary = kpi_engine.compute()

  self.frame["month"] = self.frame["date"].dt.to_period("M").dt.to_timestamp()


KeyError: 'source_name'

In [None]:
# Growth Analysis & Projections
import numpy as np
import pandas as pd
from typing import Any, Dict
feature_frame = globals().get("feature_frame", pd.DataFrame())
kpi_summary = globals().get("kpi_summary", {})

DEFAULT_TARGETS = dict(
    aum=1_200_000_000,
    revenue=95_000_000,
    ebitda=42_000_000,
    b2g_percent=0.35,
    churn_rate=0.03,
)

def build_monthly_path(frame: pd.DataFrame, metric: str) -> pd.DataFrame:
    if frame.empty or metric not in frame.columns:
        return pd.DataFrame()
    monthly = frame.groupby("month")[metric].sum().reset_index()
    monthly["monthly_growth"] = monthly[metric].pct_change().fillna(0)
    monthly["ramp_type"] = np.select(
        [monthly["monthly_growth"] > 0.1, monthly["monthly_growth"] < -0.05],
        ["ramp", "trough"],
        default="steady",
    )
    return monthly

def growth_gap_analysis(kpis: Dict[str, Any], targets: Dict[str, float]) -> pd.DataFrame:
    if not kpis:
        return pd.DataFrame()
    records = []
    for metric, target in targets.items():
        actual = kpis.get(metric, np.nan)
        gap = target - actual if not pd.isna(actual) else np.nan
        progress = actual / target if target else np.nan
        records.append(dict(metric=metric, actual=actual, target=target, gap=gap, progress=progress))
    df = pd.DataFrame(records)
    df["status"] = np.select(
        [df["progress"] >= 1, df["progress"] >= 0.8],
        ["achieved", "on_track"],
        default="gap",
    )
    return df

def acquisition_vs_churn(frame: pd.DataFrame) -> pd.DataFrame:
    if frame.empty:
        return pd.DataFrame()
    status_series = frame.get("status", pd.Series("active", index=frame.index)).str.lower()
    summary = status_series.value_counts().rename_axis("status").reset_index(name="count")
    summary["group"] = np.where(
        summary["status"].str.contains("new"),
        "acquisition",
        np.where(summary["status"].str.contains("churn"), "churn", "retained"),
    )
    return summary

growth_targets = DEFAULT_TARGETS
growth_gap_table = growth_gap_analysis(kpi_summary, growth_targets)
growth_path_balance = build_monthly_path(feature_frame, "balance") if not feature_frame.empty else pd.DataFrame()
acquisition_table = acquisition_vs_churn(feature_frame)

implication_multiplier = 8.9
if not growth_gap_table.empty:
    growth_gap_table["implied_scale"] = growth_gap_table["target"] * implication_multiplier

b2g_gap = (
    growth_targets.get("b2g_percent", np.nan) - kpi_summary.get("b2g_percent", np.nan)
) if kpi_summary else np.nan

In [None]:
# Marketing & Sales Analysis
import pandas as pd
from typing import Dict
feature_frame = globals().get("feature_frame", pd.DataFrame())

def marketing_sales_breakdown(frame: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    if frame.empty:
        return {}

    aggregations: Dict[str, pd.DataFrame] = {}
    group_fields = {
        "industry": ["industry"],
        "kam": ["kam_owner"],
        "channel": ["channel"] if "channel" in frame.columns else ["source_name"],
    }

    for label, fields in group_fields.items():
        grouped = frame.groupby(fields).agg(
            aum=("balance", "sum"),
            clients=("customer_id", "nunique"),
            weighted_apr=("weighted_apr", "mean"),
            ltv_cac=("ltv_cac_ratio", "mean"),
        ).reset_index()
        aggregations[label] = grouped

    return aggregations

marketing_sales_tables = marketing_sales_breakdown(feature_frame)
treemap_ready = marketing_sales_tables.get("industry") if marketing_sales_tables else pd.DataFrame()

In [None]:
# Risk Analysis & Roll Rate
import numpy as np
import pandas as pd
from typing import Any, Dict
feature_frame = globals().get("feature_frame", pd.DataFrame())

def delinquency_summary(frame: pd.DataFrame) -> pd.DataFrame:
    if frame.empty:
        return pd.DataFrame()
    summary = frame.groupby(["delinquency_bucket", "industry", "segment_code"]).agg(
        balance=("balance", "sum"),
        clients=("customer_id", "nunique"),
        pod_mean=("probability_of_default", "mean"),
    ).reset_index()
    total_balance = summary["balance"].sum()
    summary["delinquency_percent"] = summary["balance"] / total_balance if total_balance else np.nan
    return summary

def roll_rate_matrix(frame: pd.DataFrame) -> pd.DataFrame:
    if frame.empty:
        return pd.DataFrame()
    roll_data = frame.copy()
    roll_data["next_bucket"] = roll_data.groupby("roll_rate_key")["delinquency_bucket"].shift(-1)
    matrix = pd.crosstab(
        roll_data["delinquency_bucket"],
        roll_data["next_bucket"],
        normalize="index",
    ).fillna(0)
    return matrix

def npl_metrics(frame: pd.DataFrame) -> Dict[str, Any]:
    if frame.empty:
        return {}
    balance_total = frame["balance"].sum()
    npl = frame[frame["dpd"] >= 180]
    npl_balance = npl["balance"].sum()
    result = dict(
        npl180_balance=npl_balance,
        npl_ratio=(npl_balance / balance_total) if balance_total else np.nan,
    )
    if "ltv" in frame.columns:
        ltv_45 = frame[frame["dpd"] > 45]["ltv"].mean()
        ltv_90 = frame[frame["dpd"] > 90]["ltv"].mean()
        result["ltv_delta_45_90"] = ltv_45 - ltv_90 if not pd.isna(ltv_45) and not pd.isna(ltv_90) else np.nan
    else:
        result["ltv_delta_45_90"] = np.nan
    result["rotation_multiple"] = (
        frame.get("payments", pd.Series(0, index=frame.index)).sum() / balance_total
        if balance_total
        else np.nan
    )
    return result

risk_table = delinquency_summary(feature_frame)
roll_matrix = roll_rate_matrix(feature_frame)
risk_metrics = npl_metrics(feature_frame)
equifax_vs_dpd = (
    feature_frame[["equifax_score", "dpd"]].dropna()
    if "equifax_score" in feature_frame.columns
    else pd.DataFrame()
)

In [None]:
# Data Quality Audit
import numpy as np
import pandas as pd
from pandas.io.formats.style import Styler
from typing import Any, Dict, List, Optional, cast
try:
    import pdfplumber as _pdfplumber
except ImportError:
    _pdfplumber = None
pdfplumber = cast(Optional[Any], _pdfplumber)
feature_frame = globals().get("feature_frame", pd.DataFrame())

CRITICAL_COLUMNS = {"customer_id", "date", "balance", "dpd"}

def data_quality_audit(frame: pd.DataFrame) -> Dict[str, Any]:
    if frame.empty:
        return {"score": np.nan, "table": pd.DataFrame(), "styled": None, "pdf_completeness": 0.0}

    total_rows = len(frame)
    audit_records: List[Dict[str, Any]] = []
    penalties = 0.0

    for column in frame.columns:
        nulls = frame[column].isna().sum()
        zeros = (frame[column] == 0).sum() if pd.api.types.is_numeric_dtype(frame[column]) else np.nan
        coverage = 1 - (nulls / total_rows) if total_rows else np.nan
        if column in CRITICAL_COLUMNS and coverage < 0.9:
            penalties += 0.1
        audit_records.append(
            dict(column=column, nulls=int(nulls), zeros=int(zeros) if not pd.isna(zeros) else np.nan, coverage=coverage)
        )

    audit_table = pd.DataFrame(audit_records)
    coverage_mean = audit_table["coverage"].mean()
    quality_score = max(0.0, min(1.0, (coverage_mean if not pd.isna(coverage_mean) else 0.0) - penalties))

    def _color(value: float) -> str:
        if pd.isna(value):
            return "color: #E6E6EF; background-color: #3730A3"
        if value >= 0.95:
            return "color: #05101a; background-color: #22E7CC"
        if value >= 0.85:
            return "color: #F5F3FF; background-color: #2563EB"
        return "color: #F5F3FF; background-color: #B91C1C"

    styler = audit_table.style.format({"coverage": "{:.2%}"})
    apply_map = getattr(styler, "applymap", None)
    styled = apply_map(_color, subset=["coverage"]) if callable(apply_map) else styler

    pdf_completeness = 1.0 if pdfplumber else 0.0

    return dict(score=quality_score, table=audit_table, styled=styled, pdf_completeness=pdf_completeness)
quality_artifacts = data_quality_audit(feature_frame)
quality_score = quality_artifacts.get("score")
quality_table = quality_artifacts.get("table")
quality_styled = quality_artifacts.get("styled")

print(f"Data Quality Score: {quality_score:.2%}" if not pd.isna(quality_score) else "Data Quality Score: N/A")
print(f"PDF Completeness: {quality_artifacts.get('pdf_completeness', 0.0):.1%}")

if quality_styled is not None:
    display(quality_styled)
else:
    print("No data quality table to display - feature_frame is empty")

In [None]:
# AI Summary & Insights
%pip install google-generativeai

import logging
import os
import textwrap
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd

logger = logging.getLogger("abaco_ingestion")
if not logger.handlers:
    logger.addHandler(logging.NullHandler())

try:
    from google.generativeai.generative_models import GenerativeModel
    from google.generativeai.client import configure
except ImportError:
    GenerativeModel = None
    configure = None

kpi_summary = globals().get("kpi_summary", {})
risk_metrics = globals().get("risk_metrics", {})
growth_gap_table = globals().get("growth_gap_table", pd.DataFrame())
alerts_frame = globals().get("alerts_frame", pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))

def generate_ai_summary(
    kpis: Dict[str, Any],
    risk: Dict[str, Any],
    growth: pd.DataFrame,
    alerts: pd.DataFrame,
    market: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    gemini_key = os.getenv("GEMINI_API_KEY")
    prompt_lines: List[str] = []
    prompt_lines.append("ABACO Financial Intelligence Executive Summary")

    if kpis:
        prompt_lines.append(f"AUM: {kpis.get('aum', np.nan):,.0f}")
        prompt_lines.append(f"Default Rate: {kpis.get('default_rate', np.nan):.2%}")
        prompt_lines.append(f"B2G Share: {kpis.get('b2g_percent', np.nan):.2%}")

    if risk:
        prompt_lines.append(f"NPL 180+: {risk.get('npl180_balance', np.nan):,.0f}")

    if market:
        prompt_lines.append(f"MYPE GDP Share: {market.get('gdp_share', '48.8%')}")

    prompt_lines.append(f"Alerts Count: {len(alerts) if alerts is not None else 0}")
    prompt = "\n".join(prompt_lines)

    summary_text: Optional[str] = None

    if gemini_key and GenerativeModel is not None and configure is not None:
        try:
            configure(api_key=gemini_key)
            model = GenerativeModel("gemini-1.5-flash")
            response = model.generate_content(prompt)
            summary_text = getattr(response, "text", None)
        except Exception as exc:
            logger.warning("Gemini summary failed: %s", exc)

    if not summary_text:
        themes: List[str] = []
        if kpis:
            themes.append(
                f"Assets under management currently {kpis.get('aum', np.nan):,.0f} with weighted APR {kpis.get('weighted_apr', np.nan):.2%}."
            )
            themes.append(
                f"Default rate holding at {kpis.get('default_rate', np.nan):.2%} while churn {kpis.get('churn_rate', np.nan):.2%}."
            )
        if risk:
            themes.append(
                f"NPL ratio stands at {risk.get('npl_ratio', np.nan):.2%} with rotation multiple {risk.get('rotation_multiple', np.nan):.2f}."
            )
        if growth is not None and not growth.empty:
            gap_row = growth.sort_values("gap", ascending=False).head(1).to_dict(orient="records")
            if gap_row:
                themes.append(
                    f"Largest growth delta resides in {gap_row[0]['metric']} with gap {gap_row[0]['gap']:,.0f}."
                )
        if market:
            themes.append(
                f"Market TAM updated to {market.get('tam_update', '31,666')} with penetration insights incorporated."
            )
        summary_text = " ".join(textwrap.fill(theme, width=120) for theme in themes)

    return dict(prompt=prompt, summary=summary_text)

ai_summary = generate_ai_summary(kpi_summary, risk_metrics, growth_gap_table, alerts_frame, market=None)
summary_text = ai_summary.get("summary")
prompt_text = ai_summary.get("prompt")

print("AI Summary:")
print("=" * 50)
print(summary_text)
print("\nPrompt Used:")
print("-" * 30)
print(prompt_text)

In [None]:
# Visualizations & Exports
import json
import pathlib
from typing import Dict, Optional

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

feature_frame = globals().get("feature_frame", pd.DataFrame())
growth_path_balance = globals().get("growth_path_balance", pd.DataFrame())
roll_matrix = globals().get("roll_matrix", pd.DataFrame())
treemap_ready = globals().get("treemap_ready", pd.DataFrame())

EXPORT_DIR = pathlib.Path("artifacts/exports")
EXPORT_DIR.mkdir(parents=True, exist_ok=True)

def build_figures(
    frame: pd.DataFrame,
    growth: pd.DataFrame,
    risk: pd.DataFrame,
    treemap_source: pd.DataFrame,
) -> Dict[str, go.Figure]:
    figures: Dict[str, go.Figure] = {}

    if not frame.empty:
        line_df = frame.groupby("date")["balance"].sum().reset_index()
        figures["growth_line"] = px.line(line_df, x="date", y="balance", title="Growth Trajectory (4K Ready)")

        churn_df = frame.groupby("delinquency_bucket")["customer_id"].nunique().reset_index(name="clients")
        figures["churn_bar"] = px.bar(churn_df, x="delinquency_bucket", y="clients", title="Churn & Delinquency Bar")

        pie_df = frame.groupby("segment_code")["balance"].sum().reset_index()
        figures["segment_pie"] = px.pie(pie_df, names="segment_code", values="balance", title="Segment Penetration Pie")

        if "equifax_score" in frame.columns:
            figures["risk_scatter"] = px.scatter(
                frame,
                x="equifax_score",
                y="dpd",
                color="delinquency_bucket",
                title="Equifax vs DPD Scatter",
            )

    if not treemap_source.empty:
        figures["industry_treemap"] = px.treemap(
            treemap_source,
            path=[treemap_source.columns[0]],
            values="aum",
            color="weighted_apr",
            title="Industry TPV Treemap",
        )

    if isinstance(risk, pd.DataFrame) and not risk.empty:
        figures["roll_rate_heatmap"] = go.Figure(
            data=go.Heatmap(z=risk.values, x=risk.columns, y=risk.index, colorscale="Viridis")
        )
        figures["roll_rate_heatmap"].update_layout(title="Roll Rate Transition Matrix")

    return figures

def export_figures(figures: Dict[str, go.Figure], directory: pathlib.Path, scale: int = 1) -> None:
    for name, fig in figures.items():
        fig.update_layout(template="abaco_dark_ultra", width=3840, height=2160)
        export_path = directory / f"{name}.png"
        try:
            fig.write_image(export_path, width=3840, height=2160, scale=scale)
        except ValueError:
            print(f"Static export skipped for {name} (kaleido missing)")

def export_fact_table(frame: pd.DataFrame, directory: pathlib.Path) -> pathlib.Path:
    if frame.empty:
        raise ValueError("Feature frame is empty; nothing to export")
    csv_path = directory / "abaco_fact_table.csv"
    frame.to_csv(csv_path, index=False)
    return csv_path

figures = build_figures(feature_frame, growth_path_balance, roll_matrix, treemap_ready)
export_figures(figures, EXPORT_DIR)

fact_table_path: Optional[pathlib.Path] = None
if not feature_frame.empty:
    fact_table_path = export_fact_table(feature_frame, EXPORT_DIR)
    print(f"Fact table exported to {fact_table_path}")

html_path = EXPORT_DIR / "abaco_fact_table.html"
if not feature_frame.empty:
    feature_frame.sample(min(1000, len(feature_frame))).to_html(html_path, index=False)
    print(f"HTML table exported to {html_path}")

looker_placeholder = EXPORT_DIR / "looker_ready_manifest.json"
with open(looker_placeholder, "w", encoding="utf-8") as handle:
    json.dump(
        dict(
            dataset="abaco_financial_intelligence",
            exported=str(fact_table_path) if fact_table_path else "pending",
        ),
        handle,
        indent=2,
    )

print(f"Looker placeholder created at {looker_placeholder}")
print("Slack and HubSpot export hooks pending integration.")

In [None]:
# Market Analysis from MYPE 2025 PDF
import json
import pathlib
from typing import Any, Dict, List, Optional, cast

import pandas as pd

pdfplumber = cast(Optional[Any], globals().get("pdfplumber"))
generate_ai_summary = globals().get("generate_ai_summary", lambda *args, **kwargs: {"prompt": "", "summary": None})
kpi_summary = globals().get("kpi_summary", {})
risk_metrics = globals().get("risk_metrics", {})
growth_gap_table = globals().get("growth_gap_table", pd.DataFrame())
alerts_frame = globals().get("alerts_frame", pd.DataFrame(columns=["customer_id", "rule", "severity", "details"]))

MYPE_PDF_PATH = pathlib.Path("data/mype_report_2025.pdf")

def extract_market_insights(pdf_path: pathlib.Path) -> Dict[str, Any]:
    stats: Dict[str, Any] = dict(
        gdp_share="48.8%",
        tam_update="31,666",
        challenges=["Limited formal credit access", "Fragmented guarantee schemes"],
        opportunities=["Digital onboarding expansion", "Supply chain financing partnerships"],
        behavior_links="Micro-segment displays high cash-cycle volatility",
    )

    if pdfplumber is None:
        stats["source"] = "fallback"
        return stats
    insights_text: List[str] = []
    with pdfplumber.open(pdf_path) as pdf:
        for page_number, page in enumerate(pdf.pages, start=1):
            if page_number in {1, 2, 35}:
                text = page.extract_text() or ""
                insights_text.append(text)

    combined = " \n".join(insights_text)
    if combined:
        stats["source"] = "pdf"
        stats["raw_text_excerpt"] = combined[:2000]
    else:
        stats["source"] = "fallback"

    return stats

market_insights = extract_market_insights(MYPE_PDF_PATH)
if market_insights:
    print(json.dumps(market_insights, indent=2))

ai_summary = generate_ai_summary(kpi_summary, risk_metrics, growth_gap_table, alerts_frame, market=market_insights)
print("Updated AI summary with market context:", ai_summary.get("summary"))