In [4]:
# =================== IMPORTS ======================
import pandas as pd
import numpy as np
import os
from tqdm import tqdm
from math import isfinite

# =================== USER SETTINGS ======================
# CSV files (H1 and H4 already prepared)
H1_CSV = "EURUSDH1.csv"
H4_CSV = "EURUSDH4.csv"
OUTPUT_DIR = "C:/Users/yaman/OneDrive/سطح المكتب/project1/strategys/trend following +breakout&reset + swing trading/OUTPUT_DIR"
INITIAL_CAPITAL = 10000.0

# Position sizing
POSITION_SIZE_PCT = 0.02  # 2% of capital per trade
TP_PCT = 0.004  # 0.4%
SL_PCT = 0.002  # 0.2%
SPREAD_COST_PIPS = 0.5
PIP_SIZE = 0.0001
SPREAD_COST = SPREAD_COST_PIPS * PIP_SIZE

# Indicators
EMA_FAST = 50
EMA_SLOW = 200
DONCH_N = 20
RSI_PERIOD = 14
SWING_LEFT = 3
SWING_RIGHT = 3

# =================== HELPER FUNCTIONS ======================
def ema(series, period):
    return series.ewm(span=period, adjust=False).mean()

def donchian_high(series, n):
    return series.rolling(window=n, min_periods=1).max().shift(1)

def donchian_low(series, n):
    return series.rolling(window=n, min_periods=1).min().shift(1)

def rsi(series, period=14):
    delta = series.diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    ma_up = up.ewm(com=(period-1), adjust=False).mean()
    ma_down = down.ewm(com=(period-1), adjust=False).mean()
    rs = ma_up / ma_down
    return 100 - (100 / (1 + rs))

def detect_pivot_lows(df, left=3, right=3):
    lows = df['Low'].values
    n = len(lows)
    res = np.zeros(n, dtype=bool)
    for i in range(left, n - right):
        if lows[i] < lows[i-left:i].min() and lows[i] < lows[i+1:i+1+right].min():
            res[i] = True
    return pd.Series(res, index=df.index)

def detect_pivot_highs(df, left=3, right=3):
    highs = df['High'].values
    n = len(highs)
    res = np.zeros(n, dtype=bool)
    for i in range(left, n - right):
        if highs[i] > highs[i-left:i].max() and highs[i] > highs[i+1:i+1+right].max():
            res[i] = True
    return pd.Series(res, index=df.index)

# =================== CREATE OUTPUT DIR ======================
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# =================== LOAD DATA ======================
print("Loading H1 and H4 data...")
df_h1 = pd.read_csv(H1_CSV, parse_dates=['DateTime']).sort_values('DateTime').set_index('DateTime')
df_h4 = pd.read_csv(H4_CSV, parse_dates=['DateTime']).sort_values('DateTime').set_index('DateTime')

# =================== CALCULATE INDICATORS ======================
# H1
df_h1['EMA_fast'] = ema(df_h1['Close'], EMA_FAST)
df_h1['EMA_slow'] = ema(df_h1['Close'], EMA_SLOW)
df_h1['DONCH_HIGH'] = donchian_high(df_h1['High'], DONCH_N)
df_h1['DONCH_LOW'] = donchian_low(df_h1['Low'], DONCH_N)
df_h1['RSI'] = rsi(df_h1['Close'], RSI_PERIOD)
df_h1['pivot_low'] = detect_pivot_lows(df_h1, SWING_LEFT, SWING_RIGHT)
df_h1['pivot_high'] = detect_pivot_highs(df_h1, SWING_LEFT, SWING_RIGHT)

# H4
df_h4['EMA_fast'] = ema(df_h4['Close'], EMA_FAST)
df_h4['EMA_slow'] = ema(df_h4['Close'], EMA_SLOW)

# =================== MERGE H4 ON H1 ======================
merged = pd.merge_asof(df_h1.reset_index(), df_h4[['EMA_fast','EMA_slow','Close']].reset_index(),
                       on='DateTime', direction='backward', suffixes=('','_H4')).set_index('DateTime')

# =================== ENTRY CONDITIONS ======================
# Long: H4 trend up, H1 breakout high, pivot low
merged['long_condition'] = ((merged['EMA_fast_H4'] > merged['EMA_slow_H4']) &
                            (merged['Close_H4'] > merged['EMA_fast_H4']) &
                            (merged['Close'] > merged['DONCH_HIGH']) &
                            (merged['pivot_low'])).astype(int)

# Short: H4 trend down, H1 breakout low, pivot high
merged['short_condition'] = ((merged['EMA_fast_H4'] < merged['EMA_slow_H4']) &
                             (merged['Close_H4'] < merged['EMA_fast_H4']) &
                             (merged['Close'] < merged['DONCH_LOW']) &
                             (merged['pivot_high'])).astype(int)

# =================== BACKTEST LOOP ======================
print("Starting backtest simulation...")
cash = INITIAL_CAPITAL
in_position = False
position_type = None
position = None
equity_curve, timestamps, trades = [], [], []

for i, (dt, row) in tqdm(enumerate(merged.iterrows()), total=len(merged)):
    o, h, l, c = row['Open'], row['High'], row['Low'], row['Close']

    if in_position:
        entry_price = position['entry_price']
        tp = position['tp']
        sl = position['sl']
        shares = position['shares']
        exit_reason = None
        exit_price = None

        if position_type == "long":
            if h >= tp:
                exit_price = tp
                exit_reason = 'TP'
            elif l <= sl:
                exit_price = sl
                exit_reason = 'SL'
        elif position_type == "short":
            if l <= tp:
                exit_price = tp
                exit_reason = 'TP'
            elif h >= sl:
                exit_price = sl
                exit_reason = 'SL'

        if exit_reason:
            pnl = (exit_price - entry_price) * shares if position_type == "long" else (entry_price - exit_price) * shares
            cash += shares * exit_price
            trades.append({
                'entry_date': position['entry_date'],
                'exit_date': dt,
                'type': position_type,
                'entry_price': entry_price,
                'exit_price': exit_price,
                'pnl': pnl,
                'pnl_pct': pnl / (entry_price * shares) * 100,
                'exit_reason': exit_reason
            })
            in_position = False
            position = None

    if not in_position:
        alloc = cash * POSITION_SIZE_PCT
        shares = alloc / o if alloc > 0 else 0

        if row['long_condition']:
            entry_price = o + SPREAD_COST
            tp = entry_price * (1 + TP_PCT)
            sl = entry_price * (1 - SL_PCT)
            cash -= shares * entry_price
            in_position = True
            position_type = "long"
            position = {'entry_date': dt, 'entry_price': entry_price, 'tp': tp, 'sl': sl, 'shares': shares}

        elif row['short_condition']:
            entry_price = o - SPREAD_COST
            tp = entry_price * (1 - TP_PCT)
            sl = entry_price * (1 + SL_PCT)
            cash -= shares * entry_price
            in_position = True
            position_type = "short"
            position = {'entry_date': dt, 'entry_price': entry_price, 'tp': tp, 'sl': sl, 'shares': shares}

    equity = cash + (position['shares'] * c if in_position else 0)
    equity_curve.append(equity)
    timestamps.append(dt)

# Close any remaining position at last bar
if in_position:
    final_price = merged['Close'].iloc[-1]
    shares = position['shares']
    pnl = (final_price - position['entry_price']) * shares if position_type=="long" else (position['entry_price'] - final_price) * shares
    cash += shares * final_price
    trades.append({
        'entry_date': position['entry_date'],
        'exit_date': merged.index[-1],
        'type': position_type,
        'entry_price': position['entry_price'],
        'exit_price': final_price,
        'pnl': pnl,
        'pnl_pct': pnl / (position['entry_price'] * shares) * 100,
        'exit_reason': 'EOF'
    })

# =================== RESULTS ======================
trades_df = pd.DataFrame(trades)
total_trades = len(trades_df)
wins = trades_df['pnl'].gt(0).sum() if total_trades>0 else 0
losses = total_trades - wins
win_rate = wins/total_trades*100 if total_trades>0 else 0
net_pnl = trades_df['pnl'].sum() if total_trades>0 else 0
final_equity = equity_curve[-1] if equity_curve else INITIAL_CAPITAL

print("\n=== Backtest Summary ===")
print(f"Total Trades: {total_trades}")
print(f"Winning Trades: {wins}")
print(f"Losing Trades: {losses}")
print(f"Win Rate: {win_rate:.2f}%")
print(f"Net P&L: {net_pnl:.2f}")
print(f"Final Equity: {final_equity:.2f}")

# Save results
if total_trades>0:
    trades_df.to_csv(os.path.join(OUTPUT_DIR, "eurusd_all_trades.csv"), index=False)
    trades_df[trades_df['pnl']>0].to_csv(os.path.join(OUTPUT_DIR, "eurusd_winners.csv"), index=False)
    trades_df[trades_df['pnl']<=0].to_csv(os.path.join(OUTPUT_DIR, "eurusd_losers.csv"), index=False)

pd.DataFrame({'DateTime': timestamps, 'Equity': equity_curve}).to_csv(os.path.join(OUTPUT_DIR, "eurusd_equity_curve.csv"), index=False)

print("Backtest completed successfully.")


Loading H1 and H4 data...
Starting backtest simulation...


100%|█████████████████████████████████████████████████████████████████████████| 17523/17523 [00:01<00:00, 15405.97it/s]



=== Backtest Summary ===
Total Trades: 30
Winning Trades: 24
Losing Trades: 6
Win Rate: 80.00%
Net P&L: 16.80
Final Equity: 9996.80
Backtest completed successfully.
