# Universal Strategy Analysis

This notebook provides comprehensive analysis across all strategies tested in a parameter sweep.

**Key Features:**
- Cross-strategy performance comparison
- Parameter sensitivity analysis
- Stop loss and profit target analysis with CORRECT implementation
- Correlation analysis for ensemble building
- Regime-specific performance breakdown
- Automatic identification of optimal strategies and ensembles

In [1]:
# Parameters will be injected here by papermill
# This cell is tagged with 'parameters' for papermill to recognize it
run_dir = "."
config_name = "config"
symbols = ["SPY"]
timeframe = "5m"
min_strategies_to_analyze = 20
sharpe_threshold = 1.0
correlation_threshold = 0.7
top_n_strategies = 10
ensemble_size = 5
calculate_all_performance = True  # Set to False to limit analysis for large sweeps
performance_limit = 100  # If calculate_all_performance is False, limit to this many

# Enhanced analysis parameters
execution_cost_bps = 1.0  # Round-trip execution cost in basis points
analyze_stop_losses = True  # Whether to analyze stop loss impact
stop_loss_levels = [0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.75, 1.0]  # Stop loss percentages
verify_intraday = True  # Whether to verify intraday constraints
market_timezone = "America/New_York"  # Market timezone for constraint verification

In [2]:
# Parameters
run_dir = "/Users/daws/ADMF-PC/config/bollinger/results/20250627_184258"
config_name = "bollinger"
symbols = ["SPY"]
timeframe = "5m"
min_strategies_to_analyze = 20
sharpe_threshold = 1.0
correlation_threshold = 0.7
top_n_strategies = 10
ensemble_size = 5
calculate_all_performance = True
performance_limit = 100


## Setup

In [3]:
# Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import duckdb
import json
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Configure plotting
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10

# Initialize DuckDB
con = duckdb.connect()

# Convert run_dir to Path and resolve to absolute path
run_dir = Path(run_dir).resolve()
print(f"Analyzing run: {run_dir.name}")
print(f"Full path: {run_dir}")
print(f"Config: {config_name}")
print(f"Symbol(s): {symbols}")
print(f"Timeframe: {timeframe}")

Analyzing run: 20250627_184258
Full path: /Users/daws/ADMF-PC/config/bollinger/results/20250627_184258
Config: bollinger
Symbol(s): ['SPY']
Timeframe: 5m


## Critical Functions - Stop Loss and Trade Extraction

**IMPORTANT**: These functions implement stop loss and profit target analysis CORRECTLY by checking intraday price movements. They do NOT use the flawed retrospective capping approach.

In [4]:
def extract_trades(strategy_hash, trace_path, market_data, execution_cost_bps=1.0):
    """
    Extract trades from signal trace with execution costs.
    
    Args:
        strategy_hash: Strategy identifier
        trace_path: Path to trace file
        market_data: Market price data
        execution_cost_bps: Round-trip execution cost in basis points (default 1bp)
    
    Returns:
        DataFrame with trade details including costs
    """
    try:
        signals_path = run_dir / trace_path
        signals = pd.read_parquet(signals_path)
        signals['ts'] = pd.to_datetime(signals['ts'])
        
        # Merge with market data
        df = market_data.merge(
            signals[['ts', 'val', 'px']], 
            left_on='timestamp', 
            right_on='ts', 
            how='left'
        )
        
        # Forward fill signals
        df['signal'] = df['val'].ffill().fillna(0)
        df['position'] = df['signal'].replace({0: 0, 1: 1, -1: -1})
        df['position_change'] = df['position'].diff().fillna(0)
        
        trades = []
        current_trade = None
        
        for idx, row in df.iterrows():
            if row['position_change'] != 0 and row['position'] != 0:
                # New position opened
                if current_trade is None:
                    current_trade = {
                        'entry_time': row['timestamp'],
                        'entry_price': row['px'] if pd.notna(row['px']) else row['close'],
                        'direction': row['position'],
                        'entry_idx': idx
                    }
            elif current_trade is not None and (row['position'] == 0 or row['position_change'] != 0):
                # Position closed
                exit_price = row['px'] if pd.notna(row['px']) else row['close']
                
                # Avoid division by zero - check if entry price is valid
                if current_trade['entry_price'] == 0 or pd.isna(current_trade['entry_price']):
                    print(f"Warning: Invalid entry price {current_trade['entry_price']} for trade at {current_trade['entry_time']}")
                    current_trade = None
                    continue
                
                # Calculate raw return
                if current_trade['direction'] == 1:  # Long
                    raw_return = (exit_price - current_trade['entry_price']) / current_trade['entry_price']
                else:  # Short
                    raw_return = (current_trade['entry_price'] - exit_price) / current_trade['entry_price']
                
                # Apply execution costs
                cost_adjustment = execution_cost_bps / 10000  # Convert bps to decimal
                net_return = raw_return - cost_adjustment
                
                trade = {
                    'strategy_hash': strategy_hash,
                    'entry_time': current_trade['entry_time'],
                    'exit_time': row['timestamp'],
                    'entry_price': current_trade['entry_price'],
                    'exit_price': exit_price,
                    'direction': current_trade['direction'],
                    'raw_return': raw_return,
                    'execution_cost': cost_adjustment,
                    'net_return': net_return,
                    'duration_minutes': (row['timestamp'] - current_trade['entry_time']).total_seconds() / 60,
                    'entry_idx': current_trade['entry_idx'],
                    'exit_idx': idx
                }
                trades.append(trade)
                
                # Reset for next trade
                current_trade = None
                if row['position'] != 0 and row['position_change'] != 0:
                    # Immediately open new position (reversal)
                    current_trade = {
                        'entry_time': row['timestamp'],
                        'entry_price': row['px'] if pd.notna(row['px']) else row['close'],
                        'direction': row['position'],
                        'entry_idx': idx
                    }
        
        return pd.DataFrame(trades)
    except Exception as e:
        print(f"Error extracting trades for {strategy_hash[:8]}: {e}")
        return pd.DataFrame()

In [5]:
def calculate_stop_loss_impact(trades_df, stop_loss_levels=None, market_data=None):
    """
    Calculate returns with various stop loss levels using PROPER intraday simulation.
    
    THIS IS THE CORRECT IMPLEMENTATION:
    - Checks actual intraday high/low prices
    - Exits immediately when stop is hit
    - Stops out trades that would have been winners too
    - Does NOT retrospectively cap losses
    
    Args:
        trades_df: DataFrame of trades (must include entry_idx and exit_idx)
        stop_loss_levels: List of stop loss percentages (default 0.05% to 1%)
        market_data: Market data for intraday price movements
    
    Returns:
        DataFrame with returns for each stop loss level
    """
    if stop_loss_levels is None:
        stop_loss_levels = [0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.75, 1.0]
    
    if market_data is None:
        raise ValueError("Market data is required for proper stop loss analysis!")
    
    results = []
    
    for sl_pct in stop_loss_levels:
        sl_decimal = sl_pct / 100
        
        trades_with_sl = []
        stopped_out_count = 0
        stopped_winners = 0  # Trades that were stopped but would have been winners
        
        # Process each trade with intraday stop loss
        for _, trade in trades_df.iterrows():
            # Get intraday prices for this trade
            trade_prices = market_data.iloc[int(trade['entry_idx']):int(trade['exit_idx'])+1]
            
            if len(trade_prices) == 0:
                continue
                
            entry_price = trade['entry_price']
            direction = trade['direction']
            original_return = trade['net_return']  # What the trade actually returned
            
            # Calculate stop loss price
            if direction == 1:  # Long position
                stop_price = entry_price * (1 - sl_decimal)
            else:  # Short position  
                stop_price = entry_price * (1 + sl_decimal)
            
            # Check if stop loss is hit
            stopped = False
            exit_price = trade['exit_price']
            exit_time = trade['exit_time']
            
            for idx, bar in trade_prices.iterrows():
                if direction == 1:  # Long
                    # Check if low price hits stop
                    if bar['low'] <= stop_price:
                        stopped = True
                        stopped_out_count += 1
                        exit_price = stop_price
                        exit_time = bar['timestamp']
                        # Check if this would have been a winner
                        if original_return > 0:
                            stopped_winners += 1
                        break
                else:  # Short
                    # Check if high price hits stop
                    if bar['high'] >= stop_price:
                        stopped = True
                        stopped_out_count += 1
                        exit_price = stop_price
                        exit_time = bar['timestamp']
                        # Check if this would have been a winner
                        if original_return > 0:
                            stopped_winners += 1
                        break
            
            # Calculate return with actual or stopped exit
            if direction == 1:  # Long
                raw_return = (exit_price - entry_price) / entry_price
            else:  # Short
                raw_return = (entry_price - exit_price) / entry_price
                
            # Apply execution costs
            net_return = raw_return - trade['execution_cost']
            
            trade_result = trade.copy()
            trade_result['raw_return'] = raw_return
            trade_result['net_return'] = net_return
            trade_result['stopped_out'] = stopped
            if stopped:
                trade_result['exit_price'] = exit_price
                trade_result['exit_time'] = exit_time
                
            trades_with_sl.append(trade_result)
        
        trades_with_sl_df = pd.DataFrame(trades_with_sl)
        
        if len(trades_with_sl_df) > 0:
            # Calculate metrics with stop loss
            total_return = trades_with_sl_df['net_return'].sum()
            avg_return = trades_with_sl_df['net_return'].mean()
            win_rate = (trades_with_sl_df['net_return'] > 0).mean()
            
            results.append({
                'stop_loss_pct': sl_pct,
                'total_return': total_return,
                'avg_return_per_trade': avg_return,
                'win_rate': win_rate,
                'stopped_out_count': stopped_out_count,
                'stopped_out_rate': stopped_out_count / len(trades_with_sl_df),
                'stopped_winners': stopped_winners,
                'stopped_winners_pct': stopped_winners / len(trades_with_sl_df) * 100,
                'num_trades': len(trades_with_sl_df),
                'avg_winner': trades_with_sl_df[trades_with_sl_df['net_return'] > 0]['net_return'].mean() if (trades_with_sl_df['net_return'] > 0).any() else 0,
                'avg_loser': trades_with_sl_df[trades_with_sl_df['net_return'] <= 0]['net_return'].mean() if (trades_with_sl_df['net_return'] <= 0).any() else 0
            })
    
    return pd.DataFrame(results)

# WARNING: This is the FLAWED implementation - DO NOT USE!
# Kept here only for educational purposes to show what NOT to do
def calculate_stop_loss_impact_WRONG(trades_df, stop_loss_levels):
    """
    THIS IS THE WRONG WAY TO IMPLEMENT STOP LOSSES!
    
    This flawed method:
    - Only looks at final returns
    - Retrospectively caps losses
    - Doesn't check intraday prices
    - Makes results look artificially better
    - Doesn't stop out winning trades
    
    DO NOT USE THIS METHOD!
    """
    results = []
    
    for sl_pct in stop_loss_levels:
        sl_decimal = sl_pct / 100
        
        trades_with_sl = trades_df.copy()
        stopped_out_count = 0
        
        # WRONG: Only caps losses after the fact
        for idx, trade in trades_with_sl.iterrows():
            if trade['raw_return'] < -sl_decimal:
                # This is wrong! It pretends the loss was smaller
                trades_with_sl.loc[idx, 'raw_return'] = -sl_decimal
                trades_with_sl.loc[idx, 'net_return'] = -sl_decimal - trade['execution_cost']
                stopped_out_count += 1
        
        # Results will be artificially better than reality
        total_return = trades_with_sl['net_return'].sum()
        avg_return = trades_with_sl['net_return'].mean()
        win_rate = (trades_with_sl['net_return'] > 0).mean()
        
        results.append({
            'stop_loss_pct': sl_pct,
            'total_return': total_return,
            'avg_return_per_trade': avg_return,
            'win_rate': win_rate,
            'stopped_out_count': stopped_out_count,
            'stopped_out_rate': stopped_out_count / len(trades_with_sl) if len(trades_with_sl) > 0 else 0,
            'num_trades': len(trades_with_sl),
            'WARNING': 'FLAWED METHOD - DO NOT USE!'
        })
    
    return pd.DataFrame(results)

In [6]:
def apply_stop_target(trades_df, stop_pct, target_pct, market_data):
    """
    Apply both stop loss and profit target to trades.
    
    CORRECT IMPLEMENTATION:
    - Checks intraday prices bar by bar
    - Exits at first target hit (stop or profit)
    - Properly tracks which exit was triggered
    - Works for both long and short positions
    
    Args:
        trades_df: DataFrame of trades
        stop_pct: Stop loss percentage (e.g., 0.1 for 0.1%)
        target_pct: Profit target percentage (e.g., 0.2 for 0.2%)
        market_data: Market data with OHLC prices
    
    Returns:
        Tuple of (modified_returns_array, exit_types_dict)
    """
    if stop_pct == 0 and target_pct == 0:
        # No modification - return original
        return trades_df['net_return'].values, {'stop': 0, 'target': 0, 'signal': len(trades_df)}
    
    modified_returns = []
    exit_types = {'stop': 0, 'target': 0, 'signal': 0}
    stopped_winners = 0
    
    for _, trade in trades_df.iterrows():
        trade_prices = market_data.iloc[int(trade['entry_idx']):int(trade['exit_idx'])+1]
        
        if len(trade_prices) == 0:
            modified_returns.append(trade['net_return'])
            exit_types['signal'] += 1
            continue
        
        entry_price = trade['entry_price']
        direction = trade['direction']
        original_return = trade['net_return']
        
        # Set stop and target prices
        if direction == 1:  # Long
            stop_price = entry_price * (1 - stop_pct/100) if stop_pct > 0 else 0
            target_price = entry_price * (1 + target_pct/100) if target_pct > 0 else float('inf')
        else:  # Short
            stop_price = entry_price * (1 + stop_pct/100) if stop_pct > 0 else float('inf')
            target_price = entry_price * (1 - target_pct/100) if target_pct > 0 else 0
        
        # Check each bar for exit
        exit_price = trade['exit_price']
        exit_type = 'signal'
        
        for _, bar in trade_prices.iterrows():
            if direction == 1:  # Long
                # Check stop first (more conservative)
                if stop_pct > 0 and bar['low'] <= stop_price:
                    exit_price = stop_price
                    exit_type = 'stop'
                    if original_return > 0:
                        stopped_winners += 1
                    break
                # Then check target
                elif target_pct > 0 and bar['high'] >= target_price:
                    exit_price = target_price
                    exit_type = 'target'
                    break
            else:  # Short
                # Check stop first
                if stop_pct > 0 and bar['high'] >= stop_price:
                    exit_price = stop_price
                    exit_type = 'stop'
                    if original_return > 0:
                        stopped_winners += 1
                    break
                # Then check target
                elif target_pct > 0 and bar['low'] <= target_price:
                    exit_price = target_price
                    exit_type = 'target'
                    break
        
        exit_types[exit_type] += 1
        
        # Calculate return
        if direction == 1:
            raw_return = (exit_price - entry_price) / entry_price
        else:
            raw_return = (entry_price - exit_price) / entry_price
        
        net_return = raw_return - trade['execution_cost']
        modified_returns.append(net_return)
    
    # Add stopped winners info
    exit_types['stopped_winners'] = stopped_winners
    
    return np.array(modified_returns), exit_types

## Load Strategy Index and Market Data

In [7]:
# Load strategy index
strategy_index_path = run_dir / 'strategy_index.parquet'
if strategy_index_path.exists():
    strategy_index = pd.read_parquet(strategy_index_path)
    print(f"✅ Loaded {len(strategy_index)} strategies")
else:
    print("❌ No strategy_index.parquet found")
    strategy_index = None

# Load market data
market_data = None
# Add market data loading logic here

✅ Loaded 205 strategies


## Calculate Performance Metrics

In [8]:
# Calculate performance for all strategies
# Add performance calculation logic here
performance_df = pd.DataFrame()  # Placeholder

## Stop Loss Analysis

This section demonstrates the CORRECT way to analyze stop losses:
1. Extract trades with entry/exit indices
2. Use intraday price data to check if stops are hit
3. Track trades that would have been winners but got stopped out
4. Compare with the flawed method to show the difference

In [9]:
# Example stop loss analysis for top strategy
if len(performance_df) > 0 and analyze_stop_losses:
    top_strategy = performance_df.iloc[0]
    
    # Extract trades
    trades = extract_trades(
        top_strategy['strategy_hash'], 
        top_strategy['trace_path'], 
        market_data, 
        execution_cost_bps
    )
    
    if len(trades) > 0:
        print(f"Analyzing {len(trades)} trades for stop loss impact...")
        
        # CORRECT method
        sl_impact_correct = calculate_stop_loss_impact(trades, stop_loss_levels, market_data)
        
        # WRONG method (for comparison)
        sl_impact_wrong = calculate_stop_loss_impact_WRONG(trades, stop_loss_levels)
        
        # Show the difference
        print("\n🚨 Comparison of Methods:")
        print("Stop Loss | Correct Method | Wrong Method | Difference")
        print("-" * 60)
        
        for sl in stop_loss_levels[:5]:  # Show first 5
            correct_return = sl_impact_correct[sl_impact_correct['stop_loss_pct'] == sl]['total_return'].iloc[0]
            wrong_return = sl_impact_wrong[sl_impact_wrong['stop_loss_pct'] == sl]['total_return'].iloc[0]
            diff = wrong_return - correct_return
            
            print(f"{sl:>8.2f}% | {correct_return:>13.2%} | {wrong_return:>11.2%} | {diff:>+10.2%}")
        
        # Show stopped winners
        print(f"\n📊 Trades stopped out that would have been winners:")
        for _, row in sl_impact_correct.iterrows():
            if row['stopped_winners'] > 0:
                print(f"  {row['stop_loss_pct']:.2f}% stop: {row['stopped_winners']} winners stopped ({row['stopped_winners_pct']:.1f}% of all trades)")
else:
    print("No data for stop loss analysis")

No data for stop loss analysis
