In [28]:
from typing import Iterable, Tuple
import pandas as pd
import numpy as np
from torch import Tensor, tensor
import torch
from nn.data.sampler import DataFrameSampler
from operator import methodcaller
from functools import partial
from itertools import zip_longest
from fn import F
from tools import nor, unzip, nn, isiterable, Struct
from typing import *

from tree_tests import ensure_eq_sized



In [3]:
def eq_sized(*els):
   n = None
   for el in els:
      if n is None:
         n = len(el)
         continue
      elif n != len(el):
         return False
   return True


def ensure_eq_sized(*els):
   assert eq_sized(*els), f'Inconsistent sizing: {list(map(len, els))}'
   return els

In [33]:
def ensure_equivalence(*args):
    if len(args) == 1 and isiterable(args[0]):
        args = args[0]
    if len(args) == 0:
        return None
    first_arg = args[0]
    for i, arg in enumerate(args[1:], start=1):
        if first_arg != arg:
            if not hasattr(first_arg, '__eq__') or not first_arg.__eq__(arg):
                raise ValueError(f"Arguments at index {i} and 0 are not equivalent")
    return args[-1]

In [4]:

import pandas as pd
from coretools import list_stonks, load_frame
from faux.pgrid import PGrid
from faux.features.ta.loadout import Indicators, IndicatorBag

winners = pd.read_pickle('minirocket_exp_results.pickle')

print(winners)
winners = winners.iloc[0:11]
setups = []
for _, winner in winners.iterrows():
   winning_analyzer = Indicators(*winner.indicators)
   winning_params   = winner.hyperparams
   setups.append((winning_params, winning_analyzer))
   

[4m[34m2578575264.py:8[0m 
      accuracy  min_accuracy  max_accuracy  ...  pl_ratio min_pl_ratio  max_pl_ratio
92   0.892575      0.859425      0.921725  ...  0.961260     0.838402      1.262191
185  0.872179      0.847426      0.891892  ...  0.901553     0.833254      1.005571
17   0.847754      0.812500      0.871176  ...  0.941185     0.860532      1.003483
91   0.871036      0.852391      0.884058  ...  0.908589     0.860489      0.988356
108  0.880802      0.851438      0.897764  ...  0.970835     0.933023      1.006400
..        ...           ...           ...  ...       ...          ...           ...
122  0.889387      0.879781      0.907407  ...  1.078652     0.869514      1.360723
149  0.852911      0.832721      0.869565  ...  1.081333     0.968003      1.210234
164  0.898219      0.872428      0.924920  ...  0.990776     0.840524      1.078695
131  0.866878      0.831601      0.888889  ...  1.166742     1.020722      1.608114
124  0.901176      0.872204      0.931310  ..

In [5]:

symbols = list_stonks('./sp100')[:25]
frames = [load_frame(sym, './sp100') for sym in symbols]
for (sym, df) in zip(symbols, frames):
   df.name = sym
   print(df)

[4m[34m2152724945.py:5[0m 
                                datetime   open   high    low  close  volume
datetime                                                                   
2017-07-17 15:00:00 2017-07-17 15:00:00  75.00  75.00  70.00  70.00   33982
2017-07-18 15:00:00 2017-07-18 15:00:00  70.50  72.00  69.92  70.00   35719
2017-07-19 15:00:00 2017-07-19 15:00:00  67.92  69.51  67.80  68.95    1805
2017-07-20 15:00:00 2017-07-20 15:00:00  69.75  70.00  69.00  69.00   28390
2017-07-21 15:00:00 2017-07-21 15:00:00  70.00  70.00  68.56  68.92    1562
...                                 ...    ...    ...    ...    ...     ...
2023-01-23 15:00:00 2023-01-23 15:00:00  52.96  54.28  52.28  53.82  303948
2023-01-24 15:00:00 2023-01-24 15:00:00  53.39  54.51  52.64  53.98  252975
2023-01-25 15:00:00 2023-01-25 15:00:00  53.62  55.17  53.62  55.00  243520
2023-01-26 15:00:00 2023-01-26 15:00:00  55.44  55.74  54.49  55.32  286006
2023-01-27 15:00:00 2023-01-27 15:00:00  55.26  55.85  55

In [6]:
import pandas as pd

def samples_from(df:pd.DataFrame, params, transform=None, date_begin=None, date_end=None):
   from faux.backtesting.common import samples_for, split_samples
   if transform is None:
      transform = lambda x: x
   
   symbol = df.name
   ts_idx, X, y = samples_for(
      symbol=df, 
      analyze=F(transform.apply), 
      xcols_not=['open', 'close', 'datetime'],
      x_timesteps=params['seq_len']
   )
   
   idx:pd.DatetimeIndex = pd.DatetimeIndex([d.date() for d in ts_idx])
   print(idx)
   X = torch.from_numpy(X)
   y = torch.from_numpy(y)
   
   #split the data 
   if date_begin is not None or date_end is not None:
      begin_index = None
      end_index = None
      
      for i, ts in enumerate(ts_idx):
         if date_begin is not None and begin_index is None and ts >= date_begin:
            begin_index = i
         
         if date_end is not None and end_index is None and ts >= date_end:
            end_index = i
            break
            
      ts_idx = ts_idx[begin_index:end_index]
      X = X[begin_index:end_index]
      y = y[begin_index:end_index]
   
   #* train the model on 85% of the available data, evaluating on the remaining 15%
   # train_X, train_y, test_X, test_y = split_samples(X=X, y=y, pct=val_split, shuffle=False)
   # train_X, test_X = tuple((v.unsqueeze(1) if v.ndim == 2 else v.swapaxes(1, 2)) for v in (train_X, test_X))
   (X,) = tuple((v.unsqueeze(1) if v.ndim == 2 else v.swapaxes(1, 2)) for v in (X,))
   
   return (ts_idx, X, y)

# samples_from(frames[0], setups[0][0], setups[0][1], date_begin=pd.Timestamp('01-01-2010'))

In [15]:
params, transform = setups[0]

samples = [samples_from(df, params, transform, date_begin=pd.Timestamp('01-01-2005')) for df in frames]

[4m[34m3570289256.py:17[0m DatetimeIndex(['2017-08-08', '2017-08-09', '2017-08-10', '2017-08-11',
               '2017-08-14', '2017-08-15', '2017-08-16', '2017-08-17',
               '2017-08-18', '2017-08-21',
               ...
               '2023-01-12', '2023-01-13', '2023-01-17', '2023-01-18',
               '2023-01-19', '2023-01-20', '2023-01-23', '2023-01-24',
               '2023-01-25', '2023-01-26'],
              dtype='datetime64[ns]', length=1377, freq=None)
[4m[34m3570289256.py:17[0m DatetimeIndex(['2018-07-20', '2018-07-23', '2018-07-24', '2018-07-25',
               '2018-07-26', '2018-07-27', '2018-07-30', '2018-07-31',
               '2018-08-01', '2018-08-02',
               ...
               '2023-01-12', '2023-01-13', '2023-01-17', '2023-01-18',
               '2023-01-19', '2023-01-20', '2023-01-23', '2023-01-24',
               '2023-01-25', '2023-01-26'],
              dtype='datetime64[ns]', length=1138, freq=None)
[4m[34m3570289256.py:17[0m Dateti

In [35]:
from collections import namedtuple
from tools import maxby, minby, flatten_dict

BTSubSample = namedtuple('BTSubSample', 'step time X ytrue ypred')
BTSample = namedtuple('BTSample', 'symbol step time X ytrue ypred')

class PolySymBacktester:
   def __init__(self, symbols=None, datasets=None, samples=None, models=None, init_balance=100.0, pos_type='long', on_sampler=None):
      self.init_balance = init_balance
      self.pos_type = pos_type
      self.on_sampler = on_sampler
      
      self.wrong = 0
      self.right = 0
      self._has_run = False
      self._batched_samples = False
      self.datasets = datasets
      #...
      
      self.symbols = list(symbols) if symbols is not None else [df.name for df in datasets]
      
      if samples is not None:
         self.samples = samples
         if isinstance(samples, Mapping):
            self.samples = [samples[sym] for sym in self.symbols]
         assert len(self.samples) == len(self.symbols)
      
      assert samples is not None, f'In-house automatic sampling is not yet implemented because I\'m lazy'
      
      if models is None:
         def dfl(x:Tensor):
            return torch.zeros((len(x),))
         print('WARNING: No models specified, loss will be predicted for every sample')
         self.models = [dfl for sym in self.symbols]
      elif callable(models):
         self.models = [models for sym in self.symbols]
      else:
         self.models = models
      
      self.mount_subs(symbols=self.models, datasets=self.datasets, samples=self.samples, models=self.models)
   
   def mount_subs(self, symbols=[], datasets=[], samples=[], models=[]):
      ensure_eq_sized(symbols, datasets, samples, models)
      # from .backtester import Backtester as Sub
      from faux.backtesting.backtester import Backtester as Sub
      
      subs = self.subs = []
      
      for (symbol, df, smpls, model) in zip(symbols, datasets, samples, models):
         sub = Sub(model=model, df=df, samples=smpls)
         sub.total = 0
         subs.append(sub)
      
      self.loop = ConcurrentBTLoopEnumeration(self)
      
   
      
class ConcurrentBTLoopEnumeration:
   def __init__(self, owner:PolySymBacktester):
      self.owner = owner
      self.testers = owner.subs
      self.loops = [itertools.starmap(BTSubSample, t.loop()) for t in self.testers]
      self.terminated = {}
      self.lastYields = {}
      
   def reshape_loops(self):
      while len(self.testers) > len(self.terminated):
         values = []
         
         for symbolId, loop in enumerate(self.loops):
            if symbolId not in self.terminated:
               
               try:
                  (loop_step, l_time, l_x, l_yt, l_yp) = next(loop)
                  
                  values.append(BTSample(symbolId, loop_step, l_time, l_x, l_yt, l_yp))
                  
               except StopIteration:
                  self.terminated[symbolId] = True
                  values.append(None)
            else:
               values.append(None)
         if len(self.testers) > len(self.terminated):
            yield values
         
   def synchronize_loops(self):
      lastsample:Dict[int, BTSubSample] = {}
      start_date = None
      for symId, loop in enumerate(self.loops):
         sample = lastsample[symId] = next(loop)
         if start_date is None or sample.time > start_date:
            start_date = sample.time
      print(f'Synchronizing loops to {start_date}')
      
      for symId, sample in lastsample.items():
         while sample.time < start_date:
            sample = lastsample[symId] = next(self.loops[symId])
         self.loops[symId] = push(sample, self.loops[symId])
   
   def __iter__(self):
      self.synchronize_loops()
      
      
      for raw in self.reshape_loops():
         # if len(raw) == 0:
         time = sample_time(raw)
         signals = sample_signals(raw)
         
         yield Struct(
            time=time,
            signals=sample_signals,
            samples=raw
         )
   
def push(a, b):
   yield a
   yield from b
   
from frozendict import frozendict
from fn import _, F

def sample_signals(loop_sample):
   return frozendict((symId, e.ypred) for symId, e in enumerate(loop_sample))

def sample_time(sample):
   return ensure_equivalence([e.time for e in sample if e is not None])
   
import itertools
from tools import isiterable

def joinits(a, b):
   if not isiterable(a):
      a = [a]
   if not isiterable(b):
      b = [b]
   return itertools.chain(a, b)

tester = PolySymBacktester(symbols, datasets=frames, samples=samples)
loop = tester.loop

stop = False
steps = 0

for sample in loop:
   print(sample.time)

[4m[34m1755497444.py:91[0m Synchronizing loops to 2018-07-20 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-20 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-23 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-24 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-25 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-26 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-27 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-30 15:00:00
[4m[34m1755497444.py:143[0m 2018-07-31 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-01 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-02 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-03 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-06 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-07 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-08 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-09 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-10 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-13 15:00:00
[4m[34m1755497444.py:143[0m 2018-08-14 15:00:00
[4m[34m