# Practical Workflows with Moniker

Four business workflows using real data from the Moniker catalog:

1. **Fixed Income Relative Value** -- yield curve construction, swap spreads, sovereign cross-market analysis
2. **Credit Risk Dashboard** -- exposure aggregation, limit monitoring, breach detection
3. **MBS Pool Analysis** -- agency comparison, prepayment trends, outlier detection
4. **Discovery & Governance** -- catalog exploration, ownership audit, data quality gates

In [None]:
import sys, os
import pandas as pd

# moniker_client is pip-installed (pip install -e ~/open-moniker-client)
# We only need path setup for the mock data adapters used in demo mode
for p in [
    os.path.expanduser("~/open-moniker-svc/src"),
    os.path.expanduser("~/open-moniker-svc/external/moniker-data/src"),
]:
    if p not in sys.path:
        sys.path.insert(0, p)

from moniker_client import (
    Moniker, MonikerClient, ClientConfig,
    CatalogReflector,
)

from moniker_data.adapters.oracle import MockOracleAdapter
from moniker_data.adapters.snowflake import MockSnowflakeAdapter
from moniker_data.adapters.rest import MockRestAdapter
from moniker_data.adapters.excel import MockExcelAdapter
from moniker_data.adapters.mssql import MockMssqlAdapter

MockOracleAdapter()
MockSnowflakeAdapter()
MockRestAdapter()
MockExcelAdapter()
MockMssqlAdapter()

client = MonikerClient(config=ClientConfig(service_url="http://localhost:8050"))
reflector = CatalogReflector(client=client)

pd.options.display.float_format = "{:,.2f}".format
print("Client ready:", client.config.service_url)

---
## Workflow 1: Fixed Income Relative Value

Compare Treasury yields, sovereign spreads, and swap rates across the curve.

In [None]:
# Fetch fixed income data from Snowflake
ust_result = client.fetch("fixed_income.govies/treasury/ALL/ALL", limit=10000)
sov_result = client.fetch("fixed_income.govies/sovereign/ALL/ALL", limit=10000)
swap_result = client.fetch("rates.swap/USD/ALL", limit=10000)

ust = pd.DataFrame(ust_result.data)
sov = pd.DataFrame(sov_result.data)
swaps = pd.DataFrame(swap_result.data)

print(f"UST:       {ust.shape[0]} rows x {ust.shape[1]} cols")
print(f"Sovereign: {sov.shape[0]} rows x {sov.shape[1]} cols")
print(f"Swaps:     {swaps.shape[0]} rows x {swaps.shape[1]} cols")

# Filter to latest date
ust_latest = ust[ust["ASOF_DATE"] == ust["ASOF_DATE"].max()].copy()
sov_latest = sov[sov["ASOF_DATE"] == sov["ASOF_DATE"].max()].copy()
swap_latest = swaps[swaps["ASOF_DATE"] == swaps["ASOF_DATE"].max()].copy()

print(f"\nLatest dates: UST={ust['ASOF_DATE'].max()}, Sovereign={sov['ASOF_DATE'].max()}, Swaps={swaps['ASOF_DATE'].max()}")

# Build UST yield curve
curve = ust_latest[["TENOR", "YIELD", "DURATION", "CONVEXITY"]].sort_values("TENOR").reset_index(drop=True)
print(f"\nUST Yield Curve ({len(curve)} tenors)")
print(curve.to_string(index=False))

# Join swap rates and calculate swap spread
swap_tenors = swap_latest[["TENOR", "PAR_RATE"]].rename(columns={"PAR_RATE": "SWAP_RATE"})
combined = curve.merge(swap_tenors, on="TENOR", how="left")
combined["SWAP_SPREAD_BPS"] = ((combined["SWAP_RATE"] - combined["YIELD"]) * 100).round(1)

print(f"\nCombined Curve with Swap Spreads")
print(combined.to_string(index=False))

In [None]:
# --- Sovereign spread matrix: COUNTRY x TENOR showing SPREAD_VS_USD ---
print("Sovereign Spread vs UST (bps) by Country and Tenor")
spread_matrix = sov_latest.pivot_table(
    values="SPREAD_VS_USD", index="COUNTRY", columns="TENOR", aggfunc="mean"
)
print(spread_matrix.round(1).to_string())

# --- Duration/Convexity ladder across UST curve ---
print("\n\nUST Duration & Convexity Ladder")
ladder = ust_latest[["TENOR", "YIELD", "DURATION", "CONVEXITY"]].sort_values("TENOR").reset_index(drop=True)
ladder["DUR_x_CONV"] = (ladder["DURATION"] * ladder["CONVEXITY"].abs()).round(3)
print(ladder.to_string(index=False))

# --- 30-day yield change by tenor ---
print("\n\n30-Day Yield Change by Tenor (bps)")
first_date = ust["ASOF_DATE"].min()
last_date = ust["ASOF_DATE"].max()

ust_first = ust[ust["ASOF_DATE"] == first_date][["TENOR", "YIELD"]].rename(columns={"YIELD": "YIELD_START"})
ust_last = ust[ust["ASOF_DATE"] == last_date][["TENOR", "YIELD"]].rename(columns={"YIELD": "YIELD_END"})

yield_chg = ust_first.merge(ust_last, on="TENOR").sort_values("TENOR").reset_index(drop=True)
yield_chg["CHANGE_BPS"] = ((yield_chg["YIELD_END"] - yield_chg["YIELD_START"]) * 100).round(1)
print(f"Period: {first_date} to {last_date}")
print(yield_chg.to_string(index=False))

---
## Workflow 2: Credit Risk Dashboard

Aggregate counterparty exposures, join against approved limits, flag breaches.

In [2]:
# Fetch credit data from MS-SQL
exp_result = client.fetch("credit.exposures", limit=10000)
lim_result = client.fetch("credit.limits", limit=10000)

exp = pd.DataFrame(exp_result.data)
lim = pd.DataFrame(lim_result.data)
print(f"Exposures: {len(exp)} rows x {len(exp.columns)} cols")
print(f"Limits:    {len(lim)} rows x {len(lim.columns)} cols")

# Latest date snapshot
latest_date = exp["ASOF_DATE"].max()
latest = exp[exp["ASOF_DATE"] == latest_date].copy()
print(f"\nSnapshot date: {latest_date} ({len(latest)} rows)")

# Aggregate by counterparty
by_cp = (
    latest
    .groupby(["COUNTERPARTY_ID", "COUNTERPARTY_NAME", "SECTOR", "RATING"])
    .agg(NOTIONAL=("NOTIONAL", "sum"), CVA=("CVA", "sum"), EXPECTED_LOSS=("EXPECTED_LOSS", "sum"))
    .reset_index()
)

# Join with SingleName limits
sn_limits = lim[lim["LIMIT_TYPE"] == "SingleName"][["COUNTERPARTY_ID", "LIMIT_AMOUNT"]].copy()
dashboard = by_cp.merge(sn_limits, on="COUNTERPARTY_ID", how="left")

# Utilization and breach detection
dashboard["UTILIZATION_PCT"] = (dashboard["NOTIONAL"] / dashboard["LIMIT_AMOUNT"] * 100).round(1)
dashboard["BREACH"] = dashboard["UTILIZATION_PCT"] > 100
dashboard = dashboard.sort_values("UTILIZATION_PCT", ascending=False)

print(f"\n{'CP':<7} {'Name':<22} {'Sector':<18} {'Notional':>16} {'Limit':>16} {'Util%':>7} {'Breach'}")
print("-" * 108)
for _, r in dashboard.iterrows():
    flag = "*** YES" if r["BREACH"] else ""
    print(f"{r['COUNTERPARTY_ID']:<7} {r['COUNTERPARTY_NAME']:<22} {r['SECTOR']:<18} "
          f"{r['NOTIONAL']:>16,.0f} {r['LIMIT_AMOUNT']:>16,.0f} {r['UTILIZATION_PCT']:>6.1f}% {flag}")

breaches = dashboard["BREACH"].sum()
print(f"\nBreaches: {breaches} of {len(dashboard)} counterparties")

Exposures: 736 rows x 15 cols
Limits:    32 rows x 7 cols

Snapshot date: 2026-02-11 (32 rows)

CP      Name                   Sector                     Notional            Limit   Util% Breach
------------------------------------------------------------------------------------------------------------
CP001   Goldman Sachs Group    Financials            1,195,930,545      546,198,735  219.0% *** YES
CP005   BHP Group Ltd          Mining                1,513,291,844      764,248,174  198.0% *** YES
CP004   Shell plc              Energy                1,503,301,524      993,470,310  151.3% *** YES
CP007   Samsung Electronics    Technology            1,086,110,703    1,293,877,805   83.9% 
CP003   Toyota Motor Corp      Automotive            1,146,351,628    1,676,219,440   68.4% 
CP002   Deutsche Bank AG       Financials            1,138,581,901    1,768,003,829   64.4% 
CP006   Petrobras SA           Energy                  960,453,110    1,559,681,511   61.6% 
CP008   Nestlé SA       

In [3]:
# --- Sector concentration ---
sector = latest.groupby("SECTOR")["NOTIONAL"].sum().sort_values(ascending=False)
sector_pct = (sector / sector.sum() * 100).round(1)

print("Sector Concentration")
print(f"{'Sector':<20} {'Notional':>16} {'% Total':>8}")
print("-" * 46)
for s, val in sector.items():
    print(f"{s:<20} {val:>16,.0f} {sector_pct[s]:>7.1f}%")

# --- Rating x ExposureType pivot (in millions) ---
print("\n\nRating x Exposure Type ($ millions)")
pivot = (
    latest
    .pivot_table(values="NOTIONAL", index="RATING", columns="EXPOSURE_TYPE",
                 aggfunc="sum", fill_value=0)
    / 1_000_000
)
print(pivot.round(1).to_string())

# --- 30-day exposure time series summary ---
print("\n\n30-Day Exposure Summary by Counterparty")
ts = exp.groupby("COUNTERPARTY_ID")["NOTIONAL"].agg(["min", "max", "mean", "std"])
ts["range_pct"] = ((ts["max"] - ts["min"]) / ts["mean"] * 100).round(1)
print(ts.round(0).to_string())

Sector Concentration
Sector                       Notional  % Total
----------------------------------------------
Energy                  2,463,754,634    26.6%
Financials              2,334,512,446    25.2%
Mining                  1,513,291,844    16.3%
Automotive              1,146,351,628    12.4%
Technology              1,086,110,703    11.7%
Consumer Staples          724,408,633     7.8%


Rating x Exposure Type ($ millions)
EXPOSURE_TYPE  Derivative  Guarantee   Loan  TradeFinance
RATING                                                   
A                  346.90     452.10 240.20        464.10
A+                 782.40     898.20 588.00        391.00
A-                 482.10     450.80  83.90        121.80
AA                  19.20      47.40 302.00        355.70
AA-                519.00     685.50 578.70        498.90
BB-                424.20      90.90   6.50        438.80


30-Day Exposure Summary by Counterparty
                          min            max           mean

---
## Workflow 3: MBS Pool Analysis

Analyze mortgage-backed securities pools across agencies and coupon types.

In [4]:
# Fetch MBS pool data from Excel source
mbs_result = client.fetch("reports/regulatory/2026Q1/summary", limit=10000)
mbs = pd.DataFrame(mbs_result.data)
print(f"MBS pools: {len(mbs)} rows x {len(mbs.columns)} cols")

# Latest snapshot
latest_mbs_date = mbs["ASOF_DATE"].max()
snap = mbs[mbs["ASOF_DATE"] == latest_mbs_date].copy()
print(f"Snapshot date: {latest_mbs_date} ({len(snap)} rows)")

# Agency-level aggregates
agency_agg = snap.groupby("AGENCY").agg(
    POOLS=("POOL_ID", "nunique"),
    TOTAL_BALANCE=("CURRENT_BALANCE", "sum"),
    AVG_CPR_1M=("CPR_1M", "mean"),
    AVG_CPR_3M=("CPR_3M", "mean"),
    AVG_OAS=("OAS", "mean"),
    AVG_DURATION=("DURATION", "mean"),
    AVG_CONVEXITY=("CONVEXITY", "mean"),
)
print(f"\nAgency Summary (as of {latest_mbs_date})")
print(agency_agg.round(2).to_string())

# Coupon comparison: 15Y vs 20Y vs 30Y
coupon_agg = snap.groupby("COUPON").agg(
    POOLS=("POOL_ID", "nunique"),
    AVG_WAC=("WAC", "mean"),
    AVG_DURATION=("DURATION", "mean"),
    AVG_OAS=("OAS", "mean"),
    AVG_CPR_1M=("CPR_1M", "mean"),
    TOTAL_BALANCE=("CURRENT_BALANCE", "sum"),
)
print("\n\nCoupon Comparison (15Y vs 20Y vs 30Y)")
print(coupon_agg.round(3).to_string())

MBS pools: 270 rows x 21 cols
Snapshot date: 2026-01-15 (45 rows)

Agency Summary (as of 2026-01-15)
        POOLS  TOTAL_BALANCE  AVG_CPR_1M  AVG_CPR_3M  AVG_OAS  AVG_DURATION  AVG_CONVEXITY
AGENCY                                                                                    
FHLMC       5     3346428191        9.55       10.65    56.23          5.48          -1.19
FNMA        5     3735916105        9.18        9.78    49.69          5.41          -1.22
GNMA        5     3484443637        9.29       10.07    53.28          5.45          -1.29


Coupon Comparison (15Y vs 20Y vs 30Y)
        POOLS  AVG_WAC  AVG_DURATION  AVG_OAS  AVG_CPR_1M  TOTAL_BALANCE
COUPON                                                                  
15Y        15     0.05          5.19    52.77        8.67     3872000872
20Y        15     0.06          5.53    58.40       10.51     3395142312
30Y        15     0.07          5.62    48.04        8.84     3299644749


In [5]:
# --- Prepayment speed trends over 6-month history by agency ---
print("Prepayment Speed Trends (CPR_1M) by Agency")
trend = (
    mbs.groupby(["ASOF_DATE", "AGENCY"])["CPR_1M"]
    .mean()
    .unstack("AGENCY")
    .sort_index()
)
print(trend.round(2).to_string())

# --- Balance runoff (ORIGINAL vs CURRENT) ---
print("\n\nBalance Runoff (Original vs Current)")
runoff = snap.groupby("AGENCY").agg(
    ORIGINAL=("ORIGINAL_BALANCE", "sum"),
    CURRENT=("CURRENT_BALANCE", "sum"),
)
runoff["RUNOFF_PCT"] = ((1 - runoff["CURRENT"] / runoff["ORIGINAL"]) * 100).round(1)
print(runoff.to_string())

# --- Pools with outlier CPR (> 2 std dev from mean) ---
print("\n\nPools with Outlier CPR (>2 std dev from mean)")
mean_cpr = snap["CPR_1M"].mean()
std_cpr = snap["CPR_1M"].std()
lo, hi = mean_cpr - 2 * std_cpr, mean_cpr + 2 * std_cpr

outliers = snap[(snap["CPR_1M"] > hi) | (snap["CPR_1M"] < lo)].copy()
outliers = outliers[["POOL_ID", "AGENCY", "COUPON", "CPR_1M", "CURRENT_BALANCE"]]
outliers = outliers.sort_values("CPR_1M", ascending=False)

print(f"Mean CPR_1M: {mean_cpr:.1f},  Std: {std_cpr:.1f},  Thresholds: [{lo:.1f}, {hi:.1f}]")
if len(outliers) > 0:
    print(outliers.to_string(index=False))
else:
    print("No outlier pools found.")

Prepayment Speed Trends (CPR_1M) by Agency
AGENCY      FHLMC  FNMA  GNMA
ASOF_DATE                    
2025-08-18   9.76  8.54 10.10
2025-09-17  10.03  8.69  9.39
2025-10-17  10.27  8.87  8.86
2025-11-16   9.93  9.13  9.11
2025-12-16   9.74  8.39  9.26
2026-01-15   9.55  9.18  9.29


Balance Runoff (Original vs Current)
          ORIGINAL     CURRENT  RUNOFF_PCT
AGENCY                                    
FHLMC   4956631613  3346428191       32.50
FNMA    4875716500  3735916105       23.40
GNMA    3903202644  3484443637       10.70


Pools with Outlier CPR (>2 std dev from mean)
Mean CPR_1M: 9.3,  Std: 2.6,  Thresholds: [4.1, 14.6]
No outlier pools found.


---
## Workflow 4: Discovery & Governance

Explore the catalog, inspect before fetching, audit ownership and data quality.

In [6]:
# Step 1: What's in the catalog?
stats = reflector.stats()
print(f"Catalog: {stats.total_monikers} monikers across {len(stats.by_source_type)} source types")
print(f"Sources: {dict(stats.by_source_type)}")

# Step 2: Search for relevant data
print("\n--- Search: 'credit' ---")
results = reflector.search("credit")
for r in results.results:
    print(f"  {r['path']:<30} source={r.get('source_type', '?'):<8} tags={r.get('tags', [])}")

# Step 3: Inspect schema before fetching
print("\n--- Schema: credit.exposures ---")
schema = reflector.schema("credit.exposures")
print(f"Granularity: {schema.granularity}")
print(f"Primary key: {schema.primary_key}")
print(f"Columns ({len(schema.columns)}):")
for col in schema.columns:
    print(f"  {col['name']:<22} {col.get('type',''):<10} {col.get('description', '')}")

print("\nPattern: Discover -> Inspect -> Decide -> Fetch")

Catalog: 27 monikers across 7 source types
Sources: {'snowflake': 3, 'rest': 4, 'oracle': 1, 'static': 1, 'opensearch': 1, 'excel': 1, 'mssql': 2}

--- Search: 'credit' ---
  credit                         source=?        tags=[]
  credit.exposures               source=?        tags=['counterparty', 'timeseries', 'credit', 'mssql', 'risk']
  credit.limits                  source=?        tags=['governance', 'limits', 'credit', 'mssql', 'risk']

--- Schema: credit.exposures ---
Granularity: daily per-counterparty per-exposure-type
Primary key: ['ASOF_DATE', 'COUNTERPARTY_ID', 'EXPOSURE_TYPE']
Columns (15):
  ASOF_DATE              date       Business date of the exposure snapshot
  COUNTERPARTY_ID        string     Unique counterparty identifier
  COUNTERPARTY_NAME      string     Legal entity name
  SECTOR                 string     Industry sector classification
  RATING                 string     Credit rating (S&P scale)
  COUNTRY                string     Country of incorporation (

In [7]:
# --- Ownership audit ---
print("=== Domain Governance: credit ===")
desc = client.describe("credit")
own = desc.get("ownership", {})
print(f"  Owner:   {own.get('accountable_owner', 'n/a')}")
print(f"  Support: {own.get('support_channel', 'n/a')}")

# --- Lineage ---
print("\n=== Lineage: credit.exposures ===")
lineage = client.lineage("credit.exposures")
own_lin = lineage.get("ownership", {})
print(f"  Owner:     {own_lin.get('accountable_owner')} (from {own_lin.get('accountable_owner_defined_at')})")
print(f"  Hierarchy: {' -> '.join(lineage.get('path_hierarchy', []))}")

# --- Data quality gate ---
print("\n=== Data Quality Gate ===")
meta = client.metadata("credit.exposures")
dq = meta.data_quality or {}
print(f"  Quality score: {dq.get('quality_score', 'n/a')}")
print(f"  Known issues:  {dq.get('known_issues', [])}")

# --- Post-fetch validation ---
print("\n=== Post-Fetch Validation ===")
val_result = client.fetch("credit.exposures", limit=10000)
df = pd.DataFrame(val_result.data)

checks = {
    "NOTIONAL > 0": (df["NOTIONAL"] > 0).all(),
    "LGD in [0, 1]": df["LGD"].between(0, 1).all(),
    "PD in [0, 1]": df["PD"].between(0, 1).all(),
    "EL approx NOTIONAL*LGD*PD": (
        (df["EXPECTED_LOSS"] - df["NOTIONAL"] * df["LGD"] * df["PD"]).abs().max() < 1.0
    ),
}

all_pass = True
for check, passed in checks.items():
    status = "PASS" if passed else "FAIL"
    if not passed:
        all_pass = False
    print(f"  [{status}] {check}")

print(f"\nAll validations passed: {all_pass}")

=== Domain Governance: credit ===
  Owner:   credit-risk-governance@firm.com
  Support: #credit-risk-data

=== Lineage: credit.exposures ===
  Owner:     credit-risk-governance@firm.com (from credit)
  Hierarchy:  -> credit.exposures

=== Data Quality Gate ===
  Quality score: 92
  Known issues:  ['Weekend gaps in timeseries — no weekend data points', 'Petrobras (CP006) rating may lag by 1 business day']

=== Post-Fetch Validation ===


  [PASS] NOTIONAL > 0
  [PASS] LGD in [0, 1]
  [PASS] PD in [0, 1]
  [PASS] EL approx NOTIONAL*LGD*PD

All validations passed: True


---
## Summary

| Workflow | Source | Rows | Business Question |
|---|---|---|---|
| FI Relative Value | `fixed_income.govies/treasury` + `rates.swap` + `fixed_income.govies/sovereign` (Snowflake) | ~1,400 | Where are swap spreads wide and how do sovereign curves compare to UST? |
| Credit Risk Dashboard | `credit.exposures` + `credit.limits` (MS-SQL) | ~960 + 32 | Which counterparties breach their single-name limits? |
| MBS Pool Analysis | `reports/regulatory/2026Q1/summary` (Excel) | ~270 | How do prepayment speeds and balances vary across agencies and coupons? |
| Discovery & Governance | Catalog API | -- | How do I find, inspect, and validate data before using it? |