## Definitions

In [1]:
import os
import sys
import shutil
import random

import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
pd.set_option('display.max_columns', None)
import pandas_ta as ta
import quantstats as qs
qs.extend_pandas()

import numpy as np
import math

from datetime import datetime, timedelta
from tqdm import tqdm
from matplotlib import pyplot as plt
%matplotlib inline

params = {'figure.facecolor': 'w'}
plt.rcParams.update(params)

from IPython.display import display

''' Import custom Library '''
lib_path = '/workspace/202205_idx-trading/lib'
sys.path.insert(0, lib_path)
# Read Imports
from utils import read_config
from strat_utils import LQ45BaseStrategy
from data_utils import gen_combined_df, extend_price_df, handle_nan, RandomPriceData
from backtest import Backtest, HistoricalScenarioBacktest, RandomizedBacktest
sys.path.remove(lib_path)

### Parameter and Directories Definition

In [2]:
# Parameters
date_start = '2009-01-01'
date_breakpoint = '2010-01-01'
strat_class = "Momentum"
std = 1

In [3]:
# Data Directory
data_dir = '/workspace/202205_idx-trading/_data/'
lq45_dir = '/workspace/202205_idx-trading/_data/20220525_lq45/'
lq45_index_file = data_dir + '20220525_lq45_index.csv'
lq45_list = '20220525_lq45-list.txt'

## Data Preparation

### Data Loading
Note: Only In sample data is loaded

In [4]:
# Prepare Stock Tickers
with open(data_dir + lq45_list, "r") as f:
    lq45_tickers = f.read().split('\n')

## Prepare active tickers for international codes
active_tickers = [f + '.JK' for f in lq45_tickers]
active_tickers.append('LQ45')

In [5]:
# Prepare Time Series Data
nan_handle_method = 'bfill'

df_dict = {}
for ticker in tqdm(active_tickers):
    if ticker == 'LQ45':
        df_dict[ticker] = pd.read_csv(lq45_index_file)
    else:
        df_dict[ticker] = pd.read_csv(lq45_dir + ticker + '.csv')
    
    ## Take Only Date and Adjusted Close
    df_dict[ticker] = df_dict[ticker][['Date', 'Adj Close']]
    df_dict['Date'] = pd.to_datetime(df_dict[ticker]['Date'])
    df_dict[ticker].set_index(pd.DatetimeIndex(df_dict[ticker]['Date']), inplace=True)
    
    df_dict[ticker].drop('Date', axis=1, inplace=True)
    
    ## Convert Adj Close to price
    df_dict[ticker]['price'] = df_dict[ticker]['Adj Close']
    df_dict[ticker].drop('Adj Close', axis=1, inplace=True)

100%|█████████████████████████████████████████████████████| 46/46 [00:00<00:00, 70.69it/s]


In [6]:
# Generate In Sample Dataset
nan_cnt_threshold = 252*2

in_df = {}
out_df = {}
rmv_tickers = []
for ticker in tqdm(active_tickers):
    ## Take In Sample and Out Sample Data
    in_df[ticker] = df_dict[ticker][(df_dict[ticker].index >= date_start) & 
                                                (df_dict[ticker].index < date_breakpoint)]
    
    ## Check if there are too many NaN values
    if in_df[ticker]['price'].isna().sum() > nan_cnt_threshold:
        rmv_tickers.append(ticker)
        continue
    
    ## Handle NaN Values
    in_df[ticker] = handle_nan(in_df[ticker], method=nan_handle_method)
    
    ## Extend price to other values
    in_df[ticker] = extend_price_df(in_df[ticker])

# Remove tickers that only have small amounts of data
active_tickers = [t for t in active_tickers if t not in rmv_tickers]

100%|████████████████████████████████████████████████████| 46/46 [00:00<00:00, 373.53it/s]


## Strategy Design
Done by the Robert Carver method: https://www.youtube.com/watch?app=desktop&v=-aT55uRJI8Q

In [4]:
class BinaryEWMACStrategy(LQ45BaseStrategy):
    '''
    Exponential Moving Average Crossover Strategy.
    
    Signal is generated in a binary manner (buy/sell) 
    '''
    def __init__(self, config=None, config_filepath=None, mode="paper_trade"):
        super().__init__(config=config, config_filepath=config_filepath, mode=mode)
        
        if config is not None:
            config_dict = config
        
        elif config_filepath is not None:
            config_dict = read_config(config_filepath)
            
        else:
            assert config is not None, "Either config or config_filepath must be available"
            
        # Strategy Parameters
        strat_params = config_dict['strat_params']
        
        self.long_only = strat_params['long_only']
        
        self.raw_tickers = strat_params['tickers']
        
        self.lookback_fast = strat_params['lookback_fast']
        self.lookback_slow = strat_params['lookback_slow']
        self.vol_lookback = strat_params['vol_lookback']
        
        lookback_indicator = str(self.lookback_fast) + "/" + str(self.lookback_slow)
        self.tickers = [t + "-" + lookback_indicator for t in self.raw_tickers]
    
    def prepare_indicators(self, df_dict, vol_adj=False, lookback_postfix=True):
        strat_df = gen_combined_df(df_dict, self.raw_tickers, ['price'], add_pfix=True)
        
        # Rename Tickers to contain lookback_postfix
        if lookback_postfix:
            for r_t, t in zip(self.raw_tickers, self.tickers):
                price_r_t = "price_" + r_t
                price_t = "price_" + t
                strat_df.rename(columns={price_r_t: price_t}, inplace=True)
        
        for t in self.tickers:
            price_t = "price_" + t
            ewmac_t = "ewmac_" + t
            vol_adj_ewmac_t = "vol-adj-ewmac_" + t
            
            fast_ewma = strat_df[price_t].ewm(span=self.lookback_fast).mean()
            slow_ewma = strat_df[price_t].ewm(span=self.lookback_slow).mean()
            strat_df[ewmac_t] = fast_ewma - slow_ewma
            
            if vol_adj:
                stdev_ret = (strat_df[price_t] - strat_df[price_t].shift(1)).ewm(span=self.vol_lookback).mean()
                strat_df[vol_adj_ewmac_t] = strat_df[ewmac_t] / stdev_ret
            
        return strat_df
    
    def gen_signals(self, strat_df, remove_overshoot_signal=False):
        # Signal Rules
        uptrend_signal = lambda ewmac: ewmac > 0
        downtrend_signal = lambda ewmac: ewmac <= 0
        
        # Prepare Signal
        last_signal = {}
        
        signal_tickers = ["signal_" + t for t in self.tickers]
        for signal_t in signal_tickers:
            strat_df[signal_t] = ''
            last_signal[signal_t] = ''
        
        # Generate Signals
        if not self.long_only:
            for t in self.tickers:
                signal_t = "signal_" + t
                ewmac_t = "ewmac_" + t

                for i in range(0, len(strat_df)):
                    if i == 0:
                        strat_df[signal_t][i] = ''

                    elif last_signal[signal_t] == '':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'long_entry'
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'short_entry'
                        else:
                            strat_df[signal_t][i] = ''

                    elif last_signal[signal_t] == 'long_entry':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = ''
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'long_close'
                        else:
                            strat_df[signal_t][i] = ''

                    elif last_signal[signal_t] == 'short_entry':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'short_close'
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = ''
                        else:
                            strat_df[signal_t][i] = ''

                    elif last_signal[signal_t] == 'long_close' or last_signal[signal_t] == 'short_close':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'long_entry'
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'short_entry'
                        else:
                            strat_df[signal_t][i] = ''
                            
                    if strat_df[signal_t][i] != "":
                        last_signal[signal_t] = strat_df[signal_t][i]
                            
        elif self.long_only:
            for t in self.tickers:
                signal_t = "signal_" + t
                ewmac_t = "ewmac_" + t

                for i in range(0, len(strat_df)):
                    if i == 0:
                        strat_df[signal_t][i] = ''
                    
                    elif last_signal[signal_t] == '':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'long_entry'
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = ''
                        else:
                            strat_df[signal_t][i] = ''

                    elif last_signal[signal_t] == 'long_entry':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = ''
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'long_close'
                        else:
                            strat_df[signal_t][i] = ''
                            
                    elif last_signal[signal_t] == 'long_close':
                        if uptrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = 'long_entry'
                        elif downtrend_signal(strat_df[ewmac_t][i]):
                            strat_df[signal_t][i] = ''
                        else:
                            strat_df[signal_t][i] = ''
                            
                    if strat_df[signal_t][i] != "":
                        last_signal[signal_t] = strat_df[signal_t][i]
        
        # Remove signals for tickers not within runtime
        if remove_overshoot_signal:
            for signal_t in signal_tickers:
                strat_df.loc[strat_df.index < self.run_date_start, signal_t] = ""
        
        return strat_df
    
    def run(self):
        self.df_data_dict = self.prepare_data()
        self.strat_df = self.prepare_indicators(self.df_data_dict)
        self.strat_df = self.gen_signals(self.strat_df)
        
        return self.strat_df

### Generate Random Data

In [5]:
r = RandomPriceData()
random_data_dict = {}

# Trend lengths are based on rob carver's method (gradually increase from a week up until a year)
t_lengths = [5, 10, 15, 21, 42, 64, 85, 107, 128, 150, 171, 192, 213, 235, 256]

In [6]:
# Prepare Sawtooth Data
for tl in t_lengths:
    sawtooth_data = r.random_oscillation(n_length=512, t_length=tl, amp=3, std_scale=0.1, mode='sawtooth')
    random_data_dict[str(tl)] = pd.DataFrame(sawtooth_data, columns=['price'], index=sawtooth_data.index)

In [None]:
# Prepare Uptrend Jumpy Data
r_up = RandomPriceData()
r_up.run_simulation(r_up.random_trend, num_iter=1000, movement="up", min_frac_change=0.15, stochastic_process=r.jump_diffusion, n_length=252, 
                 std=0.1, drift=0.05, jump_mean=0, jump_std=0.05, poisson_rate=0.05)

In [None]:
# Prepare Downtrend Jumpy Data
r_down = RandomPriceData()
r_down.run_simulation(r_down.random_trend, num_iter=1000, movement="down", min_frac_change=0.15, stochastic_process=r.jump_diffusion, n_length=252, 
                 std=0.1, drift=-0.05, jump_mean=0, jump_std=0.05, poisson_rate=0.05)

In [None]:
# TODO - Prepare Mean Reverting Noisy Data

### Setup Strategy

In [7]:
config = {
            "run_params":{
                "base_data_dir": "/workspace/202205_idx-trading/_data/",
                "lq45_dir": "/workspace/202205_idx-trading/_data/20220525_lq45/",
                "lq45_index_filename": "20220525_lq45_index.csv",
                "lq45_list_filename": "20220525_lq45-list.txt"
            },

            "backtest_params":{
                "run_date_start": date_breakpoint,
                "run_date_end": "full"
            },

            "strat_params":{
                "long_only": True,
                "tickers": None,
                "lookback_fast": None,
                "lookback_slow": None,
                "vol_lookback": 25
            }   
}

### Run Strategy Experiment

#### Sawtooth Data

In [8]:
# Fast and Slow Lookback Variations
lookback_pairs = [[2, 8], [3, 12], [4, 16], [5,20], [8,32], [10,40], [12,48], [16,64], [20,80], [25, 100], [28, 112], [32, 128]]
backtests = {}
experiment_list = []

# Iterate over every combination of lengths and lookback pairs
for tl in tqdm(t_lengths):
    buffer_dict = {}
    buffer_dict['tl'] = tl
    
    for lp in lookback_pairs:
        config["strat_params"]["tickers"] = [str(tl)]
        config["strat_params"]["lookback_fast"] = lp[0]
        config["strat_params"]["lookback_slow"] = lp[1]

        s = BinaryEWMACStrategy(config=config, mode="backtest")
        strat_df = s.prepare_indicators(random_data_dict)
        strat_df = s.gen_signals(strat_df)
        
        b = Backtest()
        b.init_signal(strat_df)
        b.calc_returns()
        sharpe = b.calc_sharpe()
        
        pair_str = str(lp[0]) + "_" + str(lp[1])
        backtests[pair_str + "_" + str(tl)] = b
        
        buffer_dict[pair_str] = sharpe
    
    experiment_list.append(buffer_dict)

experiments_df = pd.DataFrame(experiment_list)

100%|█████████████████████████████████████████████████████| 15/15 [01:02<00:00,  4.16s/it]


In [9]:
display(experiments_df)

Unnamed: 0,tl,2_8,3_12,4_16,5_20,8_32,10_40,12_48,16_64,20_80,25_100,28_112,32_128
0,5,-0.430847,-3.209808,-5.160323,-5.560877,-7.505877,-7.955234,-8.651845,-8.806924,-8.681029,-8.372763,-8.200024,-7.186247
1,10,2.795588,0.604659,-0.413639,-1.664977,-3.11822,-3.322206,-3.88034,-4.669878,-4.821329,-5.209523,-5.18753,-5.055178
2,15,2.804069,1.893787,1.351844,0.689161,-0.452187,-1.009745,-1.49619,-2.390734,-2.649439,-2.050381,-2.093049,-2.043749
3,21,2.146886,2.292301,1.743594,1.535623,0.122961,0.085072,-0.348548,-1.077344,-1.692452,-1.270212,-1.034604,-1.075289
4,42,-0.744494,1.740773,1.517684,1.39753,1.211553,1.030868,0.942785,0.751458,0.463914,0.14321,0.209316,0.126958
5,64,-1.557828,0.417042,1.379641,1.637728,1.52066,1.179486,1.347415,1.256195,1.056797,0.939835,0.992645,0.827511
6,85,-3.547816,-0.665484,0.875555,1.229454,1.382267,1.457545,1.323576,1.261054,1.18223,1.224925,1.244683,1.064511
7,107,-3.976335,-2.07893,0.467328,0.982364,1.32175,1.38199,1.331277,1.369864,1.289982,1.232732,1.302389,1.130469
8,128,-4.537099,-2.853408,-0.691788,0.332461,1.033616,1.146278,0.856901,1.084148,1.014568,0.99043,0.954188,0.872415
9,150,-4.155004,-2.566993,-0.903135,-0.18873,1.732408,1.717786,1.628508,1.629858,1.443748,1.452483,1.455821,1.436204


Eyeing the results, we will use: (2_8, 4_16, 5_20, 8_32, 16_64), because:
- 2_8 and 4_16 tends to be better for shorter term trends
- 5_20, 8_32 and 16_64 are for longer term trends

We will calculate correlations for:
- 2_8 and 4_16 on shorter term trends (5, 10, 15, 21)
- 4_16 and 5_20 on mid term trends (15, 21, 42, 68, 85)
- 5_20 and 8_32 + 16_62 for mid term trends (42, 64, 85)
- 8_32 + 16_64 for longer trends(85, 128, 150, 256)

In [10]:
# Generate Turnovers
turnover_list = []

for tl in tqdm(t_lengths):
    buffer_dict = {}
    buffer_dict['tl'] = tl
    
    for lp in lookback_pairs:
        pair_str = str(lp[0]) + "_" + str(lp[1])
        lpt_str = pair_str + "_" + str(tl)
        
        turnover = backtests[lpt_str].calc_turnover()
        
        buffer_dict[pair_str] = turnover
        
    turnover_list.append(buffer_dict)
    
turnover_df = pd.DataFrame(turnover_list)

100%|█████████████████████████████████████████████████████| 15/15 [00:00<00:00, 51.87it/s]


In [11]:
display(turnover_df)

Unnamed: 0,tl,2_8,3_12,4_16,5_20,8_32,10_40,12_48,16_64,20_80,25_100,28_112,32_128
0,5,51,51,51,51,51,51,51,51,51,51,51,51
1,10,28,27,26,26,26,26,26,26,26,26,26,26
2,15,19,19,17,17,17,17,17,17,17,17,17,17
3,21,24,14,16,12,12,12,12,12,12,12,12,12
4,42,47,11,11,10,8,8,7,7,7,7,7,7
5,64,65,24,10,6,5,8,4,4,4,4,4,4
6,85,84,35,14,9,5,4,4,4,3,3,3,3
7,107,94,52,23,12,5,5,4,3,2,2,2,2
8,128,97,59,34,17,6,4,6,3,3,3,3,3
9,150,96,66,41,29,6,3,4,4,3,3,3,3


Turnovers are pretty similar with each other, except between the short trends (2_8 and 4_16) and the upper trends.

In [12]:
# Generate Correlations
lookback_pairs_options = [["2_8", "4_16"], ["4_16", "5_20"], ["5_20", "8_32"], ["5_20", "16_64"], ["8_32", "16_64"]]
trends_options = [["5", "10", "15", "21"], ["15", "21", "42", "64", "85"], ["42", "64", "85"], ["42", "64", "85"], ["85", "128", "256"]]

correlations_list = []

for lpo, to in tqdm(zip(lookback_pairs_options, trends_options)):
    buffer_dict = {}
    buffer_dict["lp0"] = lpo[0]
    buffer_dict["lp1"] = lpo[1]
    
    for trend in to:
        lpt_0 = lpo[0] + "_" + trend
        lpt_1 = lpo[1] + "_" + trend
        
        corr = backtests[lpt_0].strat_df['return'].corr(backtests[lpt_1].strat_df['return'])
        
        buffer_dict[trend] = corr
    
    correlations_list.append(buffer_dict)

correlations_df = pd.DataFrame(correlations_list)

5it [00:00, 658.78it/s]


In [13]:
display(correlations_df)

Unnamed: 0,lp0,lp1,5,10,15,21,42,64,85,128,256
0,2_8,4_16,0.673929,0.713565,0.769833,0.717651,,,,,
1,4_16,5_20,,,0.896436,0.910733,0.933452,0.936151,0.935429,,
2,5_20,8_32,,,,,0.858525,0.936804,0.933503,,
3,5_20,16_64,,,,,0.673288,0.784475,0.787807,,
4,8_32,16_64,,,,,,,0.850467,0.858439,0.515168


Findings:
- 4_16 is too highly correlated with 5_20, so will drop 4_16
- Looks like 8_32 is too highly correlated to either 4_16 and to 16_64. So will drop 8_32 and keep the rest.

Remaining: 2_8, 5_20, 16_64

## Backtest

Universes are selected based on such rules:
- Selected a diversified set of sectors from LQ45
- "Real" sectors, based on seeing the current trend
- Reduced each sector to 4 arbitrary ticker selections

We will be utilizing three variations of lookbacks 2_8, 5_20, 16_64 for each stock groups.

In [None]:
# Determine Different Universes for Strategy
stock_groups = {
    "energy_and_mining": ['BRPT.JK', 'MEDC.JK', 'ADRO.JK','ANTM.JK'],
    "retail": ['AMRT.JK', 'UNVR.JK', 'ERAA.JK', 'ASII.JK'],
    "food_agri": ['CPIN.JK', 'JPFA.JK', 'ICBP.JK', 'INDF.JK'],
    "medical": ['KLBF.JK', 'MIKA.JK']
}

# Parameters
lookback_pairs_strat = [[2, 8], [5, 20], [16, 64]]

In [None]:
# Strategy Template Config
config = {
            "run_params":{
                "base_data_dir": "/workspace/202205_idx-trading/_data/",
                "lq45_dir": "/workspace/202205_idx-trading/_data/20220525_lq45/",
                "lq45_index_filename": "20220525_lq45_index.csv",
                "lq45_list_filename": "20220525_lq45-list.txt"
            },

            "backtest_params":{
                "run_date_start": date_breakpoint,
                "run_date_end": "full"
            },

            "strat_params":{
                "long_only": True,
                "tickers": None,
                "lookback_fast": None,
                "lookback_slow": None,
                "vol_lookback": 25
            }   
}

### Individual Sectors Testing

#### LQ45 Index

#### Energy and Mining

In [None]:
# Run Strategy
config["strat_params"]["tickers"] = stock_groups["food_agri"]

strat_df_list = []
for lp in lookback_pairs_strat:
    config["strat_params"]["lookback_fast"] = lp[0]
    config["strat_params"]["lookback_slow"] = lp[1]
    
    s = BinaryEWMACStrategy(config=config, mode="backtest")
    out_dict = s.prepare_data()
    strat_df = s.prepare_indicators(out_dict)
    strat_df = s.gen_signals(strat_df)
    
    strat_df_list.append(strat_df)

strat_df = pd.concat(strat_df_list, axis=1)

In [None]:
# Single Walk Forward Backtest
b = Backtest()
b.run(strat_df)

#### Retail

#### Food Agri

#### Medical

### Aggregate Strategy

In [None]:
# TODO - WF Backtest, Historical Backtest, Randomized Backtest