In [31]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# import ta  
from ta.momentum import RSIIndicator
from ta.trend import MACD
from ta.momentum import StochasticOscillator
from ta.trend import CCIIndicator
from ta.momentum import ROCIndicator
from scipy.optimize import brute
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

In [11]:
symbols = ["BTCUSDT", "ETHUSDT", "XRPUSDT"]
data_path = "../data/"

### 1. RSI

In [33]:
lookbacks = [5, 7, 10, 14, 21]
percentile = 95

# Store all results
results_rsi = []

# Loop over symbols and lookback windows
for symbol in symbols:
    df = pd.read_csv(f"{data_path}{symbol}_3min_7days.csv", parse_dates=["timestamp"])
    df.set_index("timestamp", inplace=True)
    df = df.sort_index()

    for lookback in lookbacks:
        df_temp = df.copy()
        df_temp["rsi"] = RSIIndicator(close=df_temp["close"], window=lookback).rsi()
        df_temp["rsi_momentum"] = df_temp["rsi"].diff()

        threshold = np.percentile(df_temp["rsi_momentum"].abs().dropna(), percentile)

        df_temp["signal"] = 0
        df_temp.loc[df_temp["rsi_momentum"] > threshold, "signal"] = 1
        df_temp.loc[df_temp["rsi_momentum"] < -threshold, "signal"] = -1

        df_temp["return"] = df_temp["close"].pct_change().shift(-1)
        df_temp["strategy_return"] = df_temp["signal"] * df_temp["return"]

        hit_rate = (np.sign(df_temp["signal"]) == np.sign(df_temp["return"])).mean()
        sharpe = df_temp["strategy_return"].mean() / df_temp["strategy_return"].std()
        total_signals = df_temp["signal"].abs().sum()

        results_rsi.append({
            "symbol": symbol,
            "lookback": lookback,
            "threshold": round(threshold, 2),
            "hit_rate": round(hit_rate, 3),
            "sharpe_ratio": round(sharpe, 2),
            "signals": int(total_signals)
        })

# Present results as a DataFrame
results_rsi_df = pd.DataFrame(results_rsi)
results_rsi_df

Unnamed: 0,symbol,lookback,threshold,hit_rate,sharpe_ratio,signals
0,BTCUSDT,5,25.5,0.028,0.03,168
1,BTCUSDT,7,18.58,0.026,0.01,168
2,BTCUSDT,10,13.09,0.025,-0.01,168
3,BTCUSDT,14,9.5,0.025,-0.01,168
4,BTCUSDT,21,6.39,0.026,-0.0,167
5,ETHUSDT,5,24.4,0.026,-0.02,168
6,ETHUSDT,7,17.82,0.024,-0.04,168
7,ETHUSDT,10,12.43,0.025,-0.03,168
8,ETHUSDT,14,8.89,0.025,-0.04,168
9,ETHUSDT,21,5.97,0.024,-0.03,167


### 2. MACD

In [15]:
results_macd = []

for symbol in symbols:
    df = pd.read_csv(f"{data_path}{symbol}_3min_7days.csv", parse_dates=["timestamp"])
    df.set_index("timestamp", inplace=True)
    df = df.sort_index()

    # Calculate MACD and MACD Signal Line
    macd = MACD(close=df["close"], window_slow=26, window_fast=12, window_sign=9)
    df["macd"] = macd.macd()
    df["macd_signal"] = macd.macd_signal()
    df["macd_diff"] = df["macd"] - df["macd_signal"]

    # Generate signal based on MACD crossover
    df["signal"] = 0
    df.loc[(df["macd_diff"] > 0) & (df["macd_diff"].shift(1) <= 0), "signal"] = 1
    df.loc[(df["macd_diff"] < 0) & (df["macd_diff"].shift(1) >= 0), "signal"] = -1

    # Simulate returns
    df["return"] = df["close"].pct_change().shift(-1)
    df["strategy_return"] = df["signal"] * df["return"]

    # Evaluate performance
    hit_rate = (np.sign(df["signal"]) == np.sign(df["return"])).mean()
    sharpe = df["strategy_return"].mean() / df["strategy_return"].std()
    total_signals = df["signal"].abs().sum()

    results_macd.append({
        "symbol": symbol,
        "hit_rate": round(hit_rate, 3),
        "sharpe_ratio": round(sharpe, 2),
        "signals": int(total_signals)
    })

# Present results
results_macd_df = pd.DataFrame(results_macd)
results_macd_df

Unnamed: 0,symbol,hit_rate,sharpe_ratio,signals
0,BTCUSDT,0.044,0.04,256
1,ETHUSDT,0.037,-0.04,280
2,XRPUSDT,0.045,-0.02,273


### 3. Stochastic Oscillator

In [19]:
results_stoch = []

for symbol in symbols:
    df = pd.read_csv(f"{data_path}{symbol}_3min_7days.csv", parse_dates=["timestamp"])
    df.set_index("timestamp", inplace=True)
    df = df.sort_index()

    # Calculate Stochastic Oscillator
    stoch = StochasticOscillator(high=df["high"], low=df["low"], close=df["close"], window=14, smooth_window=3)
    df["stoch_k"] = stoch.stoch()
    df["stoch_d"] = stoch.stoch_signal()

    # Generate signal: Buy when %K crosses above %D from oversold, Sell when %K crosses below %D from overbought
    df["signal"] = 0
    df.loc[(df["stoch_k"] < 20) & (df["stoch_k"] > df["stoch_d"]) & (df["stoch_k"].shift(1) <= df["stoch_d"].shift(1)), "signal"] = 1
    df.loc[(df["stoch_k"] > 80) & (df["stoch_k"] < df["stoch_d"]) & (df["stoch_k"].shift(1) >= df["stoch_d"].shift(1)), "signal"] = -1

    # Simulate returns
    df["return"] = df["close"].pct_change().shift(-1)
    df["strategy_return"] = df["signal"] * df["return"]

    # Evaluate
    hit_rate = (np.sign(df["signal"]) == np.sign(df["return"])).mean()
    sharpe = df["strategy_return"].mean() / df["strategy_return"].std()
    total_signals = df["signal"].abs().sum()

    results_stoch.append({
        "symbol": symbol,
        "hit_rate": round(hit_rate, 3),
        "sharpe_ratio": round(sharpe, 2),
        "signals": int(total_signals)
    })

# Display results
results_stoch_df = pd.DataFrame(results_stoch)
results_stoch_df

Unnamed: 0,symbol,hit_rate,sharpe_ratio,signals
0,BTCUSDT,0.045,0.01,266
1,ETHUSDT,0.032,-0.02,206
2,XRPUSDT,0.046,-0.01,239


### 4.CCI

In [23]:
results_cci = []

for symbol in symbols:
    df = pd.read_csv(f"{data_path}{symbol}_3min_7days.csv", parse_dates=["timestamp"])
    df.set_index("timestamp", inplace=True)
    df = df.sort_index()

    # Calculate Commodity Channel Index (CCI)
    cci = CCIIndicator(high=df["high"], low=df["low"], close=df["close"], window=20)
    df["cci"] = cci.cci()

    # Generate signals based on classic CCI strategy
    # Buy when CCI crosses above -100; Sell when CCI crosses below +100
    df["signal"] = 0
    df.loc[(df["cci"] > -100) & (df["cci"].shift(1) <= -100), "signal"] = 1
    df.loc[(df["cci"] < 100) & (df["cci"].shift(1) >= 100), "signal"] = -1

    # Simulate returns
    df["return"] = df["close"].pct_change().shift(-1)
    df["strategy_return"] = df["signal"] * df["return"]

    # Evaluate
    hit_rate = (np.sign(df["signal"]) == np.sign(df["return"])).mean()
    sharpe = df["strategy_return"].mean() / df["strategy_return"].std()
    total_signals = df["signal"].abs().sum()

    results_cci.append({
        "symbol": symbol,
        "hit_rate": round(hit_rate, 3),
        "sharpe_ratio": round(sharpe, 2),
        "signals": int(total_signals)
    })

# Display results
results_cci_df = pd.DataFrame(results_cci)
results_cci_df

Unnamed: 0,symbol,hit_rate,sharpe_ratio,signals
0,BTCUSDT,0.047,-0.02,323
1,ETHUSDT,0.042,-0.03,307
2,XRPUSDT,0.057,0.01,317


### 5. Momentum

In [26]:
results_momentum = []

for symbol in symbols:
    df = pd.read_csv(f"{data_path}{symbol}_3min_7days.csv", parse_dates=["timestamp"])
    df.set_index("timestamp", inplace=True)
    df = df.sort_index()

    # Compute Momentum (Rate of Change)
    roc = ROCIndicator(close=df["close"], window=20)
    df["momentum"] = roc.roc()

    # Generate signal
    df["signal"] = 0
    df.loc[df["momentum"] > 0.2, "signal"] = 1
    df.loc[df["momentum"] < -0.2, "signal"] = -1

    # Simulate returns
    df["return"] = df["close"].pct_change().shift(-1)
    df["strategy_return"] = df["signal"] * df["return"]

    # Evaluate
    hit_rate = (np.sign(df["signal"]) == np.sign(df["return"])).mean()
    sharpe = df["strategy_return"].mean() / df["strategy_return"].std()
    total_signals = df["signal"].abs().sum()

    results_momentum.append({
        "symbol": symbol,
        "hit_rate": round(hit_rate, 3),
        "sharpe_ratio": round(sharpe, 2),
        "signals": int(total_signals)
    })

# Display results
results_momentum_df = pd.DataFrame(results_momentum)
results_momentum_df

Unnamed: 0,symbol,hit_rate,sharpe_ratio,signals
0,BTCUSDT,0.26,0.02,1739
1,ETHUSDT,0.369,-0.01,2561
2,XRPUSDT,0.352,-0.0,2432


In [None]:

class BaseSignalStrategy(bt.Strategy):
    params = (('record_signals', True),)

    def __init__(self):
        self.signal = 0  # 当前信号
        self.signals = []  # 记录历史信号

    def next(self):
        if self.record_signals:
            self.signals.append({
                'date': self.data.datetime.date(0),
                'signal': self.signal
            })

    def get_signals_df(self):
        return pd.DataFrame(self.signals).set_index('date')



In [None]:

# --------------------------
# 独立指标策略（按需扩展）
# --------------------------
class RSIStrategy(BaseSignalStrategy):
    params = (('rsi_period', 14), ('oversold', 30), ('overbought', 70))

    def __init__(self):
        super().__init__()
        self.rsi = bt.indicators.RSI(self.data.close, period=self.p.rsi_period)

    def next(self):
        if self.rsi < self.p.oversold:
            self.signal = 1  # 买入信号
        elif self.rsi > self.p.overbought:
            self.signal = -1  # 卖出信号
        else:
            self.signal = 0
        super().next()



In [None]:

class CCIStrategy(BaseSignalStrategy):
    params = (('cci_period', 20), ('oversold', -100), ('overbought', 100))

    def __init__(self):
        super().__init__()
        self.cci = bt.indicators.CCI(self.data, period=self.p.cci_period)

    def next(self):
        if self.cci < self.p.oversold:
            self.signal = 1
        elif self.cci > self.p.overbought:
            self.signal = -1
        else:
            self.signal = 0
        super().next()


In [None]:


class MACDStrategy(BaseSignalStrategy):
    params = (('fast', 12), ('slow', 26), ('signal_period', 9))

    def __init__(self):
        super().__init__()
        self.macd = bt.indicators.MACD(
            self.data.close,
            period_me1=self.p.fast,
            period_me2=self.p.slow,
            period_signal=self.p.signal_period
        )
        self.crossover = bt.indicators.CrossOver(self.macd.macd, self.macd.signal)

    def next(self):
        if self.crossover > 0:
            self.signal = 1  # 金叉买入
        elif self.crossover < 0:
            self.signal = -1  # 死叉卖出
        else:
            self.signal = 0
        super().next()



In [None]:
class StochasticStrategy(bt.Strategy):
    params = (
        ('k_period', 14),    # %K计算周期
        ('d_period', 3),     # %D计算周期（%K的SMA）
        ('overbought', 80),  # 超买阈值
        ('oversold', 20),    # 超卖阈值
        ('use_trend_filter', True),  # 是否使用趋势过滤
    )

    def __init__(self):
        # 计算指标
        self.stoch = bt.indicators.Stochastic(
            self.data,
            period=self.p.k_period,
            period_dfast=self.p.d_period,
            upperband=self.p.overbought,
            lowerband=self.p.oversold
        )
        self.crossover = bt.indicators.CrossOver(self.stoch.lines.percK, self.stoch.lines.percD)
        
        # 趋势过滤（可选）
        if self.p.use_trend_filter:
            self.sma200 = bt.indicators.SMA(self.data.close, period=200)

    def next(self):
        # 生成信号（1=买入，-1=卖出，0=无信号）
        signal = 0
        
        # 多头条件
        if (self.stoch.lines.percK[0] < self.p.oversold and 
            self.crossover > 0 and 
            (not self.p.use_trend_filter or self.data.close[0] > self.sma200[0])):
            signal = 1
        
        # 空头条件
        elif (self.stoch.lines.percK[0] > self.p.overbought and 
              self.crossover < 0 and 
              (not self.p.use_trend_filter or self.data.close[0] < self.sma200[0])):
            signal = -1
        
        # 记录信号（用于后续机器学习组合）
        self.signal = signal

In [None]:

class MOMStrategy(BaseSignalStrategy):
    params = (('mom_period', 10), ('threshold', 0))

    def __init__(self):
        super().__init__()
        self.mom = bt.indicators.Momentum(self.data.close, period=self.p.mom_period)

    def next(self):
        if self.mom > self.p.threshold:
            self.signal = 1  # 动量向上
        elif self.mom < -self.p.threshold:
            self.signal = -1  # 动量向下
        else:
            self.signal = 0
        super().next()