In [1]:
import pandas as pd
import numpy as np


In [20]:

# CSVファイルの読み込み
df = pd.read_csv('../data/bybit_btcusdt_20240901-20240930.csv',index_col=0)

# 'Timestamp'列を日時型に変換し、インデックスに設定
df['Timestamp'] = pd.to_datetime(df['Timestamp'])
df.set_index('Timestamp', inplace=True)

df_1m = df

# カラムを設定
# df_1m = df.rename(columns={'Open','High','Low','Close','Volume'})

In [21]:
# df_1m: 1分足データフレーム(OHLCV)
# カラム: ['Open','High','Low','Close','Volume']
# DateTimeIndexが正しく設定されていることが前提

# 1. 15分足データの生成
df_15m = df_1m.resample('15T').agg({
    'Open': 'first',
    'High': 'max',
    'Low': 'min',
    'Close': 'last',
    'Volume': 'sum'
}).dropna()

In [22]:
# VWAP計算用関数
def vwap(high, low, close, volume):
    tp = (high + low + close) / 3.0
    cumulative_vol = volume.cumsum()
    cumulative_tp_vol = (tp * volume).cumsum()
    return cumulative_tp_vol / cumulative_vol

In [23]:
# 2. VWAPと標準偏差バンドの計算（15分足）
df_15m['VWAP'] = vwap(df_15m['High'], df_15m['Low'], df_15m['Close'], df_15m['Volume'])
df_15m['TP'] = (df_15m['High'] + df_15m['Low'] + df_15m['Close']) / 3.0
df_15m['Diff'] = df_15m['TP'] - df_15m['VWAP']
window = 20
df_15m['Std'] = df_15m['Diff'].rolling(window).std()
sigma_factor = 1.5
df_15m['Upper_band'] = df_15m['VWAP'] + sigma_factor * df_15m['Std']
df_15m['Lower_band'] = df_15m['VWAP'] - sigma_factor * df_15m['Std']

In [24]:
# 上位足情報を1分足に結合
df_1m = df_1m.merge(df_15m[['VWAP','Upper_band','Lower_band']], left_index=True, right_index=True, how='left')
df_1m[['VWAP','Upper_band','Lower_band']] = df_1m[['VWAP','Upper_band','Lower_band']].ffill()

In [25]:
# 出来高急増判定
lookback_vol = 20
df_1m['Vol_ma'] = df_1m['Volume'].rolling(lookback_vol).mean()
volume_factor = 1.5
df_1m['Volume_spike'] = df_1m['Volume'] > df_1m['Vol_ma'] * volume_factor

In [26]:
# ローソク足パターン判定（エンゴルフィング例）
df_1m['Prev_Close'] = df_1m['Close'].shift(1)
df_1m['Prev_Open'] = df_1m['Open'].shift(1)

In [27]:
def is_bullish_engulfing(o, c, po, pc):
    # 各条件をビット演算子(&)で連結することで、Series同士の条件比較が可能となる
    return (c > o) & (c > pc) & (o < po)

def is_bearish_engulfing(o, c, po, pc):
    return (c < o) & (c < pc) & (o > po)


In [28]:
df_1m['Bullish_eng'] = is_bullish_engulfing(df_1m['Open'], df_1m['Close'], df_1m['Prev_Open'], df_1m['Prev_Close'])
df_1m['Bearish_eng'] = is_bearish_engulfing(df_1m['Open'], df_1m['Close'], df_1m['Prev_Open'], df_1m['Prev_Close'])

In [29]:
# シグナル生成
df_1m['Long_signal'] = (df_1m['Close'] < df_1m['Lower_band']) & df_1m['Volume_spike'] & df_1m['Bullish_eng']
df_1m['Short_signal'] = (df_1m['Close'] > df_1m['Upper_band']) & df_1m['Volume_spike'] & df_1m['Bearish_eng']

# エグジット条件
# ロング時: VWAP到達で利確
# ショート時: VWAP到達で利確
df_1m['Long_exit_cond'] = df_1m['Close'] >= df_1m['VWAP']
df_1m['Short_exit_cond'] = df_1m['Close'] <= df_1m['VWAP']

# 3. バックテストの実施
positions = []
current_position = None

for i, row in df_1m.iterrows():
    if current_position is None:
        # エントリー判定
        if row['Long_signal']:
            current_position = ('LONG', i, row['Close'])
        elif row['Short_signal']:
            current_position = ('SHORT', i, row['Close'])
    else:
        pos_type, entry_time, entry_price = current_position
        if pos_type == 'LONG':
            # ロングエグジット条件
            if row['Long_exit_cond']:
                positions.append((pos_type, entry_time, i, entry_price, row['Close']))
                current_position = None
        else:
            # ショートエグジット条件
            if row['Short_exit_cond']:
                positions.append((pos_type, entry_time, i, entry_price, row['Close']))
                current_position = None

# ポジション履歴DataFrame化
df_positions = pd.DataFrame(positions, columns=['Position','Entry_time','Exit_time','Entry_price','Exit_price'])

# 4. パフォーマンス評価
df_positions['PnL'] = np.where(df_positions['Position']=='LONG', 
                               df_positions['Exit_price'] - df_positions['Entry_price'], 
                               df_positions['Entry_price'] - df_positions['Exit_price'])

winning_trades = df_positions[df_positions['PnL'] > 0]
losing_trades = df_positions[df_positions['PnL'] <= 0]

win_rate = len(winning_trades) / len(df_positions) if len(df_positions) > 0 else 0
avg_win = winning_trades['PnL'].mean() if len(winning_trades) > 0 else 0
avg_loss = losing_trades['PnL'].mean() if len(losing_trades) > 0 else 0
total_pnl = df_positions['PnL'].sum()

print("Number of Trades:", len(df_positions))
print("Win Rate:", win_rate)
print("Avg Win:", avg_win)
print("Avg Loss:", avg_loss)
print("Total PnL:", total_pnl)

# 必要に応じて詳細出力
print(df_positions.head())

Number of Trades: 14
Win Rate: 0.8571428571428571
Avg Win: 373.225000000001
Avg Loss: -599.5999999999985
Total PnL: 3279.5000000000146
  Position          Entry_time           Exit_time  Entry_price  Exit_price  \
0     LONG 2024-08-31 20:07:00 2024-08-31 20:53:00      58787.3     58945.5   
1     LONG 2024-09-01 01:44:00 2024-09-01 08:00:00      58858.5     58482.4   
2     LONG 2024-09-01 09:05:00 2024-09-01 15:47:00      58154.6     58237.9   
3    SHORT 2024-09-01 18:28:00 2024-09-01 21:20:00      58545.1     58201.6   
4     LONG 2024-09-01 22:23:00 2024-09-02 08:20:00      57724.1     58063.7   

     PnL  
0  158.2  
1 -376.1  
2   83.3  
3  343.5  
4  339.6  
