In [None]:
from backtesting import Backtest, Strategy
import pandas as pd
from backtesting.lib import crossover, plot_heatmaps, resample_apply
import seaborn as sns
import matplotlib.pyplot as plt
import mpld3
import numpy as np
import time
from talib import CDLENGULFING, ADX, CCI, ATR
from DataPaths import data_paths

In [None]:
class EngulfingWithADX_CCI(Strategy):
    '''
    ******* STRAT *******
    Strategy Summary
    This is a mean-reversion strategy designed to trade in low-trend environments using technical indicators from TA-Lib. 
    It combines ADX, CCI, and Candlestick Engulfing Patterns to identify potential reversal points.

    ADX Filter: Only takes trades when ADX is below 20, signaling a weak trend (ranging market).

    Entry Signal (Long):

    CCI below -100 (indicating oversold conditions)

    Bullish Engulfing Pattern detected

    No current open position
    → Triggers a Buy

    Exit Signal:

    CCI above +100 (indicating overbought conditions)

    Bearish Engulfing Pattern detected

    Position is open
    → Closes the position
    '''

    # Class-level variables (default values for the strategy)
    adx_threshold = 26  # ADX threshold for low-trend environments
    cci_overbought = 50  # CCI overbought level
    cci_oversold = -65  # CCI oversold level
    SL = 50  # Fixed stop-loss in pips/handles
    TP_R = 3  # Risk-to-reward multiplier for take-profit

    def init(self):
        # Indicators
        self.engulfing = self.I(CDLENGULFING, self.data.Open, self.data.High, self.data.Low, self.data.Close)
        self.adx = self.I(ADX, self.data.High, self.data.Low, self.data.Close, timeperiod=14)
        self.cci = self.I(CCI, self.data.High, self.data.Low, self.data.Close, timeperiod=14)

        # Instance-level variables
        self.entry_price = None
        self.stop_loss = None
        self.take_profit = None
        self.trade_count = 0

    def next(self):
        adx_value = self.adx[-1]
        cci_value = self.cci[-1]
        engulf = self.engulfing[-1]
        price = self.data.Close[-1]

        # Only trade in low-trending conditions
        if adx_value < self.adx_threshold:

            # Long Entry
            if cci_value < self.cci_oversold and engulf == 100 and not self.position:
                self.buy()
                self.entry_price = price
                self.stop_loss = price - self.SL
                self.take_profit = price + self.SL * self.TP_R
                self.trade_count += 1  # Increment trade count

            # Short Entry
            elif cci_value > self.cci_overbought and engulf == -100 and not self.position:
                self.sell()
                self.entry_price = price
                self.stop_loss = price + self.SL
                self.take_profit = price - self.SL * self.TP_R
                self.trade_count += 1  # Increment trade count

        # If position is open, check exit conditions
        if self.position:
            if self.position.is_long:
                # Exit if price hits stop-loss or take-profit
                if price <= self.stop_loss or price >= self.take_profit:
                    self.position.close()
                    self.entry_price = None
                    self.stop_loss = None
                    self.take_profit = None

            elif self.position.is_short:
                # Exit if price hits stop-loss or take-profit
                if price >= self.stop_loss or price <= self.take_profit:
                    self.position.close()
                    self.entry_price = None
                    self.stop_loss = None
                    self.take_profit = None
    ()


In [None]:
def load_and_prepare_data(file_path, start_date, end_date):
    # Load CSV
    data = pd.read_csv(file_path, parse_dates=['Time'], index_col='Time')
    # Ensure index is datetime
    data.index = pd.to_datetime(data.index)

    # Print column names to verify
    print("Columns in the CSV file:", data.columns)

    # Select required columns
    data = data[['Open', 'High', 'Low', 'Close', 'Volume']].copy()

    # Remove duplicate indexes
    if data.index.duplicated().any():
        print("Duplicate indexes found. Removing duplicates.")
        data = data[~data.index.duplicated(keep='first')]

    # Check for NaN values
    print("Checking for NaN values in the data:")
    print(data.isna().sum())

    # Drop NaN values
    data = data.dropna()

    # Reduce the number of data points to a specific date range
    data = data.loc[start_date:end_date]

    print(f"Number of data points after reduction: {len(data)}")

    return data


In [None]:
if __name__ == '__main__':

    file_path = data_paths["XAU"]["M5"]
    
    start_date = '2024-01-01'
    end_date = '2025-01-01'

    data = load_and_prepare_data(file_path, start_date, end_date)

    print("loading backtest results.....")
    
    bt = Backtest(data, EngulfingWithADX_CCI, cash=1_000_000, commission=.0002)
    
    stats = bt.run()
    print(stats)
    bt.plot()
    

In [6]:
# Save the trades to a CSV file in a folder named "results"
import os

# Create the folder if it doesn't exist
output_folder = "results"
os.makedirs(output_folder, exist_ok=True)

# Define the full file path
output_file = os.path.join(output_folder, "trades.csv")

# Save the DataFrame to CSV
stats["_trades"].to_csv(output_file, index=False)  # index=False to skip row numbers in CSV
