# Solution to Tutorial 06

## Test Project &mdash; Algorithmic Trading
**Prediction-based Trading & Event-based Backtesting**

Implement a class that uses **event-based backtesting** to backtest the following prediction-based strategy:

* Data from `http://hilpisch.com/ref_eikon_eod_data.csv`.
* Select one symbol from the data set.
* Create the following features:
    * log return
    * direction (up or down)
    * log return as 5 categories
    * two SMAs (short and long window)
    * difference between the SMAs
    * two EWMAs (short and long window)
    * difference between the EWMAs
    * two rolling volatilities (short and long window)
* Split the data set into training (70%) and testing data.
* Normalize the training features data to have
    * zero mean and
    * standard deviation of one.
* Normalize the test features data by the same moment values as the training data.
* Create lagged features data for 5 lags.
* Train and (back-)test the following algorithms for directional (long/short) trading (from `scikit-learn`):
    * `GaussianNB()`
    * `LogisticRegression()`
    * `DecisionTreeClassifier()`
    * `SVC()`
    * `MLPClassifier()`
* Compare the performance of the different models numerically.

For the implementation, you can rely e.g. on the Python classes as presented in the PyAlgo class sessions and the resources.

## Partial Solution

The solution currently does only support returns. The other features as listed here are missing. 
* log return as 5 categories
* two SMAs (short and long window)
* difference between the SMAs
* two EWMAs (short and long window)
* difference between the EWMAs
* two rolling volatilities (short and long window)

The solution also support only a long strategy.

The amount of output can be adjusted with the log level. Currently the output is minimal.

It's possible to test a single strategy, yet the result is also added to the results of all runs.

I noticed a different performance with the MLPClassifier in the way I'm running the test. If I import the class instead of having it in the Jupyter notebook the percentage is higher. 

To put all parts of the homework together took longer than I anticipated, therefore there are some parts missing
* just a brief check of Python conventions and code guidelines
* missing of additional features
* import of classes in Colab, it was near the end of the deadline when I noticed that Colab could not import my classes.
# no further time to investigate the different MLPClassifier results due to th

# TradingDataHandler

In [1]:
# Implement basic trading functionalities

from typing import Optional, Union
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import logging


class TradingDataHandler:
    """
    Base class for event-based backtesting of trading strategies.
    
    Attributes
    ----------
    symbol : str
        Financial instrument to be used (ticker symbol).
    amount : float
        Initial amount to be invested either once or per trade.
    ftc : float
        Fixed transaction costs per trade (buy or sell).
    ptc : float
        Proportional transaction costs per trade (buy or sell).
    ptrain : float
        Training data percentage vs. test data.
    data_source : str
        URL or file path to the data file.
    """
    
    logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')

    def __init__(self, symbol: str, amount: float, 
                 ftc: float = 0.0, ptc: float = 0.0, 
                 ptrain: float = 0.7, data_source: str = 'http://hilpisch.com/ref_eikon_eod_data.csv') -> None:
        self.symbol = symbol
        self.initial_amount = amount
        self.amount = amount
        self.ftc = ftc
        self.ptc = ptc
        self.ptrain = ptrain
        self.data_source = data_source
        self.units = 0
        self.position = 0
        self.trades = 0
        self.get_data()
        
    def get_data(self) -> None:
        """Retrieves and prepares the data from a source (URL or local file)."""
        try:
            # Try loading data from URL or file
            if self.data_source.startswith("http"):
                raw = pd.read_csv(self.data_source, index_col=0, parse_dates=True).dropna()
            else:
                raw = pd.read_csv(self.data_source, index_col=0, parse_dates=True).dropna()
            
            self.data = pd.DataFrame(raw[self.symbol])
            self.data.rename(columns={self.symbol: 'price'}, inplace=True)
            self.data['return'] = np.log(self.data['price'] / self.data['price'].shift(1))
            self.data['direction'] = np.where(self.data['return'] > 0, 1, -1)
            self.data.dropna(inplace=True)
            logging.info("Data successfully loaded and prepared.")
        except Exception as e:
            logging.error(f"Failed to load data: {e}")

    def plot_data(self, cols: Optional[list] = None, title: str = None, figsize: tuple = (10, 6)) -> None:
        """Plots specified data columns."""
        if cols is None:
            cols = ['price']
        self.data[cols].plot(figsize=figsize, title=title or self.symbol)
        plt.show()

    def get_date_price(self, bar: int) -> tuple[str, float]:
        """Returns the date and price for a given bar."""
        date = str(self.data.index[bar])[:10]
        price = self.data['price'].iloc[bar]
        return date, price

    def log_balance(self, bar: int) -> None:
        """Logs the current balance."""
        date, price = self.get_date_price(bar)
        logging.info(f"{date} | current balance: ${self.amount:.2f}")

    def log_net_wealth(self, bar: int) -> None:
        """Logs the current net wealth."""
        date, price = self.get_date_price(bar)
        net_wealth = self.units * price + self.amount
        logging.info(f"{date} | current net wealth: ${net_wealth:.2f}")

    def place_order(self, bar: int, units: int, is_buy: bool = True) -> None:
        """Places a buy or sell order."""
        date, price = self.get_date_price(bar)
        transaction_cost = (units * price) * (1 + self.ptc) + self.ftc if is_buy else (units * price) * (1 - self.ptc) - self.ftc
        self.amount += -transaction_cost if is_buy else transaction_cost
        self.units += units if is_buy else -units
        self.trades += 1
        order_type = 'buying' if is_buy else 'selling'
        logging.info(f"{date} | {order_type} {units} units at ${price:.2f}")
        if self.get_log_level() == logging.DEBUG:
            self.log_balance(bar)
            self.log_net_wealth(bar)
            
    def place_buy_order(self, bar: int, units: int):
        self.place_order(bar, units)
        
    def place_sell_order(self, bar: int, units: int):
        self.place_order(bar, units, False)

    def close_out(self, bar: int) -> tuple[str, float]:
        """Closes out the position, calculating and logging final balance and performance."""
        date, price = self.get_date_price(bar)
        self.amount += self.units * price  # Account for remaining units
        logging.info(f"{date} | Final inventory: {self.units} units at ${price:.2f}")
        performance = ((self.amount - self.initial_amount) / self.initial_amount) * 100
        logging.info(f"Final balance: ${self.amount:.2f}")
        logging.info(f"Net Performance [%]: {performance:.2f}")
        return date, performance


    def get_log_level(self) -> int:
        """Returns the current logging level as a logging constant (e.g., logging.DEBUG)."""
        logger = logging.getLogger()
        if logger.isEnabledFor(logging.DEBUG):
            return logging.DEBUG
        elif logger.isEnabledFor(logging.INFO):
            return logging.INFO
        elif logger.isEnabledFor(logging.WARNING):
            return logging.WARNING
        elif logger.isEnabledFor(logging.ERROR):
            return logging.ERROR
        else:
            return logging.NOTSET

# Test of the BacktestingStrategies class

In [2]:
# Implementation to backtest different trading strategies
#
# Currently only buying long is supported

from TradingDataHandler import *

from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier

class BacktestingStrategies(TradingDataHandler):
    
    def __init__(self, symbol: str, amount: float, 
                 ftc: float = 0.0, ptc: float = 0.0, 
                 ptrain: float = 0.7, data_source: str = 'http://hilpisch.com/ref_eikon_eod_data.csv', 
                 verbose: int = 3) -> None:
        super().__init__(symbol, amount, ftc, ptc, ptrain, data_source)
        self.result = pd.DataFrame()
        self.models = [GaussianNB(), LogisticRegression(), DecisionTreeClassifier(), SVC(), MLPClassifier()]

    def prepare_model(self, model, features: list) -> pd.DataFrame:
        """Prepares and trains a model; returns normalized test data."""
        train_norm, train_labels, test_norm = self.prepare_train_test_data(features)
        model.fit(train_norm, train_labels)
        return test_norm

    def prepare_train_test_data(self, features: list) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame]:
        """Prepares the training and test data by normalizing specified features."""
        model_data = self.prepare_features(features)
        split_index = int(len(model_data) * self.ptrain)
        train = model_data.iloc[:split_index].copy()
        test = model_data.iloc[split_index:].copy()
        train_labels = train['direction']
        mu, std = train[self.feature_columns].mean(), train[self.feature_columns].std()
        train_norm = (train[self.feature_columns] - mu) / std
        test_norm = (test[self.feature_columns] - mu) / std
        return train_norm, train_labels, test_norm

    def prepare_features(self, features: list, lags: int = 5) -> pd.DataFrame:
        """Generates lagged features for the model training."""
        model_data = self.data.copy()
        self.feature_columns = features.copy()
        for feature in features:
            for lag in range(1, lags + 1):
                col_name = f'{feature}_lag_{lag}'
                model_data[col_name] = model_data[feature].shift(lag)
                self.feature_columns.append(col_name)
        model_data.dropna(inplace=True)
        return model_data

    def run_strategies(self) -> None:
        """Runs the strategy for each model in the models list."""
        for model in self.models:
            logging.info(f'-----------------------------------------------------------------')
            logging.info(f'Running strategy for {model.__class__.__name__}')
            self.run_strategy(model)
        print(f'--------------- TEST RESULTS -------------------------------')
        print(self.feature_columns)
        print(self.result)
        #print(data[['return']].loc[start_index:].sum().apply(np.exp))
        print(f'--------------- TEST RESULTS -------------------------------')

    def run_strategy(self, model, features=['return'], print_result=False) -> None:
        """Runs the strategy for a single model, handling predictions and order placement."""
        test_norm = self.prepare_model(model, features)
        self.position = 0
        self.trades = 0
        self.units = 0
        self.amount = self.initial_amount

        logging.info("Starting strategy execution...")
        for date_index, row in test_norm.iterrows():
            prediction = model.predict(row.to_frame().T)
            bar = self.data.index.get_loc(date_index)

            if prediction[0] == 1 and self.position <= 0:  # Buy condition
                trading_units = int(self.amount / self.data['price'].iloc[bar])
                self.place_buy_order(bar, units=trading_units)
                self.position = 1
            elif prediction[0] == -1 and self.position >= 0:  # Sell condition, but no short selling
                self.place_buy_order(bar, units=self.units)
                self.position = -1

            if self.get_log_level() == logging.DEBUG:
                self.log_balance(bar)
                self.log_net_wealth(bar)

        end_date, perf = self.close_out(bar)
        new_result = pd.DataFrame({'start_date': [self.data.index[0]], 'end_date': [end_date], 
                                   'model': [model.__class__.__name__], 'performance': [perf]})
        self.result = pd.concat([self.result, new_result], ignore_index=True)
        
        if (print_result):
            print(f'--------------- TEST RESULTS -------------------------------')
            print(self.feature_columns)
            print(new_result)
            print(f'--------------- TEST RESULTS -------------------------------')
            
    def add_features(self, features: str, lags: int = 5) -> pd.DataFrame:
        """Generates additional features for model training."""

        # 1. Log Return with Categories
        #model_data['log_return'] = np.log(model_data['price'] / model_data['price'].shift(1))
        #model_data['log_return_cat'] = pd.qcut(model_data['log_return'], 5, labels=False)  # 5 bins/categories

        # 2. Simple Moving Averages (SMA)
        if feature == 'SMA' or feature == 'difference_SMA':
            short_window = 5
            long_window = 42
            data['SMA_short'] = model_data['price'].rolling(short_window).mean()
            data['SMA_long'] = model_data['price'].rolling(long_window).mean()
            
        if feature == 'difference_SMA':
            data['mom_sma'] = np.where(data['sma_short'] > data['sma_long'], 1, -1)


        # 3. Exponentially Weighted Moving Averages (EWMA)
        #model_data['EWMA_short'] = model_data['price'].ewm(span=self.short_window, adjust=False).mean()
        #model_data['EWMA_long'] = model_data['price'].ewm(span=self.long_window, adjust=False).mean()
        #model_data['EWMA_diff'] = model_data['EWMA_short'] - model_data['EWMA_long']

        # 4. Rolling Volatility
        #model_data['vol_short'] = model_data['log_return'].rolling(window=self.short_window).std()
        #model_data['vol_long'] = model_data['log_return'].rolling(window=self.long_window).std()

        # Generate lagged features
        self.feature_columns = []
        for lag in range(1, lags + 1):
            col_name = f'{feature}_lag_{lag}'
            model_data[col_name] = model_data[feature].shift(lag)
            self.feature_columns.append(col_name)

        # Drop rows with NaN values
        model_data.dropna(inplace=True)
        
        # Add new feature columns
        self.feature_columns += ['log_return_cat', 'SMA_diff', 'EWMA_diff', 'vol_short', 'vol_long']
        
        return model_data

In [3]:
strategies = BacktestingStrategies('AAPL.O', 10000)

## Test all Strategies

In [5]:
strategies.run_strategies()



--------------- TEST RESULTS -------------------------------
['return', 'return_lag_1', 'return_lag_2', 'return_lag_3', 'return_lag_4', 'return_lag_5']
  start_date    end_date                   model  performance
0 2010-01-05  2019-07-01              GaussianNB      96.4507
1 2010-01-05  2019-07-01              GaussianNB      96.4507
2 2010-01-05  2019-07-01      LogisticRegression     108.4101
3 2010-01-05  2019-07-01  DecisionTreeClassifier     102.1810
4 2010-01-05  2019-07-01                     SVC     125.4817
5 2010-01-05  2019-07-01           MLPClassifier      96.5502
--------------- TEST RESULTS -------------------------------


## Test a Single Strategie with test result output

In [4]:
from sklearn.naive_bayes import GaussianNB
strategies.run_strategy(GaussianNB(), ['return'], True)

--------------- TEST RESULTS -------------------------------
['return', 'return_lag_1', 'return_lag_2', 'return_lag_3', 'return_lag_4', 'return_lag_5']
  start_date    end_date       model  performance
0 2010-01-05  2019-07-01  GaussianNB      96.4507
--------------- TEST RESULTS -------------------------------


## Todo: Result without Stategies