In [1]:
import pandas as pd
import numpy as np
import polars as pl

import matplotlib.pyplot as plt
from scipy import stats

df = pd.read_csv("cryptos.csv", index_col=0, parse_dates=True)
df.sort_index(inplace=True)
tickers = ["BTCUSDT", "ETHUSDT", "SOLUSDT", 'BNBUSDT']
n_long = 1
stop_loss = 0.05
twenty_four_hours = 24

df_all = df.copy()

In [2]:
def max_drawdown(cum_returns):
    running_max = cum_returns.cummax()
    drawdown = (cum_returns - running_max) / running_max
    return drawdown.min()

In [3]:
def run_backtest(lookback, n_top, resample_hours):
    returns = df[tickers].pct_change()
    volatility = returns.rolling(window=max(lookback, 2)).std()
    momentum = df[tickers].pct_change(periods=lookback) / (volatility + 1e-8)
    
    short_momentum = df[tickers].pct_change(periods=max(lookback//2, 1)) / (volatility + 1e-8)
    persistence_filter = (momentum > 0) & (short_momentum > 0)
    
    z_score = (df[tickers] - df[tickers].rolling(lookback*2).mean()) / (df[tickers].rolling(lookback*2).std() + 1e-8)
    overextended_filter = z_score.abs() < 2
    
    momentum = momentum.where(persistence_filter & overextended_filter)
    
    momentum_clean = momentum.dropna()
    if len(momentum_clean) == 0:
        return 0.0, 0.0
    valid_start = momentum_clean.index[0]
    
    resample_str = f"{resample_hours}H"
    resampled_momentum = momentum.resample(resample_str).mean()
    
    signals = pd.DataFrame(0, index=resampled_momentum.index, columns=resampled_momentum.columns)
    positive_momentum = resampled_momentum > 0
    
    for idx in resampled_momentum.index:
        row = resampled_momentum.loc[idx]
        if not row.isna().all():
            positive_row = row.where(positive_momentum.loc[idx], np.nan)
            if not positive_row.isna().all():
                top_assets = positive_row.nlargest(n_top).index
                signals.loc[idx, top_assets] = 1
    
    signals = signals.shift(1).reindex(df.index).ffill()
    signals = signals.loc[signals.index >= valid_start]
    
    returns = df[tickers].pct_change().loc[lambda x: x.index >= valid_start]
    
    active_positions = signals.sum(axis=1)
    portfolio_returns = (returns * signals).sum(axis=1) / active_positions.replace(0, np.nan)
    portfolio_returns = portfolio_returns.fillna(0)
    
    equity_values = np.ones(len(portfolio_returns))
    peak_values = np.ones(len(portfolio_returns))
    active_flags = np.ones(len(portfolio_returns), dtype=bool)
    
    for i in range(1, len(portfolio_returns)):
        if active_flags[i-1]:
            equity_values[i] = equity_values[i-1] * (1 + portfolio_returns.iloc[i])
            peak_values[i] = max(peak_values[i-1], equity_values[i])
            if (equity_values[i] / peak_values[i] - 1) < -stop_loss:
                active_flags[i] = False
            else:
                active_flags[i] = True
        else:
            if portfolio_returns.iloc[i] != 0:
                active_flags[i] = True
                equity_values[i] = equity_values[i-1] * (1 + portfolio_returns.iloc[i])
                peak_values[i] = equity_values[i]
            else:
                active_flags[i] = False
                equity_values[i] = equity_values[i-1]
                peak_values[i] = peak_values[i-1]
    
    cumulative_returns = pd.Series(equity_values, index=portfolio_returns.index)
    final_return = cumulative_returns.iloc[-1] - 1
    
    running_maximum = cumulative_returns.expanding().max()
    drawdown = ((cumulative_returns - running_maximum) / running_maximum).min()
    
    return final_return, drawdown

In [None]:
def run_grid_search():
    results = []
    for lookback in range(1, 13):
        for n_top in range(1, 2):
            for resample_hours in range(1, 25):
                final_return, drawdown = run_backtest(lookback, n_top, resample_hours)
                results.append({
                    'lookback': lookback,
                    'n_top': n_top,
                    'resample_hours': resample_hours,
                    'final_return': final_return,
                    'max_drawdown': drawdown
                })
                #print(f"Lookback: {lookback}, n_top: {n_top}, Resample: {resample_hours}H | "
                #      f"Return: {final_return*100:.2f}%, Drawdown: {drawdown*100:.2f}%")
    results_df = pd.DataFrame(results).sort_values(by="final_return", ascending=False).reset_index(drop=True)
    return results_df

In [None]:
train_period_days, test_period_days, start_timestamp, total, previous_iteration_return, final_return = 7, 7, 0, 1, 0, 0
print("train: ", train_period_days, "\ttest: ", test_period_days)

for i in range(0, 50):
    train_start = twenty_four_hours * start_timestamp
    train_end = train_start + twenty_four_hours * train_period_days
    test_start = train_end
    test_end = test_start + twenty_four_hours * test_period_days

    df_train = df_all.iloc[train_start:train_end]
    df_test = df_all.iloc[test_start:test_end]

    df = df_train
    results_df = run_grid_search()

    lookback = results_df.loc[0, 'lookback']
    n_top = results_df.loc[0, 'n_top']
    resample_hours = results_df.loc[0, 'resample_hours']

    df = df_test
    final_return, drawdown = run_backtest(lookback, n_top, resample_hours)

    if previous_iteration_return >= 0:
        total *= (1 + final_return)
    else:
        total *= (1 + final_return)

    print(f"Lookback: {lookback}, n_top: {n_top}, Resample: {resample_hours}H | "
          f"Return: {final_return*100:.2f}%, Drawdown: {drawdown*100:.2f}%, Total: {total*100:.2f}%")

    previous_iteration_return = final_return
    start_timestamp += test_period_days

train:  7 	test:  7
