In [1]:
# The complete guide comes with this link: 
# http://srome.github.io/Build-Your-Own-Event-Based-Backtester-In-Python/

# file 1: algorithm.py

import numpy as np

class Algorithm:
    '''
    Example algorithm for trading. Must implement a "generate_orders" function which returns a list of orders.
    Each order is a tuple of the form
        ( Stock Ticker str, Current Price float, Order Amount in shares float)
    Algorithm trades for stocks via a rolling window strategy, and randomly liquidates positions
    '''
    def __init__(self):
        self._averages = {}
        self._lambda = .5
        self._fee_estimate = lambda x : x*.04+10
        self._updates = 0
        self._price_window = 20
        self._trend = np.zeros(self._price_window)
        self._minimum_wait_between_trades = 5 # Must be less than price window
        self._last_trade = 0
        self._last_date = None

    def add_stock(self, stock, price):
        self._averages[stock] = price

    def _determine_if_trading(self, date, portfolio_value, cash_balance):
        time_delay_met = True
        trade = False
        override = False
        self._updates += 1

        if self._last_date is not None:
            if (date - self._last_date).days <= self._minimum_wait_between_trades:
                # Make orders based on previous day
                return False

        if self._updates == self._price_window+1:
            trade = True

        if (np.mean(self._trend)-portfolio_value)/portfolio_value > 0.05:
            override = True

        if cash_balance > portfolio_value*.03:
            override = True

        return trade or override

    def generate_orders(self, timestamp, portfolio):
        orders = []
        cash_balance = portfolio.balance
        portfolio_value = portfolio.get_total_value()
        self.add_trend_value(portfolio_value)

        if not self._determine_if_trading(timestamp,portfolio_value,cash_balance):
            return orders

        valid_stocks = [stock for stock in self._averages if portfolio.get_update_count(stock) > self._price_window]

        if len(valid_stocks) == 0:
            return orders

        for stock in np.random.choice(valid_stocks, replace=False, size=len(valid_stocks)):
            amt = cash_balance / len(valid_stocks) # Spend available cash
            relative_change = (self.get_window_average(stock=stock) - self.get_price(stock))/self.get_price(stock)

            if abs(relative_change) > .03:
                # Positive is buy, negative is sell
                order_type = np.sign(relative_change)
                if order_type > 0 and np.random.uniform(0,1,size=1)[0] < .9:
                    amt = np.round(amt/self.get_price(stock),0)
                else:
                    amt = - portfolio.get_shares(stock) # Liquidate! Why not?

                if abs(amt) < .01:
                    # Stop small trades
                    continue

                orders.append((stock, self.get_price(stock), amt))

        self._last_trade = self._updates
        self._last_date = timestamp

        return orders

    def get_window_average(self, stock):
        return np.mean(self._averages[stock]['History'])

    def update(self, stock, price):
        if stock in self._averages:
            self.add_price(stock, price)
        else:
            length = self._price_window
            self._averages[stock] = {'History' : np.zeros(length), 'Index' : 0, 'Length' : length}
            data = self._averages[stock]['History']
            data[0] = price

    def get_price(self, stock):
        # Assumes history is full
        return self._averages[stock]['History'][-1]

    def add_price(self, stock, price):
        history = self._averages[stock]['History']
        ind = self._averages[stock]['Index']
        length = self._averages[stock]['Length']
        if ind < length-1:
            history[ind+1] = price
            self._averages[stock]['Index'] = ind + 1
        elif ind == length-1:
            history[:-1] = history[1:]
            history[-1] = price

    def add_trend_value(self, value):
        history = self._trend
        if self._updates <= self._price_window - 1:
            history[self._updates] = value
        elif self._updates > self._price_window-1:
            history[:-1] = history[1:]
            history[-1] = value

In [2]:
# file 2: domain.py

class Portfolio:
    def __init__(self, balance=1000000):
        self._portfolio = {}
        self._portfolio['**CASH**'] = {'Shares' : balance, 'Price' : 1.0, 'Updates' : 1}

    def update(self, price, ticker):
        if ticker in self._portfolio:
            self._portfolio[ticker]['Price'] = price
            self._portfolio[ticker]['Updates'] = self._portfolio[ticker]['Updates'] + 1
        else:
            self._portfolio[ticker] = {}
            self._portfolio[ticker]['Price'] = price
            self._portfolio[ticker]['Shares'] = 0
            self._portfolio[ticker]['Updates'] = 1

    @property
    def balance(self):
        return self._portfolio['**CASH**']['Shares']

    @balance.setter
    def balance(self, balance):
        self._portfolio['**CASH**']['Shares'] = balance

    def adjust_balance(self, delta):
        self._portfolio['**CASH**']['Shares'] = self.balance + delta

    def __contains__(self, item):
        return (item in self._portfolio)

    def value_summary(self, date):
        sum = self.get_total_value()
        return '%s : Stock value: %s, Cash: %s, Total %s' % (date, sum-self.balance, self.balance, sum)

    def get_total_value(self):
        sum = 0
        for stock in self._portfolio.values():
            sum += stock['Shares'] * stock['Price']
        return sum

    def get_value(self, ticker):
        return self.get_shares(ticker) * self.get_shares(ticker)

    def get_price(self, ticker):
        return self._portfolio[ticker]['Price']

    def get_shares(self, ticker):
        return self._portfolio[ticker]['Shares']

    def get_update_count(self, ticker):
        return self._portfolio[ticker]['Updates']

    def set_shares(self, ticker, shares):
        self._portfolio[ticker]['Shares'] = shares

    def update_shares(self, ticker, share_delta):
        self.set_shares(ticker, self.get_shares(ticker) + share_delta)

    def update_trade(self, ticker, share_delta, price, fee):
        # Assumes negative shares are sells, requires validation from Controller
        self.set_shares(ticker, self.get_shares(ticker) + share_delta)
        self.adjust_balance(-(price*share_delta + fee))

    def __str__(self):
        return self._portfolio.__str__()

In [4]:
# file 3. backtester.py

from multiprocessing import Process, Queue
import numpy as np
import pandas as pd
import datetime as dt
from pandas_datareader import DataReader
from algorithm import Algorithm
from domain import Portfolio
import logging


class OrderApi:
    def __init__(self):
        self._slippage_std = .01
        self._prob_of_failure = .0001
        self._fee = .02
        self._fixed_fee = 10
        self._calculate_fee = lambda x : self._fee*abs(x) + self._fixed_fee

    def process_order(self, order):
        slippage = np.random.normal(0, self._slippage_std, size=1)[0]

        if np.random.choice([False, True], p=[self._prob_of_failure, 1 -self._prob_of_failure],size=1)[0]:
            trade_fee = self._fee*order[1]*(1+slippage)*order[2]
            return (order[0], order[1]*(1+slippage), order[2], self._calculate_fee(trade_fee))


class DataSource:
    '''
    Data source for the backtester. Must implement a "get_data" function
    which streams data from the data source.
    '''
    def __init__(self, source='yahoo', tickers=['GOGL','AAPL'], start = dt.datetime(2016,1,1), end=dt.datetime.today()):
        self._logger = logging.getLogger(__name__)
        self.set_source(source = source, tickers= tickers, start=start, end=end)

    @classmethod
    def process(cls, queue, source = None):
            source = cls() if source is None else source
            while True:
                data = source.get_data()
                if data is not None:
                    queue.put(data)
                    if data == 'POISON':
                        break

    def set_source(self, source, tickers, start, end):
        prices = pd.DataFrame()
        counter = 0.
        for ticker in tickers:
            try:
                self._logger.info('Loading ticker %s' % (counter / len(tickers)))
                prices[ticker] = DataReader(ticker, source, start, end).loc[:, 'Close']
            except Exception as e:
                self._logger.error(e)
                pass
            counter+=1

        events = []
        for row in prices.iterrows():
            timestamp=row[0]
            series = row[1]
            vals = series.values
            indx = series.index
            for k in np.random.choice(len(vals),replace=False, size=len(vals)): # Shuffle!
                if np.isfinite(vals[k]):
                    events.append((timestamp, indx[k], vals[k]))

        self._source = events

        self._logger.info('Loaded data!')

    def get_data(self):
        try:
            return self._source.pop(0)
        except IndexError as e:
            return 'POISON'

class Controller:
    def __init__(self, portfolio = None, algorithm = None):
        self._logger = logging.getLogger(__name__)
        self._portfolio = Portfolio() if portfolio is None else portfolio
        self._algorithm = Algorithm() if algorithm is None else algorithm
        self._order_api = OrderApi()

    @classmethod
    def backtest(cls, queue, controller = None):
        controller = cls() if controller is None else controller
        try:
            while True:
                if not queue.empty():
                    o = queue.get()
                    controller._logger.debug(o)

                    if o == 'POISON':
                        break

                    timestamp = o[0]
                    ticker = o[1]
                    price = o[2]

                    # Update pricing
                    controller.process_pricing(ticker = ticker, price = price)

                    # Generate Orders
                    orders = controller._algorithm.generate_orders(timestamp, controller._portfolio)

                    # Process orders
                    if len(orders) > 0:
                        # Randomize the order execution
                        final_orders = [orders[k] for k in np.random.choice(len(orders), replace=False, size=len(orders))]

                        for order in final_orders:
                            controller.process_order(order)

                        controller._logger.info(controller._portfolio.value_summary(timestamp))

        except Exception as e:
            print(e)
        finally:
            controller._logger.info(controller._portfolio.value_summary(None))

    def process_order(self, order):
        success = False
        receipt = self._order_api.process_order(order)
        if receipt is not None:
            success = self.process_receipt(receipt)

        if order is None or success is False:
            self._logger.info(('{order_type} failed: %s at $%s for %s shares' % order).format(order_type = 'Sell' if order[2] < 0 else 'Buy'))

    def process_receipt(self,receipt):
        ticker = receipt[0]
        price = receipt[1]
        share_delta = receipt[2]
        fee = receipt[3]
        temp = self._portfolio.balance - (price * share_delta + fee)
        if temp > 0:
            if share_delta < 0 and -share_delta > self._portfolio.get_shares(ticker):
                # Liquidate
                share_delta = -self._portfolio.get_shares(ticker)
                fee = self._order_api._calculate_fee(share_delta*price)
                if fee > abs(share_delta*price):
                    return False

            self._portfolio.update_trade(ticker=ticker, price=price, share_delta=share_delta, fee=fee)
            self._logger.debug('Trade on %s for %s shares at %s with fee %s' % (ticker,share_delta,price, fee))
            return True

        return False

    def process_pricing(self, ticker, price):
        self._portfolio.update(price=price, ticker = ticker)
        self._algorithm.update(stock=ticker, price = price)

class Backtester:
    def __init__(self):
        self._logger = logging.getLogger(__name__)
        self._settings = {}

        self._default_settings = {
            'Portfolio' : Portfolio(),
            'Algorithm' : Algorithm(),
            'Source' : 'yahoo',
            'Start_Day' : dt.datetime(2016,1,1),
            'End_Day' : dt.datetime.today(),
            'Tickers' : ['AAPL','GOGL','MSFT','AA','APB']
        }

    def set_portfolio(self, portfolio):
        self._settings['Portfolio'] = portfolio

    def set_algorithm(self, algorithm):
        self._settings['Algorithm'] = algorithm

    def set_source(self, source):
        self._settings['Source'] = source

    def set_start_date(self, date):
        self._settings['Start_Day'] = date

    def set_end_date(self, date):
        self._settings['End_Day'] = date

    def set_stock_universe(self, stocks):
        self._settings['Tickers'] = stocks

    def get_setting(self, setting):
        return self._settings[setting] if setting in self._settings else self._default_settings[setting]

    def backtest(self):
        #Setup Logger
        root = logging.getLogger()
        root.setLevel(level=logging.DEBUG)
        import os
        filepath = 'run.log'
        if os.path.exists(filepath):
            os.remove(filepath)

        root.addHandler(logging.FileHandler(filename=filepath))

        # Initiate run
        q = Queue()
        ds = None
        c = None

        ds = DataSource(
            source=self.get_setting('Source'),
            start=self.get_setting('Start_Day'),
            end=self.get_setting('End_Day'),
            tickers=self.get_setting('Tickers')
        )
        c = Controller(
            portfolio=self.get_setting('Portfolio'),
            algorithm=self.get_setting('Algorithm')
        )

        p = Process(target=DataSource.process, args=((q,ds)))
        p1 = Process(target=Controller.backtest, args=((q,c)))

        p.start()
        p1.start()
        p.join()
        p1.join()


if __name__ == '__main__':
    b = Backtester()
    b.backtest()

BrokenPipeError: [Errno 32] Broken pipe