In [None]:
import pandas as pd
import numpy as np
import ccxt
import pandas_ta as ta
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import os
from IPython.display import display
import time

print(" 1. Libraries imported successfully.")

print("\n🔄 2. Preparing data...")
data_dir = './data'
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
h4_file = os.path.join(data_dir, 'btc_h4_data.csv')
h1_file = os.path.join(data_dir, 'btc_h1_data.csv')
m15_file = os.path.join(data_dir, 'btc_m15_data.csv')

symbol_input = 'BTC/USD'

if os.path.exists(h4_file) and os.path.exists(h1_file) and os.path.exists(m15_file):
    print("   Loading data from local CSV files...")
    df_h4 = pd.read_csv(h4_file, index_col='timestamp', parse_dates=True)
    df_h1 = pd.read_csv(h1_file, index_col='timestamp', parse_dates=True)
    df_m15 = pd.read_csv(m15_file, index_col=0, parse_dates=['timestamp'])
    print("   ✅ Data loaded successfully from cache.")

else:
    print("   Local data not found. Fetching from exchange...")
    df_h4 = pd.DataFrame(); df_h1 = pd.DataFrame(); df_m15 = pd.DataFrame()
    try:
        exchange = ccxt.coinbaseadvanced()
        exchange.proxies = {'http': None, 'https': None}
        start_date = "2022-01-01"; ccxt_symbol = symbol_input.replace('/', '-')
        print("   Fetching H1 data..."); since = exchange.parse8601(f"{start_date} 00:00:00Z"); all_ohlcv_h1 = []
        while True:
            ohlcv_chunk = exchange.fetch_ohlcv(ccxt_symbol, '1h', since, limit=300);
            if not ohlcv_chunk: break
            all_ohlcv_h1.extend(ohlcv_chunk); since = ohlcv_chunk[-1][0] + 1; time.sleep(exchange.rateLimit / 1000)
        df_h1 = pd.DataFrame(all_ohlcv_h1, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']); df_h1['timestamp'] = pd.to_datetime(df_h1['timestamp'], unit='ms'); df_h1.set_index('timestamp', inplace=True)
        print("   Resampling H1 data to create H4 data (Layar 1)...")
        agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
        df_h4 = df_h1.resample('4H', origin='start_day').agg(agg_dict).dropna()
        print("   Fetching M15 data (Layar 3)..."); since = exchange.parse8601(f"{start_date} 00:00:00Z"); all_ohlcv_m15 = []
        while True:
            ohlcv_chunk = exchange.fetch_ohlcv(ccxt_symbol, '15m', since, limit=300)
            if not ohlcv_chunk: break
            all_ohlcv_m15.extend(ohlcv_chunk); since = ohlcv_chunk[-1][0] + 1; time.sleep(exchange.rateLimit / 1000)
        df_m15 = pd.DataFrame(all_ohlcv_m15, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']); df_m15['timestamp'] = pd.to_datetime(df_m15['timestamp'], unit='ms')
        print("\n    Successfully fetched data.")
        print("   Saving data to CSV files for future runs...")
        df_h4.to_csv(h4_file); df_h1.to_csv(h1_file); df_m15.to_csv(m15_file)
        print("   Data caching complete.")
    except Exception as e:
        print(f"    ERROR during data fetching: {type(e).__name__} - {e}")

if not df_h4.empty and not df_h1.empty and not df_m15.empty:
    print("\n🛠️  3. Calculating indicators for all timeframes...")
    df_h4['EMA_55'] = ta.ema(df_h4['close'], length=55)
    df_h4['EMA_200'] = ta.ema(df_h4['close'], length=200)
    
    df_h1['RSI_14'] = ta.rsi(df_h1['close'], length=14); df_h1['ATR_14'] = ta.atr(df_h1['high'], df_h1['low'], df_h1['close'], length=14)
    peak_indices, _ = find_peaks(df_h1['high'], distance=5, prominence=df_h1['ATR_14'].mean()); trough_indices, _ = find_peaks(-df_h1['low'], distance=5, prominence=df_h1['ATR_14'].mean())
    df_h1['swing_high'] = np.nan; df_h1.loc[df_h1.index[peak_indices], 'swing_high'] = df_h1['high'].iloc[peak_indices]
    df_h1['swing_low'] = np.nan; df_h1.loc[df_h1.index[trough_indices], 'swing_low'] = df_h1['low'].iloc[trough_indices]
    
    df_m15['EMA_9'] = ta.ema(df_m15['close'], length=9)
    df_m15['EMA_21'] = ta.ema(df_m15['close'], length=21)
    df_m15['RSI_14'] = ta.rsi(df_m15['close'], length=14)
    df_m15['ATR_14'] = ta.atr(df_m15['high'], df_m15['low'], df_m15['close'], length=14)
    df_m15.ta.macd(append=True)
    df_m15['volume_sma_20'] = df_m15['volume'].rolling(window=20).mean()
    
    df_h4.dropna(inplace=True); df_h1.dropna(inplace=True, subset=['RSI_14']); df_m15.dropna(inplace=True)
    print("   Indicators calculated successfully.")

    print("\n  4. Running Dual Strategy backtesting (w/ Momentum RSI Filter)...")
    trades = []; in_position = False
    position_type = None; entry_price = 0; stop_loss = 0; entry_timestamp = None; trade_reason = None
    highest_price_since_entry = 0; lowest_price_since_entry = float('inf'); TRAIL_ATR_MULTIPLIER = 2.5
    waiting_for_trigger = False; trigger_direction = None

    for i in range(1, len(df_m15)):
        current_time = df_m15['timestamp'].iloc[i]; current_atr = df_m15['ATR_14'].iloc[i]
        if in_position:
            if position_type == 'BUY':
                highest_price_since_entry = max(highest_price_since_entry, df_m15['high'].iloc[i])
                new_stop_loss = highest_price_since_entry - (TRAIL_ATR_MULTIPLIER * current_atr)
                stop_loss = max(stop_loss, new_stop_loss)
                if df_m15['low'].iloc[i] <= stop_loss:
                    result = 'win' if stop_loss > entry_price else 'loss'
                    trades.append({'entry_timestamp': entry_timestamp, 'exit_timestamp': current_time, 'type': position_type, 'entry_price': entry_price, 'exit_price': stop_loss, 'result': result, 'reason': trade_reason})
                    in_position = False; waiting_for_trigger = False
            elif position_type == 'SELL':
                lowest_price_since_entry = min(lowest_price_since_entry, df_m15['low'].iloc[i])
                new_stop_loss = lowest_price_since_entry + (TRAIL_ATR_MULTIPLIER * current_atr)
                stop_loss = min(stop_loss, new_stop_loss)
                if df_m15['high'].iloc[i] >= stop_loss:
                    result = 'win' if stop_loss < entry_price else 'loss'
                    trades.append({'entry_timestamp': entry_timestamp, 'exit_timestamp': current_time, 'type': position_type, 'entry_price': entry_price, 'exit_price': stop_loss, 'result': result, 'reason': trade_reason})
                    in_position = False; waiting_for_trigger = False
        
        if not in_position:
            h4_check = df_h4[df_h4.index < current_time]; h1_check = df_h1[df_h1.index < current_time]
            if h4_check.empty or h1_check.empty: continue
            
            is_uptrend = h4_check['close'].iloc[-1] > h4_check['EMA_55'].iloc[-1]
            pullback_signal = False
            if is_uptrend:
                last_highs = h1_check[h1_check['swing_high'].notna()]; last_lows = h1_check[h1_check['swing_low'].notna()]
                if not last_highs.empty and not last_lows.empty:
                    last_high_idx = last_highs.index[-1]; relevant_lows = last_lows[last_lows.index < last_high_idx]
                    if not relevant_lows.empty:
                        last_low_idx = relevant_lows.index[-1]
                        sh, sl = h1_check['swing_high'].loc[last_high_idx], h1_check['swing_low'].loc[last_low_idx]
                        if sh > sl:
                            f_0618 = sh - (sh - sl) * 0.618
                            if h1_check['close'].iloc[-1] <= f_0618 and h1_check['RSI_14'].iloc[-1] < 45:
                                pullback_signal = True; trigger_direction = 'BUY'
            else:
                last_highs = h1_check[h1_check['swing_high'].notna()]; last_lows = h1_check[h1_check['swing_low'].notna()]
                if not last_highs.empty and not last_lows.empty:
                    last_low_idx = last_lows.index[-1]; relevant_highs = last_highs[last_highs.index < last_low_idx]
                    if not relevant_highs.empty:
                        last_high_idx = relevant_highs.index[-1]
                        sh, sl = h1_check['swing_high'].loc[last_high_idx], h1_check['swing_low'].loc[last_low_idx]
                        if sh > sl:
                            f_0618 = sl + (sh - sl) * 0.618
                            if h1_check['close'].iloc[-1] >= f_0618 and h1_check['RSI_14'].iloc[-1] > 55:
                                pullback_signal = True; trigger_direction = 'SELL'
            
            if pullback_signal:
                ema_cross_up = df_m15['close'].iloc[i] > df_m15['EMA_21'].iloc[i] and df_m15['close'].iloc[i-1] <= df_m15['EMA_21'].iloc[i-1]
                macd_state_bullish = df_m15['MACD_12_26_9'].iloc[i] > df_m15['MACDs_12_26_9'].iloc[i]
                volume_confirm = df_m15['volume'].iloc[i] > df_m15['volume_sma_20'].iloc[i]
                if trigger_direction == 'BUY' and ema_cross_up and macd_state_bullish and volume_confirm:
                    in_position = True; position_type = 'BUY'; entry_price = df_m15['open'].iloc[i]; entry_timestamp = current_time; trade_reason = 'Pullback'
                    stop_loss = entry_price - (2.0 * current_atr); highest_price_since_entry = df_m15['high'].iloc[i]
                    continue

            strong_uptrend = h4_check['close'].iloc[-1] > h4_check['EMA_55'].iloc[-1] and h4_check['EMA_55'].iloc[-1] > h4_check['EMA_200'].iloc[-1]
            momentum_buy_trigger = df_m15['EMA_9'].iloc[i] > df_m15['EMA_21'].iloc[i] and df_m15['EMA_9'].iloc[i-1] <= df_m15['EMA_21'].iloc[i-1]
            rsi_momentum_confirm = df_m15['RSI_14'].iloc[i] > 60
            
            if strong_uptrend and momentum_buy_trigger and rsi_momentum_confirm:
                in_position = True; position_type = 'BUY'; entry_price = df_m15['open'].iloc[i]; entry_timestamp = current_time; trade_reason = 'Momentum'
                stop_loss = entry_price - (2.0 * current_atr); highest_price_since_entry = df_m15['high'].iloc[i]
                continue
            
            strong_downtrend = h4_check['close'].iloc[-1] < h4_check['EMA_55'].iloc[-1] and h4_check['EMA_55'].iloc[-1] < h4_check['EMA_200'].iloc[-1]
            momentum_sell_trigger = df_m15['EMA_9'].iloc[i] < df_m15['EMA_21'].iloc[i] and df_m15['EMA_9'].iloc[i-1] >= df_m15['EMA_21'].iloc[i-1]
            rsi_momentum_confirm_sell = df_m15['RSI_14'].iloc[i] < 40

            if strong_downtrend and momentum_sell_trigger and rsi_momentum_confirm_sell:
                in_position = True; position_type = 'SELL'; entry_price = df_m15['open'].iloc[i]; entry_timestamp = current_time; trade_reason = 'Momentum'
                stop_loss = entry_price + (2.0 * current_atr); lowest_price_since_entry = df_m15['low'].iloc[i]
                continue

    print("   Backtesting simulation complete.")

    print("\n 5. Analyzing performance and creating plot...")
    if not trades:
        print("   No trades were executed.")
    else:
        trades_df = pd.DataFrame(trades)
        total_trades = len(trades_df); wins = len(trades_df[trades_df['result'] == 'win'])
        win_rate = (wins / total_trades) * 100 if total_trades > 0 else 0
        trades_df['pnl'] = np.where(trades_df['type'] == 'BUY', trades_df['exit_price'] - trades_df['entry_price'], trades_df['entry_price'] - trades_df['exit_price'])
        gross_profit = trades_df[trades_df['pnl'] > 0]['pnl'].sum(); gross_loss = abs(trades_df[trades_df['pnl'] < 0]['pnl'].sum())
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf
        
        print("\n--- Dual Strategy Backtest Results ---"); print(f"Total Trades: {total_trades}"); print(f"Win Rate: {win_rate:.2f}%"); print(f"Profit Factor: {profit_factor:.2f}"); print("------------------------\n")
        if 'reason' in trades_df.columns:
            print("Trades by Strategy:")
            print(trades_df['reason'].value_counts())
        
        plt.style.use('dark_background'); fig, ax = plt.subplots(figsize=(20, 10))
        ax.plot(df_m15['timestamp'], df_m15['close'], label='Close Price (M15)', color='deepskyblue', alpha=0.5)
        
        pullback_trades = trades_df[trades_df['reason'] == 'Pullback']
        momentum_trades = trades_df[trades_df['reason'] == 'Momentum']

        ax.plot(pullback_trades['entry_timestamp'], pullback_trades['entry_price'], 'o', color='lime', markersize=10, label='Pullback Entry (Win)' , linestyle = 'None')
        ax.plot(momentum_trades['entry_timestamp'], momentum_trades['entry_price'], '^', color='cyan', markersize=10, label='Momentum Entry (Win)' , linestyle = 'None')

        for _, trade in trades_df.iterrows():
            color = 'lime' if trade['result'] == 'win' else 'red'
            ax.plot(trade['exit_timestamp'], trade['exit_price'], 'x', color=color, markersize=7)
        
        handles, labels = plt.gca().get_legend_handles_labels(); by_label = dict(zip(labels, handles)); ax.legend(by_label.values(), by_label.keys())
        ax.set_title(f'Dual Strategy Backtest for {symbol_input} (w/ Momentum RSI Filter)'); ax.set_ylabel('Price (USD)'); ax.set_xlabel('Date'); plt.grid(alpha=0.2); plt.show()
else:
    print("\n Skipping backtest because data fetching failed.")