# Phase 7: Quantitative & Adaptive Analysis

Data-driven models for edge detection and real-time adaptation.

This notebook covers:
- **Statistical arbitrage** — pairs trading and mean reversion
- **Machine learning signals** — feature engineering and model training
- **Regime detection** — identifying market states
- **Adaptive parameter tuning** — dynamic strategy adjustment

---

```bash
pip install pandas numpy matplotlib scipy scikit-learn
```

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit

plt.style.use('seaborn-v0_8-darkgrid')
np.random.seed(42)

def generate_prices(n=500, mu=0.08, sigma=0.20, start=100.0):
    dt = 1/252
    returns = np.random.normal(mu * dt, sigma * np.sqrt(dt), n)
    prices = start * np.exp(np.cumsum(returns))
    dates = pd.date_range('2023-01-01', periods=n, freq='B')
    return pd.Series(prices, index=dates, name='close')

prices = generate_prices(750)
returns = prices.pct_change().dropna()
print(f"Generated {len(prices)} price points")

---
## 7.1 Pairs Trading (Statistical Arbitrage)

Find two cointegrated assets and trade the spread when it deviates from equilibrium.

In [None]:
# Generate cointegrated pair
np.random.seed(42)
n = 500
# Common factor
common = np.cumsum(np.random.normal(0.0003, 0.015, n))
# Asset A = common + noise
asset_a = 100 * np.exp(common + np.cumsum(np.random.normal(0, 0.005, n)))
# Asset B = 0.8 * common + noise (cointegrated with A)
asset_b = 80 * np.exp(0.8 * common + np.cumsum(np.random.normal(0, 0.005, n)))

dates = pd.date_range('2023-01-01', periods=n, freq='B')
pair_df = pd.DataFrame({'A': asset_a, 'B': asset_b}, index=dates)

def compute_spread(a, b, lookback=60):
    """Compute z-score of log spread."""
    log_ratio = np.log(a / b)
    mean = log_ratio.rolling(lookback).mean()
    std = log_ratio.rolling(lookback).std()
    z_score = (log_ratio - mean) / std
    return z_score, log_ratio

z_score, log_ratio = compute_spread(pair_df['A'], pair_df['B'])

def pairs_trading_backtest(
    pair_df, z_score,
    entry_z: float = 2.0,
    exit_z: float = 0.5,
    stop_z: float = 3.5,
    leverage: float = 2.0
):
    returns_a = pair_df['A'].pct_change()
    returns_b = pair_df['B'].pct_change()
    
    position = 0  # 1 = long spread (long A, short B), -1 = short spread
    strat_returns = []
    positions = []
    
    for i in range(1, len(z_score)):
        z = z_score.iloc[i]
        if np.isnan(z):
            strat_returns.append(0)
            positions.append(0)
            continue
        
        # Entry
        if position == 0:
            if z > entry_z:
                position = -1  # short spread: short A, long B
            elif z < -entry_z:
                position = 1   # long spread: long A, short B
        
        # Exit
        elif position == 1:
            if z > -exit_z or z > stop_z:
                position = 0
        elif position == -1:
            if z < exit_z or z < -stop_z:
                position = 0
        
        # Calculate return: long A, short B (or vice versa)
        spread_return = position * (returns_a.iloc[i] - returns_b.iloc[i]) * leverage
        strat_returns.append(spread_return)
        positions.append(position)
    
    return pd.Series(strat_returns, index=z_score.index[1:]), pd.Series(positions, index=z_score.index[1:])

strat_ret, positions = pairs_trading_backtest(pair_df, z_score)

fig, axes = plt.subplots(4, 1, figsize=(14, 12), sharex=True, gridspec_kw={'height_ratios': [2, 1, 1, 2]})

# Prices
axes[0].plot(pair_df.index, pair_df['A'], label='Asset A')
axes[0].plot(pair_df.index, pair_df['B'], label='Asset B')
axes[0].set_ylabel('Price ($)')
axes[0].set_title('Pairs Trading: Cointegrated Assets', fontsize=14)
axes[0].legend()

# Z-score
axes[1].plot(z_score.index, z_score, color='purple')
axes[1].axhline(y=2, color='red', linestyle='--', alpha=0.5)
axes[1].axhline(y=-2, color='green', linestyle='--', alpha=0.5)
axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3)
axes[1].set_ylabel('Z-Score')
axes[1].set_ylim(-4, 4)

# Position
axes[2].fill_between(positions.index, positions, 0, alpha=0.3, step='post')
axes[2].set_ylabel('Position')
axes[2].set_ylim(-1.5, 1.5)

# Cumulative return
cum_ret = (1 + strat_ret).cumprod()
axes[3].plot(cum_ret.index, cum_ret, color='green', linewidth=1.5)
axes[3].axhline(y=1, color='gray', linestyle='--', alpha=0.5)
axes[3].set_ylabel('Cumulative Return')
axes[3].set_xlabel('Date')

plt.tight_layout()
plt.show()

print(f"Total return: {(cum_ret.iloc[-1] - 1):.2%}")
print(f"Sharpe ratio: {strat_ret.mean() / strat_ret.std() * np.sqrt(252):.2f}")

### Exercise 7.1

1. Change `entry_z` to 1.5 (more trades) and 2.5 (fewer trades). Which has a better Sharpe ratio?
2. The spread can "break" — cointegration can fail. Add a check that exits all positions if the 60-day correlation drops below 0.5.
3. Research the Augmented Dickey-Fuller (ADF) test for cointegration. How would you use it to select pairs?

In [None]:
# YOUR CODE HERE


---
## 7.2 Machine Learning Signals

Use features derived from price data to predict next-day direction.

In [None]:
def create_features(prices: pd.Series) -> pd.DataFrame:
    """Engineer features for ML model."""
    df = pd.DataFrame(index=prices.index)
    df['price'] = prices
    df['return_1d'] = prices.pct_change()
    df['return_5d'] = prices.pct_change(5)
    df['return_20d'] = prices.pct_change(20)
    
    # Momentum
    df['mom_10'] = prices / prices.shift(10) - 1
    df['mom_30'] = prices / prices.shift(30) - 1
    
    # Volatility
    df['vol_10'] = df['return_1d'].rolling(10).std()
    df['vol_30'] = df['return_1d'].rolling(30).std()
    df['vol_ratio'] = df['vol_10'] / df['vol_30']
    
    # Mean reversion
    df['dist_ma20'] = prices / prices.rolling(20).mean() - 1
    df['dist_ma50'] = prices / prices.rolling(50).mean() - 1
    
    # RSI
    delta = prices.diff()
    gain = delta.where(delta > 0, 0).rolling(14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
    df['rsi'] = 100 - 100 / (1 + gain / loss)
    
    # Target: next day positive return
    df['target'] = (df['return_1d'].shift(-1) > 0).astype(int)
    
    return df.dropna()

features_df = create_features(prices)
feature_cols = ['return_1d', 'return_5d', 'return_20d', 'mom_10', 'mom_30',
                'vol_10', 'vol_30', 'vol_ratio', 'dist_ma20', 'dist_ma50', 'rsi']

print(f"Features: {feature_cols}")
print(f"Samples: {len(features_df)}")
print(f"Target distribution: {features_df['target'].value_counts().to_dict()}")

In [None]:
def walk_forward_ml_backtest(
    df: pd.DataFrame,
    feature_cols: list,
    train_size: int = 252,
    test_size: int = 21,
    leverage: float = 2.0
):
    """Walk-forward ML backtest with periodic retraining."""
    predictions = []
    actuals = []
    dates = []
    
    for start in range(train_size, len(df) - test_size, test_size):
        # Training data
        train = df.iloc[start - train_size:start]
        X_train = train[feature_cols]
        y_train = train['target']
        
        # Test data
        test = df.iloc[start:start + test_size]
        X_test = test[feature_cols]
        y_test = test['target']
        
        # Scale features
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # Train model
        model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
        model.fit(X_train_scaled, y_train)
        
        # Predict probabilities
        probs = model.predict_proba(X_test_scaled)[:, 1]
        
        predictions.extend(probs)
        actuals.extend(y_test.values)
        dates.extend(test.index)
    
    results = pd.DataFrame({
        'prob': predictions,
        'actual': actuals,
        'return': df.loc[dates, 'return_1d'].shift(-1).values
    }, index=dates)
    
    # Strategy: go long if prob > 0.55, short if prob < 0.45, else flat
    results['position'] = 0
    results.loc[results['prob'] > 0.55, 'position'] = 1
    results.loc[results['prob'] < 0.45, 'position'] = -1
    
    results['strat_return'] = results['position'] * results['return'] * leverage
    
    return results.dropna()

ml_results = walk_forward_ml_backtest(features_df, feature_cols)

fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

# Predicted probabilities
axes[0].plot(ml_results.index, ml_results['prob'], color='steelblue', linewidth=0.8)
axes[0].axhline(y=0.55, color='green', linestyle='--', alpha=0.5)
axes[0].axhline(y=0.45, color='red', linestyle='--', alpha=0.5)
axes[0].set_ylabel('Predicted Prob (Up)')
axes[0].set_title('ML Signal: Random Forest Walk-Forward', fontsize=14)

# Position
axes[1].fill_between(ml_results.index, ml_results['position'], 0, alpha=0.3, step='post')
axes[1].set_ylabel('Position')

# Cumulative returns
cum_strat = (1 + ml_results['strat_return']).cumprod()
cum_bh = (1 + ml_results['return']).cumprod()
axes[2].plot(cum_strat.index, cum_strat, color='green', label='ML Strategy (2x lev)')
axes[2].plot(cum_bh.index, cum_bh, color='gray', linestyle='--', label='Buy & Hold')
axes[2].axhline(y=1, color='gray', alpha=0.3)
axes[2].set_ylabel('Cumulative Return')
axes[2].legend()

plt.tight_layout()
plt.show()

accuracy = (ml_results['prob'].round() == ml_results['actual']).mean()
print(f"Prediction accuracy: {accuracy:.1%}")
print(f"Strategy return: {(cum_strat.iloc[-1] - 1):.2%}")
print(f"Buy & hold return: {(cum_bh.iloc[-1] - 1):.2%}")

### Exercise 7.2

1. Add more features: Bollinger Band position, MACD histogram, volume ratio. Does accuracy improve?
2. Change the probability thresholds from 0.55/0.45 to 0.6/0.4. Fewer trades but higher conviction — better Sharpe?
3. Why is walk-forward validation essential for ML trading strategies? What happens if you train on all data and test on all data?

In [None]:
# YOUR CODE HERE


---
## 7.3 Regime Detection

Markets move through different regimes (trending, mean-reverting, high-vol, low-vol). Detecting the current regime helps select the right strategy.

In [None]:
def detect_regimes(prices: pd.Series, vol_lookback=20, trend_lookback=50):
    """Simple regime detection based on volatility and trend."""
    returns = prices.pct_change()
    
    # Volatility regime
    vol = returns.rolling(vol_lookback).std() * np.sqrt(252)
    vol_median = vol.rolling(252).median()
    high_vol = vol > vol_median * 1.2
    
    # Trend regime (ADX-like)
    ma = prices.rolling(trend_lookback).mean()
    dist_from_ma = (prices - ma) / ma
    trending = dist_from_ma.abs() > 0.05  # >5% from MA = trending
    
    # Combine into 4 regimes
    regime = pd.Series('unknown', index=prices.index)
    regime[(~high_vol) & (~trending)] = 'low_vol_range'
    regime[(~high_vol) & (trending)] = 'low_vol_trend'
    regime[(high_vol) & (~trending)] = 'high_vol_range'
    regime[(high_vol) & (trending)] = 'high_vol_trend'
    
    return regime, vol, trending

regime, vol, trending = detect_regimes(prices)

fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

# Price with regime coloring
colors = {'low_vol_range': 'blue', 'low_vol_trend': 'green', 
          'high_vol_range': 'orange', 'high_vol_trend': 'red', 'unknown': 'gray'}
for r in colors:
    mask = regime == r
    if mask.any():
        axes[0].scatter(prices.index[mask], prices[mask], c=colors[r], s=5, label=r, alpha=0.7)
axes[0].set_ylabel('Price ($)')
axes[0].set_title('Regime Detection', fontsize=14)
axes[0].legend(fontsize=8)

# Volatility
axes[1].plot(vol.index, vol * 100, color='purple')
axes[1].axhline(y=vol.median() * 100, color='gray', linestyle='--')
axes[1].set_ylabel('Annualized Vol (%)')

# Regime distribution
regime_counts = regime.value_counts()
axes[2].bar(regime_counts.index, regime_counts.values, color=[colors[r] for r in regime_counts.index])
axes[2].set_ylabel('Days')

plt.tight_layout()
plt.show()

print("Regime distribution:")
for r, count in regime_counts.items():
    print(f"  {r}: {count} days ({count/len(regime):.1%})")

In [None]:
# Strategy selection based on regime
def regime_adaptive_strategy(prices, regime, leverage=2.0):
    returns = prices.pct_change()
    ma_fast = prices.ewm(span=10).mean()
    ma_slow = prices.ewm(span=30).mean()
    
    # Mean reversion signal
    z = (prices - prices.rolling(20).mean()) / prices.rolling(20).std()
    mr_signal = -np.sign(z)  # fade extremes
    
    # Trend signal
    trend_signal = np.sign(ma_fast - ma_slow)
    
    # Select strategy based on regime
    position = pd.Series(0.0, index=prices.index)
    
    # Trending regimes: use trend following
    trending_mask = regime.isin(['low_vol_trend', 'high_vol_trend'])
    position[trending_mask] = trend_signal[trending_mask]
    
    # Range regimes: use mean reversion
    range_mask = regime.isin(['low_vol_range', 'high_vol_range'])
    position[range_mask] = mr_signal[range_mask].where(z.abs() > 1.5, 0)
    
    # Reduce leverage in high vol
    high_vol_mask = regime.isin(['high_vol_range', 'high_vol_trend'])
    position[high_vol_mask] *= 0.5
    
    strat_return = position.shift(1) * returns * leverage
    return strat_return, position

adaptive_ret, adaptive_pos = regime_adaptive_strategy(prices, regime)

# Compare to static trend following
ma_fast = prices.ewm(span=10).mean()
ma_slow = prices.ewm(span=30).mean()
static_pos = np.sign(ma_fast - ma_slow)
static_ret = static_pos.shift(1) * prices.pct_change() * 2

fig, ax = plt.subplots(figsize=(14, 6))
cum_adaptive = (1 + adaptive_ret.dropna()).cumprod()
cum_static = (1 + static_ret.dropna()).cumprod()
cum_bh = (1 + prices.pct_change().dropna()).cumprod()

ax.plot(cum_adaptive.index, cum_adaptive, color='green', linewidth=1.5, label='Regime Adaptive')
ax.plot(cum_static.index, cum_static, color='blue', linewidth=1.5, label='Static Trend')
ax.plot(cum_bh.index, cum_bh, color='gray', linestyle='--', label='Buy & Hold')
ax.axhline(y=1, color='gray', alpha=0.3)
ax.set_title('Regime-Adaptive vs Static Strategy', fontsize=14)
ax.set_ylabel('Cumulative Return')
ax.legend()
plt.tight_layout()
plt.show()

print(f"Adaptive: {(cum_adaptive.iloc[-1]-1):.2%}, Sharpe: {adaptive_ret.mean()/adaptive_ret.std()*np.sqrt(252):.2f}")
print(f"Static:   {(cum_static.iloc[-1]-1):.2%}, Sharpe: {static_ret.mean()/static_ret.std()*np.sqrt(252):.2f}")

### Exercise 7.3

1. Add a "crisis" regime when volatility is >2x the median. What strategy works best in crisis?
2. Use a Hidden Markov Model (HMM) to detect regimes instead of simple thresholds. Does it improve timing?
3. Calculate the Sharpe ratio of trend following ONLY during trending regimes vs ONLY during ranging regimes.

In [None]:
# YOUR CODE HERE


---
## 7.4 Comprehension Check

1. Why do pairs trading strategies require cointegration rather than just correlation?
2. What is the main risk of ML-based trading strategies? (Hint: overfitting)
3. How would you test if your regime detection is actually predictive vs just descriptive?
4. A model has 55% accuracy predicting next-day direction. Is this good enough to trade profitably? What else matters?
5. Why do quantitative strategies often have capacity limits (can't deploy unlimited capital)?

In [None]:
# YOUR ANSWERS HERE
