## Regime Switching for Pairs Trading

When trading a spread between two correlated assets (like BTC and ETH), the key question is:
**when should I enter and exit positions?**

Let's look at real data to see why this is tricky...

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from fetch_data import fetch_btc_eth_history

# Fetch 17 days of hourly BTC/ETH data
plt.style.use("dark_background")
df = fetch_btc_eth_history(days=17, interval="1h")

fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(df["open_time"], df["log_ratio"], color="cyan", linewidth=1.2)
ax.set_xlabel("Date", fontsize=11)
ax.set_ylabel("log(BTC/ETH)", fontsize=11)
ax.set_title("BTC/ETH Spread: When Should We Enter and Exit?", fontsize=14, fontweight="bold")
ax.grid(True, alpha=0.3)
ax.tick_params(axis='x', rotation=30)
plt.tight_layout()
plt.show()

In [None]:
from fetch_data import estimate_ou_parameters

# Split into training (first 14 days) and test (last 3 days)
spread = df["log_ratio"].values
times = df["open_time"]
dt = 1 / 24  # hourly data = 1/24 of a day

train_hours = 14 * 24
train_spread = spread[:train_hours]
train_times = times[:train_hours]
test_spread = spread[train_hours:]
test_times = times[train_hours:]

# Estimate parameters from training period only
params = estimate_ou_parameters(train_spread, dt)
mu = train_spread.mean()  # Use empirical mean
sigma = train_spread.std()  # Use empirical std dev (not OU sigma)

print(f"Parameters estimated from first 14 days:")
print(f"  Mean (μ): {mu:.4f}")
print(f"  Std dev (σ): {sigma:.4f}")
print(f"  OU half-life: {params['half_life']*24:.1f} hours")

# Visualize both periods
fig, axes = plt.subplots(1, 2, figsize=(14, 5), sharey=True)

for ax, s, t, title in [
    (axes[0], train_spread, train_times, "Training Period: 14 days (parameters estimated here)"),
    (axes[1], test_spread, test_times, "Test Period: 3 days (how would it perform?)")
]:
    ax.plot(t, s, color="cyan", linewidth=1.2)
    ax.axhline(mu, color="white", linestyle="--", linewidth=1.5, label=f"μ = {mu:.3f}")
    ax.fill_between(t, mu - sigma, mu + sigma, color="green", alpha=0.25, label="±1σ")
    ax.fill_between(t, mu - 2*sigma, mu - sigma, color="red", alpha=0.25, label="±2σ")
    ax.fill_between(t, mu + sigma, mu + 2*sigma, color="red", alpha=0.25)
    ax.set_xlabel("Date", fontsize=11)
    ax.set_title(title, fontsize=12, fontweight="bold")
    ax.grid(True, alpha=0.3)
    ax.tick_params(axis='x', rotation=30)

axes[0].set_ylabel("log(BTC/ETH)", fontsize=11)
axes[0].legend(loc="upper right", fontsize=9)

plt.tight_layout()
plt.show()

In [None]:
def backtest_threshold(spread: np.ndarray, mu: float, threshold: float, fee: float = 0.001) -> float:
    """
    Backtest a simple threshold strategy on spread data.
    
    Strategy:
    - Go long when spread < mu - threshold (expect reversion up)
    - Go short when spread > mu + threshold (expect reversion down)
    - Exit (go flat) when spread crosses mu
    
    Returns total PnL after fees.
    """
    position = 0  # -1 short, 0 flat, +1 long
    pnl = 0.0
    entry_price = 0.0
    
    for i in range(1, len(spread)):
        price = spread[i]
        prev_price = spread[i-1]
        
        # Check for exit first (cross the mean)
        if position == 1 and price >= mu:  # Long position, crossed above mean
            pnl += (price - entry_price) - fee
            position = 0
        elif position == -1 and price <= mu:  # Short position, crossed below mean
            pnl += (entry_price - price) - fee
            position = 0
        
        # Check for entry
        if position == 0:
            if price < mu - threshold:  # Enter long
                position = 1
                entry_price = price
                pnl -= fee
            elif price > mu + threshold:  # Enter short
                position = -1
                entry_price = price
                pnl -= fee
    
    # Close any open position at end
    if position == 1:
        pnl += (spread[-1] - entry_price)
    elif position == -1:
        pnl += (entry_price - spread[-1])
    
    return pnl

# Find the best threshold on training data (overfit!)
thresholds = np.linspace(0.001, 3 * sigma, 50)
train_pnls = [backtest_threshold(train_spread, mu, t) for t in thresholds]
best_idx = np.argmax(train_pnls)
best_threshold = thresholds[best_idx]
best_train_pnl = train_pnls[best_idx]

# Now test it out-of-sample
test_pnl = backtest_threshold(test_spread, mu, best_threshold)

print(f"Best threshold (overfit to training): {best_threshold:.4f} ({best_threshold/sigma:.2f}σ)")
print(f"Training PnL: {best_train_pnl:.4f}")
print(f"Test PnL:     {test_pnl:.4f}")

# Visualize
fig, axes = plt.subplots(1, 2, figsize=(14, 5), sharey=True)

for ax, s, t, title, pnl in [
    (axes[0], train_spread, train_times, f"Training: 14 days (PnL: {best_train_pnl:.4f})", best_train_pnl),
    (axes[1], test_spread, test_times, f"Test: 3 days (PnL: {test_pnl:.4f})", test_pnl)
]:
    ax.plot(t, s, color="cyan", linewidth=1.2)
    ax.axhline(mu, color="white", linestyle="--", linewidth=1.5, label=f"μ = {mu:.3f}")
    ax.axhline(mu + best_threshold, color="red", linestyle=":", linewidth=1.5, label=f"±{best_threshold/sigma:.2f}σ threshold")
    ax.axhline(mu - best_threshold, color="red", linestyle=":", linewidth=1.5)
    ax.fill_between(t, mu - best_threshold, mu + best_threshold, color="gray", alpha=0.15)
    ax.set_xlabel("Date", fontsize=11)
    ax.set_title(title, fontsize=12, fontweight="bold")
    ax.grid(True, alpha=0.3)
    ax.tick_params(axis='x', rotation=30)

axes[0].set_ylabel("log(BTC/ETH)", fontsize=11)
axes[0].legend(loc="upper right", fontsize=9)

plt.suptitle("Overfit Strategy: Best Threshold on Training Data", fontsize=13, fontweight="bold", y=1.02)
plt.tight_layout()
plt.show()

## Paper Simplfied
https://arxiv.org/abs/2512.04697v1

In [None]:
import torch
from pairs_trading import SWITCHING_COSTS, R_FUNDING, R_CASH
from regime_switching import Config, train

# Use OU parameters estimated from the ACTUAL training data
# (params, mu, sigma were computed in an earlier cell from train_spread)
THETA_REAL = params["theta"]  # Mean-reversion speed (per day)
MU_REAL = mu                   # Empirical mean of training spread
SIGMA_REAL = params["sigma"]   # OU volatility (per sqrt(day))
SPREAD_SCALE_REAL = sigma      # Use empirical std dev as scale

print(f"OU parameters from training data:")
print(f"  θ (mean-reversion): {THETA_REAL:.2f} per day")
print(f"  μ (long-term mean): {MU_REAL:.4f}")
print(f"  σ (OU volatility):  {SIGMA_REAL:.4f} per sqrt(day)")
print(f"  Scale (empirical σ): {SPREAD_SCALE_REAL:.4f}")
print()

# Define dynamics using REAL parameters
def dynamics_fn_real(t: torch.Tensor, x: torch.Tensor, i: torch.Tensor
                     ) -> tuple[torch.Tensor, torch.Tensor]:
    """OU dynamics using parameters estimated from actual training data."""
    drift = -THETA_REAL * x  # Mean-reverts to 0 (in normalized space)
    diffusion = torch.full_like(x, SIGMA_REAL / SPREAD_SCALE_REAL)
    return drift, diffusion

def running_reward_fn_real(t: torch.Tensor, x: torch.Tensor, i: torch.Tensor
                           ) -> torch.Tensor:
    """Running reward using real parameters."""
    drift_profit_long = -THETA_REAL * SPREAD_SCALE_REAL * x
    drift_profit_short = THETA_REAL * SPREAD_SCALE_REAL * x
    
    reward_long = drift_profit_long - R_FUNDING
    reward_short = drift_profit_short - R_FUNDING
    reward_flat = torch.full_like(x, R_CASH)
    
    return torch.where(i == 0, reward_long, 
                       torch.where(i == 1, reward_short, reward_flat))

def terminal_reward_real(x: torch.Tensor) -> torch.Tensor:
    """Terminal reward h = 0."""
    return torch.zeros(x.shape[0], device=x.device)

# Training config
config = Config(
    num_regimes=3, state_dim=1,
    switching_costs=SWITCHING_COSTS,
    temperature=0.1,
    T=2.0/24.0, K=120,  # 2 hours horizon, 1 step per minute
    batch_size=256, num_episodes=200,
    learning_rate=1e-3, hidden_dim=64,
    x0_mean=0.0,  # Normalized: 0 = at the mean
    x0_std=1.0,   # Normalized: 1 std = 1 unit
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on {device}...")

torch.manual_seed(42)
np.random.seed(42)
value_net, losses = train(config, terminal_reward_real, dynamics_fn_real, 
                          running_reward_fn_real, device)
print("Training complete!")

# Evaluate learned policy across spread values
value_net.eval()
spread_range = torch.linspace(-3.0, 3.0, 500, device=device)  # ±3 std devs
t_mid = torch.full_like(spread_range, config.T / 2)

with torch.no_grad():
    values = value_net(t_mid, spread_range)
    v_long = values[:, 0].cpu().numpy()
    v_short = values[:, 1].cpu().numpy()
    v_flat = values[:, 2].cpu().numpy()

spread_np = spread_range.cpu().numpy()

# Calculate switching thresholds accounting for costs
cost_matrix = np.array(SWITCHING_COSTS)
switching_thresholds = {}

# Short -> Long: switch when V_long - V_short > cost[1,0]
diff_short_to_long = v_long - v_short
cost_short_to_long = cost_matrix[1, 0]
switch_short_to_long_idx = np.where(diff_short_to_long > cost_short_to_long)[0]
if len(switch_short_to_long_idx) > 0:
    switching_thresholds['Short→Long'] = spread_np[switch_short_to_long_idx[-1]]

# Long -> Short: switch when V_short - V_long > cost[0,1]
diff_long_to_short = v_short - v_long
cost_long_to_short = cost_matrix[0, 1]
switch_long_to_short_idx = np.where(diff_long_to_short > cost_long_to_short)[0]
if len(switch_long_to_short_idx) > 0:
    switching_thresholds['Long→Short'] = spread_np[switch_long_to_short_idx[0]]

# Plot
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# Left plot: Value functions
ax = axes[0]
ax.plot(spread_np, v_long, 'b-', label='V(Long)', linewidth=2)
ax.plot(spread_np, v_short, 'r-', label='V(Short)', linewidth=2)
ax.plot(spread_np, v_flat, 'g-', label='V(Flat)', linewidth=2, alpha=0.7)

optimal_regime = np.argmax([v_long, v_short, v_flat], axis=0)
colors = ['blue', 'red', 'green']
for i in range(len(spread_np) - 1):
    ax.axvspan(spread_np[i], spread_np[i+1], alpha=0.1, color=colors[optimal_regime[i]])

ax.axvline(x=0, color='yellow', linestyle=':', alpha=0.5, label='Mean (μ)')
ax.set_xlabel('Normalized Spread (std devs from mean)', fontsize=12)
ax.set_ylabel('Value Function', fontsize=12)
ax.set_title('Optimal Regime (if starting fresh)', fontsize=13, fontweight='bold')
ax.legend(loc='lower right', fontsize=10)
ax.grid(True, alpha=0.3)

# Right plot: Switching thresholds
ax = axes[1]
advantage_to_long = v_long - v_short - cost_short_to_long
advantage_to_short = v_short - v_long - cost_long_to_short

ax.fill_between(spread_np, 0, advantage_to_long, where=advantage_to_long > 0, 
                alpha=0.3, color='blue', label='Switch to Long profitable')
ax.fill_between(spread_np, 0, advantage_to_short, where=advantage_to_short > 0,
                alpha=0.3, color='red', label='Switch to Short profitable')
ax.axhline(0, color='white', linestyle='-', alpha=0.5)

if 'Short→Long' in switching_thresholds:
    x = switching_thresholds['Short→Long']
    ax.axvline(x=x, color='cyan', linestyle='--', linewidth=2, label=f'Short→Long @ {x:.2f}σ')
if 'Long→Short' in switching_thresholds:
    x = switching_thresholds['Long→Short']
    ax.axvline(x=x, color='orange', linestyle='--', linewidth=2, label=f'Long→Short @ {x:.2f}σ')

ax.set_xlabel('Normalized Spread (std devs from mean)', fontsize=12)
ax.set_ylabel('Net Advantage (after cost)', fontsize=12)
ax.set_title('When to Actually Switch (accounting for costs)', fontsize=13, fontweight='bold')
ax.legend(loc='upper right', fontsize=9)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print trading rules
print("\n" + "="*70)
print("LEARNED SWITCHING RULES (trained on actual data parameters)")
print("="*70)
print(f"\nThresholds in normalized units (std devs from training mean):")

if 'Short→Long' in switching_thresholds and 'Long→Short' in switching_thresholds:
    x_s2l = switching_thresholds['Short→Long']
    x_l2s = switching_thresholds['Long→Short']
    
    print(f"  Short→Long threshold: {x_s2l:.2f}σ")
    print(f"  Long→Short threshold: {x_l2s:.2f}σ")
    print(f"  Hysteresis gap: {x_l2s - x_s2l:.2f}σ")
    print()
    
    # Convert to raw values
    raw_s2l = x_s2l * SPREAD_SCALE_REAL + MU_REAL
    raw_l2s = x_l2s * SPREAD_SCALE_REAL + MU_REAL
    
    print(f"In raw spread values:")
    print(f"  Switch to LONG when spread < {raw_s2l:.4f}")
    print(f"  Switch to SHORT when spread > {raw_l2s:.4f}")
else:
    print("Could not find clear switching thresholds")

In [None]:
# Apply learned thresholds to real data with HYSTERESIS
# Convert thresholds from normalized (std devs) to raw spread values
threshold_short_to_long_raw = switching_thresholds['Short→Long'] * SPREAD_SCALE_REAL + MU_REAL
threshold_long_to_short_raw = switching_thresholds['Long→Short'] * SPREAD_SCALE_REAL + MU_REAL

def backtest_hysteresis(spread: np.ndarray, thresh_s2l: float, thresh_l2s: float, 
                        fee: float = 0.001) -> tuple[float, list]:
    """
    Backtest with hysteresis: different thresholds for entering Long vs Short.
    
    - If Short: switch to Long when spread < thresh_s2l
    - If Long: switch to Short when spread > thresh_l2s
    - Start Flat, enter Long below thresh_s2l, enter Short above thresh_l2s
    
    Returns (total_pnl, position_history)
    """
    position = 0  # -1 short, 0 flat, +1 long
    pnl = 0.0
    entry_price = 0.0
    positions = []
    
    for i in range(len(spread)):
        price = spread[i]
        
        if position == 0:  # Flat - look for entry
            if price < thresh_s2l:
                position = 1  # Go long
                entry_price = price
                pnl -= fee
            elif price > thresh_l2s:
                position = -1  # Go short
                entry_price = price
                pnl -= fee
        elif position == 1:  # Long - check for switch to short
            if price > thresh_l2s:
                # Close long and open short
                pnl += (price - entry_price) - fee
                position = -1
                entry_price = price
                pnl -= fee
        elif position == -1:  # Short - check for switch to long
            if price < thresh_s2l:
                # Close short and open long
                pnl += (entry_price - price) - fee
                position = 1
                entry_price = price
                pnl -= fee
        
        positions.append(position)
    
    # Close any open position at end
    if position == 1:
        pnl += (spread[-1] - entry_price)
    elif position == -1:
        pnl += (entry_price - spread[-1])
    
    return pnl, positions

# Run backtest on both periods
train_pnl_rl, train_positions = backtest_hysteresis(train_spread, threshold_short_to_long_raw, 
                                                     threshold_long_to_short_raw)
test_pnl_rl, test_positions = backtest_hysteresis(test_spread, threshold_short_to_long_raw,
                                                   threshold_long_to_short_raw)

print(f"RL Learned Thresholds (with hysteresis):")
print(f"  Switch to Long when spread < {threshold_short_to_long_raw:.4f} ({switching_thresholds['Short→Long']:.2f}σ)")
print(f"  Switch to Short when spread > {threshold_long_to_short_raw:.4f} ({switching_thresholds['Long→Short']:.2f}σ)")
print(f"  Hysteresis gap: {threshold_long_to_short_raw - threshold_short_to_long_raw:.4f}")
print()
print(f"Training PnL (RL): {train_pnl_rl:.4f}")
print(f"Test PnL (RL):     {test_pnl_rl:.4f}")

# Compare to naive strategy
print(f"\nComparison to naive overfit strategy:")
print(f"  Naive Training PnL: {best_train_pnl:.4f}")
print(f"  Naive Test PnL:     {test_pnl:.4f}")

# Visualize with position coloring
fig, axes = plt.subplots(1, 2, figsize=(15, 6), sharey=True)

for ax, s, t, pos, title in [
    (axes[0], train_spread, train_times, train_positions, 
     f"Training: 14 days (RL PnL: {train_pnl_rl:.4f})"),
    (axes[1], test_spread, test_times, test_positions,
     f"Test: 3 days (RL PnL: {test_pnl_rl:.4f})")
]:
    # Plot spread
    ax.plot(t, s, color="cyan", linewidth=1.2, zorder=3)
    
    # Shade by position
    pos_arr = np.array(pos)
    t_arr = np.array(t)
    for i in range(len(s) - 1):
        if pos_arr[i] == 1:  # Long
            ax.axvspan(t_arr[i], t_arr[i+1], alpha=0.3, color='blue')
        elif pos_arr[i] == -1:  # Short
            ax.axvspan(t_arr[i], t_arr[i+1], alpha=0.3, color='red')
    
    # Draw thresholds
    ax.axhline(threshold_short_to_long_raw, color='cyan', linestyle='--', linewidth=2, 
               label=f'Long @ {switching_thresholds["Short→Long"]:.2f}σ')
    ax.axhline(threshold_long_to_short_raw, color='orange', linestyle='--', linewidth=2,
               label=f'Short @ {switching_thresholds["Long→Short"]:.2f}σ')
    
    # Mean line
    ax.axhline(MU_REAL, color='white', linestyle=':', alpha=0.5, label=f'μ = {MU_REAL:.3f}')
    
    # Hysteresis zone
    ax.fill_between(t, threshold_short_to_long_raw, threshold_long_to_short_raw, 
                    color='gray', alpha=0.2, label='Hysteresis zone')
    
    ax.set_xlabel("Date", fontsize=11)
    ax.set_title(title, fontsize=12, fontweight="bold")
    ax.grid(True, alpha=0.3)
    ax.tick_params(axis='x', rotation=30)

axes[0].set_ylabel("log(BTC/ETH)", fontsize=11)
axes[0].legend(loc="upper right", fontsize=9)

plt.suptitle("RL Strategy with Hysteresis: Learned Switching Thresholds", 
             fontsize=13, fontweight="bold", y=1.02)
plt.tight_layout()
plt.show()

# Summary comparison
print("\n" + "="*50)
print("STRATEGY COMPARISON")
print("="*50)
print(f"{'Strategy':<25} {'Train PnL':>12} {'Test PnL':>12}")
print("-"*50)
print(f"{'Naive (overfit σ)':<25} {best_train_pnl:>12.4f} {test_pnl:>12.4f}")
print(f"{'RL (hysteresis)':<25} {train_pnl_rl:>12.4f} {test_pnl_rl:>12.4f}")

## How Does This Work?

### Step 1: Model the spread as a diffusion process

We assume the spread follows an **Ornstein-Uhlenbeck (OU) process**:

$$dS = \theta(\mu - S)dt + \sigma dW$$

In plain English:
- **S** = current spread value
- **θ(μ - S)** = "pull toward the mean" — if S is above μ, this is negative (pulls down); if S is below μ, this is positive (pulls up)
- **θ** = how strong the pull is (higher = faster mean-reversion)
- **σdW** = random noise (Brownian motion)

This is just a fancy way of saying: **"the spread tends to revert to its mean, plus some randomness"**

We estimate θ, μ, and σ from the training data using linear regression on the changes in spread.

### Step 2-4: The RL Pipeline

2. **Simulate many paths** — Using the fitted OU model, generate thousands of synthetic trajectories

3. **Learn the value function** — A neural network learns V(x, regime) = "expected future reward starting from spread x in this regime"

4. **Extract thresholds** — Query where V(Long) > V(Short) + switching_cost to find optimal switching points

In [None]:
# Step 2: What does "simulate many paths" look like?
# Here are 20 sample trajectories the RL agent might see during training

def simulate_ou_path(x0: float, theta: float, sigma: float, dt: float, n_steps: int) -> np.ndarray:
    """Simulate a single OU path (in normalized space where mean=0)."""
    path = np.zeros(n_steps)
    path[0] = x0
    for i in range(1, n_steps):
        drift = -theta * path[i-1] * dt
        diffusion = sigma * np.sqrt(dt) * np.random.randn()
        path[i] = path[i-1] + drift + diffusion
    return path

# Simulation parameters (matching training config)
n_paths = 20
n_steps = 120  # 2 hours at 1 step/min
dt_sim = (2.0/24.0) / n_steps  # Time step in days

# Use the OU parameters we estimated from real data
theta_sim = THETA_REAL
sigma_sim = SIGMA_REAL / SPREAD_SCALE_REAL  # Normalized volatility

fig, ax = plt.subplots(figsize=(12, 5))

# Simulate paths starting from different points
np.random.seed(42)
for _ in range(n_paths):
    x0 = np.random.randn() * 1.0  # Start from random point ~N(0,1)
    path = simulate_ou_path(x0, theta_sim, sigma_sim, dt_sim, n_steps)
    ax.plot(path, alpha=0.5, linewidth=1)

ax.axhline(0, color='yellow', linestyle='--', linewidth=2, label='Mean (μ)')
ax.axhline(-switching_thresholds['Short→Long'], color='cyan', linestyle=':', 
           label=f'Long threshold ({switching_thresholds["Short→Long"]:.2f}σ)')
ax.axhline(-switching_thresholds['Long→Short'], color='orange', linestyle=':',
           label=f'Short threshold ({switching_thresholds["Long→Short"]:.2f}σ)')

ax.set_xlabel('Time Step (minutes)', fontsize=11)
ax.set_ylabel('Normalized Spread (σ from mean)', fontsize=11)
ax.set_title('Simulated OU Paths: What the RL Agent "Sees" During Training', fontsize=13, fontweight='bold')
ax.legend(loc='upper right', fontsize=9)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"Each path: {n_steps} steps over {config.T*24*60:.0f} minutes")
print(f"The agent learns which regime (Long/Short/Flat) maximizes reward across all these scenarios")

## The Value Function Intuition

**V(x, regime)** = expected future profit if we're at spread x in the given regime

- Where **V(Long) > V(Short)**: being Long is more valuable
- Where **V(Short) > V(Long)**: being Short is more valuable
- The **switching cost** creates hysteresis — we don't flip-flop at x=0, we wait until the benefit exceeds the cost

This is why the learned thresholds aren't symmetric around the mean. The model accounts for transaction costs and finds the optimal "buffer zone" where you should hold your current position.