In [1]:
import import_ipynb
import pandas as pd
import numpy as np
import torch
from datetime import datetime
from datetime import datetime as dt
from torch.utils.data import Dataset
from sklearn.preprocessing import StandardScaler
import pickle

In [1]:
from statsmodels.tsa.stattools import adfuller

In [None]:
class AdaMomCMOADF():
    # Adaptive momentum strategy using CMO and ADF for non-stationarity
    def __init__(self,high=80,low=20,mid=50):
        self.logL=[]
        self.high=high
        self.low=low
        self.mid=mid
        self.model_type='rule_based'
        self.data_cols=['Close_n','CMO_14','datetime']
        self.regime={}
        self.entry_val={}
        self.exit_val={}
    def set_alt_data(self,alt_data_func,remote=False):
        if remote: self.gdata=anvil.server.call(alt_data_func)['gdata']
        else: self.gdata=alt_data_func()['gdata']
    def check_entry_batch(self,dfD):
        # return always_buy(dfD)
        decisionsD={t:0 for t in dfD}
        stopD={t:5 for t in dfD}
        targetD={t:5 for t in dfD}
        dataD={}
        log_entry={}
        high=self.high
        low=self.low
        mid=self.mid
        for t in dfD.keys():
            data=dfD[t]
            row=dfD[t].iloc[-1]
            if data.shape[0]>2:
                adf=adfuller(data['Close_n'],maxlag=30,autolag=None)
                if adf[0]>adf[4]['1%']: self.regime[t]='tr'
                else: self.regime[t]='mr'
                regime=self.regime[t]
                if regime=='tr' and row['CMO_14']>low and row['CMO_14']<mid: decisionsD[t]=1
                elif regime=='tr' and row['CMO_14']<-low and row['CMO_14']>-mid: decisionsD[t]=-1
                elif regime=='mr' and row['CMO_14']>high: decisionsD[t]=-1
                elif regime=='mr' and row['CMO_14']<-high: decisionsD[t]=1
                else: decisionsD[t]=0
                self.entry_val[t]=row['CMO_14']
                self.exit_val[t]='not_set'
        return decisionsD,stopD,targetD
    def save_func(self,episode_state):
        ticker=[t for t in episode_state][0]
        return ticker,self.entry_val[ticker],self.exit_val[ticker]
    def check_exit_batch(self,dfD,posf):
        def exit_fn(row):
            return self.exit_predicate(row,dfD[row.ticker])
        posf['to_exit']=posf.apply(exit_fn,axis=1).values
        return posf
    def exit_predicate(self,row,df):
        # return False
        data=df
        dfrow=df.iloc[-1]
        high=self.high
        low=self.low
        mid=self.mid
        regime=self.regime[row['ticker']]
        self.exit_val[row['ticker']]=dfrow['CMO_14']
        if regime=='tr' and row['quant']>0 and dfrow['CMO_14']>high: return True
        elif regime=='tr' and row['quant']<0 and dfrow['CMO_14']<-high: return True
        elif regime=='mr' and row['quant']>0 and dfrow['CMO_14']>-low: return True
        elif regime=='mr' and row['quant']<0 and dfrow['CMO_14']<low: return True
        # exit cases for detecting a trade on incorrect trend direction
        # elif regime=='tr' and row['quant']>0 and dfrow['CMO_14']<=-mid: return True
        # elif regime=='tr' and row['quant']<0 and dfrow['CMO_14']>=mid: return True
        else: return False
    def Check(strat,dfD):
        return strat.check_entry_batch(dfD)
    def Exit(strat,dfD,posf):
        return strat.check_exit_batch(dfD,posf)

In [2]:
class MomStrat():
    def __init__(self,th=.00025):
        self.logL=[]
        self.th=th
        self.model_type='rule_based'
        self.data_cols=['Close','datetime']
    def set_alt_data(self,alt_data_func,remote=False):
        if remote: self.gdata=anvil.server.call(alt_data_func)['gdata']
        else: self.gdata=alt_data_func()['gdata']
    def check_entry_batch(self,dfD):
        # return always_buy(dfD)
        decisionsD={t:0 for t in dfD}
        stopD={t:0 for t in dfD}
        targetD={t:0 for t in dfD}
        dataD={}
        log_entry={}
        for t in dfD.keys():
            data=dfD[t]
            if data.shape[0]>2:
                r=np.log(data['Close']/data['Close'].shift(1)).dropna()
                m=r.rolling(30).mean().values
                if m[-1]>self.th: decisionsD[t]=-1
                elif m[-1]<-self.th: decisionsD[t]=1
                else: decisionsD[t]=0
        return decisionsD,stopD,targetD
    def check_exit_batch(self,dfD,posf):
        def exit_fn(row):
            return self.exit_predicate(row,dfD[row.ticker])
        posf['to_exit']=posf.apply(exit_fn,axis=1).values
        return posf
    def exit_predicate(self,row,df):
        return False
        data=df
        r=np.log(data['Close']/data['Close'].shift(1)).dropna()
        m=r.rolling(20).mean().values
        if row['quant']>0 and m[-1]<self.th/3: return True
        elif row['quant']<0 and m[-1]>-self.th/3: return True
        else: return False
    def Check(strat,dfD):
        return strat.check_entry_batch(dfD)
    def Exit(strat,dfD,posf):
        return strat.check_exit_batch(dfD,posf)

In [None]:
def do_nothing(dfD):
    empty={t:0 for t in dfD}
    return empty,empty,empty
def always_buy(dfD):
    buy={t:1 for t in dfD}
    empty={t:0 for t in dfD}
    return buy,empty,empty
def always_sell(dfD):
    sell={t:-1 for t in dfD}
    empty={t:0 for t in dfD}
    return sell,empty,empty