In [206]:
#Import and Settings
import numpy as np, pandas as pd, matplotlib.pyplot as plt, seaborn as sns
from collections import deque
from itertools import accumulate
from cprint import *
import bisect
import warnings

plt.rcParams['figure.figsize'] = 17,9
sns.set_style('whitegrid')
warnings.filterwarnings('ignore')

In [207]:
# 0. 데이터
kq150_data = pd.read_csv(r'C:\Users\John\OneDrive\바탕 화면\퀀트관련\퀀트스터디\afml Project\Advances_in_Financial_Engineering-main\KQ150.csv',index_col=0)
kq150_dollar = pd.read_csv(r'C:\Users\John\OneDrive\바탕 화면\퀀트관련\퀀트스터디\afml Project\Advances_in_Financial_Engineering-main\Kp200F_volume_bars_500.csv',index_col=0)
kq150_data.index = pd.to_datetime(kq150_data.index)
#kq150_dollar.index = pd.to_datetime(kq150_dollar.index)

In [208]:
kq150_data

Unnamed: 0_level_0,Open,High,Low,Close
Date-Time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2016-01-04 10:00:00,1090.0,1092.0,1090.0,1092.0
2016-01-04 10:01:00,1092.0,1093.0,1092.0,1093.0
2016-01-04 10:02:00,1093.0,1093.0,1093.0,1093.0
2016-01-04 10:03:00,1093.0,1093.0,1093.0,1093.0
2016-01-04 10:04:00,1092.7,1092.7,1092.5,1092.5
...,...,...,...,...
2023-04-25 15:31:00,1250.4,1251.2,1250.4,1250.6
2023-04-25 15:32:00,1250.5,1251.7,1250.2,1251.4
2023-04-25 15:33:00,1251.4,1251.9,1250.9,1251.5
2023-04-25 15:34:00,1251.5,1251.7,1249.8,1250.1


In [209]:
test_data = kq150_data.iloc[:2000,:]

In [210]:
# 1. 파킨슨 변동성
def parkinson_volatility(df, n):
    data_scope = deque()
    date = df.index
    parkinson_vol_lst = []
    for i in range(len(df)):
        data_scope.append(df.iloc[i, :])
        if len(data_scope) > n:
            data_scope.popleft()

        parkinson_vol = np.sqrt(
            sum(np.log(pd.DataFrame(data_scope)['High'].values / pd.DataFrame(data_scope)['Low'].values) ** 2) / (
                        4 * np.log(2) * n))
        parkinson_vol_lst.append(parkinson_vol)
    
    parkinson_vol_df = pd.DataFrame(data=parkinson_vol_lst, index=date, columns=['parksinson_vol'])
    return parkinson_vol_df

In [211]:
def getDailyVol(close,span0=100):
    '''
    daily vol, reindexed to close
    - used to set default profit taking and stop-loss limits
    '''
    df0=close.index.searchsorted(close.index-pd.Timedelta(days=1))
    df0=df0[df0>0]
    df0=pd.Series(close.index[df0-1], index=close.index[close.shape[0]-df0.shape[0]:])
    df0=close.loc[df0.index]/close.loc[df0.values].values-1 # daily returns
    df0=df0.ewm(span=span0).std()
    return df0


In [212]:
parkinson_vol = parkinson_volatility(test_data,10)
parkinson_vol, parkinson_vol.mean()

(                     parksinson_vol
 Date-Time                          
 2016-01-04 10:00:00        0.000348
 2016-01-04 10:01:00        0.000389
 2016-01-04 10:02:00        0.000389
 2016-01-04 10:03:00        0.000389
 2016-01-04 10:04:00        0.000391
 ...                             ...
 2016-01-13 10:23:00        0.000191
 2016-01-13 10:24:00        0.000222
 2016-01-13 10:25:00        0.000248
 2016-01-13 10:26:00        0.000294
 2016-01-13 10:27:00        0.000285
 
 [2000 rows x 1 columns],
 parksinson_vol    0.000382
 dtype: float64)

In [213]:
# 2. CS_filter
def getTEvents(gRaw,h): #gRaw: dollar['Close'], h:d_vol.mean()
    tEvents,sPos,sNeg=[],0,0
    diff=gRaw.diff()
    h = float(h)
    for i in diff.index[1:]:
        sPos,sNeg=float(max(0,sPos+diff.loc[i])),float(min(0,sNeg+diff.loc[i]))
        if sNeg<-h:
            sNeg=0;tEvents.append(i)
        elif sPos>h:
            sPos=0;tEvents.append(i)
    return pd.DatetimeIndex(tEvents)

In [231]:
tEvents = getTEvents(test_data['Close'], parkinson_vol.mean())
tEvents

DatetimeIndex(['2016-01-04 10:01:00', '2016-01-04 10:04:00',
               '2016-01-04 10:05:00', '2016-01-04 10:06:00',
               '2016-01-04 10:07:00', '2016-01-04 10:09:00',
               '2016-01-04 10:11:00', '2016-01-04 10:16:00',
               '2016-01-04 10:18:00', '2016-01-04 10:19:00',
               ...
               '2016-01-13 10:15:00', '2016-01-13 10:16:00',
               '2016-01-13 10:20:00', '2016-01-13 10:21:00',
               '2016-01-13 10:22:00', '2016-01-13 10:23:00',
               '2016-01-13 10:24:00', '2016-01-13 10:25:00',
               '2016-01-13 10:26:00', '2016-01-13 10:27:00'],
              dtype='datetime64[ns]', length=1664, freq=None)

In [215]:
# 3. Tripple Barrier Labeling
def applyPtSlOnT1(close,events,ptSl,molecule):
    
    '''
    Tripple-barrier labeling method
    ` Apply stop loss/profit taking, if it takes place before t1 (end of event)

    Input: 
    ` events: 
      — t1: the timestamp of vertical barrier
      — trgt: the unit width of the horizontal barriers, expressed in terms of absolute returns
    ` ptsl: a list of two non-negative float values
      - ptsl[0]: the factor multiplies trgt to set the width of the upper barrier
      - ptsl[1]: the factor that multiplies trgt to set the width of the lower barrier
    ` molecule: a list with the subset of event indices 

    Output: a Dataframe containing the timestamps at which each barrier was touched, [pt, s1, t1]
    ` 0 (inactive barrier) or 1 (active barrier)

    '''
    #events_=events.loc[molecule]
    events_=events
    out=events_[['t1']].copy(deep=True)
    if ptSl[0]>0:pt=ptSl[0]*events_['trgt']
    else:pt=pd.Series(index=events.index) # NaNs
    if ptSl[1]>0:sl=-ptSl[1]*events_['trgt']
    else:sl=pd.Series(index=events.index) # NaNs
    for loc,t1 in events_['t1'].fillna(close.index[-1]).iteritems():
        df0=close[loc:t1] # path prices
        df0=(df0/close[loc]-1)*events_.at[loc,'side'] # path returns
        out.loc[loc,'sl']=df0[df0<sl[loc]].index.min() # earliest stop loss.
        out.loc[loc,'pt']=df0[df0>pt[loc]].index.min() # earliest profit taking.
    return out


In [216]:
# 20.4.1 - mpPandasObj
def linParts(numAtoms,numThreads):
    # partition of atoms with a single loop
    parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1)
    parts=np.ceil(parts).astype(int)
    return parts

In [217]:
# 20.4.1 - mpPandasObj
def nestedParts(numAtoms,numThreads,upperTriang=False):
    # partition of atoms with an inner loop
    parts,numThreads_=[0],min(numThreads,numAtoms)
    for num in range(numThreads_):
        part=1+4*(parts[-1]**2+parts[-1]+numAtoms*(numAtoms+1.)/numThreads_)
        part=(-1+part**.5)/2.
        parts.append(part)
    parts=np.round(parts).astype(int)
    if upperTriang: # the first rows are heaviest
        parts=np.cumsum(np.diff(parts)[::-1])
        parts=np.append(np.array([0]),parts)
    return parts

In [218]:
# 20.8 - mpPandasObj
def processJobs_(jobs):
    # Run jobs sequentially, for debugging
    out=[]
    for job in jobs:
        out_=expandCall(job)
        out.append(out_)
    return out

In [219]:
# 20.10
def expandCall(kargs):
    # Expand the arguments of a callback function, kargs['func']
    func=kargs['func']
    del kargs['func']
    out=func(**kargs)
    return out

In [220]:
# 4. Multiprocessing Obj
def mpPandasObj(func,pdObj,numThreads=24,mpBatches=1,linMols=True,**kargs):
    '''
    Parallelize jobs, return a dataframe or series
    + func: function to be parallelized. Returns a DataFrame
    + pdObj[0]: Name of argument used to pass the molecule
    + pdObj[1]: List of atoms that will be grouped into molecules
    + kwds: any other argument needed by func
    
    Example: df1=mpPandasObj(func,('molecule',df0.index),24,**kwds)
    '''
    import pandas as pd
    #if linMols:parts=linParts(len(argList[1]),numThreads*mpBatches)
    #else:parts=nestedParts(len(argList[1]),numThreads*mpBatches)
    if linMols:parts=linParts(len(pdObj[1]),numThreads*mpBatches)
    else:parts=nestedParts(len(pdObj[1]),numThreads*mpBatches)
    
    jobs=[]
    for i in range(1,len(parts)):
        job={pdObj[0]:pdObj[1][parts[i-1]:parts[i]],'func':func}
        job.update(kargs)
        jobs.append(job)
    if numThreads==1:out=processJobs_(jobs)
    else: out=processJobs(jobs,numThreads=numThreads)
    if isinstance(out[0],pd.DataFrame):df0=pd.DataFrame()
    elif isinstance(out[0],pd.Series):df0=pd.Series()
    else:return out
    for i in out:df0=df0.append(i)
    df0=df0.sort_index()
    return df0

In [221]:
# 5. Get the First Time of Touch
def getEvents(close,tEvents,ptSl,trgt,minRet,numThreads,t1=False,side=None):
    '''
    Getting the time of first touch
     
    Input:
    ` tEvents: the pandas timeindex containing the timestamps that will seed every triple barrier 
      - the timestamps selected by the sampling procedures 
    ` minRet: the minimum target return required for running a triple barrier search
    ` numThreads: the number of threads concurrently used by the function  

    Output: 
    ` events: a Dataframe
      - events.index: event's starttime
      - events['t1']: event's endtime
      - events['trgt']: event's target
      - events['side'] (optional): the algo's position side
    '''
    #1) get target
    trgt=trgt.loc[tEvents]
    trgt=trgt[trgt>minRet] # minRet
    #2) get t1 (max holding period)
    if t1 is False:t1=pd.Series(pd.NaT,index=tEvents)
    #3) form events object, apply stop loss on t1
    if side is None:side_,ptSl_=pd.Series(1.,index=trgt.index),[ptSl[0],ptSl[0]]
    else:side_,ptSl_=side.loc[trgt.index],ptSl[:2]
    events=pd.concat({'t1':t1,'trgt':trgt,'side':side_}, axis=1).droplevel(1, axis=1).dropna(subset=['trgt'])
    df0=mpPandasObj(func=applyPtSlOnT1,pdObj=('molecule',events.index), numThreads=numThreads,close=close,events=events,ptSl=ptSl_)
    events['t1']=df0.dropna(how='all').min(axis=1) # pd.min ignores nan
    if side is None:events=events.drop('side',axis=1)
    return events


In [232]:
tEvents

DatetimeIndex(['2016-01-04 10:01:00', '2016-01-04 10:04:00',
               '2016-01-04 10:05:00', '2016-01-04 10:06:00',
               '2016-01-04 10:07:00', '2016-01-04 10:09:00',
               '2016-01-04 10:11:00', '2016-01-04 10:16:00',
               '2016-01-04 10:18:00', '2016-01-04 10:19:00',
               ...
               '2016-01-13 10:15:00', '2016-01-13 10:16:00',
               '2016-01-13 10:20:00', '2016-01-13 10:21:00',
               '2016-01-13 10:22:00', '2016-01-13 10:23:00',
               '2016-01-13 10:24:00', '2016-01-13 10:25:00',
               '2016-01-13 10:26:00', '2016-01-13 10:27:00'],
              dtype='datetime64[ns]', length=1664, freq=None)

In [224]:
#Vertical Barrier
def addVerticalBarrier(close, events, numDays=1):
    t1=close.index.searchsorted(events+pd.Timedelta(days=numDays))
    t1=t1[t1<close.shape[0]]
    t1=pd.Series(close.index[t1],index=events[:t1.shape[0]]) # NaNs at end
    return t1

In [233]:
t1 = addVerticalBarrier(test_data['Close'], tEvents, 1)
t1

2016-01-04 10:01:00   2016-01-05 10:01:00
2016-01-04 10:04:00   2016-01-05 10:04:00
2016-01-04 10:05:00   2016-01-05 10:05:00
2016-01-04 10:06:00   2016-01-05 10:06:00
2016-01-04 10:07:00   2016-01-05 10:07:00
                              ...        
2016-01-12 10:19:00   2016-01-13 10:20:00
2016-01-12 10:20:00   2016-01-13 10:20:00
2016-01-12 10:22:00   2016-01-13 10:22:00
2016-01-12 10:26:00   2016-01-13 10:26:00
2016-01-12 10:27:00   2016-01-13 10:27:00
Name: Date-Time, Length: 1449, dtype: datetime64[ns]

In [235]:
# create target series
ptsl = [1,1]
target=parkinson_vol
# select minRet
minRet = 0.01

# Run in single-threaded mode on Windows
import platform
if platform.system() == "Windows":
    cpus = 1
else:
    cpus = cpu_count() - 1
    
events = getEvents(test_data['Close'],tEvents,ptsl,target,minRet,cpus,t1=t1)

# 최종적인 Tripple Barrier Output
cprint(events) 

                                     t1      trgt
2016-01-04 14:42:00 2016-01-05 11:30:00  0.012681
2016-01-04 14:43:00 2016-01-05 12:05:00  0.012890
2016-01-04 14:44:00 2016-01-05 12:05:00  0.012894
2016-01-04 14:45:00 2016-01-05 12:05:00  0.012896
2016-01-04 14:46:00 2016-01-05 12:08:00  0.012899
2016-01-04 14:47:00 2016-01-05 12:05:00  0.012900
2016-01-04 14:48:00 2016-01-05 12:05:00  0.012902
2016-01-04 14:49:00 2016-01-05 12:05:00  0.012903
2016-01-04 14:50:00 2016-01-05 12:05:00  0.012904


<cprint.cprint.cprint at 0x26f522be5d0>