# Mean Reversion Trading Strategy

## Overview
This notebook implements a **statistical arbitrage mean reversion strategy** using pairs trading methodology.

### Strategy Logic
- **Approach**: Cointegration-based pairs trading
- **Signal**: Z-score of spread between cointegrated pairs
- **Entry**: Z-score exceeds ±2 standard deviations
- **Exit**: Z-score reverts to ±0.5 or stop-loss at ±4

### Contents
1. [Data Acquisition](#1.-Data-Acquisition)
2. [Pair Selection](#2.-Pair-Selection)
3. [Cointegration Testing](#3.-Cointegration-Testing)
4. [Signal Construction](#4.-Signal-Construction)
5. [Backtesting](#5.-Backtesting)
6. [Performance Analysis](#6.-Performance-Analysis)

In [None]:
# Standard imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from itertools import combinations
import warnings
warnings.filterwarnings('ignore')

# Set display options
pd.set_option('display.max_columns', 50)
plt.style.use('seaborn-v0_8-whitegrid')

# Random seed for reproducibility
np.random.seed(42)

## 1. Data Acquisition

Generate synthetic price data with embedded cointegration relationships.

In [None]:
def generate_cointegrated_prices(n_pairs=10, n_days=1260, noise_level=0.02):
    """
    Generate synthetic cointegrated price pairs.
    
    Parameters:
    -----------
    n_pairs : int
        Number of cointegrated pairs
    n_days : int
        Number of trading days (1260 ≈ 5 years)
    noise_level : float
        Idiosyncratic noise level
        
    Returns:
    --------
    pd.DataFrame
        Price data for all assets
    list
        True pairs (tuples of asset names)
    """
    dates = pd.date_range(start='2019-01-01', periods=n_days, freq='B')
    prices_dict = {}
    true_pairs = []
    
    for i in range(n_pairs):
        # Generate common stochastic trend
        trend = np.cumsum(np.random.normal(0.0002, 0.015, n_days))
        
        # Asset A follows the trend with noise
        noise_a = np.cumsum(np.random.normal(0, noise_level, n_days))
        price_a = 100 * np.exp(trend + noise_a)
        
        # Asset B follows the trend with different noise (cointegrated)
        beta = np.random.uniform(0.8, 1.2)  # Hedge ratio
        noise_b = np.cumsum(np.random.normal(0, noise_level, n_days))
        price_b = 100 * np.exp(beta * trend + noise_b)
        
        asset_a = f'STOCK_{2*i:03d}'
        asset_b = f'STOCK_{2*i+1:03d}'
        prices_dict[asset_a] = price_a
        prices_dict[asset_b] = price_b
        true_pairs.append((asset_a, asset_b))
    
    # Add some non-cointegrated assets
    for i in range(5):
        asset = f'STOCK_{2*n_pairs + i:03d}'
        random_walk = np.cumsum(np.random.normal(0.0001, 0.02, n_days))
        prices_dict[asset] = 100 * np.exp(random_walk)
    
    return pd.DataFrame(prices_dict, index=dates), true_pairs

# Generate data
prices, true_pairs = generate_cointegrated_prices()
print(f"Price data shape: {prices.shape}")
print(f"Date range: {prices.index[0]} to {prices.index[-1]}")
print(f"\nTrue cointegrated pairs: {len(true_pairs)}")
prices.head()

In [None]:
# Visualize a cointegrated pair
pair = true_pairs[0]

fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Prices
ax1 = axes[0]
ax1.plot(prices.index, prices[pair[0]], label=pair[0], linewidth=1.5)
ax1.plot(prices.index, prices[pair[1]], label=pair[1], linewidth=1.5)
ax1.set_title(f'Cointegrated Pair: {pair[0]} vs {pair[1]}', fontsize=14)
ax1.set_ylabel('Price')
ax1.legend()

# Log price ratio
ax2 = axes[1]
log_ratio = np.log(prices[pair[0]]) - np.log(prices[pair[1]])
ax2.plot(prices.index, log_ratio, 'green', linewidth=1.5)
ax2.axhline(log_ratio.mean(), color='red', linestyle='--', label='Mean')
ax2.fill_between(prices.index, log_ratio.mean() - 2*log_ratio.std(), 
                 log_ratio.mean() + 2*log_ratio.std(), alpha=0.2, color='gray')
ax2.set_title('Log Price Ratio (Spread)', fontsize=14)
ax2.set_ylabel('Log Ratio')
ax2.set_xlabel('Date')
ax2.legend()

plt.tight_layout()
plt.show()

## 2. Pair Selection

Screen for potential pairs using correlation as a pre-filter.

In [None]:
def calculate_correlation_matrix(prices, lookback=252):
    """
    Calculate correlation matrix of returns.
    
    Parameters:
    -----------
    prices : pd.DataFrame
        Price data
    lookback : int
        Lookback period for correlation
        
    Returns:
    --------
    pd.DataFrame
        Correlation matrix
    """
    returns = prices.pct_change().dropna()
    return returns.tail(lookback).corr()

# Calculate correlation matrix
corr_matrix = calculate_correlation_matrix(prices)

# Visualize
fig, ax = plt.subplots(figsize=(12, 10))
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(corr_matrix, mask=mask, annot=False, cmap='RdYlGn', center=0, 
            square=True, linewidths=0.5, ax=ax)
ax.set_title('Return Correlation Matrix', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
def find_candidate_pairs(corr_matrix, min_correlation=0.7):
    """
    Find candidate pairs based on correlation threshold.
    
    Parameters:
    -----------
    corr_matrix : pd.DataFrame
        Correlation matrix
    min_correlation : float
        Minimum correlation for candidate pairs
        
    Returns:
    --------
    list
        List of candidate pairs (tuples)
    """
    candidates = []
    assets = corr_matrix.columns
    
    for i in range(len(assets)):
        for j in range(i+1, len(assets)):
            if corr_matrix.iloc[i, j] >= min_correlation:
                candidates.append((assets[i], assets[j], corr_matrix.iloc[i, j]))
    
    return sorted(candidates, key=lambda x: x[2], reverse=True)

# Find candidate pairs
candidates = find_candidate_pairs(corr_matrix, min_correlation=0.6)
print(f"Found {len(candidates)} candidate pairs with correlation >= 0.6")
print("\nTop 10 pairs by correlation:")
for pair in candidates[:10]:
    print(f"  {pair[0]} - {pair[1]}: {pair[2]:.3f}")

## 3. Cointegration Testing

Test candidate pairs for cointegration using the Engle-Granger two-step method.

In [None]:
def engle_granger_test(y, x):
    """
    Perform Engle-Granger cointegration test.
    
    Parameters:
    -----------
    y : pd.Series
        Dependent variable (log prices)
    x : pd.Series
        Independent variable (log prices)
        
    Returns:
    --------
    dict
        Test results including p-value, hedge ratio, and spread
    """
    # Step 1: OLS regression to find hedge ratio
    # y = alpha + beta * x + epsilon
    x_with_const = np.column_stack([np.ones(len(x)), x])
    beta, _, _, _ = np.linalg.lstsq(x_with_const, y, rcond=None)
    alpha, hedge_ratio = beta[0], beta[1]
    
    # Calculate spread (residuals)
    spread = y - alpha - hedge_ratio * x
    
    # Step 2: ADF test on residuals
    # Simplified ADF using t-statistic approach
    spread_diff = spread.diff().dropna()
    spread_lag = spread.shift(1).dropna()
    spread_lag = spread_lag.iloc[:-1]  # Align with diff
    
    # Regression: delta_spread = phi * spread_lag + error
    phi = np.sum(spread_diff * spread_lag) / np.sum(spread_lag ** 2)
    residuals = spread_diff - phi * spread_lag
    se_phi = np.sqrt(np.var(residuals) / np.sum(spread_lag ** 2))
    t_stat = phi / se_phi
    
    # Approximate p-value (using ADF critical values)
    # Critical values at 1%, 5%, 10%: -3.43, -2.86, -2.57
    if t_stat < -3.43:
        p_value = 0.01
    elif t_stat < -2.86:
        p_value = 0.05
    elif t_stat < -2.57:
        p_value = 0.10
    else:
        p_value = 0.20
    
    # Calculate half-life of mean reversion
    half_life = -np.log(2) / phi if phi < 0 else np.inf
    
    return {
        'hedge_ratio': hedge_ratio,
        'alpha': alpha,
        't_stat': t_stat,
        'p_value': p_value,
        'half_life': half_life,
        'spread': spread
    }

# Test cointegration for candidate pairs
cointegrated_pairs = []

for pair in candidates:
    asset_a, asset_b, corr = pair
    log_a = np.log(prices[asset_a])
    log_b = np.log(prices[asset_b])
    
    result = engle_granger_test(log_a, log_b)
    
    if result['p_value'] <= 0.05 and 5 <= result['half_life'] <= 100:
        cointegrated_pairs.append({
            'asset_a': asset_a,
            'asset_b': asset_b,
            'correlation': corr,
            **result
        })

print(f"Found {len(cointegrated_pairs)} cointegrated pairs")
print("\nCointegrated pairs details:")
for pair in cointegrated_pairs[:5]:
    print(f"\n  {pair['asset_a']} - {pair['asset_b']}:")
    print(f"    Hedge Ratio: {pair['hedge_ratio']:.3f}")
    print(f"    Half-life: {pair['half_life']:.1f} days")
    print(f"    P-value: {pair['p_value']:.2f}")

## 4. Signal Construction

Calculate z-scores for trading signals.

In [None]:
def calculate_zscore(spread, lookback=60):
    """
    Calculate rolling z-score of spread.
    
    Parameters:
    -----------
    spread : pd.Series
        Spread time series
    lookback : int
        Lookback for mean and std calculation
        
    Returns:
    --------
    pd.Series
        Z-score time series
    """
    rolling_mean = spread.rolling(lookback).mean()
    rolling_std = spread.rolling(lookback).std()
    return (spread - rolling_mean) / rolling_std

# Calculate z-scores for all cointegrated pairs
zscores = {}
for pair in cointegrated_pairs:
    key = f"{pair['asset_a']}_{pair['asset_b']}"
    zscores[key] = calculate_zscore(pair['spread'])

# Visualize z-score for first pair
if cointegrated_pairs:
    pair = cointegrated_pairs[0]
    key = f"{pair['asset_a']}_{pair['asset_b']}"
    zscore = zscores[key]
    
    fig, ax = plt.subplots(figsize=(14, 6))
    ax.plot(zscore.index, zscore.values, 'blue', linewidth=1)
    ax.axhline(0, color='black', linestyle='-', alpha=0.5)
    ax.axhline(2, color='red', linestyle='--', label='Entry Long')
    ax.axhline(-2, color='green', linestyle='--', label='Entry Short')
    ax.axhline(0.5, color='gray', linestyle=':', alpha=0.7)
    ax.axhline(-0.5, color='gray', linestyle=':', alpha=0.7)
    ax.fill_between(zscore.index, -2, 2, alpha=0.1, color='gray')
    ax.set_title(f'Z-Score: {pair["asset_a"]} - {pair["asset_b"]}', fontsize=14)
    ax.set_ylabel('Z-Score')
    ax.set_xlabel('Date')
    ax.legend()
    ax.set_ylim(-5, 5)
    plt.tight_layout()
    plt.show()

## 5. Backtesting

Implement the mean reversion trading strategy.

In [None]:
class PairsTradingBacktest:
    """
    Backtesting framework for pairs trading strategy.
    """
    
    def __init__(self, prices, pairs_info, entry_threshold=2.0, 
                 exit_threshold=0.5, stop_loss=4.0, transaction_cost=0.001):
        """
        Initialize backtest.
        
        Parameters:
        -----------
        prices : pd.DataFrame
            Price data
        pairs_info : list
            List of dictionaries with pair information
        entry_threshold : float
            Z-score threshold for entry
        exit_threshold : float
            Z-score threshold for exit
        stop_loss : float
            Z-score threshold for stop loss
        transaction_cost : float
            One-way transaction cost
        """
        self.prices = prices
        self.pairs_info = pairs_info
        self.entry_threshold = entry_threshold
        self.exit_threshold = exit_threshold
        self.stop_loss = stop_loss
        self.transaction_cost = transaction_cost
        
        self.returns = prices.pct_change()
        
    def trade_pair(self, pair_info):
        """
        Generate signals and calculate returns for a single pair.
        
        Parameters:
        -----------
        pair_info : dict
            Dictionary with pair information
            
        Returns:
        --------
        pd.Series
            Strategy returns for this pair
        """
        asset_a = pair_info['asset_a']
        asset_b = pair_info['asset_b']
        hedge_ratio = pair_info['hedge_ratio']
        
        # Calculate z-score
        zscore = calculate_zscore(pair_info['spread'])
        
        # Initialize position
        position = pd.Series(0.0, index=zscore.index)
        
        for i in range(1, len(zscore)):
            prev_pos = position.iloc[i-1]
            z = zscore.iloc[i]
            
            if np.isnan(z):
                position.iloc[i] = prev_pos
                continue
            
            # Entry signals
            if prev_pos == 0:
                if z < -self.entry_threshold:
                    position.iloc[i] = 1  # Long spread (long A, short B)
                elif z > self.entry_threshold:
                    position.iloc[i] = -1  # Short spread (short A, long B)
                else:
                    position.iloc[i] = 0
            
            # Exit signals
            elif prev_pos == 1:  # Long spread
                if z >= -self.exit_threshold or z > self.stop_loss:
                    position.iloc[i] = 0
                else:
                    position.iloc[i] = 1
                    
            elif prev_pos == -1:  # Short spread
                if z <= self.exit_threshold or z < -self.stop_loss:
                    position.iloc[i] = 0
                else:
                    position.iloc[i] = -1
        
        # Calculate returns
        ret_a = self.returns[asset_a]
        ret_b = self.returns[asset_b]
        
        # Spread return: long A, short B (normalized)
        spread_return = (ret_a - hedge_ratio * ret_b) / (1 + hedge_ratio)
        
        # Strategy return
        gross_return = position.shift(1) * spread_return
        
        # Transaction costs
        trades = position.diff().abs()
        costs = trades * self.transaction_cost * 2  # Both legs
        
        return gross_return - costs, position
    
    def run(self):
        """
        Run backtest for all pairs.
        
        Returns:
        --------
        pd.Series
            Combined strategy returns
        """
        all_returns = []
        all_positions = {}
        
        for pair_info in self.pairs_info:
            pair_returns, position = self.trade_pair(pair_info)
            all_returns.append(pair_returns)
            key = f"{pair_info['asset_a']}_{pair_info['asset_b']}"
            all_positions[key] = position
        
        # Equal weight across pairs
        combined_returns = pd.concat(all_returns, axis=1).mean(axis=1)
        
        self.pair_returns = all_returns
        self.positions = all_positions
        
        return combined_returns

In [None]:
# Run backtest
if cointegrated_pairs:
    backtest = PairsTradingBacktest(
        prices=prices,
        pairs_info=cointegrated_pairs,
        entry_threshold=2.0,
        exit_threshold=0.5,
        stop_loss=4.0,
        transaction_cost=0.001
    )
    
    portfolio_returns = backtest.run()
    portfolio_returns = portfolio_returns.dropna()
    
    print(f"Backtest period: {portfolio_returns.index[0]} to {portfolio_returns.index[-1]}")
    print(f"Number of trading days: {len(portfolio_returns)}")
    print(f"Number of pairs traded: {len(cointegrated_pairs)}")
else:
    print("No cointegrated pairs found for backtesting.")

## 6. Performance Analysis

Evaluate strategy performance.

In [None]:
def calculate_performance_metrics(returns, risk_free_rate=0.02):
    """
    Calculate comprehensive performance metrics.
    """
    ann_factor = 252
    
    total_return = (1 + returns).prod() - 1
    ann_return = (1 + total_return) ** (ann_factor / len(returns)) - 1
    ann_vol = returns.std() * np.sqrt(ann_factor)
    
    excess_return = ann_return - risk_free_rate
    sharpe = excess_return / ann_vol if ann_vol > 0 else 0
    
    cum_returns = (1 + returns).cumprod()
    running_max = cum_returns.cummax()
    drawdown = (cum_returns - running_max) / running_max
    max_drawdown = drawdown.min()
    
    calmar = ann_return / abs(max_drawdown) if max_drawdown != 0 else 0
    
    win_rate = (returns > 0).sum() / len(returns)
    
    return {
        'Total Return': f'{total_return:.2%}',
        'Annual Return': f'{ann_return:.2%}',
        'Annual Volatility': f'{ann_vol:.2%}',
        'Sharpe Ratio': f'{sharpe:.2f}',
        'Max Drawdown': f'{max_drawdown:.2%}',
        'Calmar Ratio': f'{calmar:.2f}',
        'Win Rate': f'{win_rate:.2%}'
    }

if cointegrated_pairs:
    metrics = calculate_performance_metrics(portfolio_returns)
    
    print("\n" + "="*50)
    print("PAIRS TRADING STRATEGY PERFORMANCE METRICS")
    print("="*50)
    for key, value in metrics.items():
        print(f"{key:20} : {value:>12}")
    print("="*50)

In [None]:
if cointegrated_pairs:
    # Plot equity curve and drawdown
    fig, axes = plt.subplots(3, 1, figsize=(14, 12))
    
    # Cumulative returns
    cum_returns = (1 + portfolio_returns).cumprod()
    axes[0].plot(cum_returns.index, cum_returns.values, 'b-', linewidth=1.5)
    axes[0].set_title('Pairs Trading Strategy - Equity Curve', fontsize=14)
    axes[0].set_ylabel('Cumulative Return')
    axes[0].axhline(y=1, color='gray', linestyle='--', alpha=0.5)
    axes[0].fill_between(cum_returns.index, 1, cum_returns.values, 
                         where=(cum_returns.values >= 1), alpha=0.3, color='green')
    axes[0].fill_between(cum_returns.index, 1, cum_returns.values, 
                         where=(cum_returns.values < 1), alpha=0.3, color='red')
    
    # Drawdown
    running_max = cum_returns.cummax()
    drawdown = (cum_returns - running_max) / running_max
    axes[1].fill_between(drawdown.index, drawdown.values, 0, alpha=0.7, color='red')
    axes[1].set_title('Drawdown', fontsize=14)
    axes[1].set_ylabel('Drawdown')
    
    # Rolling Sharpe
    rolling_sharpe = portfolio_returns.rolling(126).mean() / portfolio_returns.rolling(126).std() * np.sqrt(252)
    axes[2].plot(rolling_sharpe.index, rolling_sharpe.values, 'purple', linewidth=1.5)
    axes[2].axhline(y=0, color='gray', linestyle='--', alpha=0.5)
    axes[2].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='Sharpe = 1')
    axes[2].set_title('Rolling 6-Month Sharpe Ratio', fontsize=14)
    axes[2].set_ylabel('Sharpe Ratio')
    axes[2].set_xlabel('Date')
    axes[2].legend()
    
    plt.tight_layout()
    plt.show()

In [None]:
if cointegrated_pairs:
    # Analyze individual pair performance
    pair_metrics = []
    for i, ret in enumerate(backtest.pair_returns):
        ret_clean = ret.dropna()
        if len(ret_clean) > 0:
            total_ret = (1 + ret_clean).prod() - 1
            ann_vol = ret_clean.std() * np.sqrt(252)
            sharpe = (ret_clean.mean() * 252 - 0.02) / ann_vol if ann_vol > 0 else 0
            pair_metrics.append({
                'Pair': f"{cointegrated_pairs[i]['asset_a']}_{cointegrated_pairs[i]['asset_b']}",
                'Total Return': total_ret,
                'Annual Vol': ann_vol,
                'Sharpe': sharpe
            })
    
    metrics_df = pd.DataFrame(pair_metrics)
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Total returns by pair
    axes[0].barh(metrics_df['Pair'], metrics_df['Total Return'], color='steelblue', edgecolor='black')
    axes[0].axvline(0, color='black', linewidth=0.5)
    axes[0].set_xlabel('Total Return')
    axes[0].set_title('Total Return by Pair', fontsize=12)
    
    # Sharpe by pair
    colors = ['green' if s > 0 else 'red' for s in metrics_df['Sharpe']]
    axes[1].barh(metrics_df['Pair'], metrics_df['Sharpe'], color=colors, edgecolor='black')
    axes[1].axvline(0, color='black', linewidth=0.5)
    axes[1].axvline(1, color='gray', linestyle='--', alpha=0.5)
    axes[1].set_xlabel('Sharpe Ratio')
    axes[1].set_title('Sharpe Ratio by Pair', fontsize=12)
    
    plt.tight_layout()
    plt.show()

## Conclusions

### Key Findings

1. **Pair Selection**: Cointegration testing successfully identified [X] tradeable pairs from [Y] candidates.

2. **Strategy Performance**: The pairs trading strategy shows [positive/negative] returns with:
   - Annualized return of approximately [X%]
   - Lower volatility compared to directional strategies
   - Market-neutral characteristics

3. **Risk Characteristics**:
   - Maximum drawdown of [X%]
   - Strategy benefits from mean-reverting behavior

### Potential Improvements

- **Dynamic Hedge Ratios**: Use Kalman filter for adaptive hedge ratio estimation
- **Regime Detection**: Incorporate regime switching to avoid momentum periods
- **Risk Management**: Add pair-level position limits and portfolio-level risk controls