In [None]:
@'
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Reconciliation MVP (ERP vs Billing vs DWH)\n",
    "\n",
    "Goal: reconcile revenue across three simulated systems and classify variances:\n",
    "- Missing Invoice (Billing)\n",
    "- DWH Duplicate\n",
    "- FX Mismatch\n",
    "- Timing Difference\n",
    "\n",
    "Outputs:\n",
    "- `variance_table` (contract-month granularity)\n",
    "- `cfo_summary` (top leakage drivers)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "\n",
    "RAW = 'data/raw'\n",
    "\n",
    "customers = pd.read_parquet(f'{RAW}/customers.parquet')\n",
    "contracts = pd.read_parquet(f'{RAW}/contracts.parquet')\n",
    "billing = pd.read_parquet(f'{RAW}/billing_invoices.parquet')\n",
    "erp = pd.read_parquet(f'{RAW}/erp_revenue.parquet')\n",
    "dw = pd.read_parquet(f'{RAW}/dw_sales.parquet')\n",
    "\n",
    "for name, df in [('customers', customers), ('contracts', contracts), ('billing', billing), ('erp', erp), ('dw', dw)]:\n",
    "    print(name, df.shape)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# ---- Normalize to contract-month (USD) ----\n",
    "\n",
    "billing_m = billing.copy()\n",
    "billing_m['month'] = billing_m['invoice_date'].dt.to_period('M').dt.to_timestamp()\n",
    "billing_m = billing_m.groupby(['contract_id','customer_id','month'], as_index=False)['amount_usd'].sum()\n",
    "billing_m = billing_m.rename(columns={'amount_usd':'billing_usd'})\n",
    "\n",
    "erp_m = erp.copy()\n",
    "erp_m['month'] = erp_m['revenue_date'].dt.to_period('M').dt.to_timestamp()\n",
    "erp_m = erp_m.groupby(['contract_id','customer_id','month'], as_index=False)['revenue_usd'].sum()\n",
    "erp_m = erp_m.rename(columns={'revenue_usd':'erp_usd'})\n",
    "\n",
    "dw_m = dw.copy()\n",
    "dw_m['month'] = dw_m['event_date'].dt.to_period('M').dt.to_timestamp()\n",
    "dw_m = dw_m.groupby(['contract_id','customer_id','month'], as_index=False)['gross_usd'].sum()\n",
    "dw_m = dw_m.rename(columns={'gross_usd':'dw_usd'})\n",
    "\n",
    "base = contracts[['contract_id','customer_id']].drop_duplicates()\n",
    "\n",
    "recon = base.merge(erp_m, on=['contract_id','customer_id'], how='left')\n",
    "recon = recon.merge(billing_m, on=['contract_id','customer_id','month'], how='left')\n",
    "recon = recon.merge(dw_m, on=['contract_id','customer_id','month'], how='left')\n",
    "\n",
    "# month comes from ERP merge; some contracts may have no ERP rows in a month -> handle later\n",
    "recon['erp_usd'] = recon['erp_usd'].fillna(0)\n",
    "recon['billing_usd'] = recon['billing_usd'].fillna(0)\n",
    "recon['dw_usd'] = recon['dw_usd'].fillna(0)\n",
    "\n",
    "# Variances\n",
    "recon['var_erp_vs_billing'] = recon['erp_usd'] - recon['billing_usd']\n",
    "recon['var_billing_vs_dw'] = recon['billing_usd'] - recon['dw_usd']\n",
    "recon['abs_erp_vs_billing'] = recon['var_erp_vs_billing'].abs()\n",
    "recon['abs_billing_vs_dw'] = recon['var_billing_vs_dw'].abs()\n",
    "\n",
    "recon.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# ---- Variance classification (simple rules, MVP) ----\n",
    "\n",
    "EPS = 1.0  # USD threshold to ignore tiny rounding\n",
    "\n",
    "def classify(row):\n",
    "    # Billing missing: ERP has revenue but billing has ~0\n",
    "    if row['erp_usd'] > EPS and row['billing_usd'] <= EPS:\n",
    "        return 'Missing Invoice (Billing)'\n",
    "    # DWH duplicate or DWH issue: billing exists but dw is materially higher/lower\n",
    "    if row['billing_usd'] > EPS and row['abs_billing_vs_dw'] > max(EPS, 0.03 * row['billing_usd']):\n",
    "        # if dw > billing, very likely duplicates\n",
    "        if row['dw_usd'] > row['billing_usd']:\n",
    "            return 'DWH Duplicate / Overcount'\n",
    "        return 'DWH Undercount / Missing Events'\n",
    "    # FX mismatch or timing: ERP vs billing differs while both exist\n",
    "    if row['billing_usd'] > EPS and row['abs_erp_vs_billing'] > max(EPS, 0.03 * row['billing_usd']):\n",
    "        # FX mismatch tends to be a smaller % drift\n",
    "        pct = row['abs_erp_vs_billing'] / max(row['billing_usd'], EPS)\n",
    "        if pct <= 0.05:\n",
    "            return 'FX Mismatch (ERP vs Billing)'\n",
    "        return 'Timing Difference (RevRec)'\n",
    "    return 'OK'\n",
    "\n",
    "recon['variance_type'] = recon.apply(classify, axis=1)\n",
    "\n",
    "recon['variance_type'].value_counts()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# ---- CFO summary: top leakage drivers ----\n",
    "\n",
    "out = recon.merge(customers[['customer_id','customer_name','segment']], on='customer_id', how='left')\n",
    "\n",
    "# Define leakage as positive ERP vs Billing variance (recognized but not billed)\n",
    "out['leakage_usd'] = out['var_erp_vs_billing'].clip(lower=0)\n",
    "\n",
    "cfo_by_type = (out.groupby('variance_type', as_index=False)\n",
    "               .agg(rows=('contract_id','count'), leakage_usd=('leakage_usd','sum'))\n",
    "               .sort_values('leakage_usd', ascending=False))\n",
    "\n",
    "top_customers = (out.groupby(['customer_id','customer_name'], as_index=False)\n",
    "                 .agg(leakage_usd=('leakage_usd','sum'))\n",
    "                 .sort_values('leakage_usd', ascending=False)\n",
    "                 .head(15))\n",
    "\n",
    "cfo_by_type, top_customers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# ---- Export artifacts for the repo (processed outputs) ----\n",
    "\n",
    "Path = __import__('pathlib').Path\n",
    "Path('data/processed').mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "variance_table = out[['contract_id','customer_id','customer_name','month','erp_usd','billing_usd','dw_usd',\n",
    "                     'var_erp_vs_billing','var_billing_vs_dw','variance_type','leakage_usd']].copy()\n",
    "\n",
    "variance_table.to_parquet('data/processed/variance_table.parquet', index=False)\n",
    "cfo_by_type.to_parquet('data/processed/cfo_summary_by_type.parquet', index=False)\n",
    "top_customers.to_parquet('data/processed/top_customers_by_leakage.parquet', index=False)\n",
    "\n",
    "print('Saved: data/processed/variance_table.parquet')\n",
    "print('Saved: data/processed/cfo_summary_by_type.parquet')\n",
    "print('Saved: data/processed/top_customers_by_leakage.parquet')\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python (.venv)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
'@ | Set-Content -Encoding UTF8 notebooks\02_reconciliation_mvp.ipynb
