
# Accounting approach, Naive DD per Bharath and Shumway (2008). No solver.

Implements Bharath and Shumway naive DD. Uses \(\hat V = E + F\), \(\hat\sigma_D = 0.05 + 0.25 \sigma_E\), value-weighted \(\hat\sigma_V\), and \(\hat\mu = r_{i,t−1}\). No solver.

This notebook rebuilds the accounting-based distance-to-default (DD) workflow without invoking a numerical solver. It follows the Bharath and Shumway (2008) naive specification using only the accounting file columns.



## 1. Environment setup

Install the minimal dependencies required to reproduce the accounting workflow locally.


In [None]:
# 1. Install needed packages (run once per environment)
%pip install pandas numpy


## 2. Imports, configuration, and helper utilities

Load core libraries, set display defaults, and define helper functions used throughout the notebook.


In [None]:
import math
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

try:
    from scipy.stats import norm
    Phi = norm.cdf
except Exception:
    from math import erf
    def Phi(x):  # normal CDF fallback
        x = np.asarray(x, dtype=float)
        return 0.5*(1.0 + np.vectorize(erf)(x/np.sqrt(2.0)))

pd.set_option('display.width', 120)
pd.set_option('display.max_columns', 40)

MM = 1_000_000.0
T = 1.0


def find_repo_root(start: Path, marker: str = '.git') -> Path:
    """Walk up from *start* until a directory containing *marker* is found."""
    current = start.resolve()
    for candidate in [current, *current.parents]:
        if (candidate / marker).exists():
            return candidate
    return current


def winsorize_series(series: pd.Series, lower: float = 0.01, upper: float = 0.99) -> pd.Series:
    """Clip *series* to the given quantile range, ignoring NaNs."""
    clean = series.dropna()
    if clean.empty:
        return series
    ql, qh = np.nanpercentile(clean, [lower * 100, upper * 100])
    if not np.isfinite(ql) or not np.isfinite(qh) or ql > qh:
        return series
    return series.clip(ql, qh)


base_dir = find_repo_root(Path.cwd())
print(f"Repository root: {base_dir}")

model_fp   = base_dir / 'data' / 'clean' / 'Book2_clean.csv'
output_dir = base_dir / 'data' / 'merged_inputs'
log_dir    = base_dir / 'data' / 'logs'

output_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)

print(f"Accounting input -> {'FOUND' if model_fp.exists() else 'MISSING'}")


## 3. Load and normalise accounting inputs

Read the accounting file, harmonise column names, and enforce expected data types for key fields such as instrument, year, and balance sheet magnitudes.


In [None]:
print('[INFO] Loading accounting data…')
df_raw = pd.read_csv(model_fp)
print(f"→ {df_raw.shape[0]} rows before cleaning")

# Standardise column names we rely on
rename_map = {
    'nstrument': 'instrument',
    'weighted_average_cost_of_capital,_(%)': 'wacc_pct',
    'wacc_tax_rate,_(%)': 'wacc_tax_rate_pct',
    'wacc_cost_of_debt,_(%)': 'wacc_cost_of_debt_pct',
    'wacc_debt_weight,_(%)': 'wacc_debt_weight_pct',
    'wacc_equity_weight,_(%)': 'wacc_equity_weight_pct',
}

df = df_raw.rename(columns=rename_map).copy()

# Drop columns that are clearly placeholders
unnamed_cols = [c for c in df.columns if c.lower().startswith('unnamed')]
if unnamed_cols:
    df = df.drop(columns=unnamed_cols)

# Instrument string cleanup
if 'instrument' in df.columns:
    df['instrument'] = (df['instrument']
                        .astype(str)
                        .str.strip()
                        .str.replace('"', '', regex=False)
                        .str.upper())
else:
    raise KeyError('`instrument` column not found after renaming.')

# Year as integer panel key
df['year'] = pd.to_numeric(df['year'], errors='coerce').astype('Int64')

# Numeric conversions for balance sheet figures
for col in ['total_assets', 'debt_total', 'price_to_book_value_per_share', 'd/e', 'rit', 'rit_rf', 'new_wacc']:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

# Convert WACC weights from percentages to decimals when present
for src, dest in [('wacc_equity_weight_pct', 'wacc_equity_weight'),
                  ('wacc_debt_weight_pct', 'wacc_debt_weight')]:
    if src in df.columns:
        df[dest] = pd.to_numeric(df[src], errors='coerce') / 100.0

assert (df['total_assets'].dropna() >= 0).all() and (df['debt_total'].dropna() >= 0).all(), 'Assets/debt must be nonnegative'
if not df['rit'].between(-1, 1).all(skipna=True):
    print('[WARN] rit outside [-1,1]. Ensure rit is a decimal return, not percent.')

print(df[['instrument', 'year', 'total_assets', 'debt_total']].head())


## 4. Construct book equity and market equity proxies

Compute book equity along with the three Bharath–Shumway equity proxies (price-to-book, D/E, and WACC weights). Select the best available proxy and record its source for auditability.


In [None]:
# Book equity in USD
required_cols = ['total_assets', 'debt_total']
missing_required = [c for c in required_cols if c not in df.columns]
if missing_required:
    raise KeyError(f"Missing required columns: {missing_required}")

df['assets_usd'] = pd.to_numeric(df['total_assets'], errors='coerce') * MM
df['debt_usd'] = pd.to_numeric(df['debt_total'], errors='coerce') * MM
df['be_usd'] = (df['total_assets'] - df['debt_total']) * MM

# Market equity proxies
df['E_pb'] = np.where(
    (df['price_to_book_value_per_share'] > 0) & (df['be_usd'] > 0),
    df['price_to_book_value_per_share'] * df['be_usd'],
    np.nan
)

df['E_de'] = np.where(
    (df['d/e'] > 0) & (df['debt_usd'] > 0),
    df['debt_usd'] / df['d/e'],
    np.nan
)

equity_weight = pd.to_numeric(
    df.get('wacc_equity_weight', pd.Series(index=df.index, dtype=float)),
    errors='coerce'
)
debt_weight = pd.to_numeric(
    df.get('wacc_debt_weight', pd.Series(index=df.index, dtype=float)),
    errors='coerce'
)
tolerance = 1e-3
weights_sum = equity_weight + debt_weight
wacc_mask = (
    (equity_weight > 0)
    & (debt_weight > 0)
    & (np.abs(weights_sum - 1) <= tolerance)
)

df['E_wacc'] = np.where(
    wacc_mask,
    df['assets_usd'] * equity_weight,
    np.nan
)

# Prioritise proxies: price-to-book, then D/E, then WACC
values = []
sources = []
for _, row in df[['E_pb', 'E_de', 'E_wacc']].iterrows():
    value = np.nan
    source = 'missing'
    for key in ['E_pb', 'E_de', 'E_wacc']:
        v = row[key]
        if pd.notna(v) and v > 0:
            value = v
            source = key
            break
    values.append(value)
    sources.append(source)

df['E'] = values
df['E_source'] = sources

df['weak_E_proxy'] = df['E_source'].isin(['E_de', 'E_wacc'])

df['F'] = df['debt_usd']

print(df[['instrument', 'year', 'be_usd', 'E', 'E_source', 'F']].head())



## 5. Equity volatility proxy with rolling window, imputation, and winsorisation

Following the reference guidance, compute a three-year rolling standard deviation of annual equity returns, fall back to size-bucket medians when the rolling window lacks sufficient history, and winsorise the result at the 1st and 99th percentiles.


In [None]:
df = df.sort_values(['instrument', 'year']).reset_index(drop=True)

# Size buckets from dummy indicators (default to 'small')
size_bucket = np.select(
    [df.get('dummylarge', 0) == 1, df.get('dummymid', 0) == 1],
    ['large', 'mid'],
    default='small'
)
df['size_bucket'] = size_bucket

# Rolling volatility of prior returns
def rolling_sigma(series: pd.Series) -> pd.Series:
    shifted = series.shift(1)
    return shifted.rolling(window=3, min_periods=2).std()

def rolling_count(series: pd.Series) -> pd.Series:
    shifted = series.shift(1)
    return shifted.rolling(window=3, min_periods=1).count()

df['sigma_E_raw'] = (
    df.groupby('instrument', group_keys=False)['rit']
      .apply(rolling_sigma)
)

df['sigma_E_count'] = (
    df.groupby('instrument', group_keys=False)['rit']
      .apply(rolling_count)
)

df['insufficient_returns'] = df['sigma_E_count'] < 2

# Impute using size bucket medians, then overall median
bucket_median = (
    df.groupby('size_bucket')['sigma_E_raw']
      .transform('median')
)
df['sigma_E'] = df['sigma_E_raw'].copy()
mask_impute = df['sigma_E'].isna()
df.loc[mask_impute, 'sigma_E'] = bucket_median[mask_impute]

overall_median = df['sigma_E'].median()
df['sigma_E'] = df['sigma_E'].fillna(overall_median)

df['imputed_sigmaE_sizebucket'] = mask_impute & df['sigma_E'].notna()

df['sigma_E'] = winsorize_series(df['sigma_E'], 0.01, 0.99)

df['sigma_E'] = df['sigma_E'].clip(lower=1e-6)

print(df[['instrument', 'year', 'sigma_E', 'insufficient_returns', 'imputed_sigmaE_sizebucket']].head())



## 6. Debt volatility proxy, asset proxies, and drift proxy

Derive the Bharath–Shumway debt volatility proxy, approximate asset value/volatility, and compute the drift proxy based on lagged equity returns with firm and size-bucket fallbacks.


In [None]:
# Debt and asset volatility proxies
df['sigma_D_hat'] = 0.05 + 0.25 * df['sigma_E']
df['V_hat'] = df['E'] + df['F']
valid_v = df['V_hat'] > 0

sigma_V_components = np.where(
    valid_v,
    (df['E'] / df['V_hat']) * df['sigma_E'] + (df['F'] / df['V_hat']) * df['sigma_D_hat'],
    np.nan
)
df['sigma_V_hat'] = sigma_V_components

df['sigma_V_hat'] = df['sigma_V_hat'].clip(lower=1e-6)

# Drift proxy using lagged returns
lagged_rit = df.groupby('instrument', group_keys=False)['rit'].shift(1)
firm_mean = (
    df.groupby('instrument', group_keys=False)['rit']
      .apply(lambda s: s.expanding().mean().shift(1))
)

df['mu_hat'] = lagged_rit
mask_mu = df['mu_hat'].isna()
df.loc[mask_mu, 'mu_hat'] = firm_mean[mask_mu]

size_median_mu = df.groupby('size_bucket')['mu_hat'].transform('median')
df['mu_hat'] = df['mu_hat'].fillna(size_median_mu)
df['mu_hat'] = df['mu_hat'].fillna(df['mu_hat'].median())

print(df[['instrument', 'year', 'sigma_D_hat', 'sigma_V_hat', 'mu_hat']].head())



## 7. Compute naive distance to default (DD) and probability of default (PD)

Apply the Bharath–Shumway naive formulas using the proxies above. Probability of default is clipped to the [0, 1] interval.


In [None]:
valid_sigmaV = np.isfinite(df['sigma_V_hat']) & (df['sigma_V_hat'] > 0)
valid_inputs = df['E'].gt(0) & df['F'].gt(0) & valid_sigmaV & df['mu_hat'].notna()

# Bharath & Shumway naive DD: V_hat=E+F, sigma_D_hat=0.05+0.25*sigma_E,
# sigma_V_hat = value-weighted mix, mu_hat = lagged equity return. No solver.
df['DD_naive'] = np.where(
    valid_inputs,
    (np.log(df['V_hat'] / df['F']) + (df['mu_hat'] - 0.5 * df['sigma_V_hat'] ** 2) * T)
    / (df['sigma_V_hat'] * math.sqrt(T)),
    np.nan,
)
df['PD_naive'] = np.where(np.isfinite(df['DD_naive']), Phi(-df['DD_naive']), np.nan)
df['invalid_sigmaV'] = ~valid_sigmaV

print(df[['instrument', 'year', 'DD_naive', 'PD_naive']].head())
print(f"PD==0 count: {(df['PD_naive'] == 0).sum()}, PD==1 count: {(df['PD_naive'] == 1).sum()}")


## 8. Data-quality flags and status tracking

Capture the first applicable status flag (missing inputs, fallbacks, or imputations) as `naive_status` so downstream users understand how each observation was derived.


In [None]:
missing_E = ~np.isfinite(df['E'])
missing_F = ~np.isfinite(df['F'])
nonpos_EF = (df['E'] <= 0) | (df['F'] <= 0)

flag_specs = [
    ('invalid_sigmaV', df['invalid_sigmaV']),
    ('missing_E', missing_E | (df['E_source'] == 'missing')),
    ('missing_F', missing_F),
    ('nonpos_EF', nonpos_EF),
    ('insufficient_returns', df['insufficient_returns']),
    ('imputed_sigmaE_sizebucket', df['imputed_sigmaE_sizebucket']),
    ('fallback_E_from_de', df['E_source'] == 'E_de'),
    ('fallback_E_from_wacc', df['E_source'] == 'E_wacc'),
]

for name, mask in flag_specs:
    df[name] = mask.astype(bool)


def assign_status(idx: int) -> str:
    for name, _ in flag_specs:
        if bool(df.iloc[idx][name]):
            return name
    return 'ok'


naive_status = [assign_status(i) for i in range(len(df))]
df['naive_status'] = naive_status

status_counts = df['naive_status'].value_counts(dropna=False).sort_index()
print('Naive status counts:')
print(status_counts)

fallback_summary = {
    'invalid_sigmaV': int(df['invalid_sigmaV'].sum()),
    'fallback_E_from_de': int(df['fallback_E_from_de'].sum()),
    'fallback_E_from_wacc': int(df['fallback_E_from_wacc'].sum()),
    'imputed_sigmaE_sizebucket': int(df['imputed_sigmaE_sizebucket'].sum()),
    'insufficient_returns': int(df['insufficient_returns'].sum()),
}
print("\nFallback indicator counts:")
for label, count in fallback_summary.items():
    print(f"{label}: {count}")

e_source_counts = df['E_source'].value_counts(dropna=False).sort_index()
print("\nEquity source mix:")
print(e_source_counts)
weak_proxy_count = int(df['weak_E_proxy'].sum())
print(f"Weak equity proxy count: {weak_proxy_count}")

percentiles = [0.10, 0.25, 0.50, 0.75, 0.90]
dd_stats = df['DD_naive'].describe(percentiles=percentiles)
pd_stats = df['PD_naive'].describe(percentiles=percentiles)
dd_missing = int(df['DD_naive'].isna().sum())
pd_missing = int(df['PD_naive'].isna().sum())

print("\nDD_naive summary:")
print(dd_stats)
print(f"Rows with missing DD_naive: {dd_missing}")

print("\nPD_naive summary:")
print(pd_stats)
print(f"Rows with missing PD_naive: {pd_missing}")

log_lines = []
log_lines.append('=== Naive DD/PD Diagnostics ===')
log_lines.append(f'Total rows processed: {len(df)}')
log_lines.append('')

log_lines.append('Naive status counts:')
log_lines.extend([f"{status}: {count}" for status, count in status_counts.items()])
log_lines.append('')

log_lines.append('Fallback indicator counts:')
for label, count in fallback_summary.items():
    log_lines.append(f"{label}: {count}")
log_lines.append('')

log_lines.append('Equity source mix:')
log_lines.extend([f"{source}: {count}" for source, count in e_source_counts.items()])
log_lines.append('')
log_lines.append(f"Weak equity proxy count: {weak_proxy_count}")

log_lines.append('DD_naive summary:')
log_lines.extend([f"{idx}: {value}" for idx, value in dd_stats.items()])
log_lines.append(f'Rows with missing DD_naive: {dd_missing}')
log_lines.append('')

log_lines.append('PD_naive summary:')
log_lines.extend([f"{idx}: {value}" for idx, value in pd_stats.items()])
log_lines.append(f'Rows with missing PD_naive: {pd_missing}')
log_lines.append(f"PD==0 count: {(df['PD_naive'] == 0).sum()}, PD==1 count: {(df['PD_naive'] == 1).sum()}")

log_path = log_dir / 'dd_pd_accounting_log.txt'
log_path.write_text("\n".join(log_lines))
print(f"[INFO] Wrote diagnostics to {log_path}")


## 9. Persist outputs and quick diagnostics

Save the naive DD/PD results and a percentile summary by year for quick reference.


In [None]:
result_cols = [
    'instrument', 'year', 'E', 'E_source', 'weak_E_proxy', 'E_pb', 'E_de', 'E_wacc',
    'F', 'sigma_E', 'sigma_D_hat', 'sigma_V_hat', 'mu_hat',
    'DD_naive', 'PD_naive', 'naive_status'
]

dd_output = output_dir / 'dd_pd_naive.csv'
df[result_cols].to_csv(dd_output, index=False)
print(f"[INFO] Saved naive DD/PD results to {dd_output}")

cfg = {'T': T, 'ROLL_YEARS': 3, 'WINSOR_P': [0.01, 0.99], 'Phi': 'scipy' if 'norm' in globals() else 'erf_fallback', 'spec': 'Bharath–Shumway naive, no solver, v1'}
(Path(output_dir) / 'dd_pd_naive_config.json').write_text(pd.Series(cfg).to_json())

percentiles = [0.10, 0.25, 0.50, 0.75, 0.90]
percentile_columns = [f"p{int(p * 100)}" for p in percentiles]


def build_percentile_table(metric: str) -> pd.DataFrame:
    clean = df[['year', metric]].dropna()
    if clean.empty:
        empty = {'year': ['overall'], 'metric': [metric]}
        for col in percentile_columns:
            empty[col] = [np.nan]
        return pd.DataFrame(empty)

    by_year = (
        clean.groupby('year')[metric]
             .quantile(percentiles)
             .unstack(level=-1)
    )
    by_year.columns = percentile_columns
    by_year = by_year.reset_index()
    by_year.insert(0, 'metric', metric)

    overall_values = df[metric].dropna()
    overall_row = {'metric': metric, 'year': 'overall'}
    for col, pct in zip(percentile_columns, percentiles):
        overall_row[col] = overall_values.quantile(pct) if not overall_values.empty else np.nan

    combined = pd.concat([by_year, pd.DataFrame([overall_row])], ignore_index=True)
    ordered_columns = ['year', 'metric', *percentile_columns]
    return combined[ordered_columns]


summary_frames = [build_percentile_table(metric) for metric in ['DD_naive', 'PD_naive']]
summary = pd.concat(summary_frames, ignore_index=True)

summary_output = output_dir / 'dd_pd_naive_summary.csv'
summary.to_csv(summary_output, index=False)
print(f"[INFO] Saved percentile summary to {summary_output}")

summary.head()

# Diagnostic visualisations

In [None]:
dd_plot_df = df.dropna(subset=['DD_naive', 'year']).copy()
if dd_plot_df.empty:
    print('No DD_naive data available for plotting.')
else:
    dd_plot_df['year'] = dd_plot_df['year'].astype(str)
    fig, ax = plt.subplots(figsize=(12, 6))
    dd_plot_df.boxplot(column='DD_naive', by='year', ax=ax)
    ax.set_title('DD_naive distribution by year')
    ax.set_xlabel('Year')
    ax.set_ylabel('DD_naive')
    fig.suptitle('')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

In [None]:
market_path = output_dir / 'dd_pd_market.csv'
if not market_path.exists():
    print('Market DD file not found; skipping DDm vs. DD_naive scatter.')
else:
    market_df = pd.read_csv(market_path)
    required_cols = {'instrument', 'year', 'DDm'}
    if not required_cols.issubset(market_df.columns):
        print('Market data lacks DDm column; skipping scatter plot.')
    else:
        merged = (
            df[['instrument', 'year', 'DD_naive']]
            .merge(market_df[['instrument', 'year', 'DDm']], on=['instrument', 'year'], how='inner')
            .dropna(subset=['DD_naive', 'DDm'])
        )
        if merged.empty:
            print('No overlapping DDm/DD_naive observations to plot.')
        else:
            fig, ax = plt.subplots(figsize=(8, 6))
            ax.scatter(merged['DDm'], merged['DD_naive'], alpha=0.6, edgecolors='none')
            all_vals = np.concatenate([merged['DDm'].to_numpy(), merged['DD_naive'].to_numpy()])
            lims = [all_vals.min(), all_vals.max()]
            ax.plot(lims, lims, linestyle='--', color='gray', linewidth=1)
            ax.set_xlabel('DDm (market)')
            ax.set_ylabel('DD_naive (accounting)')
            ax.set_title('DDm vs. DD_naive comparison')
            ax.set_aspect('equal', adjustable='box')
            plt.tight_layout()
            plt.show()