In [0]:
import requests
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from statsmodels.tsa.stattools import coint, adfuller
import statsmodels.api as sm
import matplotlib.pyplot as plt
from statsmodels.regression.rolling import RollingOLS
import matplotlib.dates as mdates

## Fetch Prices

In [0]:
def fetch_crypto_prices(pairs, exchanges, start_date, end_date):
    """Fetch cryptocurrency price data with pagination handling"""
    start_dt = datetime.strptime(start_date, '%d-%m-%Y')
    end_dt = datetime.strptime(end_date, '%d-%m-%Y')
    
    headers = {
        "accept": "application/json",
        "Accept-Encoding": "gzip, deflate, br",
        "x-api-key": # Insert your actual API key here
    }
    
    all_data = []
    
    for pair in pairs:
        for exchange in exchanges:
            print(f"\nFetching {pair} on {exchange}")
            pair_records = 0
            current_start = start_dt
            
            while current_start < end_dt:
                chunk_end = min(current_start + timedelta(days=730), end_dt)
                print(f"Period: {current_start.date()} to {chunk_end.date()}")
                
                next_url = None
                page_count = 1
                
                while True:
                    try:
                        if next_url:
                            print(f"Fetching page {page_count}...", end=" ")
                            response = requests.get(next_url, headers=headers)
                        else:
                            url = f"https://api.amberdata.com/markets/spot/ohlcv/{pair}"
                            params = {
                                'exchange': exchange,
                                'startDate': current_start.isoformat(),
                                'endDate': chunk_end.isoformat(),
                                'timeFormat': 'iso',
                                'timeInterval': 'days'
                            }
                            response = requests.get(url, headers=headers, params=params)
                        
                        if response.status_code == 200:
                            result = response.json()
                            if 'payload' not in result or 'data' not in result['payload']:
                                print(f"No data available")
                                break
                                
                            data = result['payload']['data']
                            if not data:  # Empty data array
                                print(f"No records found")
                                break
                                
                            records_in_page = len(data)
                            pair_records += records_in_page
                            all_data.extend(data)
                            
                            next_url = result['payload']['metadata'].get('next')
                            if next_url:
                                print(f"Got {records_in_page} records", end=" ")
                                page_count += 1
                            else:
                                print(f"Got {records_in_page} records - Complete")
                                break
                        else:
                            error_msg = f"Failed! Status code: {response.status_code}"
                            if response.status_code == 429:
                                error_msg += " (Rate limit exceeded - waiting 60 seconds)"
                                print(error_msg)
                                time.sleep(60)
                                continue
                            else:
                                print(error_msg)
                                break
                            
                    except Exception as e:
                        print(f"Error: {str(e)}")
                        break
                
                current_start = chunk_end
            
            print(f"Total records for {pair} on {exchange}: {pair_records}")
    
    print("\nProcessing downloaded data...")
    df = pd.DataFrame(all_data)
    
    if len(df) == 0:
        print("No data was downloaded!")
        return pd.DataFrame()
    
    df['exchangeTimestamp'] = pd.to_datetime(df['exchangeTimestamp'])
    df = df.sort_values('exchangeTimestamp').drop_duplicates(
        subset=['instrument', 'exchange', 'exchangeTimestamp'], 
        keep='first'
    )
    
    print(f"\nFinal dataset summary:")
    for instrument in df['instrument'].unique():
        instrument_data = df[df['instrument'] == instrument]
        date_range = instrument_data['exchangeTimestamp']
        print(f"{instrument:10} | {len(instrument_data):5} records | {date_range.min().date()} to {date_range.max().date()}")
    
    return df

## Calculate Returns

In [0]:
def calculate_returns(df):
    """Calculate percentage returns from price data"""
    return df['close'].pct_change()

## Test for Stationarity

In [0]:
def test_stationarity(series, threshold=0.05):
    """
    Test for stationarity using Augmented Dickey-Fuller test on the entire series.
    """
    if not isinstance(series, pd.Series):
        series = pd.Series(series)

    try:
        adf_test = adfuller(series.dropna(), regression='c', maxlag=5)
        return {
            'is_stationary': adf_test[1] < threshold,
            'adf_statistic': adf_test[0],
            'p_value': adf_test[1],
            'critical_values': adf_test[4],
            'confidence_level': 1 - threshold
        }
    except Exception as e:
        print(f"Error in stationarity test: {str(e)}")
        return {
            'is_stationary': False,
            'adf_statistic': None,
            'p_value': None,
            'critical_values': None,
            'confidence_level': None
        }


## Test for Cointegration

In [0]:
def test_cointegration(series1, series2, threshold=0.05, window=30):
    """
    Test for cointegration with a rolling regression.
    Uses log-transformed prices to stabilize regression.
    Returns a single float hedge_ratio derived from median rolling parameters.
    """
    # Align and drop NaN
    series1 = series1.dropna()
    series2 = series2.dropna()
    common_index = series1.index.intersection(series2.index)
    series1 = series1.loc[common_index]
    series2 = series2.loc[common_index]

    # Check for enough data
    if len(series1) < window or len(series2) < window:
        return None
    
    # Log transform prices
    y = np.log(series1)
    x = np.log(series2)
    df = pd.DataFrame({'y': y, 'x': x}).dropna()
    
    if len(df) < window:
        return None

    try:
        # Full sample cointegration test
        coint_t, p_value, critical_values = coint(df['y'], df['x'])

        # Add constant to x
        X = sm.add_constant(df['x'])

        if window:
            # Rolling regression using RollingOLS
            rols = RollingOLS(df['y'], X, window=window).fit()
            slope = rols.params['x']
            intercept = rols.params['const']
            
            # Calculate rolling R-squared
            y_pred = slope * df['x'] + intercept
            rolling_residuals = df['y'] - y_pred
            rolling_tss = ((df['y'] - df['y'].rolling(window=window).mean()) ** 2).rolling(window=window).sum()
            rolling_rss = (rolling_residuals ** 2).rolling(window=window).sum()
            r_squared = 1 - (rolling_rss / rolling_tss)
        else:
            model = sm.OLS(df['y'], X).fit()
            slope = pd.Series(model.params['x'], index=df.index)
            intercept = pd.Series(model.params['const'], index=df.index)
            r_squared = pd.Series(model.rsquared, index=df.index)

        # Derive a single hedge ratio from median parameters
        median_slope = slope.median()
        median_intercept = intercept.median()
        
        # Median price of second series
        median_p2 = np.median(series2)
        if median_p2 <= 0:
            # Avoid invalid median price
            return None

        median_ln_p2 = np.log(median_p2)

        # From log model: P1 ≈ exp(intercept + slope*ln(P2))
        # Hedge ratio ≈ (P1_median / median_p2)
        median_hedge_ratio = np.exp(median_intercept + median_slope * median_ln_p2) / median_p2

        # Test the spread based on this hedge ratio
        spread_test = series1 - (median_hedge_ratio * series2)
        spread_stationarity = test_stationarity(spread_test, threshold)

        return {
            'is_cointegrated': p_value < threshold and spread_stationarity['is_stationary'],
            'coint_t_stat': coint_t,
            'p_value': p_value,
            'critical_values': critical_values,
            'hedge_ratio': median_hedge_ratio,  # float hedge ratio
            'intercept': median_intercept,       # scalar intercept if needed
            'spread': spread_test,
            'spread_stationarity': spread_stationarity,
            'r_squared': r_squared
        }

    except Exception as e:
        print(f"Error in cointegration test: {str(e)}")
        return None

## Calculate Correlations

In [0]:
def calculate_correlations(returns1, returns2, short_window=30, long_window=180):
    """
    Calculate correlations using rolling windows for short and long term periods
    """
    common_index = returns1.index.intersection(returns2.index)
    returns1 = returns1[common_index]
    returns2 = returns2[common_index]
    
    if len(returns1) < long_window:
        return {
            'short_correlation': None,
            'long_correlation': None,
            'correlation_divergence': None
        }
    
    short_correlation = returns1.rolling(window=short_window, min_periods=short_window).corr(returns2)
    long_correlation = returns1.rolling(window=long_window, min_periods=long_window).corr(returns2)
    
    correlation_divergence = long_correlation - short_correlation
    
    return {
        'short_correlation': short_correlation,
        'long_correlation': long_correlation,
        'correlation_divergence': correlation_divergence
    }

## Mean Reversion

In [0]:
def test_mean_reversion(series):
    """Test for mean reversion characteristics"""
    if not isinstance(series, pd.Series):
        series = pd.Series(series)
    series = series.dropna()
    
    try:
        lags = range(2, min(20, len(series)//4))
        tau = []
        for lag in lags:
            lag_series = np.subtract(series[lag:].values, series[:-lag].values)
            if len(lag_series) > 0:
                lag_std = np.std(lag_series)
                if not np.isnan(lag_std) and lag_std != 0:
                    tau.append(lag_std)

        if len(tau) > 1 and all(t > 0 for t in tau):
            log_lags = np.log(list(lags)[:len(tau)])
            log_tau = np.log(tau)
            reg = np.polyfit(log_lags, log_tau, 1)
            hurst = reg[0] / 2.0
        else:
            hurst = 0.5

        df = pd.DataFrame({'y': series, 'y_lag': series.shift(1)}).dropna()
        X = sm.add_constant(df['y_lag'])
        model = sm.OLS(df['y'], X).fit()
        
        param = model.params['y_lag']
        if 0 < param < 1:
            half_life = -np.log(2) / np.log(param)
            mean_reversion_speed = 1 - param
        else:
            half_life = np.inf
            mean_reversion_speed = 0
        
        return {
            'is_mean_reverting': bool(hurst < 0.5),
            'hurst_exponent': float(hurst),
            'half_life': float(half_life),
            'mean_reversion_speed': float(mean_reversion_speed),
            'confidence': float(model.rsquared)
        }
    except Exception as e:
        print(f"Error in mean reversion test: {str(e)}")
        return {
            'is_mean_reverting': False,
            'hurst_exponent': 0.5,
            'half_life': np.inf,
            'mean_reversion_speed': 0,
            'confidence': 0
        }

## Calculate Z Score

In [0]:
def calculate_zscore(series, window=30):
    """
    Calculate rolling z-score for a series using specified window
    """
    if not isinstance(series, pd.Series):
        series = pd.Series(series)
    
    rolling_mean = series.rolling(window=window).mean()
    rolling_std = series.rolling(window=window).std()
    
    zscore = (series - rolling_mean) / rolling_std
    zscore = zscore.replace([np.inf, -np.inf], np.nan)
    
    return zscore

## Validate Pair Condition

In [0]:
def validate_pair_conditions(df1, metrics, min_correlation=0.4, min_cointegration_conf=0.8):
    """
    Validate if a pair meets trading conditions
    """
    print("\nValidation Analysis:")
    
    has_enough_data = len(df1) >= 60
    if not has_enough_data:
        print("❌ Failed: Insufficient data points")
        return False, {'enough_data': False}

    correlation = metrics['correlation']['long_correlation'].iloc[-1]
    r_squared = metrics['cointegration']['r_squared'].iloc[-1]
    correlation_path_valid = (correlation is not None and correlation >= min_correlation and 
                              r_squared is not None and r_squared >= 0.3)
    
    print(f"Path 1 - Statistical Relationship:")
    print(f"Correlation: {correlation:.4f} ({'✅' if correlation_path_valid else '❌'})")
    print(f"R-squared: {r_squared:.4f} ({'✅' if r_squared >= 0.3 else '❌'})")
    print(f"Path 1 Valid: {'✅' if correlation_path_valid else '❌'}")

    is_cointegrated = metrics['cointegration']['p_value'] < (1 - min_cointegration_conf)
    is_stationary = metrics['cointegration']['spread_stationarity']['is_stationary']
    cointegration_path_valid = is_cointegrated or is_stationary
    
    print(f"\nPath 2 - Mean Reversion Properties:")
    print(f"Cointegrated: {'✅' if is_cointegrated else '❌'} (p={metrics['cointegration']['p_value']:.4f})")
    print(f"Spread Stationary: {'✅' if is_stationary else '❌'} (p={metrics['cointegration']['spread_stationarity']['p_value']:.4f})")
    print(f"Path 2 Valid: {'✅' if cointegration_path_valid else '❌'}")

    is_valid = has_enough_data and (correlation_path_valid or cointegration_path_valid)
    
    print(f"\nFinal Result: {'✅ PASS' if is_valid else '❌ FAIL'}")
    if is_valid:
        print("Passed via: " + 
              ("Correlation/R-squared" if correlation_path_valid else "") +
              (" and " if correlation_path_valid and cointegration_path_valid else "") +
              ("Cointegration/Stationarity" if cointegration_path_valid else ""))

    conditions = {
        'enough_data': has_enough_data,
        'correlation_path': correlation_path_valid,
        'cointegration_path': cointegration_path_valid
    }

    return is_valid, conditions

## Analyze Pair

In [0]:
def analyze_pair(df1, df2, config):
    """Analyze a pair of assets with validation"""
    
    if len(df1) < 60 or len(df2) < 60:
        return None, None
    
    returns1 = calculate_returns(df1)
    returns2 = calculate_returns(df2)
    
    correlation_metrics = calculate_correlations(returns1, returns2)
    cointegration_metrics = test_cointegration(df1['close'], df2['close'])
    
    if cointegration_metrics is None or not cointegration_metrics['is_cointegrated']:
        return None, None
    
    # Check hedge ratio reasonableness
    hedge_ratio = cointegration_metrics['hedge_ratio']
    if hedge_ratio is None or hedge_ratio <= 0:
        return None, None
    
    # If hedge ratio is extremely large or small, skip
    if hedge_ratio > 10 or hedge_ratio < 0.1:
        print(f"Hedge ratio out of reasonable bounds: {hedge_ratio:.4f}, skipping this pair.")
        return None, None
    
    spread = cointegration_metrics['spread']
    mr_metrics = test_mean_reversion(spread)
    
    metrics = {
        'correlation': correlation_metrics,
        'cointegration': cointegration_metrics,
        'mean_reversion': mr_metrics
    }
    
    is_valid, conditions = validate_pair_conditions(df1, metrics)
    if not is_valid:
        return None, None
    
    series_data = pd.DataFrame({
        'pair': f"{df1['instrument'].iloc[0]}/{df2['instrument'].iloc[0]}",
        'exchange': df1['exchange'].iloc[0],
        'price1': df1['close'],
        'price2': df2['close'],
        'returns1': returns1,
        'returns2': returns2,
        'hedge_ratio': hedge_ratio,
        'spread': spread,
        'spread_zscore': calculate_zscore(spread),
        'correlation': correlation_metrics['long_correlation']
    })
    
    return series_data, metrics

## Generate Signals

In [0]:
def generate_signals(series_data, metrics, config):
    """
    Generate trading signals with proper type handling and position-based exits
    
    Parameters:
    series_data (pd.DataFrame): Input data containing spread_zscore
    metrics: Additional metrics (unused in current version)
    config (dict): Configuration parameters including zscore_threshold
    
    1 = buy/long entry
    -1 = sell/short entry
    0 = no trade
    -2 = exit long position (sell to close)
    2 = exit short position (buy to cover)
    """
    signals = pd.DataFrame(index=series_data.index)
    zscore = series_data['spread_zscore']
    zscore_threshold = config['zscore_threshold']
    
    signals['signal'] = 0
    signals['zscore'] = zscore
    current_position = 0
    
    for i in range(1, len(signals)):
        curr_zscore = zscore.iloc[i]
        prev_zscore = zscore.iloc[i-1]
        
        if pd.isna(curr_zscore) or pd.isna(prev_zscore):
            continue
            
        if current_position == 0:
            if curr_zscore > zscore_threshold and prev_zscore <= zscore_threshold:
                signals.iloc[i, signals.columns.get_loc('signal')] = -1
                current_position = -1
            elif curr_zscore < -zscore_threshold and prev_zscore >= -zscore_threshold:
                signals.iloc[i, signals.columns.get_loc('signal')] = 1
                current_position = 1
        elif (curr_zscore < 0 and prev_zscore >= 0) or (curr_zscore > 0 and prev_zscore <= 0):
            if current_position == 1:
                signals.iloc[i, signals.columns.get_loc('signal')] = -2
                current_position = 0
            elif current_position == -1:
                signals.iloc[i, signals.columns.get_loc('signal')] = 2
                current_position = 0
    
    signals['signal'] = signals['signal'].fillna(0)
    return signals

## Calculate Performance

In [0]:
def calculate_performance_metrics(returns, trades_df, cumulative_returns):
    """Calculate performance metrics"""
    total_return = cumulative_returns.iloc[-1] - 1
    n_years = len(returns) / 365
    if total_return > -1:
        annual_return = (1 + total_return) ** (1/n_years) - 1
    else:
        annual_return = total_return / n_years
        
    daily_volatility = returns.std()
    annual_volatility = daily_volatility * np.sqrt(365)
    sharpe_ratio = annual_return / annual_volatility if annual_volatility != 0 else 0
    
    rolling_max = cumulative_returns.expanding().max()
    drawdowns = (cumulative_returns - rolling_max) / rolling_max
    max_drawdown = drawdowns.min()
    
    if len(trades_df) > 0:
        trade_returns = trades_df['return']
        winning_trades = trades_df[trade_returns > 0]
        losing_trades = trades_df[trade_returns <= 0]
        
        win_rate = len(winning_trades) / len(trades_df) if len(trades_df) > 0 else 0
        avg_win = winning_trades['return'].mean() if len(winning_trades) > 0 else 0
        avg_loss = losing_trades['return'].mean() if len(losing_trades) > 0 else 0
        profit_factor = abs(winning_trades['return'].sum() / losing_trades['return'].sum()) if (len(losing_trades) > 0 and losing_trades['return'].sum() != 0) else 0
        avg_return_per_trade = trade_returns.mean() if len(trade_returns) > 0 else 0
    else:
        win_rate = 0
        avg_win = 0
        avg_loss = 0
        profit_factor = 0
        avg_return_per_trade = 0
    
    return {
        'total_return': total_return,
        'annual_return': annual_return,
        'annual_volatility': annual_volatility,
        'sharpe_ratio': sharpe_ratio,
        'max_drawdown': max_drawdown,
        'num_trades': len(trades_df),
        'win_rate': win_rate,
        'avg_win': avg_win,
        'avg_loss': avg_loss,
        'profit_factor': profit_factor,
        'avg_return_per_trade': avg_return_per_trade
    }

## Backtest Pair Strategy

In [0]:
def backtest_pair_strategy(series_data, signals, config):
    """
    Backtest pairs trading strategy with proper position tracking, stop loss, and profit taking
    """
    dtypes = {
        'position': 'int32',
        'equity': 'float64',
        'returns': 'float64',
        'unrealized_pnl': 'float64',
        'active_qty1': 'float64',
        'active_qty2': 'float64'
    }
    
    result_df = pd.DataFrame(
        {
            'position': [0]*len(signals),
            'equity': [0.0]*len(signals),
            'returns': [0.0]*len(signals),
            'unrealized_pnl': [0.0]*len(signals),
            'active_qty1': [0.0]*len(signals),
            'active_qty2': [0.0]*len(signals)
        }, index=signals.index
    ).astype(dtypes)
    
    position = 0
    running_equity = config['initial_capital']  # Realized equity
    returns = []
    trades = []
    active_qty1 = 0.0
    active_qty2 = 0.0
    entry_price1 = 0.0
    entry_price2 = 0.0
    entry_date = None
    equity_curve = []

    prev_daily_equity = running_equity  # Used to calculate daily returns

    for i in range(len(signals)):
        current_signal = signals['signal'].iloc[i]
        price1 = series_data['price1'].iloc[i]
        price2 = series_data['price2'].iloc[i]
        hedge_ratio = series_data['hedge_ratio'].iloc[i]
        current_date = signals.index[i]
        
        # Calculate current PnL (unrealized)
        if position == 1:
            current_pnl = (active_qty1 * (price1 - entry_price1)) - (active_qty2 * (price2 - entry_price2))
        elif position == -1:
            current_pnl = (-active_qty1 * (price1 - entry_price1)) + (active_qty2 * (price2 - entry_price2))
        else:
            current_pnl = 0.0
        
        period_return = 0.0
        
        # Check stop loss / take profit if in a position
        if position != 0:
            pnl_pct = current_pnl / running_equity if running_equity != 0 else 0
            if pnl_pct <= -config['stop_loss_pct']:
                exit_type = 'stop_loss'
            elif pnl_pct >= config['take_profit_pct']:
                exit_type = 'take_profit'
            else:
                exit_type = None
            
            if exit_type:
                total_cost = (abs(active_qty1*price1) + abs(active_qty2*price2)) * config['transaction_cost']
                trade_return = current_pnl - total_cost
                running_equity += trade_return  # Realize the trade return
                # After realizing trade, current_pnl = 0 since position closes
                current_pnl = 0.0

                trades.append({
                    'date': current_date,
                    'entry_date': entry_date,
                    'signal': exit_type,
                    'entry_price1': entry_price1,
                    'entry_price2': entry_price2,
                    'exit_price1': price1,
                    'exit_price2': price2,
                    'qty1': active_qty1,
                    'qty2': active_qty2,
                    'return': trade_return,
                    'costs': total_cost,
                    'position': position,
                    'pnl_pct': pnl_pct * 100
                })

                position = 0
                active_qty1 = 0.0
                active_qty2 = 0.0
                entry_date = None
        
        # Handle new entries
        elif current_signal in [1, -1] and position == 0:
            position_size = running_equity * config['position_size_pct']
            
            if price1 <= 0 or price2 <= 0 or hedge_ratio is None or hedge_ratio <= 0:
                # Invalid conditions for a trade
                pass
            else:
                active_qty1 = position_size / price1
                attempted_qty2 = (position_size * hedge_ratio) / price2
                
                if attempted_qty2 > 1e6:
                    print("Attempted qty2 too large, skipping trade.")
                else:
                    active_qty2 = attempted_qty2
                    entry_price1 = price1
                    entry_price2 = price2
                    entry_date = current_date
                    total_cost = (abs(active_qty1*price1) + abs(active_qty2*price2)) * config['transaction_cost']
                    running_equity -= total_cost
                    position = 1 if current_signal == 1 else -1

        # Handle normal exits
        elif ((current_signal == -2 and position == 1) or 
              (current_signal == 2 and position == -1)):
            
            total_cost = (abs(active_qty1*price1) + abs(active_qty2*price2)) * config['transaction_cost']
            trade_return = current_pnl - total_cost
            running_equity += trade_return
            current_pnl = 0.0  # Position closed, unrealized pnl realized
            trades.append({
                'date': current_date,
                'entry_date': entry_date,
                'signal': current_signal,
                'entry_price1': entry_price1,
                'entry_price2': entry_price2,
                'exit_price1': price1,
                'exit_price2': price2,
                'qty1': active_qty1,
                'qty2': active_qty2,
                'return': trade_return,
                'costs': total_cost,
                'position': position,
                'pnl_pct': (trade_return / running_equity)*100 if running_equity!=0 else 0
            })

            position = 0
            active_qty1 = 0.0
            active_qty2 = 0.0
            entry_date = None
        
        # Daily mark-to-market equity calculation
        daily_equity = running_equity + current_pnl
        
        # Calculate daily returns based on daily_equity changes
        if prev_daily_equity != 0:
            period_return = (daily_equity / prev_daily_equity) - 1
        else:
            period_return = 0.0
        
        prev_daily_equity = daily_equity
        
        result_df.loc[signals.index[i]] = {
            'position': position,
            'equity': daily_equity,
            'returns': period_return,
            'unrealized_pnl': current_pnl,
            'active_qty1': active_qty1,
            'active_qty2': active_qty2
        }
        
        returns.append(period_return)
        equity_curve.append(daily_equity)

    returns = pd.Series(returns, index=signals.index)
    trades_df = pd.DataFrame(trades)
    cumulative_returns = pd.Series(equity_curve, index=signals.index) / config['initial_capital']
    
    performance = calculate_performance_metrics(returns, trades_df, cumulative_returns)
    
    return result_df, returns, performance, trades_df, cumulative_returns





## Run Pair Trading System

In [0]:
def run_pair_trading_system(prices_df, pairs, exchanges, config):
    """Run complete pair trading analysis system"""
    results = []
    
    if 'exchangeTimestamp' in prices_df.columns and not isinstance(prices_df.index, pd.DatetimeIndex):
        prices_df = prices_df.set_index('exchangeTimestamp')
    
    for exchange in exchanges:
        for i, pair1 in enumerate(pairs):
            for pair2 in pairs[i+1:]:
                print(f"\nAnalyzing {pair1}/{pair2} on {exchange}")
                
                df1 = prices_df[(prices_df['instrument'] == pair1) & (prices_df['exchange'] == exchange)].copy()
                df2 = prices_df[(prices_df['instrument'] == pair2) & (prices_df['exchange'] == exchange)].copy()
                
                df1 = df1.sort_index()
                df2 = df2.sort_index()

                common_index = df1.index.intersection(df2.index)
                df1 = df1.loc[common_index]
                df2 = df2.loc[common_index]
                
                df1 = df1.dropna(subset=['close'])
                df2 = df2.dropna(subset=['close'])
                
                if len(df1) < config['min_data_points'] or len(df2) < config['min_data_points']:
                    print(f"Insufficient data: {pair1}: {len(df1)} days, {pair2}: {len(df2)} days")
                    continue
                
                if df1['close'].isnull().any() or df2['close'].isnull().any():
                    print(f"Found missing prices in {pair1} or {pair2}")
                    continue
                
                try:
                    print(f"Data ranges:")
                    print(f"{pair1}: {df1.index.min()} to {df1.index.max()}")
                    print(f"{pair2}: {df2.index.min()} to {df2.index.max()}")
                    print(f"Number of records: {pair1}: {len(df1)}, {pair2}: {len(df2)}")
                    
                    series_data, metrics = analyze_pair(df1, df2, config)
                    if series_data is None or metrics is None:
                        print("Pair failed statistical validation")
                        continue
                    
                    print(f"Cointegration p-value: {metrics['cointegration']['p_value']:.4f}")
                    print(f"Hedge ratio: {metrics['cointegration']['hedge_ratio']:.4f}")
                    print(f"Mean reversion - Hurst: {metrics['mean_reversion']['hurst_exponent']:.4f}")
                    
                    signals = generate_signals(series_data, metrics, config)
                    
                    result_df, ret_series, performance, trades, equity_curve = backtest_pair_strategy(
                        series_data, signals, config
                    )
                    
                    if performance['num_trades'] > 0:
                        print(f"Found {performance['num_trades']} trades")
                        print(f"Total return: {performance['total_return']:.2%}")
                        print(f"Sharpe ratio: {performance['sharpe_ratio']:.2f}")
                        print(f"Win rate: {performance['win_rate']:.2%}")
                        
                        result = {
                            'pair': f"{pair1}/{pair2}",
                            'exchange': exchange,
                            'series_data': series_data,
                            'metrics': metrics,
                            'signals': signals,
                            'returns': ret_series,
                            'performance': performance,
                            'trades': trades,
                            'equity_curve': equity_curve,
                            'result_df':result_df
                        }
                        results.append(result)
                    else:
                        print("No trades executed")
                        
                except Exception as e:
                    print(f"Error analyzing pair {pair1}/{pair2}: {str(e)}")
                    import traceback
                    print(traceback.format_exc())
                    continue
    
    return results

##Run

In [0]:
pairs = [
    "btc_usdt",
    "eth_usdt",
    "sol_usdt",
    "xrp_usdt",
    "ada_usdt",
    "avax_usdt",
    "dot_usdt",
    "doge_usdt",
    "link_usdt",
    "matic_usdt",
    "shib_usdt",
    "ltc_usdt",
    "uni_usdt",
    "atom_usdt",
    "etc_usdt",
    "fil_usdt",
    "near_usdt",
    "algo_usdt",
    "vet_usdt",
    "apt_usdt",
    "sand_usdt",
    "mana_usdt",
    "gala_usdt",
    "aave_usdt"
]

exchanges = ['bitfinex']

today = datetime.today()
start_date = '01-01-2020'
end_date = today.strftime('%d-%m-%Y')

print("Fetching price data...")
prices_df = fetch_crypto_prices(pairs, exchanges, start_date, end_date)

## Configuration and Execute Search

In [0]:
config = {
    'initial_capital': 1000000,
    'position_size_pct': 0.05,
    'transaction_cost': 0.001, 
    'stop_loss_pct': 0.05,
    'take_profit_pct': 0.30,
    'min_correlation': 0.3,
    'min_cointegration_conf': 0.90,
    'zscore_threshold': 3.0,
    'min_data_points': 60
}

results = run_pair_trading_system(
    prices_df=prices_df,
    pairs=pairs,
    exchanges=exchanges,
    config=config
)

## Print Results

In [0]:
def print_trading_summary(results):
    print("\n=== Pair Trading Analysis Summary ===")
    for result in results:
        print(f"\nPair: {result['pair']} on {result['exchange']}")
        metrics = result['metrics']
        perf = result['performance']
        
        print("\nStatistical Properties:")
        print(f"Cointegration p-value: {metrics['cointegration']['p_value']:.4f}")
        print(f"Hedge ratio: {metrics['cointegration']['hedge_ratio']:.4f}")
        print(f"Long-term correlation: {metrics['correlation']['long_correlation'].iloc[-1]:.4f}")
        print(f"Hurst exponent: {metrics['mean_reversion']['hurst_exponent']:.4f}")
        
        print("\nTrading Performance:")
        print(f"Total Return: {perf['total_return']*100:.2f}%")
        print(f"Annual Return: {perf['annual_return']*100:.2f}%")
        print(f"Sharpe Ratio: {perf['sharpe_ratio']:.2f}")
        print(f"Max Drawdown: {perf['max_drawdown']*100:.2f}%")
        print(f"Number of Trades: {perf['num_trades']}")
        print(f"Win Rate: {perf['win_rate']*100:.2f}%")
        print(f"Profit Factor: {perf['profit_factor']:.2f}")
        print("\n" + "="*50)

def print_winning_trading_summary(results):
    winners_found = False
    print("\n=== Winning Pairs Trading Analysis Summary ===")
    
    for result in results:
        perf = result['performance']
        if (perf['total_return'] > 0 and 
            perf['sharpe_ratio'] > 0 and 
            perf['win_rate'] > 0.2):
            winners_found = True
            print(f"\nPair: {result['pair']} on {result['exchange']}")
            metrics = result['metrics']
            
            print("\nStatistical Properties:")
            print(f"Cointegration p-value: {metrics['cointegration']['p_value']:.4f}")
            print(f"Hedge ratio: {metrics['cointegration']['hedge_ratio']:.4f}")
            print(f"Long-term correlation: {metrics['correlation']['long_correlation'].iloc[-1]:.4f}")
            print(f"Hurst exponent: {metrics['mean_reversion']['hurst_exponent']:.4f}")
            
            print("\nTrading Performance:")
            print(f"Total Return: {perf['total_return']*100:.2f}%")
            print(f"Annual Return: {perf['annual_return']*100:.2f}%")
            print(f"Sharpe Ratio: {perf['sharpe_ratio']:.2f}")
            print(f"Max Drawdown: {perf['max_drawdown']*100:.2f}%")
            print(f"Number of Trades: {perf['num_trades']}")
            print(f"Win Rate: {perf['win_rate']*100:.2f}%")
            print(f"Profit Factor: {perf['profit_factor']:.2f}")
            print("\n" + "="*50)
    
    if not winners_found:
        print("\nNo winning pairs found matching the criteria.")
        print("Criteria for winners:")
        print("- Positive total return")
        print("- Positive Sharpe ratio")
        print("- Win rate above 20%")
        print("\n" + "="*50)

In [0]:
# Print results
print_trading_summary(results)

In [0]:
# Print winning trade summary
print_winning_trading_summary(results)

## Plot Results

In [0]:
def plot_enhanced_result(result, config):
    """
    Plot a comprehensive set of charts for a single result record:
    - Spread and trades
    - Z-score of spread and trades
    - Equity curve
    - Rolling correlations (short and long term)
    - Rolling R-squared from cointegration
    - Mark stop loss exits with a black 'X' and take profit exits with a black '+'
    """

    series_data = result['series_data']
    signals = result['signals']
    trades = result['trades']
    result_df = result['result_df']
    metrics = result['metrics']

    spread = series_data['spread']
    zscore = series_data['spread_zscore']
    equity = result_df['equity'] if 'equity' in result_df.columns else None

    # Identify entry and exit signals
    long_entries = signals[signals['signal'] == 1].index
    short_entries = signals[signals['signal'] == -1].index
    long_exits = signals[signals['signal'] == -2].index
    short_exits = signals[signals['signal'] == 2].index

    # Extract correlation data if available
    short_corr = metrics['correlation'].get('short_correlation')
    long_corr = metrics['correlation'].get('long_correlation')

    # Extract rolling R-squared if available
    r_squared = metrics['cointegration'].get('r_squared')

    # Mean reversion metrics
    mr_metrics = metrics['mean_reversion']
    hurst = mr_metrics.get('hurst_exponent', None)
    half_life = mr_metrics.get('half_life', None)
    mean_reversion_speed = mr_metrics.get('mean_reversion_speed', None)
    confidence = mr_metrics.get('confidence', None)

    # We have 5 subplots: Spread, Z-score, Equity, Correlations, R-squared
    fig, axes = plt.subplots(5, 1, figsize=(14, 16), sharex=True)

    fig.suptitle(f"{result['pair']} on {result['exchange']} - Comprehensive Analysis",
                 fontsize=14, y=0.98)

    # --- Subplot 1: Spread with trades ---
    ax1 = axes[0]
    ax1.plot(spread.index, spread, label='Spread', color='blue')
    ax1.scatter(long_entries, spread.loc[long_entries], marker='^', color='green', s=100, label='Long Entry')
    ax1.scatter(short_entries, spread.loc[short_entries], marker='v', color='red', s=100, label='Short Entry')
    ax1.scatter(long_exits, spread.loc[long_exits], marker='x', color='green', s=100, label='Long Exit')
    ax1.scatter(short_exits, spread.loc[short_exits], marker='x', color='red', s=100, label='Short Exit')

    # Mark stop_loss and take_profit exits:
    # We'll loop through the trades DataFrame
    for _, trade in trades.iterrows():
        exit_signal = trade['signal']  # could be 'stop_loss', 'take_profit', or numeric
        exit_date = trade['date']
        if exit_date in spread.index:  # Ensure we have data for that date
            if exit_signal == 'stop_loss':
                ax1.scatter(exit_date, spread.loc[exit_date], marker='x', color='black', s=100, label='Stop Loss Exit')
            elif exit_signal == 'take_profit':
                ax1.scatter(exit_date, spread.loc[exit_date], marker='+', color='black', s=100, label='Take Profit Exit')

    ax1.set_ylabel('Spread')
    ax1.set_title('Spread with Trade Signals', fontsize=12)
    # We may have multiple legends due to repeated labels. Let's handle that:
    handles, labels = ax1.get_legend_handles_labels()
    # Remove duplicates while preserving order
    unique = list(dict(zip(labels, handles)).items())
    ax1.legend([u[1] for u in unique],[u[0] for u in unique], fontsize=9, loc='best')
    ax1.grid(True)

    # --- Subplot 2: Z-score with thresholds and trades ---
    ax2 = axes[1]
    ax2.plot(zscore.index, zscore, label='Z-score', color='purple')
    threshold = config['zscore_threshold']
    ax2.axhline(y=threshold, color='red', linestyle='--', linewidth=1, label=f'+{threshold} Threshold')
    ax2.axhline(y=-threshold, color='red', linestyle='--', linewidth=1, label=f'-{threshold} Threshold')
    ax2.axhline(y=0, color='black', linestyle='-', linewidth=1, label='Zero Line')
    ax2.scatter(long_entries, zscore.loc[long_entries], marker='^', color='green', s=100)
    ax2.scatter(short_entries, zscore.loc[short_entries], marker='v', color='red', s=100)
    ax2.scatter(long_exits, zscore.loc[long_exits], marker='x', color='green', s=100)
    ax2.scatter(short_exits, zscore.loc[short_exits], marker='x', color='red', s=100)

    # No special markers for stop_loss or take_profit here, just on spread chart
    ax2.set_ylabel('Z-score')
    ax2.set_title('Z-score of Spread', fontsize=12)
    ax2.legend(loc='best', fontsize=9)
    ax2.grid(True)

    # --- Subplot 3: Equity Curve ---
    ax3 = axes[2]
    if equity is not None:
        ax3.plot(equity.index, equity, label='Equity', color='blue')
        ax3.set_ylabel('Equity')
        ax3.set_title('Equity Curve', fontsize=12)
        ax3.legend(loc='best', fontsize=9)
        ax3.grid(True)
    else:
        ax3.text(0.5, 0.5, 'No Equity Data Available', ha='center', va='center',
                 transform=ax3.transAxes, fontsize=10)

    # --- Subplot 4: Rolling Correlations ---
    ax4 = axes[3]
    if short_corr is not None and long_corr is not None:
        ax4.plot(short_corr.index, short_corr, label='Short-term Correlation', color='orange')
        ax4.plot(long_corr.index, long_corr, label='Long-term Correlation', color='green')
        ax4.set_ylabel('Correlation')
        ax4.set_title('Rolling Correlations', fontsize=12)
        ax4.legend(loc='best', fontsize=9)
        ax4.grid(True)
    else:
        ax4.text(0.5, 0.5, 'No Rolling Correlation Data Available', ha='center', va='center',
                 transform=ax4.transAxes, fontsize=10)

    # --- Subplot 5: Rolling R-squared ---
    ax5 = axes[4]
    if r_squared is not None:
        ax5.plot(r_squared.index, r_squared, label='Rolling R-squared', color='magenta')
        ax5.set_ylabel('R-squared')
        ax5.set_title('Rolling R-squared of Cointegration Regression', fontsize=12)
        ax5.legend(loc='best', fontsize=9)
        ax5.grid(True)
    else:
        ax5.text(0.5, 0.5, 'No Rolling R-squared Data Available', ha='center', va='center',
                 transform=ax5.transAxes, fontsize=10)

    for ax in axes:
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
        ax.tick_params(axis='x', rotation=45, labelsize=9)

    plt.tight_layout()
    plt.show()

    
for r in results[:3]:
    plot_enhanced_result(r, config) 