In [2]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt

In [3]:

def backtest_pair(price_data, stock1, stock2, thresholds, initial_capital=100000, 
                  position_size=0.1, commission=0.001):
    """
    Enhanced backtester for pairs trading
    
    Parameters:
    -----------
    price_data : dict
        Dictionary containing price data for each stock
    stock1, stock2 : str
        Stock symbols to trade
    thresholds : dict
        Dictionary containing entry_threshold, exit_threshold, and stop_loss
    initial_capital : float
        Starting capital
    position_size : float
        Percentage of capital to allocate per trade (0.1 = 10%)
    commission : float
        Commission per trade as percentage (0.001 = 0.1%)
    """
    # Extract and format price data
    s1 = np.array(price_data[stock1], dtype=float).flatten()
    s2 = np.array(price_data[stock2], dtype=float).flatten()
    
    # Make sure we have dates for both stocks
    dates = pd.date_range(start='2023-01-01', periods=len(s1))  # Assume dates if none available
    if 'dates' in price_data:
        dates = price_data['dates'][-len(s1):]
    
    # Calculate minimum length and align
    min_len = min(len(s1), len(s2))
    s1, s2 = s1[-min_len:], s2[-min_len:]
    dates = dates[-min_len:]
    
    # Calculate hedge ratio
    X = sm.add_constant(s1)
    model = sm.OLS(s2, X).fit()
    hedge_ratio = float(model.params[1])
    intercept = float(model.params[0])
    
    # Calculate spread and z-scores
    spread = s2 - (hedge_ratio * s1 + intercept)
    rolling_mean = pd.Series(spread).rolling(window=20).mean().values
    rolling_std = pd.Series(spread).rolling(window=20).std().values
    
    # Handle initial NaN values
    rolling_mean[:20] = spread[:20].mean()
    rolling_std[:20] = spread[:20].std()
    
    # Calculate z-scores
    zscores = (spread - rolling_mean) / rolling_std
    
    # Initialize tracking variables
    capital = initial_capital
    in_position = False
    position_type = None
    entry_capital = 0
    
    # Performance tracking
    daily_returns = []
    positions = []
    trades = []
    capital_history = [initial_capital]
    
    # Main trading loop
    for i in range(1, len(spread)):
        z = zscores[i]
        z_prev = zscores[i-1]
        s1_price = s1[i]
        s2_price = s2[i]
        current_date = dates[i]
        
        # Update capital history
        if not in_position:
            capital_history.append(capital)
        else:
            # Calculate current position value
            if position_type == "long":
                # Long spread: long s2, short s1
                pos_value = (s2_price - entry_s2) - hedge_ratio * (s1_price - entry_s1)
                current_capital = entry_capital + (pos_size * pos_value)
            else:
                # Short spread: short s2, long s1
                pos_value = (entry_s2 - s2_price) - hedge_ratio * (entry_s1 - s1_price)
                current_capital = entry_capital + (pos_size * pos_value)
            
            capital_history.append(current_capital)
            
        # Trading logic
        if not in_position:
            # Check for entry signals
            if z > thresholds['entry_threshold'] and z_prev <= thresholds['entry_threshold']:
                # Short the spread: short s2, long s1
                in_position = True
                position_type = "short"
                entry_s1 = s1_price
                entry_s2 = s2_price
                entry_z = z
                entry_index = i
                entry_date = current_date
                
                # Position sizing
                pos_size = capital * position_size
                entry_capital = capital
                
                # Record trade
                positions.append({
                    'type': 'short',
                    'entry_date': entry_date,
                    'entry_z': entry_z,
                    'stock1_price': entry_s1,
                    'stock2_price': entry_s2
                })
                
                # Apply commission costs
                capital -= (pos_size * commission * 2)  # Commission for both legs
                
            elif z < -thresholds['entry_threshold'] and z_prev >= -thresholds['entry_threshold']:
                # Long the spread: long s2, short s1
                in_position = True
                position_type = "long"
                entry_s1 = s1_price
                entry_s2 = s2_price
                entry_z = z
                entry_index = i
                entry_date = current_date
                
                # Position sizing
                pos_size = capital * position_size
                entry_capital = capital
                
                # Record trade
                positions.append({
                    'type': 'long',
                    'entry_date': entry_date,
                    'entry_z': entry_z,
                    'stock1_price': entry_s1,
                    'stock2_price': entry_s2
                })
                
                # Apply commission costs
                capital -= (pos_size * commission * 2)  # Commission for both legs
                
        else:
            # Check for exit signals
            exit_signal = False
            exit_reason = None
            
            # Exit based on z-score crossing back
            if (position_type == "long" and z >= -thresholds['exit_threshold'] and z_prev < -thresholds['exit_threshold']):
                exit_signal = True
                exit_reason = "target"
            elif (position_type == "short" and z <= thresholds['exit_threshold'] and z_prev > thresholds['exit_threshold']):
                exit_signal = True
                exit_reason = "target"
                
            # Stop loss - z-score moves further from equilibrium
            elif (position_type == "long" and z < entry_z - thresholds['stop_loss']):
                exit_signal = True
                exit_reason = "stop_loss"
            elif (position_type == "short" and z > entry_z + thresholds['stop_loss']):
                exit_signal = True
                exit_reason = "stop_loss"
                
            # Time-based exit (optional) - exit after 20 days
            elif i - entry_index > 20:
                exit_signal = True
                exit_reason = "time_exit"
                
            if exit_signal:
                # Calculate profit
                if position_type == "long":
                    profit = (s2_price - entry_s2) - hedge_ratio * (s1_price - entry_s1)
                else:
                    profit = (entry_s2 - s2_price) - hedge_ratio * (entry_s1 - s1_price)
                
                # Apply position sizing and commission
                trade_pnl = pos_size * profit - (pos_size * commission * 2)
                capital = entry_capital + trade_pnl
                
                # Record trade
                trades.append({
                    'type': position_type,
                    'entry_date': entry_date,
                    'exit_date': current_date,
                    'holding_days': i - entry_index,
                    'entry_z': entry_z,
                    'exit_z': z,
                    'pnl': trade_pnl,
                    'return': trade_pnl / entry_capital * 100,
                    'exit_reason': exit_reason
                })
                
                in_position = False
    
    # Close any open position at the end
    if in_position:
        if position_type == "long":
            profit = (s2[-1] - entry_s2) - hedge_ratio * (s1[-1] - entry_s1)
        else:
            profit = (entry_s2 - s2[-1]) - hedge_ratio * (entry_s1 - s1[-1])
        
        trade_pnl = pos_size * profit - (pos_size * commission * 2)
        capital = entry_capital + trade_pnl
        
        trades.append({
            'type': position_type,
            'entry_date': entry_date,
            'exit_date': dates[-1],
            'holding_days': len(spread) - entry_index,
            'entry_z': entry_z,
            'exit_z': zscores[-1],
            'pnl': trade_pnl,
            'return': trade_pnl / entry_capital * 100,
            'exit_reason': 'end_of_period'
        })
    
    # Calculate performance metrics
    capital_history = np.array(capital_history)
    returns = np.diff(capital_history) / capital_history[:-1]
    
    # Key performance metrics
    total_return = (capital - initial_capital) / initial_capital * 100
    num_trades = len(trades)
    win_trades = sum(1 for t in trades if t['pnl'] > 0)
    win_rate = win_trades / num_trades if num_trades > 0 else 0
    
    avg_return = np.mean([t['return'] for t in trades]) if trades else 0
    avg_hold_days = np.mean([t['holding_days'] for t in trades]) if trades else 0
    
    # Sharpe ratio (annualized)
    daily_ret = pd.Series(returns).fillna(0)
    sharpe = np.sqrt(252) * daily_ret.mean() / daily_ret.std() if daily_ret.std() > 0 else 0
    
    # Max drawdown
    peak = np.maximum.accumulate(capital_history)
    drawdown = (peak - capital_history) / peak
    max_drawdown = drawdown.max() * 100
    
    # Plots
    plt.figure(figsize=(14, 10))
    
    # Z-score plot
    plt.subplot(3, 1, 1)
    plt.plot(zscores, label='Z-Score')
    plt.axhline(thresholds['entry_threshold'], color='r', linestyle='--', label='Entry')
    plt.axhline(-thresholds['entry_threshold'], color='r', linestyle='--')
    plt.axhline(thresholds['exit_threshold'], color='g', linestyle='--', label='Exit')
    plt.axhline(-thresholds['exit_threshold'], color='g', linestyle='--')
    plt.axhline(thresholds['stop_loss'], color='y', linestyle='--', label='Stop Loss')
    plt.axhline(-thresholds['stop_loss'], color='y', linestyle='--')
    
    # Mark entry and exit points
    for t in trades:
        entry_idx = np.where(dates == t['entry_date'])[0][0]
        exit_idx = np.where(dates == t['exit_date'])[0][0]
        
        if t['type'] == 'long':
            plt.scatter(entry_idx, zscores[entry_idx], color='g', marker='^', s=100)
            plt.scatter(exit_idx, zscores[exit_idx], color='r', marker='v', s=100)
        else:
            plt.scatter(entry_idx, zscores[entry_idx], color='r', marker='v', s=100)
            plt.scatter(exit_idx, zscores[exit_idx], color='g', marker='^', s=100)
    
    plt.title(f"Z-Score for {stock1}-{stock2}")
    plt.legend()
    plt.grid(True)
    
    # Capital history
    plt.subplot(3, 1, 2)
    plt.plot(capital_history, label='Portfolio Value')
    plt.title("Portfolio Value Over Time")
    plt.grid(True)
    plt.legend()
    
    # Drawdown chart
    plt.subplot(3, 1, 3)
    plt.fill_between(range(len(drawdown)), 0, drawdown * 100)
    plt.title(f"Drawdown (%) - Max: {max_drawdown:.2f}%")
    plt.grid(True)
    
    plt.tight_layout()
    
    # Summary table
    summary = {
        'Pair': f"{stock1}-{stock2}",
        'Hedge Ratio': hedge_ratio,
        'Initial Capital': initial_capital,
        'Final Capital': capital,
        'Total Return (%)': total_return,
        'Number of Trades': num_trades,
        'Win Rate (%)': win_rate * 100,
        'Avg Trade Return (%)': avg_return,
        'Avg Holding Period (days)': avg_hold_days,
        'Sharpe Ratio': sharpe,
        'Max Drawdown (%)': max_drawdown
    }
    
    return {
        'summary': summary,
        'trades': trades,
        'capital_history': capital_history,
        'zscores': zscores,
        'hedge_ratio': hedge_ratio
    }