In [None]:
# LWVB: Larry Williams Volatility Breakout

import cryptocompare
import time
import datetime
import pandas as pd
import numpy as np
import os
import shutil
import plotly.express as px
import pprint

import itertools
import tqdm

In [None]:
targetDataFile = 'data/Upbit_BTC_KRW_hour_gmt.csv'
rate_fee = 0.003
rate_slippage = 0.000
rate_cost = rate_fee + rate_slippage

In [None]:
dt_parser = lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S%z")

hist_hour = pd.read_csv(
    targetDataFile, 
    index_col = 0, 
    parse_dates = True)

# UTC + 9로 시간 변환
#hist_hour.index = hist_hour.index + datetime.timedelta(hours=9)
hist_hour.index = hist_hour.index.tz_convert('Asia/Seoul')

필터링 원할 때 다음 코드 사용

In [None]:
# hist_hour = hist_hour[hist_hour.index > datetime.datetime(2021, 12, 1).astimezone()]

In [None]:
def clean_nanmean(s):
    if np.all(np.isnan(s)):
        return np.nan
    return np.nanmean(s)

def clean_nanmax(s):
    if np.all(np.isnan(s)):
        return np.nan
    return np.nanmax(s)


In [None]:
def do_LWVB_backtest(hist_hour, name, begin_hour, func_MT, param_MT, func_TR, param_TR):
    hist_day = hist_hour.resample('1D', offset=datetime.timedelta(hours=begin_hour)).agg({
        'open': 'first', #lambda df: None if df.empty else df[0],
        'close': 'last', #lambda df: None if df.empty else df,
        'high': 'max',
        'low': 'min',
        'volume': 'sum',
        'value': 'sum',
    })
    # drop the first day
    # head data cannot be reliable because of resample&agg
    hist_day.drop(hist_day.index[:1], inplace=True)

    func_MT(hist_day, param_MT)
    func_TR(hist_day, param_TR)

    hist_day['assets_after_sell'] = hist_day['rate_of_return'].cumprod()
    hist_day['assets_max'] = hist_day['assets_after_sell'].cummax()
    hist_day['dd'] = 1 - hist_day['assets_after_sell'] / hist_day['assets_max']
    hist_day['underwater_times'] = 0
    #for idx, val in hist_day['dd'].iteritems():
    #    iidx = hist_day['dd'].index.get_loc(idx)
    #    if val > 0:
    #        hist_day['underwater_times'].iloc[iidx] = hist_day['underwater_times'].iloc[iidx-1] + 1
    #    else:
    #        hist_day['underwater_times'].iloc[iidx] = 0
    hist_day['rate_of_return_percent'] = (hist_day['rate_of_return'] - 1) * 100
    hist_day['rate_of_return_percent_profit'] = np.where(hist_day['rate_of_return_percent'] > 0, hist_day['rate_of_return_percent'], np.nan)
    hist_day['rate_of_return_percent_loss'] = np.where(hist_day['rate_of_return_percent'] < 0, -hist_day['rate_of_return_percent'], np.nan)
    
    cagr_percent = hist_day['assets_after_sell'][-1] ** (1/(len(hist_day.index)/365)) * 100 - 100
    mdd_percent = np.max(hist_day['dd']) * 100
    if mdd_percent != 0:
        cagr_over_mdd = cagr_percent / mdd_percent
    else:
        cagr_over_mdd = None
    if np.sum(hist_day['trade']) > 0:
        ration_win = np.sum(np.where(hist_day['rate_of_return'] > 1, 1, 0)) / np.sum(hist_day['trade']) * 100
    else:
        ration_win = None

    statistic = {
        'name': name,
        'begin_hour': begin_hour,
        'func_MT': func_MT.__name__,
        'param_MT': param_MT,
        'func_TR': func_TR.__name__,
        'param_TR': param_TR,
        ########################################################################
        'cagr(%)': cagr_percent,
        'finalAsset': hist_day['assets_after_sell'][-1],
        'stdev(%)': np.std(hist_day['rate_of_return']) * 100, # -100 하면 안된다. ax+b의 sigma는 ax이다
        'mdd(%)' : mdd_percent,
        #'longest_recovery_days' : np.max(hist_day['underwater_times']),
        'cagr(%)/mdd(%)' : cagr_over_mdd,
        'ratio_win(%)' : ration_win,
        'mean_profit(%)' : clean_nanmean(hist_day['rate_of_return_percent_profit']),
        'mean_loss(%)' : clean_nanmean(hist_day['rate_of_return_percent_loss']),
        'max_profit(%)' : clean_nanmax(hist_day['rate_of_return_percent_profit']),
        'max_loss(%)' : clean_nanmax(hist_day['rate_of_return_percent_loss']),
        #nderwater_times': np.max(hist_day['underwater_times']),
        'Num Of Trade' : np.sum(hist_day['trade']),
    }
    return statistic, hist_day



def MT_NoMT(hist_day, param_MT):
    hist_day['is_bull'] = True

def MT_maopen_and(hist_day, param_MT):
    if 'is_bull' not in hist_day.columns:
        hist_day['is_bull'] = True
    for open_dcount in param_MT['open_dcount_set']:
        colStr = 'maopen_{}'.format(open_dcount)
        hist_day[colStr] = hist_day['open'].shift(1).rolling(open_dcount).mean()
        hist_day['is_bull'] = hist_day['is_bull'] & (hist_day['open'].shift(1) > hist_day[colStr])

def MT_mavolume_and(hist_day, param_MT):
    if 'is_bull' not in hist_day.columns:
        hist_day['is_bull'] = True
    for vol_dcount in param_MT['vol_dcount_set']:
        colStr = 'mavolume_{}'.format(vol_dcount)
        hist_day[colStr] = hist_day['volume'].shift(1).rolling(vol_dcount).mean()
        hist_day['is_bull'] = hist_day['is_bull'] & (hist_day['volume'].shift(1) > hist_day[colStr])

def MT_maopen_mavolume_and(hist_day, param_MT):
    MT_maopen_and(hist_day, param_MT)
    MT_mavolume_and(hist_day, param_MT)




def TR_NaiveHold(hist_day, param_TR):
    hist_day['rate_of_return'] = np.where(hist_day['is_bull'], hist_day['close'] / hist_day['open'], 1)
    hist_day['trade'] = np.where(hist_day['is_bull'], 1, 0)

def TR_LWVB(hist_day, param_TR):
    hist_day['range_prev'] = hist_day['high'].shift(1) - hist_day['low'].shift(1)
    hist_day['breakout_price'] = hist_day['open'] + hist_day['range_prev'] * param_TR['break_rate']
    hist_day['breakout'] = hist_day['high'] > hist_day['breakout_price']
    hist_day['rate_of_return'] = np.where( \
        hist_day['is_bull'] & hist_day['breakout'], \
        hist_day['close'] / hist_day['breakout_price'] - rate_cost, 1)
    hist_day['trade'] = np.where(hist_day['is_bull'] & hist_day['breakout'], 1, 0)

def TR_LWVB_V(hist_day, param_TR):
    hist_day['range_prev'] = hist_day['high'].shift(1) - hist_day['low'].shift(1)
    hist_day['breakout_price'] = hist_day['open'] + hist_day['range_prev'] * param_TR['break_rate']
    hist_day['breakout'] = hist_day['high'] > hist_day['breakout_price']
    hist_day['volatility'] = hist_day['range_prev'] / hist_day['open'].shift(1)
    hist_day['invest_ratio'] = np.where( \
        hist_day['volatility'] <= param_TR['target_volatility'], \
        1, param_TR['target_volatility'] / hist_day['volatility'])
    hist_day['rate_of_return'] = \
        (1 - hist_day['invest_ratio']) + \
        hist_day['invest_ratio'] * np.where( \
            hist_day['is_bull'] & hist_day['breakout'], \
            hist_day['close'] / hist_day['breakout_price'] - rate_cost, 1)
    hist_day['trade'] = np.where(hist_day['is_bull'] & hist_day['breakout'], 1, 0)

def TR_LWVB_VN(hist_day, param_TR):
    hist_day['range_prev'] = hist_day['high'].shift(1) - hist_day['low'].shift(1)
    hist_day['noise_prev'] = 1 - np.abs(hist_day['close'].shift(1) - hist_day['open'].shift(1)) / (hist_day['high'].shift(1) - hist_day['low'].shift(1))
    hist_day['breakout_price'] = hist_day['open'] + hist_day['range_prev'] * hist_day['noise_prev']
    hist_day['breakout'] = hist_day['high'] > hist_day['breakout_price']
    hist_day['volatility'] = hist_day['range_prev'] / hist_day['open'].shift(1)
    hist_day['invest_ratio'] = np.where( \
        hist_day['volatility'] <= param_TR['target_volatility'], \
        1, param_TR['target_volatility'] / hist_day['volatility'])
    hist_day['rate_of_return'] = \
        (1 - hist_day['invest_ratio']) + \
            hist_day['invest_ratio'] * np.where( \
                hist_day['is_bull'] & hist_day['breakout'], \
                hist_day['close'] / hist_day['breakout_price'] - rate_cost, 1)
    hist_day['trade'] = np.where(hist_day['is_bull'] & hist_day['breakout'], 1, 0)



results_index = ['name', 'begin_hour', 'func_MT', 'param_MT','func_TR', 'param_TR', ]
results = pd.DataFrame(columns=results_index)

results.set_index(results_index, inplace=True)


MT_Set = [
    (MT_NoMT, {}),
    (MT_maopen_and, {'open_dcount_set': [3]}),
    (MT_maopen_and, {'open_dcount_set': [5]}),
    (MT_maopen_and, {'open_dcount_set': [10]}),
    (MT_maopen_and, {'open_dcount_set': [3, 5]}),
    (MT_maopen_and, {'open_dcount_set': [5, 10]}),
    (MT_maopen_and, {'open_dcount_set': [3, 5, 10]}),
    (MT_mavolume_and, {'vol_dcount_set': [3]}),
    (MT_mavolume_and, {'vol_dcount_set': [5]}),
    (MT_mavolume_and, {'vol_dcount_set': [10]}),
    (MT_mavolume_and, {'vol_dcount_set': [3, 5]}),
    (MT_mavolume_and, {'vol_dcount_set': [5, 10]}),
    (MT_mavolume_and, {'vol_dcount_set': [3, 5, 10]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [3], 'vol_dcount_set': [2]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [5], 'vol_dcount_set': [2]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [10], 'vol_dcount_set': [2]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [3], 'vol_dcount_set': [3]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [5], 'vol_dcount_set': [3]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [10], 'vol_dcount_set': [3]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [3], 'vol_dcount_set': [5]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [5], 'vol_dcount_set': [5]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [10], 'vol_dcount_set': [5]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [3], 'vol_dcount_set': [10]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [5], 'vol_dcount_set': [10]}),
    (MT_maopen_mavolume_and, {'open_dcount_set': [10], 'vol_dcount_set': [10]}),
]
TR_Set = [
    (TR_NaiveHold, {}),
    (TR_LWVB, {'break_rate': 0.1}),
    (TR_LWVB, {'break_rate': 0.2}),
    (TR_LWVB, {'break_rate': 0.3}),
    (TR_LWVB, {'break_rate': 0.4}),
    (TR_LWVB, {'break_rate': 0.5}),
    (TR_LWVB, {'break_rate': 0.6}),
    (TR_LWVB, {'break_rate': 0.7}),
    (TR_LWVB, {'break_rate': 0.8}),
    (TR_LWVB, {'break_rate': 0.9}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.005}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.01}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.02}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.03}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.05}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.07}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.10}),
    (TR_LWVB_V, {'break_rate': 0.3, 'target_volatility': 0.15}),

    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.005}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.01}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.02}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.03}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.05}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.07}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.10}),
    (TR_LWVB_V, {'break_rate': 0.5, 'target_volatility': 0.15}),

    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.005}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.01}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.02}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.03}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.05}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.07}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.10}),
    (TR_LWVB_V, {'break_rate': 0.7, 'target_volatility': 0.15}),

    (TR_LWVB_VN, {'target_volatility': 0.005}),
    (TR_LWVB_VN, {'target_volatility': 0.01}),
    (TR_LWVB_VN, {'target_volatility': 0.02}),
    (TR_LWVB_VN, {'target_volatility': 0.03}),
    (TR_LWVB_VN, {'target_volatility': 0.05}),
    (TR_LWVB_VN, {'target_volatility': 0.07}),
    (TR_LWVB_VN, {'target_volatility': 0.10}),
    (TR_LWVB_VN, {'target_volatility': 0.15}),
]

strategies = itertools.product(range(0, 24), MT_Set, TR_Set) 
for begin_hour, MT_Pair, TR_Pair in tqdm.tqdm(strategies, total=24*len(MT_Set)*len(TR_Set)): 
    ret, _, = do_LWVB_backtest(hist_hour, '', begin_hour, MT_Pair[0], MT_Pair[1], TR_Pair[0], TR_Pair[1])
    results = results.append(ret, ignore_index=True)


results.to_csv('results.csv')

아래 코드는 특정 백테스트 결과 세부 확인용 

In [None]:
ret, hist_day = do_LWVB_backtest(
    hist_hour, '', 13, 
    MT_maopen_and, {'open_dcount_set': [3, 5, 10]}, 
    TR_LWVB_VN, {'target_volatility': 0.05})
graph = pd.DataFrame()
graph['assets.13_maopn_lwvb_vn'] = hist_day['assets_after_sell']
graph['dd.13_maopn_lwvb_vn'] = hist_day['dd']

px.line(
    graph.filter(regex='assets.*'), 
    log_y=True).update_layout(showlegend=True).show()
px.line(
    graph.filter(regex='dd.*')).update_layout(showlegend=True).show()

In [None]:
hist_day.to_csv('temp.csv')