In [7]:
from fastcore.all import *
from IPython.display import clear_output
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from polygon import RESTClient
from utils import view_source_code, get_dollars
from datetime import datetime, timedelta, date
import math, time

# Testing Framework

Ok so now we want to build a minimal testing framework.  This will by no means be a complete comprehensive framework, but it will be a starting point that handles what we have run across so far in a flexible way.  The goal is to create something we can build on as we come across more and more examples.  There's a few thing we need to be able to do:

1. Split our data into train/val/test sets
1. Conduct bootstrapping on a model
1. Get metrics and statistics about test such return, p values, and others
1. Store results and details of experiments

We could just use what we did last chapter and copy that code, but we want it to be flexible.  We don't know what format data will come in and we don't know exactly what we want to do yet.  We need an easy way to accomplish what we need while keeping it easy to build unknown stuff we may need in the future.

:::{note} It is very likely that if you work for a firm they will have a testing framework for you to use.  This is not meant to replace that, but by understanding how they work you will be able to effectively learn and use any framework

### Split our Data

Let's start from the beginning!  Let's say we have a dataset and we need to read in the csv and split it into train/test/valid.

Let's create a `CsvGetter` class can be used to get a file from a csv.

In [8]:
path = Path('../data')
fpath = path/'eod-quotemedia.csv'

In [9]:
class CsvGetter:
    def __init__(self,fpath): store_attr()
    def __call__(self): return pd.read_csv(self.fpath, parse_dates=['date'])

In [10]:
df = CsvGetter(fpath)().head(6)
df

Unnamed: 0,date,ticker,adj_close
0,2013-07-01,A,29.994186
1,2013-07-02,A,29.650137
2,2013-07-03,A,29.705185
3,2013-07-05,A,30.434568
4,2013-07-08,A,30.524021
5,2013-07-09,A,30.689164


Now that we have the csv let's create a `SizeSplitter` class to split the data based on relative sizes in a column.

It's important to add in tests of inputs with good clear error messages.

In [11]:
class SizeSplitter:
    def __init__(self,sizes=None): 
        if sizes is None:
            self.sizes = L(('train',0.5),('valid',0.25),('test',0.25))
        else: 
            sizes_sum = np.array([o[1] for o in sizes]).sum()
            
            try:test_close(sizes_sum, 1.)
            except AssertionError: raise Exception('Your sizes must sum to 1')
                
            try:test_eq(['train','valid','test'],sizes.keys())
            except AssertionError: raise Exception('You must have train, valid, and test sets')
                
            self.sizes = sizes

        
    def __call__(self,df):        
        sizes = self.sizes
        unique_dates = L(*df.date.unique()).sorted()

        out = AttrDict()
        
        break_sz = int(len(unique_dates)*sizes[0][1])
        out[sizes[0][0]] = df.loc[df.date <= unique_dates[break_sz]]
        
        _remainder_df = df.loc[df.date > unique_dates[break_sz]]
        _remainder_dates = unique_dates[break_sz+1:]

        break_sz = int(len(unique_dates)*sizes[1][1])
        out[sizes[1][0]] = _remainder_df.loc[_remainder_df.date <= _remainder_dates[break_sz]]
        out[sizes[2][0]] = _remainder_df.loc[_remainder_df.date > _remainder_dates[break_sz]]
        return out

In [12]:
df = CsvGetter(fpath)().head(100)
dataset = SizeSplitter()(df)

We can see that it's a dictionary like object with 3 sets, each of which are dataframes.  Exactly what we want.

In [13]:
print(dataset.keys())
display(dataset.valid.head(2))

dict_keys(['train', 'valid', 'test'])


Unnamed: 0,date,ticker,adj_close
51,2013-09-12,A,33.455318
52,2013-09-13,A,33.345222


And that's the core of it.  A getter + a splitter gives us our data splits.  Let's put that into 1 module for simplicity.

In [14]:
class DataModule:
    def __init__(self, getter, splitter): 
        store_attr()
        df = getter()
        self.datasets = splitter(df)

In [97]:
dm = DataModule(CsvGetter(fpath),SizeSplitter())
dm.datasets.keys() # verify we still have our data

dict_keys(['train', 'valid', 'test'])

The nice thing is by following this same format we can define over types of getters or splitters and just pass them in for a consistent format.

# Models

In [89]:
def get_next_trading_day(dte,unique_dates,dates_dict):
    for i in range(10):
        out = dates_dict.get(dte+timedelta(i),False) 
        if out != False: return unique_dates[out]
    return None
    
class RandomModel:
    def __init__(self,action_probs): store_attr()
        
    def __call__(self, df,hold_time=28):
        out = self.open_positions(df)
        out = self.close_positions(out,hold_time)
        
        f = bind(self.get_next_trading_day,unique_dates=L(*df.date))   
        out['open_date']  = pd.to_datetime(trans.open_date.apply(f))
        out['close_date'] = pd.to_datetime(trans.close_date.apply(f))
        return out
        
    def open_positions(self,df):
        out_cols = ['open_date','ticker','action']
        out = pd.DataFrame(columns=out_cols)
        in_df = df[['date','ticker']]
        in_df.columns = out_cols[:-1]

        for action,prob in action_probs.items():
            _tmp = in_df.sample(frac=.1)
            _tmp['action'] = action
            out = pd.concat([out,_tmp])
        out.sort_values('open_date',inplace=True)
        
        out['open_date'] = out.open_date.astype(date)
        return out  
    
    def close_positions(self,df,hold_time):
        df['close_date'] = df.open_date + timedelta(hold_time)
        return df

 Now it's important to note that you should take time to optimize your code some, especially if things are running slow.  Iteration speed is critical and doing this will allow you to work faster.  At this stage you don't need to try to super optimize everything, but you should have reasonably performance code.  For example, this was the first function I wrote for `get_next_trading_day`.  It make look simpler but it's about 12 times slower!

In [90]:
def get_next_trading_day_old(dte,unique_dates):
    return unique_dates.filter(lambda x: x >= dte)[0]

In [91]:
unique_dates = L(*dm.datasets.train.date).unique().sorted()
dates_dict = unique_dates.val2idx()

In [92]:
f = bind(get_next_trading_day_old,unique_dates=unique_dates)   
%timeit _ = dm.datasets.train.iloc[:10000].date.apply(f)

554 ms ± 4.12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [93]:
f = bind(get_next_trading_day,unique_dates=unique_dates,dates_dict=dates_dict)   
%timeit _ = dm.datasets.train.iloc[:10000].date.apply(f)

44.8 ms ± 528 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [18]:
dm = DataModule(CsvGetter(fpath),SizeSplitter())
h0 = RandomModel({'Buy':.5,'Short':.1})
h0(dm.datasets.train)

NameError: name 'action_probs' is not defined

# Metrics

# Tester

In [None]:
class Tester
    def __init__(self, data_module, metric):
    
    def run_bootstrap(self, h0, h1, samples, sample_size):
        # run h0 
        
    
    def plot_bootstrap(self):
    
    def get_results(self,last_n=3):
        
        return h0_mean, h1, p_value

In [None]:
dm = DataModule(CsvGetter(path/'eod-quotemedia.csv'),
                SizeSplitter())

Tester(dm, h0, h1, log_return)

In [341]:
action_probs = {'buy':.5,'short':.5}
RandomModel(action_probs)(df).sample(5)

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,open_date,ticker,action,close_date
282719,2017-05-05 00:00:00,LUV,buy,2017-06-02
144324,2013-09-05 00:00:00,DVA,buy,2013-10-03
49148,2016-09-22 00:00:00,ARE,buy,2016-10-20
78889,2014-08-18 00:00:00,CAT,short,2014-09-15
1371,2014-12-05 00:00:00,AAL,buy,2015-01-02


In [None]:

# Cell
import shelve
import matplotlib.pyplot as plt
import os
from fastcore.foundation import *
import numpy as np

# Cell
def create(filename,keys):
  with shelve.open(filename) as d:
    if type(keys) == str: d[keys] = L()
    else:
        for key in keys: d[key] = L()

# Cell
def append(filename,new_dict,key='exp'):
    '''Append a new_dict to list store in key - create db if needed'''
    if not os.path.exists(filename): create(filename,key)
    with shelve.open(filename) as d:
        if key not in list(d.keys()): d[key] = L()
        tmp = d[key]
        tmp.append(new_dict)
        d['exp'] = tmp

# Cell
def delete(filename,exp_num,key='exp'):
    '''delete an item from list stored in key'''
    with shelve.open(filename) as d:
        tmp = d[key]
        if type(exp) == int:
            tmp.pop(exp_num)
        if type(exp) == str:
            for i,e in eumerate(tmp):
              if e['name'] == name: tmp.pop(i)
        d[key] = tmp

# Cell
def print_keys(filename,last_only=True, with_type=False, key='exp'):
    with shelve.open(filename) as d:
        if last_only and not with_type: print(list(d[key][-1].keys()))
        if last_only and with_type: print({k:type(v) for k,v in d['exp'][-1].items()})
        if not last_only and not with_type:
            a = L()
            for o in d['exp']: a = a + L(o.keys())
            a = a.unique()
            print(a)
        if not last_only and with_type:
            a = {}
            for o in d['exp']: a.update({k:type(v) for k,v in o.items()})
            print(a)

# Cell
def get_stat(filename,exp_num,stat,key='exp',display=True):
    '''get a specific stat (ie loss) from key for a given expirament
    Goes well with partial
    '''
    with shelve.open(filename) as d:
      if display: print(f'd[{key}][{exp_num}][{stat}]: {d[key][exp_num][stat]}')
      return (f'd[{key}][{exp_num}][{stat}]',d[key][exp_num][stat])

# Cell
def get_stats(filename,exp_num,stats,key='exp',display=True):
    return [get_stat(filename,exp_num,stat,key,display) for stat in stats]

# Cell
def print_best(filename,stat,best='min', key='exp'):
    with shelve.open(filename) as d: exps = len(d[key])
    if best == 'min':
        out = (np.inf,None)
        for i in range(exps):
          a,b = get_stat(filename,i,stat,display=False)
          if min(b) < out[0]: out = (min(b),i)
    if best == 'max':
        out = (-np.inf,None)
        for i in range(exps):
          a,b = get_stat(filename,i,stat,display=False)
          if max(b) > out[0]: out = (max(b),i)

    print(f'{stat} {best} value = {out[0]} | best idx = {out[1]-exps}')

# Cell
def graph_stat(filename,stat,idxs=[-1,-2,-3], key='exp',name='name',figsize=(12,6)):
    with shelve.open(filename) as d:
        fig,ax = plt.subplots(figsize=figsize)
        for e in L(d[key])[idxs]:
            try:
              vals = e[stat]
              ax.plot(range(len(e[stat])),e[stat],label=e[name])
              ax.legend();ax.set_title(f'{stat}')
            except:
              print(f'Unable to plot {stat} for {e[name]}')

def graph_stats(filename,stats,idxs=[-1,-2,-3],key='exp',name='name',figsize=(12,6)):
    for stat in stats:
        graph_stat(filename,stat,idxs,key,name,figsize)