In [None]:
import json
import numpy as np
import pandas as pd
import vectorbt as vbt
import optuna
from numba import njit
from sklearn.model_selection import TimeSeriesSplit
import warnings

warnings.filterwarnings("ignore")


class Strategy:
    def __init__(self, name, param_space, signal_generator):
        self.name = name
        self.param_space = param_space
        self.signal_generator = signal_generator

@njit
def compute_atr_numba(high, low, close, length):
    n = len(close)
    atr = np.full(n, np.nan)
    tr = np.empty(n)
    tr[0] = high[0] - low[0]
    for i in range(1, n):
        hl = high[i] - low[i]
        hc = abs(high[i] - close[i - 1])
        lc = abs(low[i] - close[i - 1])
        tr[i] = max(hl, hc, lc)
    if n >= length:
        s = 0.0
        for k in range(length):
            s += tr[k]
        atr[length - 1] = s / length
        for k in range(length, n):
            atr[k] = (atr[k - 1] * (length - 1) + tr[k]) / length
    return atr


def build_backtest(df, signals):
    close = df["Close"].values

    tp_mult = 2.5
    sl_mult = 1.0

    long_tp = close + tp_mult * close
    long_sl = close - sl_mult * close
    short_tp = close - tp_mult * close
    short_sl = close + sl_mult * close

    entries = (
        pd.Series(signals["long_signal"].values, index=df.index)
        .shift(1)
        .fillna(False)
        .astype(bool)
    )
    exits = (
        ((df["Low"] <= long_sl) | (df["High"] >= long_tp))
        .shift(1)
        .fillna(False)
        .astype(bool)
    )

    short_entries = (
        pd.Series(signals["short_signal"].values, index=df.index)
        .shift(1)
        .fillna(False)
        .astype(bool)
    )
    short_exits = (
        ((df["High"] >= short_sl) | (df["Low"] <= short_tp))
        .shift(1)
        .fillna(False)
        .astype(bool)
    )

    pf = vbt.Portfolio.from_signals(
        close=df["Close"],
        entries=entries,
        exits=exits,
        short_entries=short_entries,
        short_exits=short_exits,
        init_cash=10_000,
        fees=0.0005 * 2,
        slippage=0.0002,
        size=100,
        direction="both",
    )
    return pf


def define_objective(df, strategy):
    def objective(trial):
        params = {}
        for k, v in strategy.param_space.items():
            if v[0] == "int":
                params[k] = trial.suggest_int(k, v[1], v[2])
            else:
                params[k] = trial.suggest_float(k, v[1], v[2])

        tscv = TimeSeriesSplit(n_splits=5)
        scores = []
        for _, test_idx in tscv.split(df):
            df_test = df.iloc[test_idx]
            signals = strategy.signal_generator(df_test, params)
            pf = build_backtest(df_test, signals, params)
            if pf.trades.count() == 0:
                return -np.inf
            sharpe = pf.sharpe_ratio()
            n_trades = pf.trades.count()
            wins = pf.trades.winning.count()
            win_rate = wins / n_trades if n_trades > 0 else 0
            if sharpe > 0:
                score = (1 + sharpe) * win_rate * np.log1p(n_trades) / 200
                scores.append(((1 + win_rate) ** 2) * wins)
            else:
                scores.append(-wins)
        return np.mean(scores)

    return objective


def run_optimization(tokens, timeframes, strategies, n_trials=400):
    results = {}
    for token in tokens:
        df_raw = pd.read_csv(f"{token}.csv", index_col=0, parse_dates=True)
        df_raw.index = pd.to_datetime(df_raw.index, utc=True)

        results[token] = {}
        for tf in timeframes:
            df = (
                df_raw.resample(tf)
                .agg(
                    {
                        "Open": "first",
                        "High": "max",
                        "Low": "min",
                        "Close": "last",
                        "Volume": "sum",
                    }
                )
                .dropna()
            )

            results[token][tf] = {}
            for strat in strategies:
                study = optuna.create_study(
                    direction="maximize",
                    sampler=optuna.samplers.TPESampler(),
                    pruner=optuna.pruners.HyperbandPruner(
                        min_resource=2, max_resource=6, reduction_factor=3
                    ),
                )
                study.optimize(define_objective(df, strat), n_trials=n_trials, n_jobs=5)
                results[token][tf][strat.name] = {
                    "best_params": study.best_params,
                    "best_metric": study.best_value,
                }
                print(
                    f"{token}-{tf}-{strat.name}: {study.best_params}, SCORE={study.best_value:.2f}"
                )

                # with open("optimization_results.json", "w") as f:
                #     json.dump(results, f, indent=4)
                #     print(f"Intermediate results saved")

    return results

In [None]:
from data_preprocessing.strategies import (
    _compute_williams_r_signals,
    _enhanced_lh_tick_jit,
    adaptive_t3_signals,
    bwab_signals,
    compute_two_pole_oscillator,
    ema_pivot_signals,
    generate_market_bottom_signals,
    harsi_strategy,
    market_sentiment_signals,
    next_pivot_signals,
    triple_ma_signals,
)


def bwab_signal_wrapper(df, params):
    high = df["High"].values
    low = df["Low"].values
    close = df["Close"].values
    long_sig, short_sig = bwab_signals(
        high=high,
        low=low,
        close=close,
        ma_len=params["ma_len"],
        range_len=params["range_len"],
        atr_len=params["atr_len"],
        atr_mult=params["atr_mult"],
        entry_buildup=params["entry_buildup"],
    )
    df_out = df.copy()
    df_out["long_signal"] = long_sig.astype(int)
    df_out["short_signal"] = short_sig.astype(int)
    return df_out[["long_signal", "short_signal"]]


def triple_ma_signal_wrapper(df, params):
    close = df["Close"].values
    short_sig, long_sig = triple_ma_signals(
        close=close, s1=params["s1"], s2=params["s2"], s3=params["s3"]
    )
    df_out = df.copy()
    df_out["long_signal"] = long_sig.astype(int)
    df_out["short_signal"] = short_sig.astype(int)
    return df_out[["long_signal", "short_signal"]]


def wvf_bottom_signal_wrapper(df, params):
    result = generate_market_bottom_signals(
        original_df=df, length=params["length"], bbl=params["bbl"], mult=params["mult"]
    )
    df_out = result.copy()
    df_out["long_signal"] = df_out["bottom_signal"].astype(int)
    df_out["short_signal"] = 0
    return df_out[["long_signal", "short_signal"]]


def williams_r_signal_wrapper(df, params):
    high = df["High"].to_numpy(dtype=np.float64)
    low = df["Low"].to_numpy(dtype=np.float64)
    close = df["Close"].to_numpy(dtype=np.float64)
    sig, val = _compute_williams_r_signals(
        high=high, low=low, close=close, period=params["period"]
    )
    df_out = df.copy()
    df_out["williams_r"] = val
    df_out["long_signal"] = (sig == 1).astype(int)
    df_out["short_signal"] = (sig == -1).astype(int)
    df_out["long_signal"] = (
        df_out["long_signal"]
        .replace(0, np.nan)
        .ffill(limit=params["persist"])
        .fillna(0)
        .astype(int)
    )
    df_out["short_signal"] = (
        df_out["short_signal"]
        .replace(0, np.nan)
        .ffill(limit=params["persist"])
        .fillna(0)
        .astype(int)
    )
    return df_out[["long_signal", "short_signal"]]


def lht_signal_wrapper(df, params):
    highs = df["High"].to_numpy()
    lows = df["Low"].to_numpy()
    closes = df["Close"].to_numpy()
    vols = df["Volume"].to_numpy() if "Volume" in df.columns else np.empty(0)
    long_sig, short_sig, _, _, _, _ = _enhanced_lh_tick_jit(
        highs=highs,
        lows=lows,
        closes=closes,
        volumes=vols,
        lookback=params["lookback"],
        ma_period=params["ma_period"],
        atr_period=params["atr_period"],
        atr_mult=params["atr_mult"],
    )
    df_out = df.copy()
    df_out["long_signal"] = long_sig.astype(int)
    df_out["short_signal"] = short_sig.astype(int)
    df_out["long_signal"] = (
        df_out["long_signal"]
        .replace(0, np.nan)
        .ffill(limit=params["persist"])
        .fillna(0)
        .astype(int)
    )
    df_out["short_signal"] = (
        df_out["short_signal"]
        .replace(0, np.nan)
        .ffill(limit=params["persist"])
        .fillna(0)
        .astype(int)
    )
    return df_out[["long_signal", "short_signal"]]


def two_pole_signal_wrapper(df, params):
    result = compute_two_pole_oscillator(
        df=df,
        filt_len=params["filt_len"],
        sma_len=params["sma_len"],
        threshold=params["threshold"],
        persist=params["persist"],
    )
    df_out = result.copy()
    df_out["long_signal"] = (df_out["two_pole_signal"] == 1).astype(int)
    df_out["short_signal"] = (df_out["two_pole_signal"] == -1).astype(int)
    return df_out[["long_signal", "short_signal"]]


def adaptive_t3_signal_wrapper(df, params):
    result = adaptive_t3_signals(
        df=df,
        rsi_len=params["rsi_len"],
        min_len=params["min_len"],
        max_len=params["max_len"],
        v=params["v"],
        volat=params["volat"],
        persist=params["persist"],
        lag=params["lag"],
    )
    df_out = result.copy()
    df_out["long_signal"] = (df_out["t3_signal"] == 1).astype(int)
    df_out["short_signal"] = (df_out["t3_signal"] == -1).astype(int)
    return df_out[["long_signal", "short_signal"]]


def next_pivot_signal_wrapper(df, params):
    result = next_pivot_signals(
        df=df,
        hist_len=params["hist_len"],
        fore_len=params["fore_len"],
        lookback=params["lookback"],
        method=params["method"],
        persist=params["persist"],
    )
    df_out = result.copy()
    df_out["long_signal"] = (df_out["next_pivot_signal"] == 1).astype(int)
    df_out["short_signal"] = (df_out["next_pivot_signal"] == -1).astype(int)
    return df_out[["long_signal", "short_signal"]]


def market_sentiment_signal_wrapper(df, params):
    result = market_sentiment_signals(
        df=df,
        rsi_len=params["rsi_len"],
        stoch_k_len=params["stoch_k_len"],
        stoch_k_smooth=params["stoch_k_smooth"],
        cci_len=params["cci_len"],
        bbp_len=params["bbp_len"],
        ma_len=params["ma_len"],
        st_len=params["st_len"],
        st_mult=params["st_mult"],
        reg_len=params["reg_len"],
        ms_len=params["ms_len"],
        persist=params["persist"],
    )
    df_out = result.copy()
    df_out["long_signal"] = (df_out["market_sentiment_signal"] == 1).astype(int)
    df_out["short_signal"] = (df_out["market_sentiment_signal"] == -1).astype(int)
    return df_out[["long_signal", "short_signal"]]

def ema_pivot_signal_wrapper(df, params):
    result = ema_pivot_signals(
        df=df,
        ema_length=params['ema_length'],
        backcandles=params['backcandles'],
        pivot_window=params['pivot_window']
    )
    df_out = result.copy()
    df_out['long_signal'] = (df_out['EMASignal'] == 2).astype(int)
    df_out['short_signal'] = (df_out['EMASignal'] == 1).astype(int)
    return df_out[['long_signal', 'short_signal']]



def harsi_signal_wrapper(df, params):
    result = harsi_strategy(
        df=df,
        use_ema=params['use_ema'],
        use_vol=params['use_vol'],
        use_adx=params['use_adx'],
        use_vortex=params['use_vortex'],
        ost=params['ost'],
        rsi_period=params['rsi_period'],
        fastk_period=params['fastk_period'],
        slowk_period=params['slowk_period'],
        slowk_matype=params['slowk_matype'],
        slowd_period=params['slowd_period'],
        slowd_matype=params['slowd_matype'],
        ema_period=params['ema_period'],
        volume_sma_slow=params['volume_sma_slow'],
        volume_sma_fast=params['volume_sma_fast'],
        adx_p_period=params['adx_p_period'],
        adx_m_period=params['adx_m_period']
    )
    df_out = result.copy()
    df_out['long_signal'] = (df_out['harsi_signal'] == 1).astype(int)
    df_out['short_signal'] = (df_out['harsi_signal'] == -1).astype(int)
    return df_out[['long_signal', 'short_signal']]


In [None]:
bwab_strategy = Strategy(
    name="mark_bwab_signals",
    param_space={
        'ma_len': ('int', 5, 50),
        'range_len': ('int', 5, 30),
        'atr_len': ('int', 5, 30),
        'atr_mult': ('float', 0.5, 3.0),
        'entry_buildup': ('int', 3, 10)
    },
    signal_generator=bwab_signal_wrapper
)

triple_ma_strategy = Strategy(
    name="triple_ma",
    param_space={
        's1': ('int', 3, 15),  
        's2': ('int', 10, 30), 
        's3': ('int', 20, 50)  
    },
    signal_generator=triple_ma_signal_wrapper
)

wvf_bottom_strategy = Strategy(
    name="generate_market_bottom_signals",
    param_space={
        'length': ('int', 3, 30),    
        'bbl': ('int', 3, 30),       
        'mult': ('float', 1.0, 3.0)   
    },
    signal_generator=wvf_bottom_signal_wrapper
)

williams_r_strategy = Strategy(
    name="williams_r",
    param_space={
        'period': ('int', 10, 30),   
        'persist': ('int', 2, 10)    
    },
    signal_generator=williams_r_signal_wrapper
)

lht_strategy = Strategy(
    name="lowest_highest_tick",
    param_space={
        'lookback': ('int', 2, 50),    
        'ma_period': ('int', 2, 50),   
        'atr_period': ('int', 5, 30),   
        'atr_mult': ('float', 0.2, 2.0),
        'persist': ('int', 2, 10)       
    },
    signal_generator=lht_signal_wrapper
)

two_pole_strategy = Strategy(
    name="two_pole_oscillator",
    param_space={
        'filt_len': ('int', 5, 30),       
        'sma_len': ('int', 10, 50),       
        'threshold': ('float', 0.05, 0.5),
        'persist': ('int', 2, 10)         
    },
    signal_generator=two_pole_signal_wrapper
)

adaptive_t3_strategy = Strategy(
    name="adaptive_t3",
    param_space={
        'rsi_len': ('int', 10, 40),   
        'min_len': ('int', 3, 10),    
        'max_len': ('int', 12, 60),   
        'v': ('float', 0.3, 0.8),     
        'volat': ('int', 20, 60),     
        'persist': ('int', 2, 10),    
        'lag': ('int', 1, 5)          
    },
    signal_generator=adaptive_t3_signal_wrapper
)

next_pivot_strategy = Strategy(
    name="next_pivot",
    param_space={
        'hist_len': ('int', 5, 20),     
        'fore_len': ('int', 10, 30),    
        'lookback': ('int', 50, 200),   
        'method': ('int', 0, 2),        
        'persist': ('int', 2, 10)       
    },
    signal_generator=next_pivot_signal_wrapper
)

market_sentiment_strategy = Strategy(
    name="market_sentiment",
    param_space={
        'rsi_len': ('int', 5, 30),
        'stoch_k_len': ('int', 5, 30),
        'stoch_k_smooth': ('int', 1, 5),
        'cci_len': ('int', 5, 30),
        'bbp_len': ('int', 5, 30),
        'ma_len': ('int', 5, 30),
        'st_len': ('int', 5, 20),
        'st_mult': ('int', 1, 5),
        'reg_len': ('int', 5, 50),
        'ms_len': ('int', 3, 10),
        'persist': ('int', 2, 10)
    },
    signal_generator=market_sentiment_signal_wrapper
)

ema_pivot_strategy = Strategy(
    name="ema_pivot",
    param_space={
        'ema_length': ('int', 10, 60),   
        'backcandles': ('int', 2, 20),    
        'pivot_window': ('int', 2, 20)    
    },
    signal_generator=ema_pivot_signal_wrapper
)

harsi_strategy_obj = Strategy(
    name="harsi",
    param_space={
        'use_ema': ('int', 0, 1),
        'use_vol': ('int', 0, 1),
        'use_adx': ('int', 0, 1),
        'use_vortex': ('int', 0, 1),
        'ost': ('int', 70, 90),
        'rsi_period': ('int', 10, 30),
        'fastk_period': ('int', 10, 30),
        'slowk_period': ('int', 3, 10),
        'slowk_matype': ('int', 0, 3),
        'slowd_period': ('int', 3, 10),
        'slowd_matype': ('int', 0, 3),
        'ema_period': ('int', 10, 50),
        'volume_sma_slow': ('int', 10, 30),
        'volume_sma_fast': ('int', 3, 10),
        'adx_p_period': ('int', 10, 30),
        'adx_m_period': ('int', 10, 30)
    },
    signal_generator=harsi_signal_wrapper
)



In [None]:
strategies = [
    harsi_strategy_obj,
    ema_pivot_strategy,
    market_sentiment_strategy,
    next_pivot_strategy,
    adaptive_t3_strategy,
    two_pole_strategy,
    lht_strategy,
    williams_r_strategy,
    wvf_bottom_strategy,
    triple_ma_strategy,
    bwab_strategy
]

In [None]:
results = run_optimization(("eth", "sol", "doge", "ada", "tao", "xrp", "trx"), ("30min", "1h", "2h", "3h", "4h", "5h"), strategies, 250)
results