In [2]:
!ls ../core

[34m__pycache__[m[m          backtest_engine.py   portfolio_manager.py


## 1. Backtest Engine

### 1.1 Original `BacktestEngine` class

In [None]:
class BacktestEngine:
    
    def __init__(self, data, strategy, initial_cash = 1000):
        """
        Initialize Backtest Engine Object
        
        @param data: a data frame with columns of "Date" and "Close", it's genrated by csv_loader in data_loader directory
        @param strategy: a strategy object (e.g. RSIStrategy object)
        @param initial_cash: default 1000
        """
        
        self.data = data
        self.strategy = strategy
        self.initial_cash = initial_cash
        
        
        self.cash_stack = [initial_cash * 0.2] * 5
        self.position_stack = []
        self.portfolio_value_list = [] #Track portfolio value over time
        
        
    def run(self):
        """
        Generate trading signals and Execute trades based on signals.
        Protfolio value will be recorded over time.
        """
        
        # Generate the trading signals for each close price
        signal_list = self.strategy.generate_signals(self.data)
        
        for i in range(len(self.data)):
            signal = signal_list[i]
            price = self.data["Close"].iloc[i]
            
            if signal == 1 and len(self.cash_stack) > 0: # buy signal
                cash = self.cash_stack.pop()
                shares_to_buy = cash / price
                
                self.position_stack.append(shares_to_buy)
                
            elif signal == -1 and len(self.position_stack) > 0: #sell signal
                shares_to_sell = self.position_stack.pop()
                cash = price * shares_to_sell
                
                self.cash_stack.append(cash)
                
            # calculate current portfolio value   
            current_portfolio_value = sum(self.cash_stack) + sum(self.position_stack) * price
            self.portfolio_value_list.append(current_portfolio_value)
            
            
    def get_results(self):
        """
        retrieve the portfolio values over the backtesting period
        
        @return: A list of portfolio values at each time step
        """
        return self.portfolio_value_list
        
        
    
    def calculate_performance(self):
        """
        calculate the total return

        @return: a dictionary that contains total return information
        """
        
        final_value = self.portfolio_value_list[-1]
        
        total_return = (final_value - self.initial_cash) / self.initial_cash
        
        return {"Total Return": total_return}

### 1.2 `BacktestEngine` class after introducing `PortfolioManager` Object in `portfolio_manager.py`

As **`PortfolioManager()`** object is introduced, inside the **`BacktestEngine.run()`** method, the **BUY**, **SELL** and **GET PORTFOLIO VALUE** operations are handled by **`PortfolioManager().buy()`**, **`PortfolioManager().sell()`** and **`PortfolioManager().get_portfolio_value()`** methods

In [None]:
from portfolio_manager import PortfolioManager


class BacktestEngine:
    
    def __init__(self, data, strategy, initial_cash = 1000):
        """
        Initialize Backtest Engine Object
        
        @param data: a data frame with columns of "Date" and "Close", it's genrated by csv_loader in data_loader directory
        @param strategy: a strategy object (e.g. RSIStrategy object)
        @param initial_cash: default 1000
        """
        
        self.data = data
        self.strategy = strategy
        self.initial_cash = initial_cash

        # Initialize the portfolio manager object, it will hand buy, sell trading action and calculate portfolio value at each time step
        self.portfolio_manager = PortfolioManager(initial_cash)

        self.portfolio_value_list = [] #Track portfolio value over time
        
        
    def run(self):
        """
        Generate trading signals and Execute trades based on signals.
        Protfolio value will be recorded over time, at the self.portfolio_value_list
        """
        
        # Generate the trading signals for each close price
        signal_list = self.strategy.generate_signals(self.data)
        
        for i in range(len(self.data)):
            signal = signal_list[i]
            price = self.data["Close"].iloc[i]
            
            if signal == 1: # buy signal
                success = self.portfolio_manager.buy(price)   #returns if the buy action is successful or not
                
                if not success:
                    print(f"Buy failed at {self.data.index[i]}: Insufficient cash")
                
            elif signal == -1: #sell signal
                success = self.portfolio_manager.sell(price)  # returns if the sell action is successful or not
                
                if not success:
                    print(f"Sell failed at {self.data.index[i]}: Insufficient position")
   
            # calculate current portfolio value   
            current_portfolio_value = self.portfolio_manager.get_portfolio_value(price)
            self.portfolio_value_list.append(current_portfolio_value)
            
            
    def get_results(self):
        """
        retrieve the portfolio values over the backtesting period
        
        @return: A list of portfolio values at each time step
        """
        return self.portfolio_value_list
        
        
    
    def calculate_performance(self):
        """
        calculate the total return

        @return: a dictionary that contains total return information
        """
        
        final_value = self.portfolio_value_list[-1]
        
        total_return = (final_value - self.initial_cash) / self.initial_cash
        
        return {"Total Return": total_return}

### 1.3 `BacktestEngine` class after introducing `Performance` Object in `performance.py`

In [None]:
from portfolio_manager import PortfolioManager


class BacktestEngine:
    
    def __init__(self, data, strategy, initial_cash = 1000):
        """
        Initialize Backtest Engine Object
        
        @param data: a data frame with columns of "Date" and "Close", it's genrated by csv_loader in data_loader directory
        @param strategy: a strategy object (e.g. RSIStrategy object)
        @param initial_cash: default 1000
        """
        
        self.data = data
        self.strategy = strategy
        self.initial_cash = initial_cash

        # Initialize the portfolio manager object, it will hand buy, sell trading action and calculate portfolio value at each time step
        self.portfolio_manager = PortfolioManager(initial_cash)

        self.portfolio_value_list = [] #Track portfolio value over time
        
        
    def run(self):
        """
        Generate trading signals and Execute trades based on signals.
        Protfolio value will be recorded over time, at the self.portfolio_value_list
        """
        
        # Generate the trading signals for each close price
        signal_list = self.strategy.generate_signals(self.data)
        
        for i in range(len(self.data)):
            signal = signal_list[i]
            price = self.data["Close"].iloc[i]
            
            if signal == 1: # buy signal
                success = self.portfolio_manager.buy(price)   #returns if the buy action is successful or not
                
                if not success:
                    print(f"Buy failed at {self.data.index[i]}: Insufficient cash")
                
            elif signal == -1: #sell signal
                success = self.portfolio_manager.sell(price)  # returns if the sell action is successful or not
                
                if not success:
                    print(f"Sell failed at {self.data.index[i]}: Insufficient position")
   
            # calculate current portfolio value   
            current_portfolio_value = self.portfolio_manager.get_portfolio_value(price)
            self.portfolio_value_list.append(current_portfolio_value)
        
        
    
    def get_performance_metrics(self):
        """
        get performance metrics using performance.py mehtods
        
        @return: a dictionary that contains performance metrics
        """
        
        return Performance.calculate_performance(self.portfolio_value_list)

## 2. Portfolio Manager

### `PorfolioManager` class in `portfolio_manager.py`

1. `__init__(initial_cash, transaction_fee, n_splits)`:
   - take in the initial cash and number of splits, initialize the cash stack
   - initialize an empty position stack
   - take in the transaction fee
2. `buy(current_price)`:
3. `sell(current_price)`:
4. `get_portfolio_value(current_price)`

In [None]:
class PortfolioManager:

    def __init__(self, initial_cash, transaction_fee = 0.001, n_splits = 10):
        """
        Initialize the PortfolioManager object instance
        
        @param initial_cash: initial cash
        @param transaction_fee: transaction cost rate, default is 0.1%
        @param n_splits: the number of portions to split the initial cash into for cash stack, default is 10
        """
        
        # split the initial cash into equal portions for cash stack
        self.cash_stack = [initial_cash / n_splits] * n_splits
        self.position_stack = []  #Initialize an empty position stack
        self.transaction_fee = 0.001


    def buy(self, current_price):
        """
        Execute a buy operation given current price, update the cash stack and the position stack
        
        @param current_price: current buy price
        
        @return: True if buy successful, False otherwise
        """

        if len(self.cash_stack) > 0: # check if cash is enough for buy action
            
            cash = self.cash_stack.pop() # pop one portion of cash from the cash stack
            shares_to_buy = cash / (current_price * (1 + self.transaction_fee))  #calculate how many shares can be bought
            
            self.position_stack.append(shares_to_buy) # update the position stack by pushing the newly bought shares
            return True
        return False


    def sell(self, current_price):
        """
        Execute sell operation given current price, update the cash stack and the position stack
        
        @param current_price: current sell price
        
        @return: True if sell successful, False otherwise
        """

        if len(self.position_stack) > 0:  # check if there's enough position to sell
            
            shares_to_sell = self.position_stack.pop()   #pop one portion of the position from the position stack
            cash = shares_to_sell * current_price * (1 - self.transaction_fee  # sell the position and get the cash

            self.cash_stack.append(cash)  #update the cash stack by pushing the cash into it
            return True
        return False


    def get_portfolio_value(self, current_price):
        """
        Calculate current portfolio value given current price
        
        @return: current portfolio value
        """

        return sum(self.cash_stack) + sum(self.position_stack) * current_price
        

## 3. Performance static method class

### `Performance` class in performance.py
It provides a collection of **independent methods** related to performance evaluation **without an `__init__` methond** and **without needing to instantiate an object**.

1. `calculate_total_return(portfolio_value_list)`
2. `calculate_max_drawdown(portfolio_value_list)`

In [None]:
import pandas as pd


class Performance:

    @staticmethod
    def calculate_total_return(portfolio_value_list):
        """
        Calculate the total return given the portfolio values over time steps

        @param portfolio_value_list: a list of portfolio values over time

        @return: total return
        """
        initial_value = portfolio_value_list[0]
        final_value = portfolio_value_list[-1]

        return (final_value - initial_value) / initial_value


    @staticmethod
    def max_drawdown(portfolio_value_list):
        """
        Calculate the max drawdown based on the portfolio values over time

        @param portfolio_value_list: a list of portfolio values over time

        @return: Maximum Drawn
        """
        portfolio_value_series = pd.Series(portfolio_value_list)

        cumulative_max_series = portfolio_value_series.cummax()

        drawdown_series = (portfo_value_series - cumulative_max_series) / cumulative_max_series

        max_drawdown_value = drawdown_series.min()

        return max_drawdown_value


    @staticmethod
    def calculate_performance(portfolio_value_list):
        """
        Calculate all the performance methods and return a dictionary

        @param portfolio_value_list: a list of portfolio values over time

        @return: a dictionary including Total Return, Max Drawdown, etc...
        """
        return {
            "Total Return": Performance.calculate_total_return(portforlio_value_list),
            "Maximum Drawdown": Performance.calculate_max_drawdown(portfolio_value_list)
        }
        

In [4]:
import pandas as pd

portfolio_value_list = [100, 120, 115, 130, 90, 140, 105]
portfolio_value_list = pd.Series(portfolio_value_list)
portfolio_value_list

0    100
1    120
2    115
3    130
4     90
5    140
6    105
dtype: int64

In [5]:
cumulative_max_list = portfolio_value_list.cummax()
cumulative_max_list

0    100
1    120
2    120
3    130
4    130
5    140
6    140
dtype: int64

In [6]:
drawdown_list = (portfolio_value_list - cumulative_max_list) / cumulative_max_list
drawdown_list

0    0.000000
1    0.000000
2   -0.041667
3    0.000000
4   -0.307692
5    0.000000
6   -0.250000
dtype: float64

In [8]:
max_drawdown = drawdown_list.min()
max_drawdown

-0.3076923076923077