# Exposure Regime Dashboard

Tracks how the strategy is using risk budget through gross, net, long, and short exposure regimes.

Data source: `wolfpack/daily_snapshots.csv`.


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from io import StringIO
from IPython.display import display

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (14, 6)

from QuantConnect import *
from QuantConnect.Research import QuantBook

qb = QuantBook()
print('QuantBook initialized')


def read_csv_from_store(key):
    try:
        if not qb.ObjectStore.ContainsKey(key):
            print(f'ObjectStore key not found: {key}')
            return None
        content = qb.ObjectStore.Read(key)
        if not content:
            print(f'Empty ObjectStore key: {key}')
            return None
        return pd.read_csv(StringIO(content))
    except Exception as e:
        print(f'Error reading {key}: {e}')
        return None


In [None]:
GROSS_HIGH = 1.20
GROSS_LOW = 0.80
NET_BULL = 0.30
NET_BEAR = -0.30

TARGET_GROSS = 1.00

df_snapshots = read_csv_from_store('wolfpack/daily_snapshots.csv')
if df_snapshots is None:
    raise ValueError('daily_snapshots.csv is required for exposure analysis.')

required = ['date', 'gross_exposure', 'net_exposure', 'long_exposure', 'short_exposure', 'nav']
missing = [c for c in required if c not in df_snapshots.columns]
if missing:
    raise ValueError(f'Missing required columns in daily_snapshots.csv: {missing}')

df = df_snapshots.copy()
df['date'] = pd.to_datetime(df['date'], errors='coerce')
for c in ['gross_exposure', 'net_exposure', 'long_exposure', 'short_exposure', 'nav']:
    df[c] = pd.to_numeric(df[c], errors='coerce')

df = df[df['date'].notna()].sort_values('date').reset_index(drop=True)

# Add daily return and next-day return for regime sanity checks.
df['daily_return'] = df['nav'].pct_change()
df['next_day_return'] = df['daily_return'].shift(-1)

print(f'Rows: {len(df):,}')
print(f'Date range: {df["date"].min().date()} to {df["date"].max().date()}')
display(df.tail(10))


In [None]:
def classify_regime(row):
    gross = row['gross_exposure']
    net = row['net_exposure']

    if gross >= GROSS_HIGH:
        gross_bucket = 'high_gross'
    elif gross <= GROSS_LOW:
        gross_bucket = 'low_gross'
    else:
        gross_bucket = 'mid_gross'

    if net >= NET_BULL:
        net_bucket = 'net_long'
    elif net <= NET_BEAR:
        net_bucket = 'net_short'
    else:
        net_bucket = 'market_neutral'

    return f'{gross_bucket}__{net_bucket}'

df['regime'] = df.apply(classify_regime, axis=1)

df['gross_gap_vs_target'] = df['gross_exposure'] - TARGET_GROSS

df['risk_state'] = np.select(
    [
        df['gross_exposure'] >= GROSS_HIGH,
        df['gross_exposure'] <= GROSS_LOW
    ],
    [
        'risk_on',
        'risk_off'
    ],
    default='balanced'
)

regime_stats = (
    df.groupby('regime', as_index=False)
      .agg(
          days=('regime', 'size'),
          avg_gross=('gross_exposure', 'mean'),
          avg_net=('net_exposure', 'mean'),
          avg_next_day_return=('next_day_return', 'mean'),
          realized_vol=('daily_return', lambda s: s.dropna().std() * np.sqrt(252))
      )
      .sort_values('days', ascending=False)
)
regime_stats['pct_days'] = regime_stats['days'] / len(df)

display(regime_stats)


In [None]:
fig, axes = plt.subplots(3, 1, figsize=(16, 12), sharex=True)

axes[0].plot(df['date'], 100 * df['gross_exposure'], label='Gross exposure', color='#1f77b4', linewidth=2)
axes[0].axhline(100 * GROSS_HIGH, color='red', linestyle='--', linewidth=1.5, label='High gross threshold')
axes[0].axhline(100 * GROSS_LOW, color='orange', linestyle='--', linewidth=1.5, label='Low gross threshold')
axes[0].axhline(100 * TARGET_GROSS, color='black', linestyle=':', linewidth=1.5, label='Target gross')
axes[0].set_title('Gross Exposure')
axes[0].set_ylabel('% of NAV')
axes[0].legend(loc='upper left')
axes[0].grid(alpha=0.3)

axes[1].plot(df['date'], 100 * df['net_exposure'], label='Net exposure', color='#d62728', linewidth=2)
axes[1].axhline(100 * NET_BULL, color='green', linestyle='--', linewidth=1.5, label='Net long threshold')
axes[1].axhline(100 * NET_BEAR, color='purple', linestyle='--', linewidth=1.5, label='Net short threshold')
axes[1].axhline(0, color='black', linewidth=1)
axes[1].set_title('Net Exposure')
axes[1].set_ylabel('% of NAV')
axes[1].legend(loc='upper left')
axes[1].grid(alpha=0.3)

axes[2].plot(df['date'], 100 * df['long_exposure'], label='Long exposure', color='#2ca02c', linewidth=2)
axes[2].plot(df['date'], 100 * df['short_exposure'], label='Short exposure', color='#9467bd', linewidth=2)
axes[2].set_title('Long / Short Exposure')
axes[2].set_ylabel('% of NAV')
axes[2].set_xlabel('Date')
axes[2].legend(loc='upper left')
axes[2].grid(alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:
risk_state_counts = (
    df.groupby('risk_state', as_index=False)
      .agg(days=('risk_state', 'size'))
      .sort_values('days', ascending=False)
)
risk_state_counts['pct_days'] = risk_state_counts['days'] / len(df)

gross_breach = (df['gross_exposure'] > GROSS_HIGH).mean()
gross_under = (df['gross_exposure'] < GROSS_LOW).mean()
net_long_pct = (df['net_exposure'] > NET_BULL).mean()
net_short_pct = (df['net_exposure'] < NET_BEAR).mean()

print('Exposure regime scorecard:')
print(f'  Gross > {100 * GROSS_HIGH:.0f}% NAV: {100 * gross_breach:.2f}% of days')
print(f'  Gross < {100 * GROSS_LOW:.0f}% NAV: {100 * gross_under:.2f}% of days')
print(f'  Net > +{100 * NET_BULL:.0f}% NAV: {100 * net_long_pct:.2f}% of days')
print(f'  Net < {100 * abs(NET_BEAR):.0f}% NAV: {100 * net_short_pct:.2f}% of days')

display(risk_state_counts)


In [None]:
if df['daily_return'].notna().sum() > 30:
    by_state = (
        df.groupby('risk_state', as_index=False)
          .agg(
              observations=('daily_return', lambda s: s.notna().sum()),
              avg_daily_return=('daily_return', 'mean'),
              annualized_return=('daily_return', lambda s: (1 + s.dropna()).prod() ** (252 / max(len(s.dropna()), 1)) - 1),
              annualized_vol=('daily_return', lambda s: s.dropna().std() * np.sqrt(252)),
              avg_next_day_return=('next_day_return', 'mean')
          )
    )

    by_state['daily_sharpe'] = np.where(
        by_state['annualized_vol'].abs() > 1e-12,
        by_state['annualized_return'] / by_state['annualized_vol'],
        np.nan
    )

    display(by_state)

    plt.figure(figsize=(10, 5))
    plt.bar(by_state['risk_state'], 100 * by_state['annualized_vol'], color=['#1f77b4', '#2ca02c', '#d62728'])
    plt.title('Annualized Volatility by Risk State')
    plt.ylabel('Annualized vol (%)')
    plt.xlabel('Risk state')
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()
else:
    print('Not enough return observations for state-conditioned return/vol analysis.')
