In [1]:
# Installing dependencies
import os, datetime
import numpy as np
import pandas as pd
from yfQuery import datareader

# Custom Class

In [2]:
class DataLoader:
    def __init__(self, symbol, years=10, dname='Database', need_update=False):
        self.symbol = symbol
        self.yrs = years
        self.directory = os.path.join(os.getcwd(), dname)
        self.filename = self.symbol + '.csv'
        # Create Database directory if doesn't exist
        if not os.path.isdir(self.directory):
            os.mkdir(self.directory)
        # Check if symbol in database or need_update
        if not os.path.exists(os.path.join(self.directory, self.filename)) or need_update:
            # Download data then load
            self.update_database()
        else:
            # Load data
            self.load_data()
    
    def get_data(self, start, end):
        assert isinstance(start, str) or isinstance(start, int), "Check start date or index"
        assert isinstance(end, str) or isinstance(end, int), "Check end date or index"
        # Get a set period of data
        if isinstance(start, str) and isinstance(end, str):
            m = self.data.index.isin(pd.date_range(start, end))
            return self.data.loc[m]
        elif isinstance(start, int) and isinstance(end, int):
            return self.data.iloc[start: end]
        else:
            print('DataType mismatch!')
        
    def load_data(self):
        # loading data from Database
        self.data = pd.read_csv(os.path.join(self.directory, self.filename), index_col=0, header=0)
        self.data.index = pd.to_datetime(self.data.index, format='%Y%m%d %H:%M:%S')
    
    def update_database(self):
        # Get today to use as ending date
        today = datetime.date.today()
        # Get start time base on lookback years 
        previous = today - datetime.timedelta(days=365*self.yrs)
        # Downloading price data
        df = datareader(self.symbol, str(previous), str(today))
        # Storing price data
        df.to_csv(os.path.join(self.directory, self.filename))
        self.data = df.dropna()

In [3]:
class BuySell:
    def __init__(self, capital=None, max_share=None):
        self.original_capital = capital
        self.capital = capital
        self.max_share = max_share
        self.share = 0
        self.is_holding = False
        self.buy_at = 0
        # Array for dollar gain per trade 
        self.dollar_gain = []
        # Array for percentage gain per trade
        self.pct_gain = []
    
    def buy(self, price):
        # Buy Action
        self.buy_at = price
        self.is_holding = True
        # Buy Power with capital
        if self.capital is not None:
            self.share = self.capital // self.buy_at
            if self.max_share is not None and self.share > self.max_share:
                self.share = self.max_share
            self.capital -= np.round(self.buy_at * self.share, 2)
    
    def sell(self, price):
        # Sell Action
        self.calculate_gain(price)
        if self.capital is not None:
            self.capital += np.round(price * self.share)
            self.share = 0
        self.buy_at = 0
        self.is_holding = False
    
    def calculate_gain(self, price):
        dollar_gain = price - self.buy_at
        pct_gain = price / self.buy_at - 1
        self.dollar_gain.append(dollar_gain)
        self.pct_gain.append(pct_gain)
    
    def show_results(self):
        self.results = self.result_Series()
        print('Average gain of ${:.2f} per share after {} trades'.format(self.results['Total-Dollar-Gain'], 
                                                                self.results['Num-Trades']))
        print('and have average gain percentage of {:.2f}%'.format(self.results['Total-Percentage-Gain'] * 100))
        print('Has a {:.2f}% of good trades'.format(self.results['Percentage-of-Good-Trades'] * 100))
        if self.capital is not None:
            print('Test Ending Capital: ${:.2f} base on original capital of ${:.2f}'.format(self.capital, self.original_capital))
            print('With {:.2f}% Capital Gain'.format(self.results['Percentage-Capital-Gain'] * 100))

    def result_Series(self):
        index = ['Num-Trades', 'Total-Dollar-Gain', 'Total-Percentage-Gain', 
                 'Percentage-of-Good-Trades','Percentage-Capital-Gain']
        num_trades = len(self.dollar_gain)
        total_dollar_gain = np.mean(self.dollar_gain)
        total_pct_gain = np.mean(self.pct_gain)
        pct_good_trades = (np.array(self.dollar_gain) > 0).sum() / num_trades
        array = [num_trades, total_dollar_gain, total_pct_gain, pct_good_trades]
        if self.capital is not None:
            capital_gain = self.capital / self.original_capital - 1
            array.append(capital_gain)
            return pd.Series(array, index=index)
        return pd.Series(array, index=index[:-1])
        

In [4]:
class Strategy:
    def __init__(self, fast=3, slow=6):
        self.a = fast
        self.b = slow
        self.apct = None
    
    def update_apct(self, df):
        # Calcualte Periodic High Low gap
        rolled_low = df['Low'].shift(self.a)
        pct = df['High'] / rolled_low - 1
        # Getting Average Gap base on data
        self.apct = pct.mean()
    
    def update_data(self, df):
        self.data = self.crossing_data(df)
        self.data = self.get_good_trades(self.data)
    
    def crossing_data(self, df):
        # MA Crossing or Conditional 
        fast_ma = df['Close'].rolling(self.a).mean()
        slow_ma = df['Low'].rolling(self.b).mean()
        above = fast_ma > slow_ma
        # Concatenate MA and Crossing
        data = pd.concat([fast_ma, slow_ma, above], axis=1)
        data.columns = ['fast_ma', 'slow_ma', 'above']
        # Concatenate DF with MA
        data = pd.concat([df.loc[:, ['High', 'Low', 'Close']], data], axis=1)
        return data.dropna()
    
    def get_good_trades(self, df):
        rolled_high = df['High'].shift(self.a)
        # Adjusted High is like a padding for first requirement of a good buy price
        adjusted_high = rolled_high * (1. - self.apct) 
        # Current Low Price against Adjusted High Price to get a percent different
        pricing = df['Low'] / adjusted_high - 1
        # Check if the percent different is lesser than the average gap
        # which would result in a good buy of the Low price
        df['Good-Buy'] = pricing <= self.apct
        return df.dropna()

In [5]:
def static_backtest(strategy, capital=1000, max_share=1000):
    # Static Backtest from data
    bs = BuySell(capital=capital, max_share=max_share)
    data = strategy.data
    length = len(data)

    # Backtest 
    for i in range(length):
        # Skipping 1st day
        if i == 0:
            continue
        is_above = data['above'].iloc[i - 1]
        is_good_buy = data['Good-Buy'].iloc[i]
        buying_price = data['Low'].iloc[i]
        selling_pirce = data['Close'].iloc[i]
        high = data['High'].iloc[i]
        # Buy Sell Logic
        if i + 1 == length:
            # Selling at the last day
            if bs.is_holding:
                bs.sell(selling_pirce)
        else:
            # Check for Holding or not
            if not bs.is_holding:
                if is_above and is_good_buy:
                    bs.buy(buying_price)
            else:
                pct = high / bs.buy_at - 1
                good_sell = pct >= strategy.apct
                if not is_above or good_sell:
                    if good_sell:
                        bs.sell(high)
                    else:
                        bs.sell(selling_pirce)
    
    print('#####################################################')
    bs.show_results()
    print('#####################################################')
    avg_days_per_trade = length / bs.results['Num-Trades']
    stock_momentum = data['Close'].iloc[-1] / data['Close'].iloc[0] - 1
    print('Average days per trade: {:.2f}'.format(avg_days_per_trade))
    print("Stock's momemtum: {:.2f}".format(stock_momentum))
    print('#####################################################')

# Back Testing

In [21]:
d = DataLoader('MSFT')

In [22]:
data = d.get_data('2018-01-01', '2019-12-31')
s = Strategy()
s.update_apct(data)
s.update_data(data)

In [23]:
static_backtest(s)

#####################################################
Average gain of $2.02 per share after 84.0 trades
and have average gain percentage of 1.85%
Has a 78.57% of good trades
Test Ending Capital: $4371.57 base on original capital of $1000.00
With 337.16% Capital Gain
#####################################################
Average days per trade: 5.93
Stock's momemtum: 0.79
#####################################################


# Testing without updating the average gap

In [24]:
data = d.get_data('2020-01-01', '2020-12-31')
s.update_data(data)

In [25]:
static_backtest(s)

#####################################################
Average gain of $4.77 per share after 52.0 trades
and have average gain percentage of 2.75%
Has a 82.69% of good trades
Test Ending Capital: $3733.04 base on original capital of $1000.00
With 273.30% Capital Gain
#####################################################
Average days per trade: 4.77
Stock's momemtum: 0.37
#####################################################
