<a href="https://colab.research.google.com/github/baileysmoko/Fabric/blob/main/Mean_Reversion_Pairs_Trading.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import os

folder = '/content/drive/MyDrive/top1000_tokens_20251008_154804'

# Load combined datasets
combined_prices = pd.read_csv(os.path.join(folder, 'combined_prices_daily.csv'))
combined_caps = pd.read_csv(os.path.join(folder, 'combined_market_caps_daily.csv'))
combined_volumes = pd.read_csv(os.path.join(folder, 'combined_total_volumes_daily.csv'))
tokens = pd.read_csv(os.path.join(folder, 'selected_tokens.csv'))


In [None]:
# Convert timestamp to datetime and set as index
for df in [combined_prices, combined_caps, combined_volumes]:
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df.set_index('timestamp', inplace=True)

# Assume all token columns are identical across the three DataFrames
tokens = combined_prices.columns

# Build a wide DataFrame where each entry is a tuple (price, cap, volume)
tuple_df = pd.DataFrame(index=combined_prices.index)
for t in tokens:
    tuple_df[t] = list(zip(combined_prices[t], combined_caps[t], combined_volumes[t]))

print(f"‚úÖ Tuple-wide dataset shape: {tuple_df.shape}")
print(tuple_df.head())

‚úÖ Tuple-wide dataset shape: (4547, 1000)
                                bitcoin         ethereum      binancecoin  \
timestamp                                                                   
2013-04-28   (135.3, 1500517590.0, 0.0)  (nan, nan, nan)  (nan, nan, nan)   
2013-04-29  (141.96, 1575032004.0, 0.0)  (nan, nan, nan)  (nan, nan, nan)   
2013-04-30   (135.3, 1501657493.0, 0.0)  (nan, nan, nan)  (nan, nan, nan)   
2013-05-01   (117.0, 1298951550.0, 0.0)  (nan, nan, nan)  (nan, nan, nan)   
2013-05-02  (103.43, 1148667722.0, 0.0)  (nan, nan, nan)  (nan, nan, nan)   

                     ripple           solana         dogecoin  \
timestamp                                                       
2013-04-28  (nan, nan, nan)  (nan, nan, nan)  (nan, nan, nan)   
2013-04-29  (nan, nan, nan)  (nan, nan, nan)  (nan, nan, nan)   
2013-04-30  (nan, nan, nan)  (nan, nan, nan)  (nan, nan, nan)   
2013-05-01  (nan, nan, nan)  (nan, nan, nan)  (nan, nan, nan)   
2013-05-02  (nan, nan, nan)

In [None]:
import pandas as pd
import numpy as np
import re

# Assume tuple_df is your DataFrame with tuples: (price, market_cap, volume)
# Example: tuple_df['bitcoin'][0] ‚Üí (135.3, 1500517590.0, 0.0)

# --- PARAMETERS ---
MIN_YEARS = 5
MIN_MARKET_CAP = 100_000_000  # Shorya suggested $100M

# --- 1. Remove staked/wrapped/variant tokens ---
def is_base_token(name):
    # Remove staked, wrapped, vault, etc.
    return not bool(re.search(r'(-staked|-wrapped|-vault|-v\d+)', name))

base_tokens = [t for t in tuple_df.columns if is_base_token(t)]
tuple_df = tuple_df[base_tokens]
print(f"‚úÖ Tokens after removing staked/wrapped variants: {len(tuple_df.columns)}")

# --- 2. Remove tokens with less than MIN_YEARS of data ---
# Compute number of non-NaN price entries
min_rows_required = MIN_YEARS * 365  # approximate, ignoring leap years
def has_enough_data(series):
    # series is a column of tuples
    non_na_count = sum([1 for x in series if pd.notna(x[0])])
    return non_na_count >= min_rows_required

tokens_with_enough_data = [t for t in tuple_df.columns if has_enough_data(tuple_df[t])]
tuple_df = tuple_df[tokens_with_enough_data]
print(f"‚úÖ Tokens after removing those with < {MIN_YEARS} years of data: {len(tuple_df.columns)}")

# --- 3. Remove tokens with low market cap ---
def meets_market_cap(series, min_cap=MIN_MARKET_CAP):
    # Check if token ever had market cap above min_cap
    return any([x[1] >= min_cap if pd.notna(x[1]) else False for x in series])

tokens_high_cap = [t for t in tuple_df.columns if meets_market_cap(tuple_df[t])]
tuple_df = tuple_df[tokens_high_cap]
print(f"‚úÖ Tokens after removing low market cap: {len(tuple_df.columns)}")

# tuple_df now contains only the tokens you want


‚úÖ Tokens after removing staked/wrapped variants: 985
‚úÖ Tokens after removing those with < 5 years of data: 226
‚úÖ Tokens after removing low market cap: 217


In [None]:
# Extract prices from tuple_df
prices_df = tuple_df.applymap(lambda x: x[0] if pd.notna(x[0]) else np.nan)
print(f"‚úÖ Prices DataFrame shape: {prices_df.shape}")


‚úÖ Prices DataFrame shape: (4547, 217)


In [None]:
import itertools
import pandas as pd

def pairwise_corr(df):
    pairs = []
    tokens = df.columns.tolist()
    for t1, t2 in itertools.combinations(tokens, 2):
        overlap = df[[t1, t2]].dropna()
        if len(overlap) < 30:  # skip if less than 30 overlapping days
            continue
        corr_val = overlap[t1].corr(overlap[t2])
        pairs.append((t1, t2, corr_val))
    return pd.DataFrame(pairs, columns=['Token A', 'Token B', 'Correlation'])

corr_pairs_df = pairwise_corr(prices_df)
corr_pairs_df = corr_pairs_df.dropna()
corr_pairs_df = corr_pairs_df.reindex(corr_pairs_df.Correlation.abs().sort_values(ascending=False).index)

# Extract top N correlated pairs
TOP_N_PAIRS = 100
top_pairs_df = corr_pairs_df.head(TOP_N_PAIRS)
print(f"=== Top {TOP_N_PAIRS} Correlated Tokens ===")
print(top_pairs_df.head(20))


=== Top 100 Correlated Tokens ===
                   Token A          Token B  Correlation
393               ethereum             seth     0.997627
7862                  gala      the-sandbox     0.968389
3629              polkadot             icon     0.967378
8387           the-sandbox     decentraland     0.962386
8069                 tezos             celo     0.961867
21078                 orbs         metadium     0.960456
16379            holotoken         medibloc     0.960010
21344  mass-vehicle-ledger         metadium     0.959509
21037                 orbs           dkargo     0.959470
22105               dkargo    thunder-token     0.959362
13777                iotex        chromaway     0.959251
23253            moviebloc         metadium     0.959197
6536              fetch-ai   singularitynet     0.958558
7673                  iota           kusama     0.958419
22283      tokamak-network  origin-protocol     0.957627
18626        band-protocol         dia-data     0.9566

In [None]:
top_pairs_df.to_csv("top_correlated_tokens_filtered.csv", index=False)
print("‚úÖ Top correlated pairs saved")


‚úÖ Top correlated pairs saved


In [None]:
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller, coint, kpss
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from scipy.stats import spearmanr
import warnings
warnings.filterwarnings('ignore')

class PairsTradingAnalyzer:
    """
    Comprehensive pairs trading analysis including cointegration,
    mean reversion, and spread stability tests.
    """

    def __init__(self, prices_df, min_overlap=365):
        """
        Args:
            prices_df: DataFrame with dates as index and token prices as columns
            min_overlap: Minimum number of overlapping data points required
        """
        self.prices_df = prices_df
        self.min_overlap = min_overlap

    def calculate_hedge_ratio(self, y, x):
        """Calculate hedge ratio using OLS regression: y = alpha + beta*x"""
        X = add_constant(x)
        model = OLS(y, X).fit()
        return model.params[1], model  # beta, model

    def calculate_spread(self, y, x, beta):
        """Calculate spread: y - beta*x"""
        return y - beta * x

    def test_cointegration(self, y, x):
        """
        Engle-Granger cointegration test
        Returns: (is_cointegrated, p_value, critical_values)
        """
        try:
            score, pvalue, crit_values = coint(y, x)
            is_cointegrated = pvalue < 0.05
            return is_cointegrated, pvalue, crit_values[1]  # 5% critical value
        except:
            return False, np.nan, np.nan

    def test_stationarity_adf(self, series):
        """
        Augmented Dickey-Fuller test for stationarity
        Returns: (is_stationary, p_value, test_statistic)
        """
        try:
            result = adfuller(series.dropna(), autolag='AIC')
            is_stationary = result[1] < 0.05  # p-value < 0.05
            return is_stationary, result[1], result[0]
        except:
            return False, np.nan, np.nan

    def test_stationarity_kpss(self, series):
        """
        KPSS test for stationarity (null = stationary)
        Returns: (is_stationary, p_value, test_statistic)
        """
        try:
            result = kpss(series.dropna(), regression='c', nlags='auto')
            is_stationary = result[1] > 0.05  # p-value > 0.05 means stationary
            return is_stationary, result[1], result[0]
        except:
            return False, np.nan, np.nan

    def calculate_half_life(self, spread):
        """
        Calculate half-life of mean reversion using AR(1) model
        Returns: half_life in days
        """
        try:
            spread_clean = spread.dropna()
            if len(spread_clean) < 30:
                return np.nan

            # Calculate lag and diff
            spread_lag = spread_clean.shift(1)
            spread_diff = spread_clean.diff()

            # Drop NaN created by shift/diff
            df_temp = pd.DataFrame({'lag': spread_lag, 'diff': spread_diff}).dropna()

            if len(df_temp) < 30:
                return np.nan

            # Run regression
            X = add_constant(df_temp['lag'])
            model = OLS(df_temp['diff'], X).fit()
            lambda_param = model.params[1]

            # Check if mean reverting (lambda should be negative)
            if lambda_param >= 0 or lambda_param >= -0.0001:
                return np.nan

            half_life = -np.log(2) / lambda_param

            # Sanity check: half-life should be positive and reasonable
            if half_life <= 0 or half_life > 1000:
                return np.nan

            return half_life
        except Exception as e:
            return np.nan

    def calculate_hurst_exponent(self, series):
        """
        Calculate Hurst exponent
        H < 0.5: mean reverting
        H = 0.5: random walk
        H > 0.5: trending
        """
        try:
            ts = series.dropna().values
            if len(ts) < 100:
                return np.nan

            # Use first 500 points or all if less
            ts = ts[:min(500, len(ts))]

            lags = range(2, min(100, len(ts)//2))

            # Calculate the array of variances
            tau = []
            for lag in lags:
                # Calculate standard deviation of differences
                pp = np.subtract(ts[lag:], ts[:-lag])
                tau.append(np.std(pp))

            # Use log-log plot to estimate Hurst
            tau = np.array(tau)
            lags = np.array(list(lags))

            # Remove any zeros or invalid values
            valid = (tau > 0) & np.isfinite(tau)
            if np.sum(valid) < 10:
                return np.nan

            tau = tau[valid]
            lags = lags[valid]

            # Linear regression on log-log plot
            poly = np.polyfit(np.log(lags), np.log(tau), 1)
            hurst = poly[0]

            # Sanity check
            if hurst < 0 or hurst > 1:
                return np.nan

            return hurst
        except Exception as e:
            return np.nan

    def rolling_correlation(self, s1, s2, window=90):
        """Calculate rolling correlation"""
        return s1.rolling(window).corr(s2)

    def calculate_spread_volatility(self, spread, window=90):
        """Calculate rolling volatility of spread"""
        return spread.rolling(window).std()

    def analyze_pair(self, token_a, token_b, rolling_window=90):
        """
        Comprehensive analysis of a token pair
        Returns dict with all metrics
        """
        # Get overlapping data
        pair_data = self.prices_df[[token_a, token_b]].dropna()

        if len(pair_data) < self.min_overlap:
            return None

        y = pair_data[token_a]
        x = pair_data[token_b]

        results = {
            'token_a': token_a,
            'token_b': token_b,
            'n_observations': len(pair_data),
            'date_range': f"{pair_data.index[0].date()} to {pair_data.index[-1].date()}"
        }

        # 1. Correlation metrics
        results['pearson_corr'] = y.corr(x)
        results['spearman_corr'], _ = spearmanr(y, x)

        # Calculate rolling correlations
        roll_corr = self.rolling_correlation(y, x, rolling_window)
        results['rolling_corr_mean'] = roll_corr.mean()
        results['rolling_corr_std'] = roll_corr.std()

        # 2. Cointegration test
        is_coint, coint_pvalue, coint_crit = self.test_cointegration(y, x)
        results['is_cointegrated'] = is_coint
        results['coint_pvalue'] = coint_pvalue
        results['coint_critical_5pct'] = coint_crit

        # 3. Hedge ratio and spread
        beta, ols_model = self.calculate_hedge_ratio(y, x)
        results['hedge_ratio'] = beta
        results['ols_rsquared'] = ols_model.rsquared

        spread = self.calculate_spread(y, x, beta)

        # 4. Spread stationarity tests
        adf_stat, adf_pval, adf_test = self.test_stationarity_adf(spread)
        results['spread_stationary_adf'] = adf_stat
        results['adf_pvalue'] = adf_pval

        kpss_stat, kpss_pval, kpss_test = self.test_stationarity_kpss(spread)
        results['spread_stationary_kpss'] = kpss_stat
        results['kpss_pvalue'] = kpss_pval

        # 5. Mean reversion metrics
        results['half_life_days'] = self.calculate_half_life(spread)
        results['hurst_exponent'] = self.calculate_hurst_exponent(spread)

        # 6. Spread volatility
        spread_vol = self.calculate_spread_volatility(spread, rolling_window)
        results['spread_vol_mean'] = spread_vol.mean()
        results['spread_vol_std'] = spread_vol.std()
        results['spread_vol_cv'] = spread_vol.std() / spread_vol.mean() if spread_vol.mean() != 0 else np.nan

        # 7. Z-score metrics
        spread_zscore = (spread - spread.mean()) / spread.std()
        results['spread_zscore_mean'] = spread_zscore.mean()
        results['spread_zscore_std'] = spread_zscore.std()
        results['max_zscore'] = spread_zscore.abs().max()

        # 8. Trading opportunity metrics
        # Count how often spread crosses mean (more crossings = more opportunities)
        spread_centered = spread - spread.mean()
        sign_changes = np.diff(np.sign(spread_centered))
        results['mean_crossings'] = np.sum(sign_changes != 0)
        results['crossings_per_year'] = results['mean_crossings'] / (len(pair_data) / 365)

        return results

    def analyze_top_pairs(self, top_pairs_df, n_pairs=None):
        """
        Analyze all pairs in the top_pairs_df

        Args:
            top_pairs_df: DataFrame with columns ['Token A', 'Token B', 'Correlation']
            n_pairs: Number of pairs to analyze (None = all)
        """
        if n_pairs:
            pairs_to_analyze = top_pairs_df.head(n_pairs)
        else:
            pairs_to_analyze = top_pairs_df

        results = []
        total = len(pairs_to_analyze)

        for idx, row in pairs_to_analyze.iterrows():
            if idx % 10 == 0:
                print(f"Analyzing pair {idx+1}/{total}...")

            result = self.analyze_pair(row['Token A'], row['Token B'])
            if result:
                results.append(result)

        return pd.DataFrame(results)

    def rank_pairs(self, analysis_df):
        """
        Rank pairs by trading suitability

        Scoring criteria:
        - Cointegrated (required)
        - Stationary spread (required)
        - Low half-life (faster mean reversion)
        - Hurst < 0.5 (mean reverting)
        - Stable hedge ratio (low rolling corr std)
        - Reasonable volatility (not too high or low)
        - Frequent mean crossings (trading opportunities)
        """
        scored_df = analysis_df.copy()

        # Filter: must be cointegrated and stationary
        scored_df = scored_df[
            (scored_df['is_cointegrated'] == True) &
            (scored_df['spread_stationary_adf'] == True)
        ]

        if len(scored_df) == 0:
            print("‚ö†Ô∏è No pairs meet basic requirements (cointegration + stationarity)")
            return scored_df

        print(f"‚úÖ {len(scored_df)} pairs are cointegrated and stationary")

        # Check how many have valid half-life and Hurst
        valid_hl = scored_df['half_life_days'].notna().sum()
        valid_hurst = scored_df['hurst_exponent'].notna().sum()
        print(f"   - {valid_hl} pairs have valid half-life")
        print(f"   - {valid_hurst} pairs have valid Hurst exponent")

        # Scoring components (normalize to 0-100)
        scores = pd.DataFrame(index=scored_df.index)

        # 1. Half-life score (prefer 5-50 days, penalize too fast or too slow)
        hl = scored_df['half_life_days'].fillna(100).clip(1, 100)
        scores['halflife_score'] = 100 - np.abs(hl - 20) * 2  # Optimal around 20 days
        scores['halflife_score'] = scores['halflife_score'].clip(0, 100)
        # Zero out score if half-life is NaN
        scores.loc[scored_df['half_life_days'].isna(), 'halflife_score'] = 0

        # 2. Hurst exponent score (prefer < 0.5)
        hurst = scored_df['hurst_exponent'].fillna(0.5)
        scores['hurst_score'] = (0.5 - hurst) * 200  # Max score at H=0
        scores['hurst_score'] = scores['hurst_score'].clip(0, 100)
        # Zero out score if Hurst is NaN
        scores.loc[scored_df['hurst_exponent'].isna(), 'hurst_score'] = 0

        # 3. Correlation stability score (low std is better)
        corr_std = scored_df['rolling_corr_std'].fillna(1)
        scores['corr_stability_score'] = 100 * (1 - corr_std.clip(0, 1))

        # 4. R-squared score (how well hedge ratio fits)
        scores['rsquared_score'] = scored_df['ols_rsquared'].fillna(0) * 100

        # 5. Trading opportunity score (crossings per year)
        crossings = scored_df['crossings_per_year'].fillna(0).clip(0, 50)
        scores['opportunity_score'] = (crossings / 50) * 100

        # 6. Cointegration strength (lower p-value is better)
        coint_p = scored_df['coint_pvalue'].fillna(1)
        scores['coint_score'] = (1 - coint_p) * 100

        # Calculate composite score with adjusted weights for missing data
        # If half-life/Hurst are missing, rely more on other metrics
        has_mean_reversion_metrics = (
            scored_df['half_life_days'].notna() &
            scored_df['hurst_exponent'].notna()
        )

        # Standard weights
        weights_with_mr = {
            'halflife_score': 0.25,
            'hurst_score': 0.20,
            'corr_stability_score': 0.15,
            'rsquared_score': 0.15,
            'opportunity_score': 0.15,
            'coint_score': 0.10
        }

        # Adjusted weights when MR metrics missing
        weights_without_mr = {
            'halflife_score': 0.0,
            'hurst_score': 0.0,
            'corr_stability_score': 0.25,
            'rsquared_score': 0.25,
            'opportunity_score': 0.30,
            'coint_score': 0.20
        }

        # Calculate composite score
        scored_df['composite_score'] = 0
        for idx in scored_df.index:
            if has_mean_reversion_metrics[idx]:
                weights = weights_with_mr
            else:
                weights = weights_without_mr

            scored_df.loc[idx, 'composite_score'] = sum(
                scores.loc[idx, col] * weight for col, weight in weights.items()
            )

        # Add individual scores for transparency
        for col in scores.columns:
            scored_df[col] = scores[col]

        # Sort by composite score
        scored_df = scored_df.sort_values('composite_score', ascending=False)

        return scored_df


# ==== USAGE EXAMPLE ====

# Initialize analyzer
analyzer = PairsTradingAnalyzer(prices_df, min_overlap=365)

# Analyze all top correlated pairs
print("üîç Analyzing pairs for trading suitability...")
analysis_results = analyzer.analyze_top_pairs(top_pairs_df, n_pairs=100)

print(f"\n‚úÖ Analyzed {len(analysis_results)} pairs")

# Rank pairs by trading suitability
print("\nüìä Ranking pairs by trading quality...")
ranked_pairs = analyzer.rank_pairs(analysis_results)

print(f"\nüéØ Found {len(ranked_pairs)} suitable pairs for trading")

# Display top 20 ranked pairs
if len(ranked_pairs) > 0:
    display_cols = [
        'token_a', 'token_b', 'composite_score',
        'pearson_corr', 'is_cointegrated', 'half_life_days',
        'hurst_exponent', 'spread_stationary_adf',
        'crossings_per_year', 'ols_rsquared'
    ]

    print("\n" + "="*80)
    print("TOP 20 PAIRS FOR PAIRS TRADING")
    print("="*80)
    print(ranked_pairs[display_cols].head(20).to_string())

    # Save results
    ranked_pairs.to_csv('ranked_pairs_for_trading.csv', index=False)
    print("\n‚úÖ Full results saved to 'ranked_pairs_for_trading.csv'")

    # Summary statistics
    print("\n" + "="*80)
    print("SUMMARY STATISTICS")
    print("="*80)
    print(f"Mean half-life: {ranked_pairs['half_life_days'].mean():.1f} days")
    print(f"Mean Hurst exponent: {ranked_pairs['hurst_exponent'].mean():.3f}")
    print(f"Mean correlation: {ranked_pairs['pearson_corr'].mean():.3f}")
    print(f"Mean crossings/year: {ranked_pairs['crossings_per_year'].mean():.1f}")
else:
    print("\n‚ö†Ô∏è No pairs meet the requirements for pairs trading.")
    print("Consider:")
    print("- Relaxing minimum overlap period")
    print("- Looking at different token combinations")
    print("- Checking data quality")

üîç Analyzing pairs for trading suitability...
Analyzing pair 10441/100...
Analyzing pair 17091/100...
Analyzing pair 11581/100...
Analyzing pair 7851/100...
Analyzing pair 5901/100...
Analyzing pair 12581/100...
Analyzing pair 21071/100...
Analyzing pair 21981/100...
Analyzing pair 21011/100...
Analyzing pair 13081/100...
Analyzing pair 16811/100...
Analyzing pair 8471/100...
Analyzing pair 13131/100...

‚úÖ Analyzed 100 pairs

üìä Ranking pairs by trading quality...
‚úÖ 97 pairs are cointegrated and stationary
   - 97 pairs have valid half-life
   - 95 pairs have valid Hurst exponent

üéØ Found 97 suitable pairs for trading

TOP 20 PAIRS FOR PAIRS TRADING
            token_a            token_b  composite_score  pearson_corr  is_cointegrated  half_life_days  hurst_exponent  spread_stationary_adf  crossings_per_year  ols_rsquared
0          ethereum               seth        81.372616      0.997627             True        1.660258        0.059605                   True           29.

In [None]:
import pandas as pd
import numpy as np
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from itertools import product
import warnings
warnings.filterwarnings('ignore')

# ==============================================================================
# PAIRS TRADING STRATEGY CLASS
# ==============================================================================

class PairsTradingStrategy:
    """
    Comprehensive pairs trading strategy with multiple signal types
    """

    def __init__(self, prices_df, token_a, token_b, volumes_df=None):
        self.prices_df = prices_df
        self.token_a = token_a
        self.token_b = token_b
        self.volumes_df = volumes_df

    def prepare_data(self, lookback_period=60, recalc_hedge_ratio=False,
                     recalc_window=None):
        """
        Prepare spread and calculate indicators

        Args:
            lookback_period: Window for calculating rolling statistics
            recalc_hedge_ratio: If True, recalculate hedge ratio periodically
            recalc_window: How often to recalculate hedge ratio (in days)
        """
        pair_data = self.prices_df[[self.token_a, self.token_b]].dropna()

        if len(pair_data) < lookback_period * 2:
            return None

        y = pair_data[self.token_a]
        x = pair_data[self.token_b]

        # Calculate hedge ratio
        if recalc_hedge_ratio and recalc_window:
            # Rolling hedge ratio
            hedge_ratios = []
            for i in range(recalc_window, len(pair_data)):
                subset_y = y.iloc[i-recalc_window:i]
                subset_x = x.iloc[i-recalc_window:i]
                X_subset = add_constant(subset_x)
                model = OLS(subset_y, X_subset).fit()
                hedge_ratios.append(model.params[1])

            # Pad beginning with first calculated ratio
            hedge_ratios = [hedge_ratios[0]] * recalc_window + hedge_ratios
            beta_series = pd.Series(hedge_ratios, index=pair_data.index)
            spread = y - beta_series * x
        else:
            # Static hedge ratio using all data
            X = add_constant(x)
            model = OLS(y, X).fit()
            beta = model.params[1]
            spread = y - beta * x
            beta_series = pd.Series(beta, index=pair_data.index)

        # Calculate indicators
        df = pd.DataFrame({
            'price_a': y,
            'price_b': x,
            'hedge_ratio': beta_series,
            'spread': spread
        })

        # Rolling statistics
        df['spread_mean'] = spread.rolling(lookback_period).mean()
        df['spread_std'] = spread.rolling(lookback_period).std()
        df['zscore'] = (spread - df['spread_mean']) / df['spread_std']

        # Bollinger Bands on spread
        df['bb_upper'] = df['spread_mean'] + 2 * df['spread_std']
        df['bb_lower'] = df['spread_mean'] - 2 * df['spread_std']

        # Moving average crossovers on spread
        df['spread_sma_fast'] = spread.rolling(10).mean()
        df['spread_sma_slow'] = spread.rolling(30).mean()

        # Spread momentum
        df['spread_momentum'] = spread.diff(5)

        # Percentile rank
        df['spread_percentile'] = spread.rolling(lookback_period).apply(
            lambda x: (x[-1] - x.min()) / (x.max() - x.min()) if (x.max() - x.min()) > 0 else 0.5
        )

        # Add volume data if available
        if self.volumes_df is not None:
            vol_data = self.volumes_df[[self.token_a, self.token_b]].reindex(df.index)
            df['volume_a'] = vol_data[self.token_a]
            df['volume_b'] = vol_data[self.token_b]
            df['volume_ratio'] = df['volume_a'] / df['volume_b']

        return df.dropna()


class StrategyBacktester:
    """
    Backtest different pairs trading strategies
    """

    def __init__(self, strategy_data, initial_capital=10000):
        self.data = strategy_data
        self.initial_capital = initial_capital

    def zscore_strategy(self, entry_z=2.0, exit_z=0.5, stop_loss_z=4.0):
        """Classic z-score mean reversion strategy"""
        return self._execute_trades(
            entry_long=lambda df, i: df['zscore'].iloc[i] < -entry_z,
            entry_short=lambda df, i: df['zscore'].iloc[i] > entry_z,
            exit_long=lambda df, i: df['zscore'].iloc[i] > -exit_z,
            exit_short=lambda df, i: df['zscore'].iloc[i] < exit_z,
            stop_loss_long=lambda df, i: df['zscore'].iloc[i] < -stop_loss_z,
            stop_loss_short=lambda df, i: df['zscore'].iloc[i] > stop_loss_z,
            strategy_name='Z-Score'
        )

    def bollinger_strategy(self):
        """Bollinger Bands strategy"""
        return self._execute_trades(
            entry_long=lambda df, i: df['spread'].iloc[i] < df['bb_lower'].iloc[i],
            entry_short=lambda df, i: df['spread'].iloc[i] > df['bb_upper'].iloc[i],
            exit_long=lambda df, i: df['spread'].iloc[i] > df['spread_mean'].iloc[i],
            exit_short=lambda df, i: df['spread'].iloc[i] < df['spread_mean'].iloc[i],
            stop_loss_long=lambda df, i: df['spread'].iloc[i] < df['bb_lower'].iloc[i] - 2*df['spread_std'].iloc[i],
            stop_loss_short=lambda df, i: df['spread'].iloc[i] > df['bb_upper'].iloc[i] + 2*df['spread_std'].iloc[i],
            strategy_name='Bollinger'
        )

    def percentile_strategy(self, entry_pct=0.1, exit_pct=0.5):
        """Percentile-based strategy"""
        return self._execute_trades(
            entry_long=lambda df, i: df['spread_percentile'].iloc[i] < entry_pct,
            entry_short=lambda df, i: df['spread_percentile'].iloc[i] > (1 - entry_pct),
            exit_long=lambda df, i: df['spread_percentile'].iloc[i] > exit_pct,
            exit_short=lambda df, i: df['spread_percentile'].iloc[i] < (1 - exit_pct),
            stop_loss_long=lambda df, i: df['spread_percentile'].iloc[i] < 0.01,
            stop_loss_short=lambda df, i: df['spread_percentile'].iloc[i] > 0.99,
            strategy_name='Percentile'
        )

    def ma_crossover_strategy(self):
        """Moving average crossover on spread"""
        return self._execute_trades(
            entry_long=lambda df, i: (df['spread_sma_fast'].iloc[i] < df['spread_sma_slow'].iloc[i] and
                                      df['spread_sma_fast'].iloc[i-1] >= df['spread_sma_slow'].iloc[i-1]),
            entry_short=lambda df, i: (df['spread_sma_fast'].iloc[i] > df['spread_sma_slow'].iloc[i] and
                                       df['spread_sma_fast'].iloc[i-1] <= df['spread_sma_slow'].iloc[i-1]),
            exit_long=lambda df, i: df['spread_sma_fast'].iloc[i] > df['spread_mean'].iloc[i],
            exit_short=lambda df, i: df['spread_sma_fast'].iloc[i] < df['spread_mean'].iloc[i],
            stop_loss_long=lambda df, i: df['zscore'].iloc[i] < -4,
            stop_loss_short=lambda df, i: df['zscore'].iloc[i] > 4,
            strategy_name='MA-Crossover'
        )

    def hybrid_strategy(self, entry_z=2.0, exit_z=0.5, stop_loss_z=4.0,
                        use_momentum=True):
        """
        Hybrid strategy combining z-score with momentum filter
        Only enter when momentum supports mean reversion
        """
        def entry_long_cond(df, i):
            zscore_signal = df['zscore'].iloc[i] < -entry_z
            if use_momentum:
                # Only enter if momentum is turning up (spread stopped falling)
                momentum_ok = df['spread_momentum'].iloc[i] > 0
                return zscore_signal and momentum_ok
            return zscore_signal

        def entry_short_cond(df, i):
            zscore_signal = df['zscore'].iloc[i] > entry_z
            if use_momentum:
                # Only enter if momentum is turning down (spread stopped rising)
                momentum_ok = df['spread_momentum'].iloc[i] < 0
                return zscore_signal and momentum_ok
            return zscore_signal

        return self._execute_trades(
            entry_long=entry_long_cond,
            entry_short=entry_short_cond,
            exit_long=lambda df, i: df['zscore'].iloc[i] > -exit_z,
            exit_short=lambda df, i: df['zscore'].iloc[i] < exit_z,
            stop_loss_long=lambda df, i: df['zscore'].iloc[i] < -stop_loss_z,
            stop_loss_short=lambda df, i: df['zscore'].iloc[i] > stop_loss_z,
            strategy_name='Hybrid-Momentum'
        )

    def _execute_trades(self, entry_long, entry_short, exit_long, exit_short,
                       stop_loss_long, stop_loss_short, strategy_name=''):
        """
        Execute trades based on entry/exit conditions
        """
        df = self.data
        position = 0  # 1 = long spread, -1 = short spread
        entry_spread = 0
        entry_date = None
        trades = []

        capital = self.initial_capital
        equity_curve = [capital]

        for i in range(1, len(df)):
            date = df.index[i]
            spread = df['spread'].iloc[i]

            # Entry signals
            if position == 0:
                if entry_long(df, i):
                    position = 1
                    entry_spread = spread
                    entry_date = date

                elif entry_short(df, i):
                    position = -1
                    entry_spread = spread
                    entry_date = date

            # Exit signals
            elif position == 1:  # Long position
                exit_trade = False
                exit_reason = ""

                if exit_long(df, i):
                    exit_trade = True
                    exit_reason = "take_profit"
                elif stop_loss_long(df, i):
                    exit_trade = True
                    exit_reason = "stop_loss"

                if exit_trade:
                    pnl = spread - entry_spread
                    pnl_pct = (pnl / abs(entry_spread)) * 100
                    capital += capital * (pnl_pct / 100)

                    trades.append({
                        'entry_date': entry_date,
                        'exit_date': date,
                        'position': 'long',
                        'entry_spread': entry_spread,
                        'exit_spread': spread,
                        'pnl': pnl,
                        'pnl_pct': pnl_pct,
                        'exit_reason': exit_reason,
                        'days_held': (date - entry_date).days
                    })
                    position = 0

            elif position == -1:  # Short position
                exit_trade = False
                exit_reason = ""

                if exit_short(df, i):
                    exit_trade = True
                    exit_reason = "take_profit"
                elif stop_loss_short(df, i):
                    exit_trade = True
                    exit_reason = "stop_loss"

                if exit_trade:
                    pnl = entry_spread - spread
                    pnl_pct = (pnl / abs(entry_spread)) * 100
                    capital += capital * (pnl_pct / 100)

                    trades.append({
                        'entry_date': entry_date,
                        'exit_date': date,
                        'position': 'short',
                        'entry_spread': entry_spread,
                        'exit_spread': spread,
                        'pnl': pnl,
                        'pnl_pct': pnl_pct,
                        'exit_reason': exit_reason,
                        'days_held': (date - entry_date).days
                    })
                    position = 0

            equity_curve.append(capital)

        trades_df = pd.DataFrame(trades)

        # Calculate performance metrics
        if len(trades_df) > 0:
            stats = self._calculate_statistics(trades_df, equity_curve, strategy_name)
        else:
            stats = {'strategy': strategy_name, 'total_trades': 0}

        return trades_df, stats, equity_curve

    def _calculate_statistics(self, trades_df, equity_curve, strategy_name):
        """Calculate comprehensive performance statistics"""

        winning_trades = trades_df[trades_df['pnl'] > 0]
        losing_trades = trades_df[trades_df['pnl'] < 0]

        total_return = (equity_curve[-1] - self.initial_capital) / self.initial_capital * 100

        # Calculate maximum drawdown
        equity_series = pd.Series(equity_curve)
        running_max = equity_series.expanding().max()
        drawdown = (equity_series - running_max) / running_max * 100
        max_drawdown = drawdown.min()

        # Calculate Sharpe ratio (annualized)
        returns = trades_df['pnl_pct'].values
        if len(returns) > 0 and returns.std() > 0:
            avg_trades_per_year = len(trades_df) / ((trades_df['exit_date'].max() -
                                                      trades_df['entry_date'].min()).days / 365)
            sharpe = (returns.mean() / returns.std()) * np.sqrt(avg_trades_per_year)
        else:
            sharpe = 0

        # Profit factor
        gross_profit = winning_trades['pnl_pct'].sum() if len(winning_trades) > 0 else 0
        gross_loss = abs(losing_trades['pnl_pct'].sum()) if len(losing_trades) > 0 else 0
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf

        stats = {
            'strategy': strategy_name,
            'total_trades': len(trades_df),
            'winning_trades': len(winning_trades),
            'losing_trades': len(losing_trades),
            'win_rate': len(winning_trades) / len(trades_df) * 100 if len(trades_df) > 0 else 0,
            'total_return': total_return,
            'avg_pnl_pct': trades_df['pnl_pct'].mean(),
            'median_pnl_pct': trades_df['pnl_pct'].median(),
            'avg_win_pct': winning_trades['pnl_pct'].mean() if len(winning_trades) > 0 else 0,
            'avg_loss_pct': losing_trades['pnl_pct'].mean() if len(losing_trades) > 0 else 0,
            'max_win_pct': trades_df['pnl_pct'].max(),
            'max_loss_pct': trades_df['pnl_pct'].min(),
            'avg_days_held': trades_df['days_held'].mean(),
            'max_drawdown': max_drawdown,
            'sharpe_ratio': sharpe,
            'profit_factor': profit_factor,
            'expectancy': trades_df['pnl_pct'].mean(),
            'take_profit_pct': (trades_df['exit_reason'] == 'take_profit').sum() / len(trades_df) * 100,
            'stop_loss_pct': (trades_df['exit_reason'] == 'stop_loss').sum() / len(trades_df) * 100
        }

        return stats


# ==============================================================================
# STRATEGY OPTIMIZER
# ==============================================================================

class StrategyOptimizer:
    """
    Optimize strategy parameters using grid search
    """

    def __init__(self, strategy_data):
        self.data = strategy_data

    def optimize_zscore_strategy(self, initial_capital=10000):
        """
        Optimize z-score strategy parameters
        """
        # Parameter grid
        entry_z_values = [1.5, 2.0, 2.5, 3.0]
        exit_z_values = [0.0, 0.25, 0.5, 0.75, 1.0]
        stop_loss_z_values = [3.5, 4.0, 4.5, 5.0]

        results = []

        print("üîç Optimizing Z-Score Strategy...")
        total_combinations = len(entry_z_values) * len(exit_z_values) * len(stop_loss_z_values)
        count = 0

        for entry_z, exit_z, stop_z in product(entry_z_values, exit_z_values, stop_loss_z_values):
            if exit_z >= entry_z:  # Skip invalid combinations
                continue

            count += 1
            if count % 10 == 0:
                print(f"  Testing combination {count}/{total_combinations}...")

            backtester = StrategyBacktester(self.data, initial_capital)
            trades_df, stats, _ = backtester.zscore_strategy(entry_z, exit_z, stop_z)

            if stats['total_trades'] > 0:
                stats['entry_z'] = entry_z
                stats['exit_z'] = exit_z
                stats['stop_loss_z'] = stop_z
                results.append(stats)

        results_df = pd.DataFrame(results)

        if len(results_df) > 0:
            # Sort by Sharpe ratio (or another metric)
            results_df = results_df.sort_values('sharpe_ratio', ascending=False)

        return results_df

    def optimize_percentile_strategy(self, initial_capital=10000):
        """
        Optimize percentile strategy parameters
        """
        entry_pct_values = [0.05, 0.10, 0.15, 0.20]
        exit_pct_values = [0.40, 0.45, 0.50, 0.55, 0.60]

        results = []

        print("üîç Optimizing Percentile Strategy...")

        for entry_pct, exit_pct in product(entry_pct_values, exit_pct_values):
            if exit_pct <= entry_pct or exit_pct >= (1 - entry_pct):
                continue

            backtester = StrategyBacktester(self.data, initial_capital)
            trades_df, stats, _ = backtester.percentile_strategy(entry_pct, exit_pct)

            if stats['total_trades'] > 0:
                stats['entry_pct'] = entry_pct
                stats['exit_pct'] = exit_pct
                results.append(stats)

        results_df = pd.DataFrame(results)

        if len(results_df) > 0:
            results_df = results_df.sort_values('sharpe_ratio', ascending=False)

        return results_df


# ==============================================================================
# COMPREHENSIVE STRATEGY COMPARISON
# ==============================================================================

def compare_all_strategies(prices_df, token_a, token_b, initial_capital=10000):
    """
    Test all strategies on a pair and compare results
    """
    print(f"\n{'='*80}")
    print(f"STRATEGY COMPARISON: {token_a} / {token_b}")
    print(f"{'='*80}\n")

    # Prepare data
    strategy = PairsTradingStrategy(prices_df, token_a, token_b)
    data = strategy.prepare_data(lookback_period=60)

    if data is None or len(data) < 100:
        print("‚ùå Insufficient data for backtesting")
        return None

    print(f"üìä Data period: {data.index[0].date()} to {data.index[-1].date()}")
    print(f"üìä Total days: {len(data)}\n")

    backtester = StrategyBacktester(data, initial_capital)

    # Test all strategies
    strategies_results = []

    # 1. Z-Score Strategy (multiple parameter sets)
    print("Testing Z-Score strategies...")
    for entry_z, exit_z, stop_z in [(2.0, 0.5, 4.0), (2.5, 0.5, 4.5), (1.5, 0.25, 3.5)]:
        _, stats, _ = backtester.zscore_strategy(entry_z, exit_z, stop_z)
        stats['params'] = f"entry={entry_z}, exit={exit_z}, stop={stop_z}"
        strategies_results.append(stats)

    # 2. Bollinger Strategy
    print("Testing Bollinger strategy...")
    _, stats, _ = backtester.bollinger_strategy()
    stats['params'] = "2 std bands"
    strategies_results.append(stats)

    # 3. Percentile Strategy
    print("Testing Percentile strategies...")
    for entry_pct, exit_pct in [(0.1, 0.5), (0.15, 0.5), (0.05, 0.45)]:
        _, stats, _ = backtester.percentile_strategy(entry_pct, exit_pct)
        stats['params'] = f"entry={entry_pct}, exit={exit_pct}"
        strategies_results.append(stats)

    # 4. MA Crossover
    print("Testing MA Crossover strategy...")
    _, stats, _ = backtester.ma_crossover_strategy()
    stats['params'] = "10/30 SMA"
    strategies_results.append(stats)

    # 5. Hybrid Strategy
    print("Testing Hybrid strategies...")
    for use_momentum in [True, False]:
        _, stats, _ = backtester.hybrid_strategy(use_momentum=use_momentum)
        stats['params'] = f"momentum_filter={use_momentum}"
        strategies_results.append(stats)

    # Create comparison DataFrame
    comparison_df = pd.DataFrame(strategies_results)

    # Filter out strategies with no trades
    comparison_df = comparison_df[comparison_df['total_trades'] > 0]

    if len(comparison_df) == 0:
        print("‚ùå No strategies generated trades")
        return None

    # Sort by Sharpe ratio
    comparison_df = comparison_df.sort_values('sharpe_ratio', ascending=False)

    # Display results
    print("\n" + "="*80)
    print("RESULTS SUMMARY")
    print("="*80)

    display_cols = ['strategy', 'params', 'total_trades', 'win_rate', 'total_return',
                    'sharpe_ratio', 'max_drawdown', 'profit_factor', 'avg_days_held']

    print(comparison_df[display_cols].to_string(index=False))

    # Highlight best strategy
    best_strategy = comparison_df.iloc[0]
    print(f"\nüèÜ BEST STRATEGY: {best_strategy['strategy']} ({best_strategy['params']})")
    print(f"   Sharpe Ratio: {best_strategy['sharpe_ratio']:.2f}")
    print(f"   Total Return: {best_strategy['total_return']:.2f}%")
    print(f"   Win Rate: {best_strategy['win_rate']:.1f}%")
    print(f"   Max Drawdown: {best_strategy['max_drawdown']:.2f}%")
    print(f"   Profit Factor: {best_strategy['profit_factor']:.2f}")

    return comparison_df


# ==============================================================================
# USAGE EXAMPLE
# ==============================================================================

# Load your data
# prices_df should be your DataFrame with token prices
# ranked_pairs should be your ranked pairs DataFrame

# Example: Test top pairs
ranked_pairs = pd.read_csv('ranked_pairs_for_trading.csv')
top_pairs = ranked_pairs.head(10)

print("\n" + "="*80)
print("BACKTESTING TOP 10 PAIRS WITH ALL STRATEGIES")
print("="*80)

all_results = []

for idx, row in top_pairs.iterrows():
    token_a = row['token_a']
    token_b = row['token_b']

    # Run comprehensive comparison
    results = compare_all_strategies(prices_df, token_a, token_b, initial_capital=10000)

    if results is not None:
        results['token_a'] = token_a
        results['token_b'] = token_b
        all_results.append(results)

# Combine all results
if all_results:
    combined_results = pd.concat(all_results, ignore_index=True)
    combined_results = combined_results.sort_values('sharpe_ratio', ascending=False)

    print("\n" + "="*80)
    print("TOP 20 PAIR-STRATEGY COMBINATIONS")
    print("="*80)

    display_cols = ['token_a', 'token_b', 'strategy', 'total_return', 'sharpe_ratio',
                    'win_rate', 'max_drawdown', 'total_trades']
    print(combined_results[display_cols].head(20).to_string(index=False))

    # Save results
    combined_results.to_csv('strategy_backtest_results.csv', index=False)
    print("\n‚úÖ Full results saved to 'strategy_backtest_results.csv'")
else:
    print("\n‚ùå No successful backtests completed")

# ==============================================================================
# OPTIMIZE SPECIFIC PAIR
# ==============================================================================

# Example: Optimize the best pair
print("\n" + "="*80)
print("OPTIMIZING PARAMETERS FOR BEST PAIR")
print("="*80)

best_pair = ranked_pairs.iloc[0]
token_a = best_pair['token_a']
token_b = best_pair['token_b']

print(f"\nOptimizing: {token_a} / {token_b}")

strategy = PairsTradingStrategy(prices_df, token_a, token_b)
data = strategy.prepare_data(lookback_period=60)

if data is not None:
    optimizer = StrategyOptimizer(data)

    # Optimize Z-Score strategy
    zscore_results = optimizer.optimize_zscore_strategy(initial_capital=10000)

    print("\nüìà TOP 10 Z-SCORE PARAMETER COMBINATIONS:")
    print(zscore_results[['entry_z', 'exit_z', 'stop_loss_z', 'total_return',
                          'sharpe_ratio', 'win_rate', 'total_trades']].head(10).to_string(index=False))

    # Optimize Percentile strategy
    percentile_results = optimizer.optimize_percentile_strategy(initial_capital=10000)

    print("\nüìà TOP 10 PERCENTILE PARAMETER COMBINATIONS:")
    print(percentile_results[['entry_pct', 'exit_pct', 'total_return',
                              'sharpe_ratio', 'win_rate', 'total_trades']].head(10).to_string(index=False))


BACKTESTING TOP 10 PAIRS WITH ALL STRATEGIES

STRATEGY COMPARISON: ethereum / seth

üìä Data period: 2019-09-01 to 2025-10-08
üìä Total days: 2230

Testing Z-Score strategies...
Testing Bollinger strategy...
Testing Percentile strategies...
Testing MA Crossover strategy...
Testing Hybrid strategies...

RESULTS SUMMARY
       strategy                         params  total_trades  win_rate  total_return  sharpe_ratio  max_drawdown  profit_factor  avg_days_held
        Z-Score  entry=2.5, exit=0.5, stop=4.5            70 87.142857 -1.603722e+18      2.056963 -8.777750e+03       4.578969       3.414286
     Percentile           entry=0.15, exit=0.5           166 46.987952  2.413962e+26      0.855029 -4.304308e+12       3.027184       6.469880
     Percentile          entry=0.05, exit=0.45           125 48.000000  6.694790e+09      0.769405 -5.197765e+03       2.414945       4.984000
Hybrid-Momentum           momentum_filter=True             9 77.777778 -2.277800e+02      0.596176 -1.549

In [None]:
import pandas as pd
import numpy as np
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from statsmodels.tsa.stattools import coint
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

"""
WALK-FORWARD PAIRS TRADING BACKTEST
Following the methodology from "The Modern 70/30" paper
"""

class WalkForwardPairsTrading:
    """
    Walk-forward backtesting framework for pairs trading strategies
    Avoids look-ahead bias by training on past data and testing on future data
    """

    def __init__(self, prices_df, volumes_df=None):
        self.prices_df = prices_df
        self.volumes_df = volumes_df

    def split_train_test(self, train_years=3, test_years=1, step_years=1):
        """
        Create rolling train/test windows

        Args:
            train_years: Years of data for training (parameter optimization)
            test_years: Years of data for testing (out-of-sample)
            step_years: How much to roll forward between folds

        Returns:
            List of (train_start, train_end, test_start, test_end) tuples
        """
        dates = self.prices_df.index
        start_date = dates[0]
        end_date = dates[-1]

        folds = []
        current_train_start = start_date

        while True:
            train_end = current_train_start + timedelta(days=365*train_years)
            test_start = train_end
            test_end = test_start + timedelta(days=365*test_years)

            if test_end > end_date:
                break

            folds.append({
                'train_start': current_train_start,
                'train_end': train_end,
                'test_start': test_start,
                'test_end': test_end,
                'fold_id': len(folds) + 1
            })

            current_train_start += timedelta(days=365*step_years)

        return folds

    def train_pair(self, token_a, token_b, train_data):
        """
        Train on historical data to find optimal parameters

        Returns:
            dict with trained parameters
        """
        pair_data = train_data[[token_a, token_b]].dropna()

        if len(pair_data) < 365:
            return None

        y = pair_data[token_a]
        x = pair_data[token_b]

        # Calculate hedge ratio
        X = add_constant(x)
        model = OLS(y, X).fit()
        beta = model.params[1]

        # Calculate spread
        spread = y - beta * x

        # Test cointegration
        try:
            _, pvalue, _ = coint(y, x)
            is_cointegrated = pvalue < 0.05
        except:
            is_cointegrated = False

        if not is_cointegrated:
            return None

        # Optimize parameters on training data
        best_params = self._optimize_parameters(pair_data, token_a, token_b, beta)

        if best_params is None:
            return None

        return {
            'hedge_ratio': beta,
            'rsquared': model.rsquared,
            **best_params
        }

    def _optimize_parameters(self, pair_data, token_a, token_b, beta, lookback=60):
        """
        Grid search for best parameters on training data
        """
        y = pair_data[token_a]
        x = pair_data[token_b]
        spread = y - beta * x

        # Calculate indicators
        spread_mean = spread.rolling(lookback).mean()
        spread_std = spread.rolling(lookback).std()
        zscore = (spread - spread_mean) / spread_std
        spread_momentum = spread.diff(5)

        data = pd.DataFrame({
            'spread': spread,
            'zscore': zscore,
            'momentum': spread_momentum
        }).dropna()

        # Parameter grid
        entry_z_values = [1.5, 2.0, 2.5]
        exit_z_values = [0.25, 0.5, 0.75]
        stop_z_values = [3.5, 4.0, 4.5]

        best_sharpe = -np.inf
        best_params = None

        for entry_z in entry_z_values:
            for exit_z in exit_z_values:
                for stop_z in stop_z_values:
                    if exit_z >= entry_z:
                        continue

                    # Backtest with these parameters
                    trades = self._simple_backtest(data, entry_z, exit_z, stop_z)

                    if len(trades) >= 5:  # Minimum trades required
                        sharpe = self._calculate_sharpe(trades)

                        if sharpe > best_sharpe:
                            best_sharpe = sharpe
                            best_params = {
                                'entry_z': entry_z,
                                'exit_z': exit_z,
                                'stop_z': stop_z,
                                'train_sharpe': sharpe,
                                'train_trades': len(trades)
                            }

        return best_params

    def _simple_backtest(self, data, entry_z, exit_z, stop_z):
        """Quick backtest for parameter optimization"""
        position = 0
        entry_spread = 0
        trades = []

        for i in range(1, len(data)):
            zscore = data['zscore'].iloc[i]
            spread = data['spread'].iloc[i]
            momentum = data['momentum'].iloc[i]

            if position == 0:
                # Entry logic with momentum filter
                if zscore < -entry_z and momentum > 0:
                    position = 1
                    entry_spread = spread
                elif zscore > entry_z and momentum < 0:
                    position = -1
                    entry_spread = spread

            elif position != 0:
                # Exit logic
                exit_trade = False

                if position == 1:
                    if zscore > -exit_z or zscore < -stop_z:
                        exit_trade = True
                else:
                    if zscore < exit_z or zscore > stop_z:
                        exit_trade = True

                if exit_trade:
                    pnl = position * (spread - entry_spread)
                    pnl_pct = (pnl / abs(entry_spread)) * 100 if abs(entry_spread) > 1e-10 else 0
                    pnl_pct = np.clip(pnl_pct, -100, 200)

                    trades.append(pnl_pct)
                    position = 0

        return trades

    def _calculate_sharpe(self, trades):
        """Calculate Sharpe ratio from trade returns"""
        if len(trades) < 2:
            return 0

        returns = np.array(trades)
        if returns.std() == 0:
            return 0

        # Annualize assuming average trade takes 14 days
        trades_per_year = 365 / 14
        sharpe = (returns.mean() / returns.std()) * np.sqrt(trades_per_year)
        return sharpe

    def test_pair(self, token_a, token_b, test_data, trained_params):
        """
        Test pair on out-of-sample data using trained parameters
        """
        pair_data = test_data[[token_a, token_b]].dropna()

        if len(pair_data) < 60:
            return None

        y = pair_data[token_a]
        x = pair_data[token_b]

        beta = trained_params['hedge_ratio']
        spread = y - beta * x

        # Calculate indicators
        lookback = 60
        spread_mean = spread.rolling(lookback).mean()
        spread_std = spread.rolling(lookback).std()
        zscore = (spread - spread_mean) / spread_std
        spread_momentum = spread.diff(5)

        data = pd.DataFrame({
            'date': pair_data.index,
            'price_a': y,
            'price_b': x,
            'spread': spread,
            'zscore': zscore,
            'momentum': spread_momentum
        }).dropna()

        # Execute strategy with trained parameters
        trades = self._execute_strategy(data, trained_params)

        if len(trades) == 0:
            return None

        # Calculate performance metrics
        metrics = self._calculate_metrics(trades, data)

        return metrics

    def _execute_strategy(self, data, params):
        """Execute trading strategy and track equity curve"""
        position = 0
        entry_spread = 0
        entry_date = None
        trades = []
        equity = [10000]  # Start with $10k

        for i in range(1, len(data)):
            zscore = data['zscore'].iloc[i]
            spread = data['spread'].iloc[i]
            momentum = data['momentum'].iloc[i]
            date = data['date'].iloc[i]

            if position == 0:
                # Entry with momentum filter (Hybrid-Momentum strategy)
                if zscore < -params['entry_z'] and momentum > 0:
                    position = 1
                    entry_spread = spread
                    entry_date = date
                elif zscore > params['entry_z'] and momentum < 0:
                    position = -1
                    entry_spread = spread
                    entry_date = date

            elif position != 0:
                # Exit logic
                exit_trade = False
                exit_reason = None

                if position == 1:
                    if zscore > -params['exit_z']:
                        exit_trade = True
                        exit_reason = 'take_profit'
                    elif zscore < -params['stop_z']:
                        exit_trade = True
                        exit_reason = 'stop_loss'
                else:
                    if zscore < params['exit_z']:
                        exit_trade = True
                        exit_reason = 'take_profit'
                    elif zscore > params['stop_z']:
                        exit_trade = True
                        exit_reason = 'stop_loss'

                if exit_trade:
                    pnl = position * (spread - entry_spread)
                    pnl_pct = (pnl / abs(entry_spread)) * 100 if abs(entry_spread) > 1e-10 else 0
                    pnl_pct = np.clip(pnl_pct, -100, 200)

                    # Update equity
                    equity.append(equity[-1] * (1 + pnl_pct/100))

                    trades.append({
                        'entry_date': entry_date,
                        'exit_date': date,
                        'position': 'long' if position == 1 else 'short',
                        'pnl_pct': pnl_pct,
                        'exit_reason': exit_reason,
                        'days_held': (date - entry_date).days
                    })
                    position = 0
                else:
                    equity.append(equity[-1])
            else:
                equity.append(equity[-1])

        return {'trades': trades, 'equity': equity}

    def _calculate_metrics(self, result, data):
        """Calculate comprehensive performance metrics"""
        trades = result['trades']
        equity = result['equity']

        if len(trades) == 0:
            return None

        trades_df = pd.DataFrame(trades)
        returns = trades_df['pnl_pct'].values / 100

        # Basic stats
        total_trades = len(trades)
        winning_trades = (returns > 0).sum()
        losing_trades = (returns < 0).sum()
        win_rate = winning_trades / total_trades if total_trades > 0 else 0

        # Returns
        total_return = (equity[-1] - equity[0]) / equity[0]
        days_in_test = (data['date'].iloc[-1] - data['date'].iloc[0]).days
        years = days_in_test / 365
        cagr = (1 + total_return) ** (1/years) - 1 if years > 0 else 0

        # Drawdown
        equity_series = pd.Series(equity)
        running_max = equity_series.expanding().max()
        drawdown = (equity_series - running_max) / running_max
        max_drawdown = drawdown.min()

        # Volatility (annualized)
        daily_returns = equity_series.pct_change().dropna()
        ann_vol = daily_returns.std() * np.sqrt(252)

        # Win rate (daily)
        win_rate_days = (daily_returns > 0).sum() / len(daily_returns) if len(daily_returns) > 0 else 0

        # Sharpe & Sortino
        if ann_vol > 0 and years > 0:
            sharpe = (cagr) / ann_vol
        else:
            sharpe = 0

        downside_returns = daily_returns[daily_returns < 0]
        downside_vol = downside_returns.std() * np.sqrt(252) if len(downside_returns) > 0 else 0.0001
        sortino = (cagr) / downside_vol if downside_vol > 0 else 0

        # Profit factor
        gross_profit = returns[returns > 0].sum() if (returns > 0).any() else 0
        gross_loss = abs(returns[returns < 0].sum()) if (returns < 0).any() else 0.0001
        profit_factor = gross_profit / gross_loss

        return {
            'total_trades': total_trades,
            'win_rate': win_rate * 100,
            'total_return': total_return * 100,
            'cagr': cagr * 100,
            'max_drawdown': max_drawdown * 100,
            'ann_vol': ann_vol * 100,
            'win_rate_days': win_rate_days * 100,
            'sharpe': sharpe,
            'sortino': sortino,
            'profit_factor': profit_factor,
            'avg_days_held': trades_df['days_held'].mean(),
            'equity_curve': equity
        }

    def run_walk_forward_test(self, pair_config, train_years=3, test_years=1):
        """
        Run complete walk-forward test for a pair

        Args:
            pair_config: dict with 'token_a' and 'token_b'
            train_years: Years for training
            test_years: Years for testing
        """
        token_a = pair_config['token_a']
        token_b = pair_config['token_b']

        print(f"\n{'='*80}")
        print(f"Walk-Forward Test: {token_a} / {token_b}")
        print(f"{'='*80}")

        # Create folds
        folds = self.split_train_test(train_years, test_years, test_years)

        print(f"Total folds: {len(folds)}")
        print(f"Training period: {train_years} years")
        print(f"Testing period: {test_years} years\n")

        results = []

        for fold in folds:
            print(f"Fold {fold['fold_id']}: Train {fold['train_start'].date()} to {fold['train_end'].date()}, "
                  f"Test {fold['test_start'].date()} to {fold['test_end'].date()}")

            # Get train/test data
            train_data = self.prices_df[fold['train_start']:fold['train_end']]
            test_data = self.prices_df[fold['test_start']:fold['test_end']]

            # Train
            trained_params = self.train_pair(token_a, token_b, train_data)

            if trained_params is None:
                print(f"  ‚ùå Not cointegrated in training period")
                continue

            print(f"  ‚úì Trained params: entry_z={trained_params['entry_z']}, "
                  f"exit_z={trained_params['exit_z']}, stop_z={trained_params['stop_z']}")
            print(f"  ‚úì Train Sharpe: {trained_params['train_sharpe']:.2f}")

            # Test
            test_metrics = self.test_pair(token_a, token_b, test_data, trained_params)

            if test_metrics is None:
                print(f"  ‚ùå No trades in test period")
                continue

            print(f"  ‚úì Test trades: {test_metrics['total_trades']}, "
                  f"Return: {test_metrics['total_return']:.1f}%, "
                  f"Sharpe: {test_metrics['sharpe']:.2f}")

            results.append({
                'fold_id': fold['fold_id'],
                'test_start': fold['test_start'],
                'test_end': fold['test_end'],
                **trained_params,
                **test_metrics
            })

        return results


def aggregate_results(results_list):
    """
    Aggregate results across folds (like the paper's table format)
    """
    if len(results_list) == 0:
        return None

    df = pd.DataFrame(results_list)

    # Calculate weighted averages
    total_days = sum((r['test_end'] - r['test_start']).days for r in results_list)

    agg = {
        'Folds': len(results_list),
        'TotalTrades': df['total_trades'].sum(),
        'AvgTradesPerYear': df['total_trades'].mean() * (365 / ((results_list[0]['test_end'] - results_list[0]['test_start']).days)),
        'MaxDrawdown': df['max_drawdown'].min(),
        'TotalReturn': df['total_return'].mean(),
        'CAGR': df['cagr'].mean(),
        'AnnVol': df['ann_vol'].mean(),
        'WinRate': df['win_rate'].mean(),
        'WinRate_days': df['win_rate_days'].mean(),
        'Sharpe': df['sharpe'].mean(),
        'Sortino': df['sortino'].mean(),
        'ProfitFactor': df['profit_factor'].mean(),
        'AvgDaysHeld': df['avg_days_held'].mean()
    }

    return agg


def compare_strategies(prices_df, pair_configs, test_years=1):
    """
    Compare multiple pairs with walk-forward testing
    Output format similar to the paper
    """
    backtester = WalkForwardPairsTrading(prices_df)

    all_results = {}

    for config in pair_configs:
        pair_name = f"{config['token_a']}/{config['token_b']}"
        print(f"\n{'#'*80}")
        print(f"# TESTING PAIR: {pair_name}")
        print(f"{'#'*80}")

        results = backtester.run_walk_forward_test(config, train_years=3, test_years=test_years)

        if len(results) > 0:
            all_results[pair_name] = results

    return all_results


def format_results_table(all_results):
    """
    Format results in a table like the paper
    """
    summary = {}

    for pair_name, results in all_results.items():
        agg = aggregate_results(results)
        if agg:
            summary[pair_name] = agg

    if len(summary) == 0:
        print("No results to display")
        return None

    # Create comparison table
    df = pd.DataFrame(summary).T

    # Format like the paper
    print("\n" + "="*80)
    print(f"WALK-FORWARD BACKTEST RESULTS ({df.iloc[0]['Folds']:.0f} Folds √ó {1 if 'test_years' not in df.columns else df.iloc[0].get('test_years', 1):.0f} Year Test)")
    print("="*80)

    display_df = pd.DataFrame({
        'Pair': df.index,
        'Trades': df['TotalTrades'].astype(int),
        'MaxDrawdown': df['MaxDrawdown'].apply(lambda x: f"{x:.0f}%"),
        'TotalReturn': df['TotalReturn'].apply(lambda x: f"{x:.0f}%"),
        'CAGR': df['CAGR'].apply(lambda x: f"{x:.0f}%"),
        'AnnVol': df['AnnVol'].apply(lambda x: f"{x:.0f}%"),
        'WinRate': df['WinRate'].apply(lambda x: f"{x:.0f}%"),
        'Sharpe': df['Sharpe'].apply(lambda x: f"{x:.2f}"),
        'Sortino': df['Sortino'].apply(lambda x: f"{x:.2f}"),
    })

    print(display_df.to_string(index=False))

    return df


# ==============================================================================
# USAGE EXAMPLE
# ==============================================================================

print("""
WALK-FORWARD PAIRS TRADING BACKTEST
====================================

This framework follows the methodology from "The Modern 70/30" paper:

1. Split data into rolling train/test windows (no look-ahead bias)
2. Train parameters on historical data (3 years)
3. Test on out-of-sample data (1 year)
4. Roll forward and repeat
5. Aggregate results across all folds

Example usage:
""")

# Define pairs to test (based on your earlier analysis)
PAIR_CONFIGS = [
    {'token_a': 'steem', 'token_b': 'tokamak-network'},
    {'token_a': 'gala', 'token_b': 'the-sandbox'},
    {'token_a': 'orbs', 'token_b': 'dkargo'},
    {'token_a': 'tokamak-network', 'token_b': 'origin-protocol'},
    {'token_a': 'medibloc', 'token_b': 'moviebloc'},
]

# Run walk-forward test
# all_results = compare_strategies(prices_df, PAIR_CONFIGS, test_years=1)

# Format results
# results_table = format_results_table(all_results)

print("""
To run the backtest:
1. Load your prices_df
2. Uncomment the lines above
3. Run: all_results = compare_strategies(prices_df, PAIR_CONFIGS, test_years=1)
4. View: results_table = format_results_table(all_results)

Output will show:
- Multiple folds (avoiding overfitting)
- Out-of-sample performance metrics
- Comparison table like the paper
- Max drawdown, Sharpe, Sortino, etc.
""")


WALK-FORWARD PAIRS TRADING BACKTEST

This framework follows the methodology from "The Modern 70/30" paper:

1. Split data into rolling train/test windows (no look-ahead bias)
2. Train parameters on historical data (3 years)
3. Test on out-of-sample data (1 year)
4. Roll forward and repeat
5. Aggregate results across all folds

Example usage:


To run the backtest:
1. Load your prices_df
2. Uncomment the lines above
3. Run: all_results = compare_strategies(prices_df, PAIR_CONFIGS, test_years=1)
4. View: results_table = format_results_table(all_results)

Output will show:
- Multiple folds (avoiding overfitting)
- Out-of-sample performance metrics
- Comparison table like the paper
- Max drawdown, Sharpe, Sortino, etc.



In [None]:
"""
Simple function to extract top N pairs from ranked_pairs_df
and format them for walk-forward testing
"""

def get_pair_configs(ranked_pairs_df, n_pairs=20):
    """
    Extract top N pairs from ranked pairs DataFrame

    Args:
        ranked_pairs_df: Output from PairsTradingAnalyzer.rank_pairs()
        n_pairs: Number of top pairs to extract

    Returns:
        List of dicts ready for compare_strategies()
    """
    top_pairs = ranked_pairs_df.head(n_pairs)

    pair_configs = []
    for idx, row in top_pairs.iterrows():
        pair_configs.append({
            'token_a': row['token_a'],
            'token_b': row['token_b']
        })

    return pair_configs


# ==============================================================================
# USAGE
# ==============================================================================

# After running your PairsTradingAnalyzer code:
# analyzer = PairsTradingAnalyzer(prices_df, min_overlap=365)
# analysis_results = analyzer.analyze_top_pairs(top_pairs_df, n_pairs=100)
# ranked_pairs = analyzer.rank_pairs(analysis_results)

# Automatically get top 20 pairs
PAIR_CONFIGS = get_pair_configs(ranked_pairs, n_pairs=20)

# Now run walk-forward test
all_results = compare_strategies(prices_df, PAIR_CONFIGS, test_years=1)

# Format results
results_table = format_results_table(all_results)

# Done!

print(f"\n‚úÖ Extracted {len(PAIR_CONFIGS)} pairs from ranked_pairs")
print(f"Ready for walk-forward testing!")
print(f"\nTop 5 pairs:")
for i, config in enumerate(PAIR_CONFIGS[:5], 1):
    print(f"  {i}. {config['token_a']} / {config['token_b']}")


################################################################################
# TESTING PAIR: ethereum/seth
################################################################################

Walk-Forward Test: ethereum / seth
Total folds: 9
Training period: 3 years
Testing period: 1 years

Fold 1: Train 2013-04-28 to 2016-04-27, Test 2016-04-27 to 2017-04-27
  ‚ùå Not cointegrated in training period
Fold 2: Train 2014-04-28 to 2017-04-27, Test 2017-04-27 to 2018-04-27
  ‚ùå Not cointegrated in training period
Fold 3: Train 2015-04-28 to 2018-04-27, Test 2018-04-27 to 2019-04-27
  ‚ùå Not cointegrated in training period
Fold 4: Train 2016-04-27 to 2019-04-27, Test 2019-04-27 to 2020-04-26
  ‚ùå Not cointegrated in training period
Fold 5: Train 2017-04-27 to 2020-04-26, Test 2020-04-26 to 2021-04-26
  ‚ùå Not cointegrated in training period
Fold 6: Train 2018-04-27 to 2021-04-26, Test 2021-04-26 to 2022-04-26
  ‚ùå Not cointegrated in training period
Fold 7: Train 2019-04-27 to 2022-0