In [4]:
'''
STRATEGY 1: Literature based
'''

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from backtesting import Backtest, Strategy
from datetime import datetime
import talib
import joblib
import matplotlib.pyplot as plt

# Load data
data = pd.read_csv('./EURUSD_D1.csv')
data['Time'] = pd.to_datetime(data['Time'], format='%Y-%m-%d %H:%M:%S')
data.set_index('Time', inplace=True)


#### Feature Engineering ####
def multivariateFeatureEngineering(data):
    
    #Trend following Indicators:

    #SMA - identofy long term trend
    data['50_sma'] = data['Close'].rolling(window=50).mean() 
    data['200_sma'] = data['Close'].rolling(window=200).mean() 

    #EMA - trend analysis: more weight applied to recent points
    data['50_ema'] = data['Close'].ewm(span=50, adjust=False).mean()
    data['100_ema'] = data['Close'].ewm(span=100, adjust=False).mean()

    #MACD
    data['12_ema'] = data['Close'].ewm(span=12, adjust=False).mean()
    data['26_ema'] = data['Close'].ewm(span=26, adjust=False).mean()

    data['MACD_line'] = data['12_ema']-data['26_ema'] # calculate the MACD line
    data['Signal_line'] = data['MACD_line'].ewm(span=9, adjust=False).mean() # 9-preiod ema signal calculated from the Macdline

    #ADX
    # Calculate ADX using TA-Lib (14-period by default)
    data['ADX'] = talib.ADX(data['High'], data['Low'], data['Close'], timeperiod=14)

    #Momentum indicators:

    #RSI - 14-period
    data['RSI'] = talib.RSI(data['Close'], timeperiod=14)
 
    #Stochastic Oscillator
    data['stoch_k'], data['stoch_d'] = talib.STOCH(data['High'], data['Low'], data['Close'], 
                                                fastk_period=14, slowk_period=3, slowd_period=3)

    #Volatility indicators#:

    #ATR -Default period for ATR is 14
    data['ATR'] = talib.ATR(data['High'], data['Low'], data['Close'], timeperiod=14)
  
    data = data.dropna() # drop rows that have NA

    #drop certain featires
    data = data.drop(columns=['12_ema', '26_ema'])

    return data

###### Generate lag feaures #######
def multivariateFeatureLagMultiStep(data, n_past, future_steps, target_column):
    features = []
    response = []

    max_future_step = max(future_steps)
    num_features = data.shape[1]
    group_feature_lags =  1 # change grouping of lagged features

    # Adjust the loop to prevent index out of bounds
    for i in range(n_past, len(data) - max_future_step + 1):

        if group_feature_lags==1:
                
            lagged_features = []

            for feature_idx in range(num_features):
                feature_lags = data.iloc[i - n_past:i, feature_idx].values 
                lagged_features.extend(feature_lags) 

        elif group_feature_lags==0:
            features.append(data.iloc[i - n_past:i, :].values)  # Take all columns as features

        # Use .iloc for integer-based indexing and .values to get a NumPy array

        if group_feature_lags==1:
            features.append(lagged_features)

        # Extract the target values at specified future steps using .iloc
        response.append([data.iloc[i + step - 1, target_column] for step in future_steps])

    # Convert lists to NumPy arrays after the loop
    features = np.array(features)  # Shape: (num_samples, n_past, num_features)
    response = np.array(response)  # Shape: (num_samples, len(future_steps))

    # Flatten the features to 2D array: (num_samples, n_past * num_features)
    features_flat = features.reshape(features.shape[0], -1)

    return features_flat, response


############################# Load saved Best model information ##################################################

best_model_info= [1, 1, ['Open', 'Low', 'sma_50', 'sma_200', 'ema_50', 'MACD_line', 'Signal_line', 'RSI', 'ATR', 'ema_100', 'High', 'Close']]

best_model_data = joblib.load('./model _weights/best_model_weights_and_scaler.pkl')
scaler = best_model_data['scaler']
weights = best_model_data['weights']
bias = best_model_data['bias']

lookback_window = best_model_info[0]
features = best_model_info[2]

data = multivariateFeatureEngineering(data) # generate additional features
#rename columns 
data = data.rename(columns={
    '50_sma': 'sma_50',
    '200_sma': 'sma_200',
    '50_ema': 'ema_50',
    '100_ema': 'ema_100'

})

################################# # Backtesting Strategy using Linear Regression #########################################

class LinearRegressionStrategy(Strategy):
    #############  invoked before the strategy is run #################
    '''
    one ideally precomputes in efficient, vectorized manner whatever indicators and signals the strategy depends on
    # This would be where the forecast are done (Unless want it to be done on each candles avalailability)
    
    '''
    position_size = 0.1
    short_sl_multiplier =1.015 
    short_tp_multiplier = 0.97 
    long_tp_multiplier = 1.02
    long_sl_multiplier = 0.99

    def init(self):
        self.index = 1         # Track positions and predictions
        self.test_data_subset = self.data.df[features]        # Prepare test data subset with the best selected features
        self.Trend = 0
        self.forecast_data = []  # Store forecasts for plotting

    ''' Issue is that this is not dynamic '''
    def forecast_prices(self, data_window):

        """Forecast prices for 1-day, 3-day, and 5-day horizons."""
        num_features = data_window.shape[1]         # Get the number of features

        lagged_features = []         # Flatten the data with grouping by feature type

        for feature_idx in range(num_features):
            feature_lags = data_window.iloc[:, feature_idx].values             # Extract all lagged values for the current feature

            lagged_features.extend(feature_lags)

        X = np.array(lagged_features).reshape(1, -1)         # Convert to a NumPy array and reshape to fit the model's input (flattening)


        X_scaled = scaler.transform(X)         # Scale the flattened features using the trained scaler

        # Forecast using the model's weights and bias
        predictions = np.dot(X_scaled, weights) + bias  # Shape: (1, 3)

        return predictions[0]  #    
    
    ######### Strategy Exectution #############
    def validate_order(self, trade_type, order_price, sl, tp):
        """
        Validates the order to ensure the correct relationship between TP, order price, and SL.
        - For long: TP > order price > SL
        - For short: TP < order price < SL
        """

        adjust_order =  0
        if trade_type == 'long':
            adjust_order = 1
            if not (tp > order_price > sl):
                print(f"CHECK: Long orders require: TP ({tp}) > Order ({order_price}) > SL ({sl})")
                return adjust_order 

        elif trade_type == 'short':
            adjust_order = 2
            if not (tp < order_price < sl):
                print(f"CHECK: Short orders require: TP ({tp}) < Order ({order_price}) < SL ({sl})")

                return adjust_order

    
    def adjust_order_limits(self, trade_type, order_price, sl, tp):
        """
        Adjusts SL or TP to ensure the correct relationship between TP, order price, and SL.
        - For long: TP > order price > SL
        - For short: TP < order price < SL
        """
        if trade_type == 'long':
            if tp > order_price:
                tp  =tp 
            elif tp < order_price:
                tp = order_price + 0.016  # Small buffer to meet the criteria
            
            if sl < order_price:
                sl =  sl
            elif sl > order_price:
                sl =  order_price - 0.008
          
        elif trade_type == 'short':
            if sl > order_price:
                sl  = sl 
            elif sl < order_price:
                sl = order_price + 0.008

            if tp < order_price:
                tp = tp - 0.01

            elif tp > order_price:
                tp  =  order_price - 0.016
           
        return sl, tp
    
    '''
    iteratively called by the Backtest instance, once for each data point (data frame row), simulating the incremental availability 
    each new full candlestick
    - orders are executed on the current candle's close (trade_on_close= True) 
    - self.data loads individual candles iteratively (not a normal dataframe), but an internal object
    - each object should be accessed individually
    '''
    def next(self):
        if self.index >= lookback_window:
            # Get the latest data window
            data_window = self.test_data_subset.iloc[self.index - lookback_window + 1 : self.index + 1]
         
            # Forecast prices for 1-day, 3-day, and 5-day horizons
            forecast_1d, forecast_3d, forecast_5d = self.forecast_prices(data_window)
            current_close = self.data.Close[self.index]

             # Store the forecasts for plotting later
            self.forecast_data.append({
                'index': self.index,
                'forecast_1d': forecast_1d,
                'forecast_3d': forecast_3d,
                'forecast_5d': forecast_5d,
                'current_close': current_close
            })

            # Condition for long trade: Forecasts show upward trend
            if   forecast_1d > current_close:
                if not self.position.is_long:
                    self.position.close()
                    self.buy(size=self.position_size, sl=self.long_sl_multiplier*current_close, tp=self.long_tp_multiplier*current_close)
 
            else:
                if not self.position.is_short:
                    self.position.close()
                    self.sell(size=self.position_size, sl=self.short_sl_multiplier*current_close, tp=self.short_tp_multiplier*current_close)

        self.index += 1

    def plot_forecasts(self):
            """Plot the forecasted values alongside actual prices."""
            indexes = [f['index'] for f in self.forecast_data]
            actual_prices = [f['current_close'] for f in self.forecast_data]
            forecast_1d = [f['forecast_1d'] for f in self.forecast_data]
            forecast_3d = [f['forecast_3d'] for f in self.forecast_data]
            forecast_5d = [f['forecast_5d'] for f in self.forecast_data]

            plt.figure(figsize=(10, 6))
            plt.plot(indexes, actual_prices, label='Actual Price', color='black', linestyle='--')
            plt.plot(indexes, forecast_1d, label='Forecast 1D', color='blue')
            plt.plot(indexes, forecast_3d, label='Forecast 3D', color='green')
            plt.plot(indexes, forecast_5d, label='Forecast 5D', color='red')

            plt.xlabel('Index')
            plt.ylabel('Price')
            plt.title('Forecasted Values vs Actual Prices')
            plt.legend()
            plt.show()

test_data = data.iloc[-481:]  # This assumes X_test is at the end of the dataset
bt = Backtest(test_data, LinearRegressionStrategy, cash =10000, commission=0, margin=.05, trade_on_close=True)
stats = bt.run()
# stats = bt.optimize(short_sl_multiplier=[1.01, 1.015, 1.017,1.02], long_sl_multiplier= [0.98,0.985, 0.99, 0.995], maximize='Equity Final [$]')

strategy_instance = stats._strategy  # Retrieve the strategy object

best_params = stats._strategy._params
print(best_params)

bt.plot() # trade execution

stats # trade statistics



{}


Start                     2023-03-30 00:00:00
End                       2024-10-11 00:00:00
Duration                    561 days 00:00:00
Exposure Time [%]                     99.5842
Equity Final [$]                 11959.685385
Equity Peak [$]                  12377.812798
Return [%]                          19.596854
Buy & Hold Return [%]                0.286131
Return (Ann.) [%]                    9.829256
Volatility (Ann.) [%]               12.194345
Sharpe Ratio                          0.80605
Sortino Ratio                        1.357474
Calmar Ratio                         1.059638
Max. Drawdown [%]                   -9.276052
Avg. Drawdown [%]                   -1.821305
Max. Drawdown Duration      318 days 00:00:00
Avg. Drawdown Duration       30 days 00:00:00
# Trades                                  143
Win Rate [%]                        53.846154
Best Trade [%]                            2.0
Worst Trade [%]                          -1.5
Avg. Trade [%]                    

# old code

In [None]:
 def next(self):
        if self.index >= lookback_window:
            # Print current P&L for debugging
            if self.position.is_long:
                print(f'Current long position P&L: {self.position.pl}')
            elif self.position.is_short:
                print(f'Current short position P&L: {self.position.pl}')

            # Get the latest data window
            data_window = self.test_data_subset.iloc[self.index - lookback_window + 1 : self.index + 1]
         
            # Forecast prices for 1-day, 3-day, and 5-day horizons
            forecast_1d, forecast_3d, forecast_5d = self.forecast_prices(data_window)
            current_close = self.data.Close[self.index]

             # Store the forecasts for plotting later
            self.forecast_data.append({
                'index': self.index,
                'forecast_1d': forecast_1d,
                'forecast_3d': forecast_3d,
                'forecast_5d': forecast_5d,
                'current_close': current_close
            })

            # Define stop loss and take profit distances
            stop_loss_distance = 0.008  # 80 pips

            # Long trade parameters
            sl_long = current_close - stop_loss_distance
            tp_long = max(forecast_1d, current_close + 0.016)  # TP above current close

            # Short trade parameters
            sl_short = current_close + stop_loss_distance
            tp_short = min(forecast_1d, current_close - 0.016)  # TP below current close

            # Condition for long trade: Forecasts show upward trend
            if   forecast_1d > current_close:
                if not self.position.is_long:
                    self.position.close()
                    # #check validaty of new order
                    # adapt_order = self.validate_order('long', current_close, sl_long, tp_long)
                    # if adapt_order == 1:
                    #     sl_long, tp_long = self.adjust_order_limits('long', current_close, sl_long, tp_long)
                    # self.buy(size=0.1, sl=sl_long, tp=tp_long)
                    self.buy(size=self.position_size, sl=self.long_sl_multiplier*current_close, tp=self.long_tp_multiplier*current_close)

     
            else:
                if not self.position.is_short:
                    self.position.close()
                    # #check validaty of new order
                    # adapt_order = self.validate_order('short', current_close, sl_short, tp_short)
                    # if adapt_order == 2:
                    #     sl_short, tp_short = self.adjust_order_limits('short', current_close, sl_short, tp_short)

                    # self.sell(size=0.1, sl=sl_short, tp=tp_short)
                    self.sell(size=self.position_size, sl=self.short_sl_multiplier*current_close, tp=self.short_tp_multiplier*current_close)

        self.index += 1

## Strategy 1 Analysis:

* Strategy reacts to most current trend
* Disadvantage:
    - increased trade frequency >> incures more transaction costs >> increased risk, due to increased market exposure
    - if transaction costs included, profitability decreases

* A note is that the amount of paper profits realized depend also on the position size.

* Not optimal because condiions won't be met due to the volatile nature of the markets
    - as result trades being executed will not be correct
* R:R -> 1:2 