# Day 04: A/B Testing for Trading Strategies

## Production ML - Week 23

**Objective:** Learn how to rigorously compare trading strategies using statistical A/B testing frameworks, enabling data-driven decisions about strategy deployment.

### Why A/B Testing for Trading?

In production trading systems, we need to answer critical questions:
- Is the new strategy variant actually better than the existing one?
- How confident are we in the observed performance difference?
- When can we stop testing and make a deployment decision?

### Key Differences from Web A/B Testing

| Aspect | Web A/B Testing | Trading A/B Testing |
|--------|-----------------|---------------------|
| Samples | Independent users | Correlated time-series |
| Metrics | Conversion rate | Sharpe, returns, drawdown |
| Environment | Relatively stable | Non-stationary markets |
| Cost of error | Lost revenue | Capital loss |
| Time horizon | Days to weeks | Weeks to months |

### Topics Covered

1. Statistical foundations for strategy comparison
2. Frequentist hypothesis testing (t-test, Mann-Whitney U)
3. Effect size and power analysis
4. Bootstrap confidence intervals
5. Sharpe ratio comparison methods
6. Sequential testing for early stopping
7. Bayesian A/B testing approach

## 1. Import Required Libraries

Import numpy, pandas, scipy.stats, matplotlib, and other necessary libraries for statistical analysis and visualization.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import ttest_ind, mannwhitneyu, norm, t as t_dist
from typing import Tuple, Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')

# Set random seed for reproducibility
np.random.seed(42)

# Plotting configuration
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 11

print("Libraries imported successfully!")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")

## 2. Generate Synthetic Trading Data

Create synthetic price data and trading signals to simulate two different trading strategies for comparison. We'll generate realistic market data with trends, volatility clustering, and regime changes.

In [None]:
def generate_price_data(
    n_days: int = 504,  # ~2 years of trading days
    initial_price: float = 100.0,
    annual_return: float = 0.08,
    annual_volatility: float = 0.20,
    seed: int = 42
) -> pd.DataFrame:
    """
    Generate synthetic price data using Geometric Brownian Motion
    with volatility clustering (GARCH-like behavior).
    """
    np.random.seed(seed)
    
    # Daily parameters
    daily_return = annual_return / 252
    daily_vol = annual_volatility / np.sqrt(252)
    
    # Generate returns with volatility clustering
    returns = np.zeros(n_days)
    vol = daily_vol
    
    for i in range(n_days):
        # GARCH(1,1)-like volatility update
        vol = 0.9 * vol + 0.1 * daily_vol * (1 + 0.5 * np.abs(returns[i-1] if i > 0 else 0) / daily_vol)
        returns[i] = daily_return + vol * np.random.randn()
    
    # Generate prices
    prices = initial_price * np.exp(np.cumsum(returns))
    
    # Create DataFrame
    dates = pd.date_range(start='2024-01-01', periods=n_days, freq='B')
    df = pd.DataFrame({
        'date': dates,
        'price': prices,
        'returns': returns
    })
    df.set_index('date', inplace=True)
    
    # Add technical indicators
    df['sma_20'] = df['price'].rolling(20).mean()
    df['sma_50'] = df['price'].rolling(50).mean()
    df['volatility_20'] = df['returns'].rolling(20).std() * np.sqrt(252)
    df['momentum_10'] = df['price'].pct_change(10)
    
    return df

# Generate price data
price_data = generate_price_data(n_days=504, seed=42)

print(f"Generated {len(price_data)} days of price data")
print(f"\nPrice Data Summary:")
print(price_data.describe())

# Plot price and volatility
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

axes[0].plot(price_data.index, price_data['price'], 'b-', linewidth=1.5, label='Price')
axes[0].plot(price_data.index, price_data['sma_20'], 'r--', alpha=0.7, label='SMA(20)')
axes[0].plot(price_data.index, price_data['sma_50'], 'g--', alpha=0.7, label='SMA(50)')
axes[0].set_title('Synthetic Price Data', fontsize=14, fontweight='bold')
axes[0].set_ylabel('Price')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].fill_between(price_data.index, 0, price_data['volatility_20'], alpha=0.5, color='orange')
axes[1].set_title('Rolling 20-Day Volatility (Annualized)', fontsize=14, fontweight='bold')
axes[1].set_ylabel('Volatility')
axes[1].set_xlabel('Date')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 3. Define Strategy A and Strategy B

Implement two distinct trading strategies for comparison:
- **Strategy A (Control):** Simple Moving Average Crossover (SMA 20/50)
- **Strategy B (Treatment):** Enhanced SMA with Momentum Filter

The treatment strategy adds a momentum confirmation signal to potentially improve timing.

In [None]:
class TradingStrategy:
    """Base class for trading strategies."""
    
    def __init__(self, name: str):
        self.name = name
        
    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """Generate trading signals: 1 for long, 0 for no position, -1 for short."""
        raise NotImplementedError
        

class StrategyA_SMA_Crossover(TradingStrategy):
    """
    Strategy A (Control): Simple SMA Crossover
    - Long when SMA(20) > SMA(50)
    - Flat when SMA(20) <= SMA(50)
    """
    
    def __init__(self, short_window: int = 20, long_window: int = 50):
        super().__init__("Strategy A: SMA Crossover")
        self.short_window = short_window
        self.long_window = long_window
        
    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        sma_short = data['price'].rolling(self.short_window).mean()
        sma_long = data['price'].rolling(self.long_window).mean()
        
        signals = pd.Series(0, index=data.index)
        signals[sma_short > sma_long] = 1
        signals[sma_short <= sma_long] = 0
        
        return signals


class StrategyB_SMA_Momentum(TradingStrategy):
    """
    Strategy B (Treatment): SMA Crossover with Momentum Filter
    - Long when SMA(20) > SMA(50) AND momentum_10 > 0
    - Flat otherwise
    """
    
    def __init__(self, short_window: int = 20, long_window: int = 50, momentum_window: int = 10):
        super().__init__("Strategy B: SMA + Momentum")
        self.short_window = short_window
        self.long_window = long_window
        self.momentum_window = momentum_window
        
    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        sma_short = data['price'].rolling(self.short_window).mean()
        sma_long = data['price'].rolling(self.long_window).mean()
        momentum = data['price'].pct_change(self.momentum_window)
        
        signals = pd.Series(0, index=data.index)
        
        # Long only when both conditions are met
        long_condition = (sma_short > sma_long) & (momentum > 0)
        signals[long_condition] = 1
        
        return signals


# Initialize strategies
strategy_a = StrategyA_SMA_Crossover()
strategy_b = StrategyB_SMA_Momentum()

# Generate signals
signals_a = strategy_a.generate_signals(price_data)
signals_b = strategy_b.generate_signals(price_data)

# Display signal statistics
print("=" * 60)
print("STRATEGY SIGNALS COMPARISON")
print("=" * 60)
print(f"\n{strategy_a.name}:")
print(f"  Days in market: {(signals_a == 1).sum()} ({100*(signals_a == 1).mean():.1f}%)")
print(f"  Days out of market: {(signals_a == 0).sum()} ({100*(signals_a == 0).mean():.1f}%)")

print(f"\n{strategy_b.name}:")
print(f"  Days in market: {(signals_b == 1).sum()} ({100*(signals_b == 1).mean():.1f}%)")
print(f"  Days out of market: {(signals_b == 0).sum()} ({100*(signals_b == 0).mean():.1f}%)")

# Visualize signals
fig, ax = plt.subplots(figsize=(14, 6))

ax.plot(price_data.index, price_data['price'], 'k-', linewidth=1, alpha=0.7, label='Price')

# Highlight periods in market for each strategy
in_market_a = signals_a == 1
in_market_b = signals_b == 1

ax.fill_between(price_data.index, price_data['price'].min(), price_data['price'].max(), 
                where=in_market_a, alpha=0.2, color='blue', label='Strategy A in market')
ax.fill_between(price_data.index, price_data['price'].min(), price_data['price'].max(), 
                where=in_market_b, alpha=0.2, color='green', label='Strategy B in market')

ax.set_title('Strategy Signals Comparison', fontsize=14, fontweight='bold')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 4. Calculate Strategy Returns

Compute daily and cumulative returns for both strategies, including transaction costs consideration. We'll track:
- Gross returns (before costs)
- Net returns (after transaction costs)
- Cumulative performance

In [None]:
def calculate_strategy_returns(
    data: pd.DataFrame,
    signals: pd.Series,
    transaction_cost_bps: float = 10.0,  # 10 basis points per trade
    strategy_name: str = "Strategy"
) -> pd.DataFrame:
    """
    Calculate strategy returns including transaction costs.
    
    Parameters:
    -----------
    data : pd.DataFrame with 'returns' column
    signals : pd.Series with position signals (0 or 1)
    transaction_cost_bps : float, cost per trade in basis points
    strategy_name : str, name for the strategy
    
    Returns:
    --------
    pd.DataFrame with return calculations
    """
    results = pd.DataFrame(index=data.index)
    
    # Shift signals to avoid look-ahead bias (trade next day)
    positions = signals.shift(1).fillna(0)
    
    # Calculate position changes (trades)
    position_changes = positions.diff().abs().fillna(0)
    
    # Gross returns (position * market returns)
    results['gross_returns'] = positions * data['returns']
    
    # Transaction costs
    cost_per_trade = transaction_cost_bps / 10000
    results['transaction_costs'] = position_changes * cost_per_trade
    
    # Net returns
    results['net_returns'] = results['gross_returns'] - results['transaction_costs']
    
    # Cumulative returns
    results['cumulative_gross'] = (1 + results['gross_returns']).cumprod()
    results['cumulative_net'] = (1 + results['net_returns']).cumprod()
    
    # Position tracking
    results['position'] = positions
    results['trades'] = position_changes
    
    return results


# Calculate returns for both strategies
returns_a = calculate_strategy_returns(price_data, signals_a, strategy_name="Strategy A")
returns_b = calculate_strategy_returns(price_data, signals_b, strategy_name="Strategy B")

# Calculate buy-and-hold benchmark
returns_benchmark = pd.DataFrame(index=price_data.index)
returns_benchmark['net_returns'] = price_data['returns']
returns_benchmark['cumulative_net'] = (1 + returns_benchmark['net_returns']).cumprod()

# Summary statistics
def calculate_performance_metrics(returns: pd.Series, name: str = "Strategy") -> Dict:
    """Calculate comprehensive performance metrics."""
    annual_factor = 252
    
    total_return = (1 + returns).prod() - 1
    annual_return = (1 + total_return) ** (annual_factor / len(returns)) - 1
    annual_vol = returns.std() * np.sqrt(annual_factor)
    sharpe = annual_return / annual_vol if annual_vol > 0 else 0
    
    # Drawdown calculation
    cumulative = (1 + returns).cumprod()
    running_max = cumulative.expanding().max()
    drawdown = (cumulative - running_max) / running_max
    max_drawdown = drawdown.min()
    
    # Win rate
    winning_days = (returns > 0).sum()
    trading_days = (returns != 0).sum()
    win_rate = winning_days / trading_days if trading_days > 0 else 0
    
    return {
        'name': name,
        'total_return': total_return,
        'annual_return': annual_return,
        'annual_volatility': annual_vol,
        'sharpe_ratio': sharpe,
        'max_drawdown': max_drawdown,
        'win_rate': win_rate,
        'num_trades': 0  # Will be set separately
    }


# Calculate metrics
metrics_a = calculate_performance_metrics(returns_a['net_returns'].dropna(), "Strategy A")
metrics_a['num_trades'] = returns_a['trades'].sum()

metrics_b = calculate_performance_metrics(returns_b['net_returns'].dropna(), "Strategy B")
metrics_b['num_trades'] = returns_b['trades'].sum()

metrics_bh = calculate_performance_metrics(returns_benchmark['net_returns'].dropna(), "Buy & Hold")

# Display performance comparison
print("=" * 80)
print("STRATEGY PERFORMANCE COMPARISON")
print("=" * 80)

metrics_df = pd.DataFrame([metrics_a, metrics_b, metrics_bh]).set_index('name')
metrics_display = metrics_df.copy()
metrics_display['total_return'] = metrics_display['total_return'].apply(lambda x: f"{100*x:.2f}%")
metrics_display['annual_return'] = metrics_display['annual_return'].apply(lambda x: f"{100*x:.2f}%")
metrics_display['annual_volatility'] = metrics_display['annual_volatility'].apply(lambda x: f"{100*x:.2f}%")
metrics_display['sharpe_ratio'] = metrics_display['sharpe_ratio'].apply(lambda x: f"{x:.3f}")
metrics_display['max_drawdown'] = metrics_display['max_drawdown'].apply(lambda x: f"{100*x:.2f}%")
metrics_display['win_rate'] = metrics_display['win_rate'].apply(lambda x: f"{100*x:.1f}%")
metrics_display['num_trades'] = metrics_display['num_trades'].apply(lambda x: f"{int(x)}")

print(metrics_display.T.to_string())

## 5. Perform Statistical Hypothesis Testing

Apply classical frequentist tests to determine if the difference in strategy returns is statistically significant.

### Hypothesis Setup
- **Null Hypothesis (H‚ÇÄ):** Mean return of Strategy B = Mean return of Strategy A
- **Alternative Hypothesis (H‚ÇÅ):** Mean return of Strategy B ‚â† Mean return of Strategy A

### Tests Applied
1. **Two-sample t-test:** Assumes normally distributed returns
2. **Mann-Whitney U test:** Non-parametric alternative, robust to non-normality
3. **Welch's t-test:** Does not assume equal variances

In [None]:
class ABTestHypothesis:
    """
    A/B Testing for Trading Strategies using Frequentist Methods.
    """
    
    def __init__(self, returns_control: pd.Series, returns_treatment: pd.Series,
                 alpha: float = 0.05):
        """
        Initialize with returns from control and treatment strategies.
        
        Parameters:
        -----------
        returns_control : daily returns from control strategy (Strategy A)
        returns_treatment : daily returns from treatment strategy (Strategy B)
        alpha : significance level (default 0.05)
        """
        self.control = returns_control.dropna().values
        self.treatment = returns_treatment.dropna().values
        self.alpha = alpha
        
    def two_sample_ttest(self, equal_var: bool = True) -> Dict:
        """
        Perform two-sample t-test.
        
        Parameters:
        -----------
        equal_var : if True, assumes equal variances (Student's t-test)
                   if False, does not assume equal variances (Welch's t-test)
        """
        t_stat, p_value = ttest_ind(self.treatment, self.control, equal_var=equal_var)
        
        test_name = "Student's t-test" if equal_var else "Welch's t-test"
        
        return {
            'test': test_name,
            't_statistic': t_stat,
            'p_value': p_value,
            'significant': p_value < self.alpha,
            'mean_control': np.mean(self.control),
            'mean_treatment': np.mean(self.treatment),
            'mean_diff': np.mean(self.treatment) - np.mean(self.control)
        }
    
    def mann_whitney_test(self) -> Dict:
        """
        Perform Mann-Whitney U test (non-parametric alternative).
        """
        u_stat, p_value = mannwhitneyu(self.treatment, self.control, alternative='two-sided')
        
        return {
            'test': 'Mann-Whitney U',
            'u_statistic': u_stat,
            'p_value': p_value,
            'significant': p_value < self.alpha,
            'median_control': np.median(self.control),
            'median_treatment': np.median(self.treatment)
        }
    
    def check_normality(self) -> Dict:
        """Check if returns are normally distributed using Shapiro-Wilk test."""
        # Use a sample if data is too large (Shapiro-Wilk has size limits)
        sample_size = min(len(self.control), len(self.treatment), 5000)
        
        _, p_control = stats.shapiro(np.random.choice(self.control, sample_size, replace=False))
        _, p_treatment = stats.shapiro(np.random.choice(self.treatment, sample_size, replace=False))
        
        return {
            'control_normal_pvalue': p_control,
            'treatment_normal_pvalue': p_treatment,
            'control_is_normal': p_control > self.alpha,
            'treatment_is_normal': p_treatment > self.alpha
        }
    
    def run_all_tests(self) -> pd.DataFrame:
        """Run all hypothesis tests and return summary."""
        results = []
        
        # Check normality first
        normality = self.check_normality()
        
        # Student's t-test
        t_result = self.two_sample_ttest(equal_var=True)
        results.append({
            'Test': t_result['test'],
            'Statistic': f"{t_result['t_statistic']:.4f}",
            'P-Value': f"{t_result['p_value']:.6f}",
            'Significant': '‚úì' if t_result['significant'] else '‚úó'
        })
        
        # Welch's t-test
        welch_result = self.two_sample_ttest(equal_var=False)
        results.append({
            'Test': welch_result['test'],
            'Statistic': f"{welch_result['t_statistic']:.4f}",
            'P-Value': f"{welch_result['p_value']:.6f}",
            'Significant': '‚úì' if welch_result['significant'] else '‚úó'
        })
        
        # Mann-Whitney U
        mw_result = self.mann_whitney_test()
        results.append({
            'Test': mw_result['test'],
            'Statistic': f"{mw_result['u_statistic']:.4f}",
            'P-Value': f"{mw_result['p_value']:.6f}",
            'Significant': '‚úì' if mw_result['significant'] else '‚úó'
        })
        
        return pd.DataFrame(results), normality, t_result


# Run hypothesis tests
ab_test = ABTestHypothesis(
    returns_control=returns_a['net_returns'],
    returns_treatment=returns_b['net_returns'],
    alpha=0.05
)

test_results_df, normality_check, detailed_result = ab_test.run_all_tests()

print("=" * 70)
print("HYPOTHESIS TESTING RESULTS")
print("=" * 70)
print(f"\nSignificance Level (Œ±): 0.05")
print(f"\nSample Sizes:")
print(f"  Strategy A (Control): {len(ab_test.control)} observations")
print(f"  Strategy B (Treatment): {len(ab_test.treatment)} observations")

print(f"\nüìä Normality Check (Shapiro-Wilk Test):")
print(f"  Control returns normally distributed: {normality_check['control_is_normal']} (p={normality_check['control_normal_pvalue']:.4f})")
print(f"  Treatment returns normally distributed: {normality_check['treatment_is_normal']} (p={normality_check['treatment_normal_pvalue']:.4f})")

print(f"\nüìà Mean Daily Returns:")
print(f"  Strategy A (Control): {100*detailed_result['mean_control']:.4f}%")
print(f"  Strategy B (Treatment): {100*detailed_result['mean_treatment']:.4f}%")
print(f"  Difference: {100*detailed_result['mean_diff']:.4f}%")

print(f"\nüß™ Statistical Tests:")
print(test_results_df.to_string(index=False))

# Interpretation
print("\n" + "=" * 70)
print("INTERPRETATION")
print("=" * 70)
if detailed_result['p_value'] < 0.05:
    if detailed_result['mean_diff'] > 0:
        print("‚úÖ The treatment strategy (B) shows STATISTICALLY SIGNIFICANT improvement")
        print("   over the control strategy (A) at the 5% significance level.")
    else:
        print("‚ö†Ô∏è  The treatment strategy (B) is SIGNIFICANTLY WORSE than the control (A)")
        print("   at the 5% significance level.")
else:
    print("‚ùå We CANNOT REJECT the null hypothesis that the strategies have equal means.")
    print("   The observed difference may be due to random chance.")

## 6. Calculate Effect Size and Power Analysis

Statistical significance alone isn't enough‚Äîwe need to know:
1. **Effect Size (Cohen's d):** How large is the practical difference?
2. **Statistical Power:** What's our probability of detecting a true effect?
3. **Required Sample Size:** How many observations do we need?

### Effect Size Interpretation (Cohen's d)
- Small: |d| ‚âà 0.2
- Medium: |d| ‚âà 0.5
- Large: |d| ‚âà 0.8

In [None]:
def cohens_d(group1: np.ndarray, group2: np.ndarray) -> float:
    """
    Calculate Cohen's d effect size.
    
    Cohen's d = (mean1 - mean2) / pooled_std
    """
    n1, n2 = len(group1), len(group2)
    var1, var2 = np.var(group1, ddof=1), np.var(group2, ddof=1)
    
    # Pooled standard deviation
    pooled_std = np.sqrt(((n1 - 1) * var1 + (n2 - 1) * var2) / (n1 + n2 - 2))
    
    return (np.mean(group1) - np.mean(group2)) / pooled_std


def calculate_power(effect_size: float, n1: int, n2: int, alpha: float = 0.05) -> float:
    """
    Calculate statistical power for a two-sample t-test.
    
    Power = P(reject H0 | H0 is false)
    """
    # Degrees of freedom
    df = n1 + n2 - 2
    
    # Non-centrality parameter
    se = np.sqrt(1/n1 + 1/n2)
    ncp = effect_size / se
    
    # Critical value
    t_crit = t_dist.ppf(1 - alpha/2, df)
    
    # Power = P(|T| > t_crit) under alternative
    power = 1 - t_dist.cdf(t_crit - ncp, df) + t_dist.cdf(-t_crit - ncp, df)
    
    return power


def required_sample_size(effect_size: float, power: float = 0.80, alpha: float = 0.05) -> int:
    """
    Calculate required sample size per group to achieve desired power.
    Uses iterative approximation.
    """
    if abs(effect_size) < 0.001:
        return float('inf')
    
    # Initial approximation
    z_alpha = norm.ppf(1 - alpha/2)
    z_beta = norm.ppf(power)
    
    n_approx = 2 * ((z_alpha + z_beta) / effect_size) ** 2
    
    # Refine iteratively
    for n in range(int(n_approx * 0.5), int(n_approx * 2)):
        if calculate_power(effect_size, n, n, alpha) >= power:
            return n
    
    return int(n_approx)


# Calculate effect size
control_returns = returns_a['net_returns'].dropna().values
treatment_returns = returns_b['net_returns'].dropna().values

effect_size = cohens_d(treatment_returns, control_returns)
current_power = calculate_power(effect_size, len(treatment_returns), len(control_returns))

print("=" * 70)
print("EFFECT SIZE AND POWER ANALYSIS")
print("=" * 70)

print(f"\nüìè Effect Size (Cohen's d): {effect_size:.4f}")

# Interpret effect size
if abs(effect_size) < 0.2:
    effect_interpretation = "Negligible/Small"
elif abs(effect_size) < 0.5:
    effect_interpretation = "Small to Medium"
elif abs(effect_size) < 0.8:
    effect_interpretation = "Medium to Large"
else:
    effect_interpretation = "Large"

print(f"   Interpretation: {effect_interpretation}")

print(f"\n‚ö° Current Statistical Power: {100*current_power:.2f}%")
print(f"   (Probability of detecting the effect if it's real)")

# Sample size analysis for different effect sizes
print("\nüìä Required Sample Size Analysis (per group):")
print("   Target Power: 80%, Significance Level: 5%")
print("-" * 50)

effect_sizes_to_test = [0.1, 0.2, 0.3, 0.5, 0.8]
for es in effect_sizes_to_test:
    req_n = required_sample_size(es, power=0.80, alpha=0.05)
    req_days = f"{req_n} days" if req_n < 5000 else f"~{req_n} days"
    trading_years = req_n / 252
    print(f"   Effect Size d={es:.1f}: {req_days} ({trading_years:.1f} trading years)")

# Power curve visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Power vs Sample Size for our observed effect
sample_sizes = np.arange(50, 1000, 10)
powers = [calculate_power(effect_size, n, n) for n in sample_sizes]

axes[0].plot(sample_sizes, powers, 'b-', linewidth=2)
axes[0].axhline(y=0.80, color='r', linestyle='--', label='80% Power Target')
axes[0].axvline(x=len(treatment_returns), color='g', linestyle='--', 
                label=f'Current n={len(treatment_returns)}')
axes[0].fill_between(sample_sizes, 0, powers, alpha=0.2)
axes[0].set_xlabel('Sample Size (per group)', fontsize=12)
axes[0].set_ylabel('Statistical Power', fontsize=12)
axes[0].set_title(f'Power Curve for Observed Effect Size (d={effect_size:.3f})', 
                  fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].set_ylim(0, 1)

# Plot 2: Required sample size vs Effect size
effect_sizes_range = np.arange(0.05, 1.0, 0.01)
required_n = [required_sample_size(es, power=0.80) for es in effect_sizes_range]

axes[1].plot(effect_sizes_range, required_n, 'b-', linewidth=2)
axes[1].axvline(x=abs(effect_size), color='g', linestyle='--', 
                label=f'Our Effect Size: d={effect_size:.3f}')
axes[1].axhline(y=len(treatment_returns), color='orange', linestyle='--',
                label=f'Our Sample Size: n={len(treatment_returns)}')
axes[1].set_xlabel("Effect Size (Cohen's d)", fontsize=12)
axes[1].set_ylabel('Required Sample Size (per group)', fontsize=12)
axes[1].set_title('Required Sample Size for 80% Power', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].set_ylim(0, 3000)

plt.tight_layout()
plt.show()

## 7. Bootstrap Confidence Intervals

Use bootstrap resampling to construct confidence intervals for the difference in mean returns between strategies. Bootstrap is particularly useful when:
- Distribution assumptions may not hold
- We want non-parametric confidence intervals
- We need intervals for complex statistics (Sharpe ratio, max drawdown)

In [None]:
class BootstrapAnalysis:
    """
    Bootstrap analysis for comparing trading strategy performance.
    """
    
    def __init__(self, returns_a: np.ndarray, returns_b: np.ndarray, 
                 n_bootstrap: int = 10000, random_state: int = 42):
        self.returns_a = returns_a
        self.returns_b = returns_b
        self.n_bootstrap = n_bootstrap
        self.rng = np.random.RandomState(random_state)
        
    def bootstrap_statistic(self, statistic_func: callable) -> Tuple[np.ndarray, np.ndarray]:
        """
        Generate bootstrap distributions for a given statistic.
        
        Returns:
        --------
        (bootstrap_a, bootstrap_b): Arrays of bootstrapped statistics
        """
        n_a, n_b = len(self.returns_a), len(self.returns_b)
        
        bootstrap_a = np.zeros(self.n_bootstrap)
        bootstrap_b = np.zeros(self.n_bootstrap)
        
        for i in range(self.n_bootstrap):
            # Resample with replacement
            sample_a = self.rng.choice(self.returns_a, size=n_a, replace=True)
            sample_b = self.rng.choice(self.returns_b, size=n_b, replace=True)
            
            bootstrap_a[i] = statistic_func(sample_a)
            bootstrap_b[i] = statistic_func(sample_b)
            
        return bootstrap_a, bootstrap_b
    
    def confidence_interval_diff(self, statistic_func: callable, 
                                  confidence: float = 0.95) -> Dict:
        """
        Calculate confidence interval for the difference in a statistic.
        """
        bootstrap_a, bootstrap_b = self.bootstrap_statistic(statistic_func)
        
        # Difference distribution
        diff = bootstrap_b - bootstrap_a
        
        alpha = 1 - confidence
        ci_lower = np.percentile(diff, 100 * alpha / 2)
        ci_upper = np.percentile(diff, 100 * (1 - alpha / 2))
        
        # Probability that B > A
        prob_b_better = (diff > 0).mean()
        
        return {
            'mean_diff': diff.mean(),
            'std_diff': diff.std(),
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'confidence': confidence,
            'prob_b_better': prob_b_better,
            'bootstrap_diff': diff,
            'bootstrap_a': bootstrap_a,
            'bootstrap_b': bootstrap_b
        }


# Define statistics functions
def mean_return(returns):
    return np.mean(returns)

def sharpe_ratio(returns, risk_free=0.0):
    excess = returns - risk_free/252
    if np.std(excess) == 0:
        return 0
    return np.mean(excess) / np.std(excess) * np.sqrt(252)

def max_drawdown(returns):
    cumulative = (1 + returns).cumprod()
    running_max = np.maximum.accumulate(cumulative)
    drawdown = (cumulative - running_max) / running_max
    return np.min(drawdown)


# Run bootstrap analysis
bootstrap = BootstrapAnalysis(
    returns_a=control_returns,
    returns_b=treatment_returns,
    n_bootstrap=10000
)

# Mean return difference
mean_ci = bootstrap.confidence_interval_diff(mean_return)

# Sharpe ratio difference
sharpe_ci = bootstrap.confidence_interval_diff(sharpe_ratio)

# Print results
print("=" * 70)
print("BOOTSTRAP CONFIDENCE INTERVALS")
print("=" * 70)

print(f"\nüìä Mean Daily Return Difference (Strategy B - Strategy A):")
print(f"   Mean Difference: {100*mean_ci['mean_diff']:.4f}%")
print(f"   95% CI: [{100*mean_ci['ci_lower']:.4f}%, {100*mean_ci['ci_upper']:.4f}%]")
print(f"   P(Strategy B > Strategy A): {100*mean_ci['prob_b_better']:.1f}%")

contains_zero_mean = mean_ci['ci_lower'] <= 0 <= mean_ci['ci_upper']
print(f"   CI Contains Zero: {'Yes ‚ö†Ô∏è' if contains_zero_mean else 'No ‚úì'}")

print(f"\nüìà Sharpe Ratio Difference (Strategy B - Strategy A):")
print(f"   Mean Difference: {sharpe_ci['mean_diff']:.4f}")
print(f"   95% CI: [{sharpe_ci['ci_lower']:.4f}, {sharpe_ci['ci_upper']:.4f}]")
print(f"   P(Strategy B > Strategy A): {100*sharpe_ci['prob_b_better']:.1f}%")

contains_zero_sharpe = sharpe_ci['ci_lower'] <= 0 <= sharpe_ci['ci_upper']
print(f"   CI Contains Zero: {'Yes ‚ö†Ô∏è' if contains_zero_sharpe else 'No ‚úì'}")

# Visualize bootstrap distributions
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Mean return difference histogram
axes[0, 0].hist(mean_ci['bootstrap_diff'] * 100, bins=50, density=True, 
                alpha=0.7, color='steelblue', edgecolor='black')
axes[0, 0].axvline(x=0, color='red', linestyle='--', linewidth=2, label='Zero (No Difference)')
axes[0, 0].axvline(x=mean_ci['ci_lower']*100, color='green', linestyle='--', label='95% CI')
axes[0, 0].axvline(x=mean_ci['ci_upper']*100, color='green', linestyle='--')
axes[0, 0].axvline(x=mean_ci['mean_diff']*100, color='orange', linewidth=2, label='Mean Diff')
axes[0, 0].set_xlabel('Mean Return Difference (%)', fontsize=11)
axes[0, 0].set_ylabel('Density', fontsize=11)
axes[0, 0].set_title('Bootstrap Distribution: Mean Return Difference', fontsize=12, fontweight='bold')
axes[0, 0].legend()

# Sharpe ratio difference histogram
axes[0, 1].hist(sharpe_ci['bootstrap_diff'], bins=50, density=True, 
                alpha=0.7, color='steelblue', edgecolor='black')
axes[0, 1].axvline(x=0, color='red', linestyle='--', linewidth=2, label='Zero (No Difference)')
axes[0, 1].axvline(x=sharpe_ci['ci_lower'], color='green', linestyle='--', label='95% CI')
axes[0, 1].axvline(x=sharpe_ci['ci_upper'], color='green', linestyle='--')
axes[0, 1].axvline(x=sharpe_ci['mean_diff'], color='orange', linewidth=2, label='Mean Diff')
axes[0, 1].set_xlabel('Sharpe Ratio Difference', fontsize=11)
axes[0, 1].set_ylabel('Density', fontsize=11)
axes[0, 1].set_title('Bootstrap Distribution: Sharpe Ratio Difference', fontsize=12, fontweight='bold')
axes[0, 1].legend()

# Strategy A vs B Sharpe distributions
axes[1, 0].hist(sharpe_ci['bootstrap_a'], bins=50, alpha=0.6, label='Strategy A', color='blue')
axes[1, 0].hist(sharpe_ci['bootstrap_b'], bins=50, alpha=0.6, label='Strategy B', color='green')
axes[1, 0].set_xlabel('Sharpe Ratio', fontsize=11)
axes[1, 0].set_ylabel('Frequency', fontsize=11)
axes[1, 0].set_title('Bootstrap Sharpe Ratio Distributions', fontsize=12, fontweight='bold')
axes[1, 0].legend()

# Probability B > A over bootstrap samples
cumulative_prob = np.cumsum(sharpe_ci['bootstrap_diff'] > 0) / np.arange(1, len(sharpe_ci['bootstrap_diff']) + 1)
axes[1, 1].plot(cumulative_prob, 'b-', linewidth=1)
axes[1, 1].axhline(y=0.5, color='red', linestyle='--', label='50% (No Preference)')
axes[1, 1].axhline(y=sharpe_ci['prob_b_better'], color='green', linestyle='--', 
                   label=f'Final: {100*sharpe_ci["prob_b_better"]:.1f}%')
axes[1, 1].set_xlabel('Bootstrap Iteration', fontsize=11)
axes[1, 1].set_ylabel('P(Strategy B Sharpe > Strategy A Sharpe)', fontsize=11)
axes[1, 1].set_title('Convergence of P(B > A)', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].set_ylim(0, 1)

plt.tight_layout()
plt.show()

## 8. Visualize Strategy Performance Comparison

Create comprehensive visualizations comparing:
- Equity curves (cumulative returns)
- Return distributions
- Drawdown analysis
- Risk-return scatter plots

In [None]:
def calculate_drawdown_series(returns: pd.Series) -> pd.Series:
    """Calculate drawdown series from returns."""
    cumulative = (1 + returns).cumprod()
    running_max = cumulative.expanding().max()
    drawdown = (cumulative - running_max) / running_max
    return drawdown


# Calculate drawdowns
dd_a = calculate_drawdown_series(returns_a['net_returns'].fillna(0))
dd_b = calculate_drawdown_series(returns_b['net_returns'].fillna(0))
dd_bh = calculate_drawdown_series(returns_benchmark['net_returns'].fillna(0))

# Create comprehensive comparison visualization
fig = plt.figure(figsize=(16, 12))

# 1. Equity Curves
ax1 = fig.add_subplot(2, 2, 1)
ax1.plot(returns_a.index, returns_a['cumulative_net'], 'b-', linewidth=1.5, 
         label='Strategy A (Control)', alpha=0.9)
ax1.plot(returns_b.index, returns_b['cumulative_net'], 'g-', linewidth=1.5, 
         label='Strategy B (Treatment)', alpha=0.9)
ax1.plot(returns_benchmark.index, returns_benchmark['cumulative_net'], 'k--', 
         linewidth=1, label='Buy & Hold', alpha=0.7)
ax1.fill_between(returns_a.index, 1, returns_a['cumulative_net'], 
                 where=returns_a['cumulative_net'] > 1, alpha=0.1, color='blue')
ax1.fill_between(returns_b.index, 1, returns_b['cumulative_net'], 
                 where=returns_b['cumulative_net'] > 1, alpha=0.1, color='green')
ax1.set_title('Equity Curves Comparison', fontsize=14, fontweight='bold')
ax1.set_xlabel('Date')
ax1.set_ylabel('Portfolio Value (Starting at 1)')
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)

# 2. Return Distributions
ax2 = fig.add_subplot(2, 2, 2)
bins = np.linspace(-0.05, 0.05, 50)
ax2.hist(returns_a['net_returns'].dropna() * 100, bins=bins*100, alpha=0.6, 
         label='Strategy A', color='blue', density=True)
ax2.hist(returns_b['net_returns'].dropna() * 100, bins=bins*100, alpha=0.6, 
         label='Strategy B', color='green', density=True)
ax2.axvline(x=returns_a['net_returns'].mean()*100, color='blue', linestyle='--', linewidth=2)
ax2.axvline(x=returns_b['net_returns'].mean()*100, color='green', linestyle='--', linewidth=2)
ax2.set_title('Daily Return Distributions', fontsize=14, fontweight='bold')
ax2.set_xlabel('Daily Return (%)')
ax2.set_ylabel('Density')
ax2.legend()
ax2.grid(True, alpha=0.3)

# 3. Drawdown Comparison
ax3 = fig.add_subplot(2, 2, 3)
ax3.fill_between(dd_a.index, 0, dd_a * 100, alpha=0.5, color='blue', label='Strategy A')
ax3.fill_between(dd_b.index, 0, dd_b * 100, alpha=0.5, color='green', label='Strategy B')
ax3.plot(dd_bh.index, dd_bh * 100, 'k--', alpha=0.7, label='Buy & Hold')
ax3.set_title('Drawdown Comparison', fontsize=14, fontweight='bold')
ax3.set_xlabel('Date')
ax3.set_ylabel('Drawdown (%)')
ax3.legend(loc='lower left')
ax3.grid(True, alpha=0.3)

# 4. Rolling Sharpe Comparison
rolling_window = 63  # ~3 months
rolling_sharpe_a = returns_a['net_returns'].rolling(rolling_window).apply(
    lambda x: np.mean(x) / np.std(x) * np.sqrt(252) if np.std(x) > 0 else 0
)
rolling_sharpe_b = returns_b['net_returns'].rolling(rolling_window).apply(
    lambda x: np.mean(x) / np.std(x) * np.sqrt(252) if np.std(x) > 0 else 0
)

ax4 = fig.add_subplot(2, 2, 4)
ax4.plot(rolling_sharpe_a.index, rolling_sharpe_a, 'b-', linewidth=1.5, 
         label='Strategy A', alpha=0.8)
ax4.plot(rolling_sharpe_b.index, rolling_sharpe_b, 'g-', linewidth=1.5, 
         label='Strategy B', alpha=0.8)
ax4.axhline(y=0, color='red', linestyle='--', alpha=0.5)
ax4.fill_between(rolling_sharpe_a.index, rolling_sharpe_a, rolling_sharpe_b, 
                 where=rolling_sharpe_b > rolling_sharpe_a, 
                 alpha=0.3, color='green', label='B > A')
ax4.fill_between(rolling_sharpe_a.index, rolling_sharpe_a, rolling_sharpe_b, 
                 where=rolling_sharpe_a > rolling_sharpe_b, 
                 alpha=0.3, color='blue', label='A > B')
ax4.set_title(f'Rolling {rolling_window}-Day Sharpe Ratio', fontsize=14, fontweight='bold')
ax4.set_xlabel('Date')
ax4.set_ylabel('Sharpe Ratio')
ax4.legend(loc='upper right')
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Summary statistics table
print("\n" + "=" * 70)
print("PERFORMANCE SUMMARY TABLE")
print("=" * 70)

summary_data = {
    'Metric': ['Total Return', 'Annual Return', 'Annual Volatility', 
               'Sharpe Ratio', 'Max Drawdown', 'Win Rate', 'Num Trades'],
    'Strategy A': [
        f"{100*metrics_a['total_return']:.2f}%",
        f"{100*metrics_a['annual_return']:.2f}%",
        f"{100*metrics_a['annual_volatility']:.2f}%",
        f"{metrics_a['sharpe_ratio']:.3f}",
        f"{100*metrics_a['max_drawdown']:.2f}%",
        f"{100*metrics_a['win_rate']:.1f}%",
        f"{int(metrics_a['num_trades'])}"
    ],
    'Strategy B': [
        f"{100*metrics_b['total_return']:.2f}%",
        f"{100*metrics_b['annual_return']:.2f}%",
        f"{100*metrics_b['annual_volatility']:.2f}%",
        f"{metrics_b['sharpe_ratio']:.3f}",
        f"{100*metrics_b['max_drawdown']:.2f}%",
        f"{100*metrics_b['win_rate']:.1f}%",
        f"{int(metrics_b['num_trades'])}"
    ],
    'Difference': [
        f"{100*(metrics_b['total_return'] - metrics_a['total_return']):.2f}%",
        f"{100*(metrics_b['annual_return'] - metrics_a['annual_return']):.2f}%",
        f"{100*(metrics_b['annual_volatility'] - metrics_a['annual_volatility']):.2f}%",
        f"{metrics_b['sharpe_ratio'] - metrics_a['sharpe_ratio']:.3f}",
        f"{100*(metrics_b['max_drawdown'] - metrics_a['max_drawdown']):.2f}%",
        f"{100*(metrics_b['win_rate'] - metrics_a['win_rate']):.1f}%",
        f"{int(metrics_b['num_trades'] - metrics_a['num_trades'])}"
    ]
}

summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string(index=False))

## 9. Calculate Sharpe Ratio Comparison

Compare risk-adjusted returns using Sharpe ratios with proper statistical testing. The Ledoit-Wolf (2008) method provides a robust test for Sharpe ratio differences that accounts for:
- Non-normality of returns
- Serial correlation
- Estimation uncertainty

Reference: *Robust Sharpe Ratio Tests for an Arbitrage-Free Asset*, Ledoit & Wolf (2008)

In [None]:
class SharpeRatioTest:
    """
    Statistical test for comparing Sharpe ratios between two strategies.
    Implements the HAC (Heteroskedasticity and Autocorrelation Consistent) 
    standard error approach.
    """
    
    def __init__(self, returns_a: np.ndarray, returns_b: np.ndarray, 
                 risk_free_rate: float = 0.0):
        self.returns_a = returns_a
        self.returns_b = returns_b
        self.rf = risk_free_rate / 252  # Daily risk-free rate
        self.n = len(returns_a)
        
    def compute_sharpe(self, returns: np.ndarray) -> float:
        """Compute annualized Sharpe ratio."""
        excess = returns - self.rf
        return np.mean(excess) / np.std(excess, ddof=1) * np.sqrt(252)
    
    def newey_west_variance(self, x: np.ndarray, max_lag: int = None) -> float:
        """
        Compute Newey-West HAC variance estimator.
        Accounts for heteroskedasticity and autocorrelation.
        """
        n = len(x)
        if max_lag is None:
            max_lag = int(np.floor(4 * (n / 100) ** (2/9)))
        
        # Sample variance
        x_demean = x - np.mean(x)
        variance = np.mean(x_demean ** 2)
        
        # Add autocovariance terms
        for j in range(1, max_lag + 1):
            weight = 1 - j / (max_lag + 1)  # Bartlett kernel
            autocov = np.mean(x_demean[j:] * x_demean[:-j])
            variance += 2 * weight * autocov
            
        return variance
    
    def jobson_korkie_test(self) -> Dict:
        """
        Jobson-Korkie test for equality of Sharpe ratios.
        H0: Sharpe_A = Sharpe_B
        """
        excess_a = self.returns_a - self.rf
        excess_b = self.returns_b - self.rf
        
        mu_a, mu_b = np.mean(excess_a), np.mean(excess_b)
        sigma_a, sigma_b = np.std(excess_a, ddof=1), np.std(excess_b, ddof=1)
        
        sr_a = mu_a / sigma_a
        sr_b = mu_b / sigma_b
        
        # Covariance between returns
        cov_ab = np.cov(excess_a, excess_b)[0, 1]
        
        # Test statistic variance (asymptotic)
        theta = (
            1 / self.n * (
                2 * (1 - cov_ab / (sigma_a * sigma_b)) 
                + 0.5 * (sr_a**2 + sr_b**2 - 2 * sr_a * sr_b * cov_ab / (sigma_a * sigma_b))
            )
        )
        
        # Z statistic
        z_stat = (sr_a - sr_b) / np.sqrt(theta) if theta > 0 else 0
        p_value = 2 * (1 - norm.cdf(abs(z_stat)))
        
        return {
            'test': 'Jobson-Korkie',
            'sharpe_a': sr_a * np.sqrt(252),
            'sharpe_b': sr_b * np.sqrt(252),
            'sharpe_diff': (sr_a - sr_b) * np.sqrt(252),
            'z_statistic': z_stat,
            'p_value': p_value,
            'significant_005': p_value < 0.05
        }
    
    def bootstrap_sharpe_test(self, n_bootstrap: int = 10000) -> Dict:
        """
        Bootstrap test for Sharpe ratio difference using circular block bootstrap.
        """
        rng = np.random.RandomState(42)
        block_length = int(np.ceil(self.n ** (1/3)))  # Block length
        
        sr_a_original = self.compute_sharpe(self.returns_a)
        sr_b_original = self.compute_sharpe(self.returns_b)
        diff_original = sr_b_original - sr_a_original
        
        bootstrap_diffs = np.zeros(n_bootstrap)
        
        for i in range(n_bootstrap):
            # Circular block bootstrap
            n_blocks = int(np.ceil(self.n / block_length))
            start_indices = rng.randint(0, self.n, n_blocks)
            
            indices = []
            for start in start_indices:
                block_indices = [(start + j) % self.n for j in range(block_length)]
                indices.extend(block_indices)
            indices = indices[:self.n]
            
            boot_a = self.returns_a[indices]
            boot_b = self.returns_b[indices]
            
            sr_a_boot = self.compute_sharpe(boot_a)
            sr_b_boot = self.compute_sharpe(boot_b)
            bootstrap_diffs[i] = sr_b_boot - sr_a_boot
        
        # Two-sided p-value
        p_value = 2 * min(
            np.mean(bootstrap_diffs >= diff_original),
            np.mean(bootstrap_diffs <= diff_original)
        )
        
        ci_lower = np.percentile(bootstrap_diffs, 2.5)
        ci_upper = np.percentile(bootstrap_diffs, 97.5)
        
        return {
            'test': 'Bootstrap (Block)',
            'sharpe_diff': diff_original,
            'p_value': p_value,
            'ci_95_lower': ci_lower,
            'ci_95_upper': ci_upper,
            'prob_b_better': np.mean(bootstrap_diffs > 0),
            'bootstrap_distribution': bootstrap_diffs
        }


# Run Sharpe ratio tests
sharpe_test = SharpeRatioTest(control_returns, treatment_returns)

jk_result = sharpe_test.jobson_korkie_test()
boot_result = sharpe_test.bootstrap_sharpe_test()

print("=" * 70)
print("SHARPE RATIO STATISTICAL TESTS")
print("=" * 70)

print(f"\nAnnualized Sharpe Ratios:")
print(f"  Strategy A (Control): {jk_result['sharpe_a']:.4f}")
print(f"  Strategy B (Treatment): {jk_result['sharpe_b']:.4f}")
print(f"  Difference (B - A): {jk_result['sharpe_diff']:.4f}")

print(f"\nüìä Jobson-Korkie Test (Asymptotic):")
print(f"  Z-statistic: {jk_result['z_statistic']:.4f}")
print(f"  P-value: {jk_result['p_value']:.6f}")
print(f"  Significant at 5%: {'Yes ‚úì' if jk_result['significant_005'] else 'No ‚úó'}")

print(f"\nüìä Bootstrap Test (Block Bootstrap):")
print(f"  P-value: {boot_result['p_value']:.6f}")
print(f"  95% CI for difference: [{boot_result['ci_95_lower']:.4f}, {boot_result['ci_95_upper']:.4f}]")
print(f"  P(Strategy B > Strategy A): {100*boot_result['prob_b_better']:.1f}%")

# Visualize bootstrap distribution
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Bootstrap distribution
axes[0].hist(boot_result['bootstrap_distribution'], bins=50, density=True, 
             alpha=0.7, color='steelblue', edgecolor='black')
axes[0].axvline(x=0, color='red', linestyle='--', linewidth=2, label='No Difference')
axes[0].axvline(x=boot_result['sharpe_diff'], color='orange', linewidth=2, 
                label=f'Observed: {boot_result["sharpe_diff"]:.3f}')
axes[0].axvline(x=boot_result['ci_95_lower'], color='green', linestyle='--', label='95% CI')
axes[0].axvline(x=boot_result['ci_95_upper'], color='green', linestyle='--')
axes[0].set_xlabel('Sharpe Ratio Difference (B - A)', fontsize=12)
axes[0].set_ylabel('Density', fontsize=12)
axes[0].set_title('Bootstrap Distribution of Sharpe Ratio Difference', 
                  fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Comparison bar chart
sharpes = [jk_result['sharpe_a'], jk_result['sharpe_b']]
names = ['Strategy A\n(Control)', 'Strategy B\n(Treatment)']
colors = ['steelblue', 'seagreen']
bars = axes[1].bar(names, sharpes, color=colors, edgecolor='black', alpha=0.8)

# Add value labels
for bar, val in zip(bars, sharpes):
    axes[1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02,
                 f'{val:.3f}', ha='center', va='bottom', fontsize=12, fontweight='bold')

axes[1].set_ylabel('Annualized Sharpe Ratio', fontsize=12)
axes[1].set_title('Sharpe Ratio Comparison', fontsize=13, fontweight='bold')
axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

## 10. Implement Sequential A/B Testing

Traditional fixed-horizon tests require waiting until the predetermined sample size is reached. **Sequential testing** allows for early stopping when:
- One strategy clearly outperforms the other
- There's no detectable difference (futility)

### Methods Covered
1. **Sequential Probability Ratio Test (SPRT):** Wald's original sequential test
2. **Group Sequential Design:** Periodic interim analyses with adjusted significance levels
3. **Bayesian Stopping Rules:** Based on posterior probability thresholds

In [None]:
class SequentialABTest:
    """
    Sequential A/B Testing Framework for Trading Strategies.
    Allows for early stopping while controlling Type I and Type II error rates.
    """
    
    def __init__(self, alpha: float = 0.05, beta: float = 0.20,
                 min_effect_size: float = 0.1):
        """
        Parameters:
        -----------
        alpha : Type I error rate (false positive)
        beta : Type II error rate (false negative)  
        min_effect_size : Minimum detectable effect (in std units)
        """
        self.alpha = alpha
        self.beta = beta
        self.min_effect_size = min_effect_size
        
        # SPRT thresholds (log-likelihood ratio bounds)
        self.upper_bound = np.log((1 - beta) / alpha)
        self.lower_bound = np.log(beta / (1 - alpha))
        
    def sprt_log_likelihood(self, diff: float, std: float) -> float:
        """
        Calculate log-likelihood ratio for a single observation.
        H0: mean difference = 0
        H1: mean difference = min_effect_size * std
        """
        effect = self.min_effect_size * std
        if std == 0:
            return 0
        # Log-likelihood ratio: log(P(data|H1) / P(data|H0))
        return (diff * effect - 0.5 * effect**2) / (std**2)
    
    def run_sprt(self, returns_control: np.ndarray, 
                 returns_treatment: np.ndarray) -> Dict:
        """
        Run Sequential Probability Ratio Test.
        Returns test progress and stopping point if applicable.
        """
        n = min(len(returns_control), len(returns_treatment))
        
        cumulative_llr = 0
        llr_history = []
        decisions = []
        
        for i in range(n):
            # Paired difference
            diff = returns_treatment[i] - returns_control[i]
            
            # Estimate std from data so far
            if i > 10:
                diffs_so_far = returns_treatment[:i+1] - returns_control[:i+1]
                std = np.std(diffs_so_far)
            else:
                std = np.std(returns_treatment[:i+1] - returns_control[:i+1])
                
            if std > 0:
                cumulative_llr += self.sprt_log_likelihood(diff, std)
            
            llr_history.append(cumulative_llr)
            
            # Decision
            if cumulative_llr >= self.upper_bound:
                decisions.append('Reject H0 (B > A)')
            elif cumulative_llr <= self.lower_bound:
                decisions.append('Accept H0 (No difference)')
            else:
                decisions.append('Continue')
        
        # Find first stopping point
        stopping_point = None
        final_decision = 'No conclusion (test continues)'
        
        for i, decision in enumerate(decisions):
            if decision != 'Continue':
                stopping_point = i + 1
                final_decision = decision
                break
                
        return {
            'llr_history': np.array(llr_history),
            'upper_bound': self.upper_bound,
            'lower_bound': self.lower_bound,
            'stopping_point': stopping_point,
            'final_decision': final_decision,
            'decisions': decisions
        }
    
    def group_sequential_test(self, returns_control: np.ndarray,
                              returns_treatment: np.ndarray,
                              n_looks: int = 5) -> Dict:
        """
        Group Sequential Design with O'Brien-Fleming spending function.
        Performs interim analyses at equally spaced intervals.
        """
        n = min(len(returns_control), len(returns_treatment))
        analysis_points = np.linspace(n // n_looks, n, n_looks, dtype=int)
        
        results = []
        
        for i, look_n in enumerate(analysis_points):
            # Get data up to this point
            ctrl = returns_control[:look_n]
            treat = returns_treatment[:look_n]
            
            # T-test
            t_stat, p_value = ttest_ind(treat, ctrl)
            
            # O'Brien-Fleming boundary (alpha spending)
            info_fraction = (i + 1) / n_looks
            # Approximate O'Brien-Fleming boundary
            z_boundary = norm.ppf(1 - self.alpha / 2) / np.sqrt(info_fraction)
            alpha_spent = 2 * (1 - norm.cdf(z_boundary))
            
            # Decision
            reject = p_value < alpha_spent
            
            results.append({
                'look': i + 1,
                'n_observations': look_n,
                'info_fraction': info_fraction,
                't_statistic': t_stat,
                'p_value': p_value,
                'alpha_boundary': alpha_spent,
                'reject_h0': reject,
                'mean_diff': np.mean(treat) - np.mean(ctrl)
            })
            
            if reject:
                break  # Early stopping
                
        return {
            'interim_results': results,
            'final_decision': results[-1],
            'early_stopped': len(results) < n_looks
        }


# Run Sequential Tests
seq_test = SequentialABTest(alpha=0.05, beta=0.20, min_effect_size=0.15)

# SPRT
sprt_result = seq_test.run_sprt(control_returns, treatment_returns)

# Group Sequential
gs_result = seq_test.group_sequential_test(control_returns, treatment_returns, n_looks=5)

print("=" * 70)
print("SEQUENTIAL A/B TESTING RESULTS")
print("=" * 70)

print("\nüìà Sequential Probability Ratio Test (SPRT):")
print(f"  Upper Boundary (Reject H0): {sprt_result['upper_bound']:.4f}")
print(f"  Lower Boundary (Accept H0): {sprt_result['lower_bound']:.4f}")
if sprt_result['stopping_point']:
    print(f"  Stopping Point: Day {sprt_result['stopping_point']}")
    print(f"  Decision: {sprt_result['final_decision']}")
else:
    print(f"  Status: Test did not reach a conclusion")
    print(f"  Final LLR: {sprt_result['llr_history'][-1]:.4f}")

print("\nüìä Group Sequential Design (O'Brien-Fleming):")
print(f"  Number of Interim Analyses: {len(gs_result['interim_results'])}")
print(f"  Early Stopped: {'Yes' if gs_result['early_stopped'] else 'No'}")
print("\n  Interim Analysis Results:")
print("  " + "-" * 60)

interim_df = pd.DataFrame(gs_result['interim_results'])
interim_df['mean_diff_pct'] = interim_df['mean_diff'] * 100
interim_df_display = interim_df[['look', 'n_observations', 'p_value', 'alpha_boundary', 'reject_h0']].copy()
interim_df_display['p_value'] = interim_df_display['p_value'].apply(lambda x: f"{x:.6f}")
interim_df_display['alpha_boundary'] = interim_df_display['alpha_boundary'].apply(lambda x: f"{x:.6f}")
print(interim_df_display.to_string(index=False))

# Visualize SPRT
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# SPRT trajectory
x = np.arange(1, len(sprt_result['llr_history']) + 1)
axes[0].plot(x, sprt_result['llr_history'], 'b-', linewidth=1.5, label='Log-Likelihood Ratio')
axes[0].axhline(y=sprt_result['upper_bound'], color='green', linestyle='--', 
                linewidth=2, label=f'Reject H0 boundary ({sprt_result["upper_bound"]:.2f})')
axes[0].axhline(y=sprt_result['lower_bound'], color='red', linestyle='--', 
                linewidth=2, label=f'Accept H0 boundary ({sprt_result["lower_bound"]:.2f})')
axes[0].axhline(y=0, color='gray', linestyle='-', alpha=0.5)

if sprt_result['stopping_point']:
    axes[0].axvline(x=sprt_result['stopping_point'], color='orange', linestyle=':', 
                    linewidth=2, label=f'Stop at day {sprt_result["stopping_point"]}')

axes[0].fill_between(x, sprt_result['lower_bound'], sprt_result['upper_bound'], 
                     alpha=0.1, color='yellow', label='Continue testing region')
axes[0].set_xlabel('Trading Day', fontsize=12)
axes[0].set_ylabel('Cumulative Log-Likelihood Ratio', fontsize=12)
axes[0].set_title('SPRT: Sequential Probability Ratio Test', fontsize=13, fontweight='bold')
axes[0].legend(loc='best', fontsize=9)
axes[0].grid(True, alpha=0.3)

# Group Sequential boundaries
looks = interim_df['look'].values
p_values = interim_df['p_value'].apply(lambda x: float(x)).values
alpha_boundaries = interim_df['alpha_boundary'].apply(lambda x: float(x)).values

axes[1].plot(looks, p_values, 'bo-', markersize=10, linewidth=2, label='P-value')
axes[1].plot(looks, alpha_boundaries, 'r--', linewidth=2, label='O\'Brien-Fleming Boundary')
axes[1].fill_between(looks, 0, alpha_boundaries, alpha=0.2, color='red', label='Rejection region')

# Mark stopping point if applicable
reject_indices = np.where(p_values < alpha_boundaries)[0]
if len(reject_indices) > 0:
    stop_idx = reject_indices[0]
    axes[1].plot(looks[stop_idx], p_values[stop_idx], 'go', markersize=15, 
                 markeredgecolor='black', markeredgewidth=2, label='Early stop')

axes[1].set_xlabel('Interim Analysis (Look)', fontsize=12)
axes[1].set_ylabel('P-value / Alpha Boundary', fontsize=12)
axes[1].set_title('Group Sequential Design: O\'Brien-Fleming', fontsize=13, fontweight='bold')
axes[1].legend(loc='best')
axes[1].grid(True, alpha=0.3)
axes[1].set_yscale('log')
axes[1].set_xticks(looks)

plt.tight_layout()
plt.show()

## 11. Bayesian A/B Testing

The Bayesian approach offers several advantages for trading strategy comparison:
- Provides probability statements ("95% probability B is better")
- Naturally handles sequential updates
- Incorporates prior knowledge
- Calculates expected loss from wrong decisions

### Key Concepts
- **Prior:** Initial belief about strategy performance
- **Likelihood:** Evidence from observed returns
- **Posterior:** Updated belief after seeing data

In [None]:
class BayesianABTest:
    """
    Bayesian A/B Testing for Trading Strategies.
    Uses Normal-Inverse-Gamma conjugate prior for unknown mean and variance.
    """
    
    def __init__(self, prior_mean: float = 0.0, prior_std: float = 0.001,
                 prior_df: float = 1.0):
        """
        Initialize with prior parameters.
        
        Parameters:
        -----------
        prior_mean : Prior mean for daily returns (usually 0 for no prior belief)
        prior_std : Prior standard deviation for returns
        prior_df : Prior degrees of freedom (strength of prior)
        """
        self.prior_mean = prior_mean
        self.prior_std = prior_std
        self.prior_df = prior_df
        
    def update_posterior(self, returns: np.ndarray) -> Dict:
        """
        Update posterior parameters given observed returns.
        Using Normal-Gamma conjugate update.
        """
        n = len(returns)
        sample_mean = np.mean(returns)
        sample_var = np.var(returns, ddof=1) if n > 1 else self.prior_std**2
        
        # Posterior parameters
        posterior_df = self.prior_df + n
        
        # Posterior mean (weighted average of prior and sample)
        weight_prior = self.prior_df / (self.prior_df + n)
        weight_data = n / (self.prior_df + n)
        posterior_mean = weight_prior * self.prior_mean + weight_data * sample_mean
        
        # Posterior variance
        prior_ss = self.prior_df * self.prior_std**2
        data_ss = (n - 1) * sample_var if n > 1 else 0
        mean_adjustment = (self.prior_df * n) / (self.prior_df + n) * (sample_mean - self.prior_mean)**2
        
        posterior_var = (prior_ss + data_ss + mean_adjustment) / posterior_df
        posterior_std = np.sqrt(posterior_var)
        
        return {
            'mean': posterior_mean,
            'std': posterior_std,
            'df': posterior_df,
            'n': n
        }
    
    def probability_b_better(self, returns_a: np.ndarray, returns_b: np.ndarray,
                             n_samples: int = 100000) -> Dict:
        """
        Calculate P(Strategy B > Strategy A) using Monte Carlo sampling.
        """
        np.random.seed(42)
        
        # Get posteriors
        post_a = self.update_posterior(returns_a)
        post_b = self.update_posterior(returns_b)
        
        # Sample from posterior t-distributions
        samples_a = post_a['mean'] + post_a['std'] * np.random.standard_t(post_a['df'], n_samples)
        samples_b = post_b['mean'] + post_b['std'] * np.random.standard_t(post_b['df'], n_samples)
        
        # Probability B > A
        prob_b_better = (samples_b > samples_a).mean()
        
        # Expected loss (if we wrongly choose B when A is better)
        loss_choosing_b = np.maximum(samples_a - samples_b, 0).mean()
        loss_choosing_a = np.maximum(samples_b - samples_a, 0).mean()
        
        # 95% credible interval for difference
        diff_samples = samples_b - samples_a
        ci_lower = np.percentile(diff_samples, 2.5)
        ci_upper = np.percentile(diff_samples, 97.5)
        
        return {
            'prob_b_better': prob_b_better,
            'prob_a_better': 1 - prob_b_better,
            'expected_loss_choose_b': loss_choosing_b,
            'expected_loss_choose_a': loss_choosing_a,
            'mean_diff': np.mean(diff_samples),
            'ci_95_lower': ci_lower,
            'ci_95_upper': ci_upper,
            'samples_a': samples_a,
            'samples_b': samples_b,
            'posterior_a': post_a,
            'posterior_b': post_b
        }
    
    def sequential_bayesian(self, returns_a: np.ndarray, returns_b: np.ndarray,
                            prob_threshold: float = 0.95) -> Dict:
        """
        Sequential Bayesian analysis with probability threshold stopping.
        """
        n = min(len(returns_a), len(returns_b))
        
        history = []
        stopped_at = None
        
        for i in range(20, n, 5):  # Start after 20 days, check every 5 days
            result = self.probability_b_better(returns_a[:i], returns_b[:i], n_samples=10000)
            
            history.append({
                'day': i,
                'prob_b_better': result['prob_b_better'],
                'mean_diff': result['mean_diff'] * 252,  # Annualized
                'expected_loss_b': result['expected_loss_choose_b'] * 252
            })
            
            # Stopping criterion
            if result['prob_b_better'] >= prob_threshold or result['prob_a_better'] >= prob_threshold:
                stopped_at = i
                break
                
        return {
            'history': history,
            'stopped_at': stopped_at,
            'final_prob_b_better': history[-1]['prob_b_better']
        }


# Run Bayesian A/B Test
bayes_test = BayesianABTest(prior_mean=0.0, prior_std=0.01, prior_df=1.0)

# Full analysis
bayes_result = bayes_test.probability_b_better(control_returns, treatment_returns)

# Sequential analysis
seq_bayes = bayes_test.sequential_bayesian(control_returns, treatment_returns, prob_threshold=0.95)

print("=" * 70)
print("BAYESIAN A/B TESTING RESULTS")
print("=" * 70)

print(f"\nüìä Posterior Summary:")
print(f"\n  Strategy A (Control):")
print(f"    Posterior Mean (Daily): {100*bayes_result['posterior_a']['mean']:.4f}%")
print(f"    Posterior Std: {100*bayes_result['posterior_a']['std']:.4f}%")

print(f"\n  Strategy B (Treatment):")
print(f"    Posterior Mean (Daily): {100*bayes_result['posterior_b']['mean']:.4f}%")
print(f"    Posterior Std: {100*bayes_result['posterior_b']['std']:.4f}%")

print(f"\nüéØ Key Results:")
print(f"  P(Strategy B > Strategy A): {100*bayes_result['prob_b_better']:.2f}%")
print(f"  P(Strategy A > Strategy B): {100*bayes_result['prob_a_better']:.2f}%")
print(f"\n  95% Credible Interval for Difference (Daily):")
print(f"    [{100*bayes_result['ci_95_lower']:.4f}%, {100*bayes_result['ci_95_upper']:.4f}%]")

print(f"\nüí∞ Expected Loss Analysis (Annualized):")
print(f"  If we choose Strategy B (and A is actually better): {100*bayes_result['expected_loss_choose_b']*252:.2f}%")
print(f"  If we choose Strategy A (and B is actually better): {100*bayes_result['expected_loss_choose_a']*252:.2f}%")

# Recommendation
print(f"\nüìã RECOMMENDATION:")
if bayes_result['prob_b_better'] >= 0.95:
    print("  ‚úÖ Strong evidence that Strategy B is better (>95% probability)")
    print("  ‚û°Ô∏è  Consider deploying Strategy B")
elif bayes_result['prob_b_better'] >= 0.80:
    print("  üî∂ Moderate evidence that Strategy B is better (80-95% probability)")
    print("  ‚û°Ô∏è  Continue testing or deploy with caution")
elif bayes_result['prob_b_better'] >= 0.50:
    print("  ‚öñÔ∏è  Inconclusive: No clear winner")
    print("  ‚û°Ô∏è  Continue testing to gather more evidence")
else:
    print("  üî¥ Evidence suggests Strategy A is better")
    print("  ‚û°Ô∏è  Keep Strategy A, abandon Strategy B")

# Visualize Bayesian results
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 1. Posterior distributions
x_range = np.linspace(
    min(bayes_result['samples_a'].min(), bayes_result['samples_b'].min()),
    max(bayes_result['samples_a'].max(), bayes_result['samples_b'].max()),
    100
)

axes[0, 0].hist(bayes_result['samples_a'] * 100, bins=50, density=True, 
                alpha=0.6, label='Strategy A', color='blue')
axes[0, 0].hist(bayes_result['samples_b'] * 100, bins=50, density=True, 
                alpha=0.6, label='Strategy B', color='green')
axes[0, 0].axvline(x=0, color='red', linestyle='--', label='Zero')
axes[0, 0].set_xlabel('Daily Return (%)', fontsize=11)
axes[0, 0].set_ylabel('Density', fontsize=11)
axes[0, 0].set_title('Posterior Distributions of Mean Return', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 2. Difference distribution
diff = bayes_result['samples_b'] - bayes_result['samples_a']
axes[0, 1].hist(diff * 100, bins=50, density=True, alpha=0.7, 
                color='purple', edgecolor='black')
axes[0, 1].axvline(x=0, color='red', linestyle='--', linewidth=2, label='No Difference')
axes[0, 1].axvline(x=bayes_result['ci_95_lower']*100, color='green', linestyle='--', label='95% CI')
axes[0, 1].axvline(x=bayes_result['ci_95_upper']*100, color='green', linestyle='--')
axes[0, 1].fill_between(
    np.linspace(diff.min()*100, 0, 50),
    0, 5,
    alpha=0.2, color='red', label=f'P(A>B)={100*bayes_result["prob_a_better"]:.1f}%'
)
axes[0, 1].set_xlabel('Difference in Daily Return (B - A) (%)', fontsize=11)
axes[0, 1].set_ylabel('Density', fontsize=11)
axes[0, 1].set_title('Posterior Distribution of Difference', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 3. Sequential probability evolution
if seq_bayes['history']:
    days = [h['day'] for h in seq_bayes['history']]
    probs = [h['prob_b_better'] for h in seq_bayes['history']]
    
    axes[1, 0].plot(days, probs, 'b-o', linewidth=2, markersize=6)
    axes[1, 0].axhline(y=0.95, color='green', linestyle='--', label='95% threshold')
    axes[1, 0].axhline(y=0.50, color='gray', linestyle='--', alpha=0.5, label='50% (no preference)')
    axes[1, 0].axhline(y=0.05, color='red', linestyle='--', label='5% threshold')
    axes[1, 0].fill_between(days, 0.95, 1.0, alpha=0.2, color='green', label='Conclude B better')
    axes[1, 0].fill_between(days, 0.0, 0.05, alpha=0.2, color='red', label='Conclude A better')
    
    if seq_bayes['stopped_at']:
        axes[1, 0].axvline(x=seq_bayes['stopped_at'], color='orange', linestyle=':', 
                           linewidth=2, label=f'Early stop (day {seq_bayes["stopped_at"]})')
    
    axes[1, 0].set_xlabel('Trading Day', fontsize=11)
    axes[1, 0].set_ylabel('P(Strategy B > Strategy A)', fontsize=11)
    axes[1, 0].set_title('Sequential Bayesian: Probability Evolution', fontsize=12, fontweight='bold')
    axes[1, 0].legend(loc='center right', fontsize=9)
    axes[1, 0].grid(True, alpha=0.3)
    axes[1, 0].set_ylim(0, 1)

# 4. Expected loss over time
if seq_bayes['history']:
    losses = [h['expected_loss_b'] * 100 for h in seq_bayes['history']]
    
    axes[1, 1].plot(days, losses, 'r-o', linewidth=2, markersize=6)
    axes[1, 1].fill_between(days, 0, losses, alpha=0.3, color='red')
    axes[1, 1].set_xlabel('Trading Day', fontsize=11)
    axes[1, 1].set_ylabel('Expected Loss if B is Wrong (%)', fontsize=11)
    axes[1, 1].set_title('Expected Loss from Choosing B (Annualized)', fontsize=12, fontweight='bold')
    axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()