In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import numpy as np
import pandas as pd
from quantfreedom.enums import *
from quantfreedom.helper_funcs import dl_ex_candles
from quantfreedom.simulate import run_df_backtest, or_backtest
import plotly.graph_objects as go
from MACDandEMA import MACDandEMA
import re
from logging import getLogger

# Data

In [None]:
# Get the candles for the backtest
candles = dl_ex_candles(
    exchange= 'binance_us',
    symbol= 'BTCUSDT',
    timeframe = '15m',
    candles_to_dl = 10000,
)

In [None]:
backtest_settings = BacktestSettings()

exchange_settings = ExchangeSettings(
    asset_tick_step = 3,
    leverage_mode = 1,
    leverage_tick_step = 2,
    limit_fee_pct = 0.0003,
    market_fee_pct = 0.0006,
    max_asset_size = 100,
    max_leverage = 150,
    min_asset_size = 0.001,
    min_leverage = 1.0,
    mmr_pct = 0.004,
    position_mode = 3,
    price_tick_step = 1,
)

static_os = StaticOrderSettings(
    increase_position_type = IncreasePositionType.RiskPctAccountEntrySize,
    leverage_strategy_type = LeverageStrategyType.Dynamic,
    pg_min_max_sl_bcb = "min",
    sl_strategy_type = StopLossStrategyType.SLBasedOnCandleBody,
    sl_to_be_bool = False,
    starting_bar = 50,
    starting_equity = 1000.0,
    static_leverage = None,
    tp_fee_type = "limit",
    tp_strategy_type = TakeProfitStrategyType.RiskReward,
    trail_sl_bool = True,
    z_or_e_type = None,
)

dos_arrays = DynamicOrderSettingsArrays(
    # In Video, but not needed
    # max_equity_risk_pct=np.array([12]),
    max_trades = np.array([4]),
    # Different From Video
    account_pct_risk_per_trade = np.array([3]),
    risk_reward = np.array([5]),
    sl_based_on_add_pct = np.array([0.1, 0.25]),
    sl_based_on_lookback = np.array([20, 50]),
    sl_bcb_type = np.array([CandleBodyType.Low]),
    sl_to_be_cb_type = np.array([CandleBodyType.Nothing]),
    sl_to_be_when_pct = np.array([0]),
    trail_sl_bcb_type = np.array([CandleBodyType.Low]),
    trail_sl_by_pct = np.array([0.5, 1.0]),
    trail_sl_when_pct = np.array([1, 2]),
)

In [None]:
macd_strat = MACDandEMA(
    long_short = 'long',
    ema_length = np.arange(10, 101, 20),
    fast_length = np.arange(10, 61, 10),
    macd_below = np.arange(0, 51, 25),
    signal_smoothing = np.arange(5, 16, 5),
    slow_length = np.arange(90, 121, 30)
)

# Walk Forward

In [None]:
def walk_forward(
                 strategy,
                 candles, 
                 warmup_bars,
                 lookback_bars,
                 validation_bars,
                 practice_bars,
                 backtest_settings,
                 dos_arrays,
                 exchange_settings,
                 static_os,
                 pct_optimize,
                 variation_numbers
                ):
    
    logger = getLogger(__name__)
    
    backtest_equity = static_os.starting_equity
    starting_equity = backtest_equity
    
    # Create a dataframe that is going to store
    master_df = pd.DataFrame(columns=['iteration', 'dos_index', 'ind_set_idx', 'equity', 'pct_change'])
    
    # Initialize a dictionary to store the equity for each iteration
    equity_dict = {0: starting_equity * variation_numbers}

    data = pd.DataFrame(candles, columns=['Timestamp', 'Open', 'High', 'Low', 'Close', 'Volume'])
    data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='ms')
    data.set_index('Timestamp', inplace=True)
    
    # Get the timestamps from the index
    timestamp1 = data.index[0]
    timestamp2 = data.index[1]

    # Subtract the two timestamps
    interval = timestamp2 - timestamp1

    # Calculate the total number of seconds in a day
    seconds_in_a_day = 24 * 60 * 60

    # Calculate the number of intervals within a day
    intervals_in_a_day = seconds_in_a_day / interval.total_seconds()
    
    time_units = {
        'Y': 365 * intervals_in_a_day,  
        'M': 30 * intervals_in_a_day,   
        'W': 7 * intervals_in_a_day,    
        'D': intervals_in_a_day,        
        'H': intervals_in_a_day / 24,                  
    }
    
    # Convert bars to intervals
    warmup_bars = int(int(re.findall(r'\d+', warmup_bars)[0]) * time_units[re.findall(r'[A-Z]', warmup_bars)[0]])
    lookback_bars = int(int(re.findall(r'\d+', lookback_bars)[0]) * time_units[re.findall(r'[A-Z]', lookback_bars)[0]])
    validation_bars = int(int(re.findall(r'\d+', validation_bars)[0]) * time_units[re.findall(r'[A-Z]', validation_bars)[0]])
    practice_bars = int(int(re.findall(r'\d+', practice_bars)[0]) * time_units[re.findall(r'[A-Z]', practice_bars)[0]])
    
    bars_in_a_week = 7 * intervals_in_a_day
    
    weeks_in_training = lookback_bars / bars_in_a_week
    weeks_in_testing = validation_bars / bars_in_a_week
    
    optimize_pct_training = pct_optimize * weeks_in_training
    optimize_pct_testing = pct_optimize * weeks_in_testing
    
    for iteration, i in enumerate(range(lookback_bars + warmup_bars, len(candles) - practice_bars, practice_bars)):
        
        training_data = candles[i - lookback_bars - warmup_bars:i]
        validation_data = candles[i - warmup_bars: i + validation_bars]
        practice_data = candles[i + validation_bars - warmup_bars: i + validation_bars + practice_bars]
        
        
        if len(validation_data) != (validation_bars + warmup_bars) or len(practice_data) != (practice_bars + warmup_bars):
            equity_dict[iteration + 1] = equity_dict[iteration]
            continue

        backtest_training = run_df_backtest(
            backtest_settings=backtest_settings,
            candles=training_data,
            dos_arrays=dos_arrays,
            exchange_settings=exchange_settings,
            static_os=static_os,
            strategy=strategy
        )
        
        if backtest_training.empty:
            equity_dict[iteration + 1] = equity_dict[iteration]
            continue
        
        best_training = backtest_training[backtest_training['gains_pct'] > optimize_pct_training]
        dos_training_list = best_training['dos_index'].tolist()
        ind_set_training_list = best_training['ind_set_idx'].tolist()
        num_combinations = len(dos_training_list)
        
        if num_combinations == 0:
            equity_dict[iteration + 1] = equity_dict[iteration]
            continue
            
        validation_df = pd.DataFrame(columns=['dos_index', 'ind_set_idx', 'equity', 'pct_change'])
        
        for combination in range(num_combinations):
            try:
                validation_stats = or_backtest(
                    backtest_settings=backtest_settings,
                    candles=validation_data,
                    dos_arrays=dos_arrays,
                    exchange_settings=exchange_settings,
                    static_os=static_os,
                    strategy=strategy,
                    dos_index=int(dos_training_list[combination]),
                    ind_set_index=int(ind_set_training_list[combination]),
                    plot_results=False,
                    logger_bool=False,
                )
            except IndexError as e:
                logger.error(f"IndexError during backtest: {e}")
                raise
            except Exception as e:
                logger.error(f"An error occurred: {e}")
                raise
            
            if validation_stats.empty:
                continue
            
            equity = validation_stats['equity'].iloc[-1]
                
            pct_change = (equity - backtest_equity) / backtest_equity * 100
            if pct_change > optimize_pct_testing:
                # I want there to be at most 2 indicators indexes that are the same
                # If there are already two indexes that are the same, then I will add the next best index
                if len(validation_df[validation_df['ind_set_idx'] == int(ind_set_training_list[combination])]) < 2:
                    data = {
                        'dos_index': int(dos_training_list[combination]),
                        'ind_set_idx': int(ind_set_training_list[combination]),
                        'equity': equity,
                        'pct_change': pct_change
                    }
                    validation_df = pd.concat([validation_df, pd.DataFrame([data])], ignore_index=True)

        validation_df = validation_df.sort_values(by='pct_change', ascending=False)
        top_df = validation_df.head(variation_numbers)

        dos_validation_list = top_df['dos_index'].tolist()
        ind_set_validation_list = top_df['ind_set_idx'].tolist()
        num_combinations = len(dos_validation_list)
        
        if num_combinations == 0:
            equity_dict[iteration + 1] = equity_dict[iteration]
            continue
            
        real_df = pd.DataFrame(columns=['dos_index', 'ind_set_idx', 'equity', 'pct_change'])
        
        previous_equity = equity_dict[iteration]  # Get the equity from the previous iteration
        equity_per_combination = previous_equity / variation_numbers
        
        for combination in range(num_combinations):
            try:
                # If the amount of bars in practice_data is greater than the length of what practice data should be then, skip this iteration
                practice_stats = or_backtest(
                    backtest_settings=backtest_settings,
                    candles=practice_data,
                    dos_arrays=dos_arrays,
                    exchange_settings=exchange_settings,
                    static_os=static_os,
                    strategy=strategy,
                    dos_index=int(dos_validation_list[combination]),
                    ind_set_index=int(ind_set_validation_list[combination]),
                    plot_results=True,
                    logger_bool=False,
                )
            except IndexError as e:
                logger.error(f"IndexError during backtest: {e}")
                raise
            except Exception as e:
                logger.error(f"An error occurred: {e}")
                raise

            
            # If the outcome is NaN, the fill it with the starting equity
            practice_stats.fillna(equity_per_combination, inplace=True)
            
            if practice_stats.empty:
                # If one of the backtests is empty, then it means that no trade was used on that combination during the time
                equity = equity_per_combination
            else:
                equity = practice_stats['equity'].iloc[-1]
                
            pct_change = (equity - starting_equity) / starting_equity * 100
            updated_equity = equity_per_combination + (equity_per_combination * (pct_change / 100))
            
            data = {
                'dos_index': int(dos_validation_list[combination]),
                'ind_set_idx': int(ind_set_validation_list[combination]),
                'equity': updated_equity,
                'pct_change': pct_change
            }
            real_df = pd.concat([real_df, pd.DataFrame([data])], ignore_index=True)
            master_data = {
                'iteration': iteration,
                'dos_index': int(dos_validation_list[combination]),
                'ind_set_idx': int(ind_set_validation_list[combination]),
                'equity': updated_equity,
                'pct_change': pct_change
            }
            master_df = pd.concat([master_df, pd.DataFrame([master_data])], ignore_index=True)

        real_df = real_df.sort_values(by='pct_change', ascending=False)
        display(real_df)
        
        total_equity = real_df['equity'].sum() 
        equity_dict[iteration + 1] = total_equity  # Save the total equity for the current iteration
        
    display(master_df)
    
    # Create a graph that shows the equity curve over time
    fig = go.Figure()
    
    fig.add_trace(
        go.Scatter(
            x=list(equity_dict.keys()),
            y=list(equity_dict.values()),
            mode='lines',
            name='Equity Curve'
        )
    )
    
    fig.update_layout(height=1000, xaxis_rangeslider_visible=False)
    fig.update_yaxes(tickformat="$,")
    fig.show()


In [None]:
warmup_bars = '15H'
lookback_bars = '4W'
validation_bars = '2W'
practice_bars = '1W'
pct_optimize = 5
variation_numbers = 5

In [None]:
walk_forward(
    strategy=macd_strat,
    candles = candles,
    warmup_bars = warmup_bars,
    lookback_bars = lookback_bars,
    validation_bars = validation_bars,
    practice_bars=practice_bars,
    backtest_settings = backtest_settings,
    dos_arrays = dos_arrays,
    exchange_settings = exchange_settings,
    static_os = static_os,
    pct_optimize = pct_optimize,
    variation_numbers = variation_numbers
)

In [None]:
# Changed line 453 of simpulate.py to the following
# if strategy.entries[bar_index] and strategy.entries[bar_index + 1] == total_bars: