In [16]:
%load_ext autoreload
%autoreload 2
# import standard libs
from IPython.display import display
from IPython.core.debugger import set_trace as bp
from pathlib import PurePath, Path
import sys
import time
import datetime as dt
from datetime import timedelta
import multiprocessing as mp
from datetime import datetime
from collections import OrderedDict as od
import re
import os
import json

os.environ['THEANO_FLAGS'] = 'device=cpu,floatX=float32'

# import python scientific stack
import pandas as pd
# import pandas_datareader.data as web
from pandas import Timestamp
pd.set_option('display.max_rows', 100)
# from dask import dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
import numpy as np
# import scipy.stats as stats
# import statsmodels.api as sm
# from numba import jit
import math
# import pymc3 as pm
# from theano import shared, theano as tt
from sklearn.utils import resample
from sklearn.utils import shuffle
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor, BaggingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_curve, auc

from itertools import cycle
# from scipy import interp

# import visual tools
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
%matplotlib inline
import seaborn as sns
import plotly
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# plt.style.use('seaborn-talk')
plt.style.use('bmh')

#plt.rcParams['font.family'] = 'DejaVu Sans Mono'
#plt.rcParams['font.size'] = 9.5
plt.rcParams['font.weight'] = 'medium'
#plt.rcParams['figure.figsize'] = 10,7
blue, green, red, purple, gold, teal = sns.color_palette('colorblind', 6)

# import util libs
# import pyarrow as pa
# import pyarrow.parquet as pq
from tqdm import tqdm, tqdm_notebook
import warnings
warnings.filterwarnings("ignore")
# import missingno as msno
# from google.colab import drive

In [56]:
class MultiProcessingFunctions:
    """ This static functions in this class enable multi-processing """

    def __init__(self):
        pass

    @staticmethod
    def lin_parts(num_atoms, num_threads):
        """Partition a list of atoms into subsets (molecules) of equal size."""
        parts = np.linspace(0, num_atoms, min(num_threads, num_atoms) + 1)
        parts = np.ceil(parts).astype(int)
        return parts

    @staticmethod
    def nested_parts(num_atoms, num_threads, upper_triangle=False):
        """Partition of atoms for nested loops (enables parallelization)."""
        # parts = [] [] 빈 리스트로 하면 첫번째 실행할 때, 빈 리스트에 [-1]인덱스를 참조하면 out of range
        parts = [0.0]
        num_threads_ = min(num_threads, num_atoms)

        for num in range(num_threads_):
            part = 1 + 4 * (
                parts[-1] ** 2 + parts[-1] + num_atoms * (num_atoms + 1.) / num_threads_
            )
            part = (-1 + part ** 0.5) / 2.
            parts.append(part)

        parts = np.round(parts).astype(int)

        if upper_triangle:  # the first rows are heaviest
            parts = np.cumsum(np.diff(parts)[::-1])
            parts = np.append(np.array([0]), parts)
        return parts

    @staticmethod
    def mp_pandas_obj(func, pd_obj, num_threads=24, mp_batches=1, lin_mols=True, **kargs):
        """
        Parallelize a function across pandas objects.
        :param func: function to be parallelized
        :param pd_obj: ('arg_name', atoms) where atoms is an index-like sequence
        :param num_threads: number of worker processes
        :param mp_batches: multiplier for number of jobs
        :param lin_mols: linear vs nested partitioning
        :return: pandas Series/DataFrame or list of results
        """
        atoms = pd_obj[1]
        n_jobs = max(1, int(num_threads) * int(mp_batches))
        if lin_mols:
            parts = MultiProcessingFunctions.lin_parts(len(atoms), n_jobs)
        else:
            parts = MultiProcessingFunctions.nested_parts(len(atoms), n_jobs)

        jobs = []
        for i in range(1, len(parts)):
            mol = atoms[parts[i - 1]:parts[i]]
            if len(mol) == 0:
                continue
            job = {pd_obj[0]: mol, 'func': func}
            job.update(kargs)
            jobs.append(job)

        if len(jobs) == 0:
            return pd.Series(dtype=float)

        out = (
            MultiProcessingFunctions.process_jobs_(jobs)
            if num_threads == 1
            else MultiProcessingFunctions.process_jobs(jobs, num_threads=num_threads)
        )

        out = [o for o in out if o is not None]
        if len(out) == 0:
            return pd.Series(dtype=float)

        first = out[0]
        if isinstance(first, pd.DataFrame):
            out = [o.to_frame().T if isinstance(o, pd.Series) else o for o in out]
            return pd.concat(out, axis=0, copy=False).sort_index()
        elif isinstance(first, pd.Series):
            out = [o if isinstance(o, pd.Series) else pd.Series(o) for o in out]
            return pd.concat(out, axis=0, copy=False).sort_index()
        else:
            return out

    @staticmethod
    def process_jobs_(jobs):
        """Run jobs sequentially (debugging)."""
        out = []
        for job in jobs:
            out_ = MultiProcessingFunctions.expand_call(job)
            out.append(out_)
        return out

    @staticmethod
    def expand_call(kargs):
        """Expand arguments of a callback function."""
        func = kargs['func']
        del kargs['func']
        return func(**kargs)

    @staticmethod
    def report_progress(job_num, num_jobs, time0, task):
        """Report progress of async jobs."""
        msg = [float(job_num) / num_jobs, (time.time() - time0) / 60.]
        msg.append(msg[1] * (1 / msg[0] - 1))
        time_stamp = str(dt.datetime.fromtimestamp(time.time()))

        msg = (
            time_stamp + " " + str(round(msg[0] * 100, 2)) + "% " + task +
            " done after " + str(round(msg[1], 2)) +
            " minutes. Remaining " + str(round(msg[2], 2)) + " minutes."
        )

        if job_num < num_jobs:
            sys.stderr.write(msg + "\r")
        else:
            sys.stderr.write(msg + "\n")

    @staticmethod
    def process_jobs(jobs, task=None, num_threads=24):
        """Run jobs in parallel."""
        if task is None:
            task = jobs[0]['func'].__name__

        pool = mp.Pool(processes=num_threads)
        outputs = pool.imap_unordered(MultiProcessingFunctions.expand_call, jobs)
        out = []
        time0 = time.time()

        for i, out_ in enumerate(outputs, 1):
            out.append(out_)
            MultiProcessingFunctions.report_progress(i, len(jobs), time0, task)

        pool.close()
        pool.join()
        return out


In [71]:
## plot for grid search
def plot_search_results(grid):
    """
    Params: 
        grid: A trained GridSearchCV object.
    """
    ## Results from grid search
    results = grid.cv_results_
    means_test = results['mean_test_score']
    stds_test = results['std_test_score']

    ## Getting indexes of values per hyper-parameter
    masks=[]
    masks_names= list(grid.best_params_.keys())
    for p_k, p_v in grid.best_params_.items():
        masks.append(list(results['param_'+p_k].data==p_v))

    params=grid.param_grid

    ## Ploting results
    fig, ax = plt.subplots(1,len(params),sharex='none', sharey='all',figsize=(20,5))
    fig.suptitle('Score per parameter')
    fig.text(0.04, 0.5, 'MEAN SCORE', va='center', rotation='vertical')
    pram_preformace_in_best = {}
    for i, p in enumerate(masks_names):
        m = np.stack(masks[:i] + masks[i+1:])
        pram_preformace_in_best
        best_parms_mask = m.all(axis=0)
        best_index = np.where(best_parms_mask)[0]
        x = np.array(params[p])
        y_1 = np.array(means_test[best_index])
        e_1 = np.array(stds_test[best_index])
        ax[i].errorbar(x, y_1, e_1, linestyle='--', marker='o', label='test')
        ax[i].set_xlabel(p.upper())

    plt.legend()
    plt.show()

In [58]:
def apply_pt_sl_on_t1(close, events, pt_sl, molecule):
    """
    :param close: (series) close prices
    :param events: (series) of indices that signify "events" 
    :param pt_sl: (array) element 0, indicates the profit taking level; 
                          element 1 is stop loss level
    :param molecule: (an array) a set of datetime index values for processing
    :return: (dataframe) timestamps at which each barrier was touched
    """
    # apply stop loss/profit taking, if it takes place before t1 (end of event)
    events_ = events.loc[molecule]
    out = events_[['t1']].copy(deep=True)
    if pt_sl[0] > 0:
        pt = pt_sl[0] * events_['trgt']
    else:
        pt = pd.Series(index=events.index)  # NaNs

    if pt_sl[1] > 0:
        sl = -pt_sl[1] * events_['trgt']
    else:
        sl = pd.Series(index=events.index)  # NaNs

    for loc, t1 in events_['t1'].fillna(close.index[-1]).items():
        df0 = close[loc:t1]  # path prices
        df0 = (df0 / close[loc] - 1) * events_.at[loc, 'side']  # path returns 수익률(사이드 반영)
        out.loc[loc, 'sl'] = df0[df0 < sl[loc]].index.min()  # earliest stop loss
        out.loc[loc, 'pt'] = df0[df0 > pt[loc]].index.min()  # earliest profit taking

    return out

In [1]:
def lin_parts(num_atoms, num_threads):
    """Partition a list of atoms into subsets (molecules) of equal size."""
    parts = np.linspace(0, num_atoms, min(num_threads, num_atoms) + 1)
    parts = np.ceil(parts).astype(int)
    return parts

In [3]:
import numpy as np

a = 10
b = 3

part = lin_parts(a , b)

In [4]:
part

array([ 0,  4,  7, 10])

In [11]:
def nested_parts(num_atoms, num_threads, upper_triangle=False):   #[part-1]
        """Partition of atoms for nested loops (enables parallelization)."""
        parts = [0.0]
        num_threads_ = min(num_threads, num_atoms)

        for num in range(num_threads_):
            part = 1 + 4 * (
                parts[-1] ** 2 + parts[-1] + num_atoms * (num_atoms + 1.) / num_threads_
            )
            part = (-1 + part ** 0.5) / 2.
            parts.append(part)

        parts = np.round(parts).astype(int)

        if upper_triangle:  # the first rows are heaviest
            parts = np.cumsum(np.diff(parts)[::-1])
            parts = np.append(np.array([0]), parts)
        return parts

In [12]:
parts = nested_parts(a,b, False)

In [13]:
parts

array([ 0,  6,  8, 10])

---

In [19]:
def bbands(close_prices, window, no_of_stdev):
    # rolling_mean = close_prices.rolling(window=window).mean()
    # rolling_std = close_prices.rolling(window=window).std()
    rolling_mean = close_prices.ewm(span=window).mean()
    rolling_std = close_prices.ewm(span=window).std()

    upper_band = rolling_mean + (rolling_std * no_of_stdev)
    lower_band = rolling_mean - (rolling_std * no_of_stdev)

    return rolling_mean, upper_band, lower_band

In [26]:
def get_dollar_bars(time_bars, dollar_threshold):

    # initialize an empty list of dollar bars
    dollar_bars = []

    # initialize the running dollar volume at zero
    running_volume = 0

    # initialize the running high and low with placeholder values
    running_high, running_low = 0, math.inf

    # for each time bar...
    for i in range(len(time_bars)):

        # get the timestamp, open, high, low, close, and volume of the next bar
        next_close, next_high, next_low, next_open, next_timestamp, next_volume = [time_bars[i][k] for k in ['close', 'high', 'low', 'open', 'timestamp', 'vol']]

        # get the midpoint price of the next bar (the average of the open and the close)
        midpoint_price = ((next_open) + (next_close))/2

        # get the approximate dollar volume of the bar using the volume and the midpoint price
        dollar_volume = next_volume * midpoint_price

        # update the running high and low
        running_high, running_low = max(running_high, next_high), min(running_low, next_low)

        # if the next bar's dollar volume would take us over the threshold...
        if dollar_volume + running_volume >= dollar_threshold:

            # set the timestamp for the dollar bar as the timestamp at which the bar closed (i.e. one minute after the timestamp of the last minutely bar included in the dollar bar)
            bar_timestamp = next_timestamp + timedelta(minutes=1)
            
            # add a new dollar bar to the list of dollar bars with the timestamp, running high/low, and next close
            dollar_bars += [{'timestamp': bar_timestamp, 'open': next_open, 'high': running_high, 'low': running_low, 'close': next_close}]

            # reset the running volume to zero
            running_volume = 0

            # reset the running high and low to placeholder values
            running_high, running_low = 0, math.inf

        # otherwise, increment the running volume
        else:
            running_volume += dollar_volume

    # return the list of dollar bars
    return dollar_bars

In [14]:
def get_daily_vol(close, lookback=100):
    """
    :param close: (data frame) Closing prices
    :param lookback: (int) lookback period to compute volatility
    :return: (series) of daily volatility value
    """
    print('Calculating daily volatility for dynamic thresholds')
    
    df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1))
    df0 = df0[df0 > 0]
    df0 = (pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:]))
        
    df0 = close.loc[df0.index] / close.loc[df0.values].values - 1  # daily returns
    df0 = df0.ewm(span=lookback).std()
    return df0

In [17]:
#download the data into your google drive to use the following code
infp=PurePath('BTCUSDT_1min.csv')

#import data and set timestamps as index
data = pd.read_csv(infp)   
data['open_time']= pd.to_datetime((data['open_time']))

data = data[['open_time', 'open', 'high', 'low', 'close', 'volume']]
data.rename(columns = {'open_time':'timestamp', 'open':'open', 'high':'high', 'low':'low','close':'close', 'volume':'vol'},
            inplace = True)

data.tail() #  ~ 2022-09-30 20:25:00

Unnamed: 0,timestamp,open,high,low,close,vol
4259707,2025-09-28 06:59:00,109410.0,109410.0,109409.99,109410.0,0.73075
4259708,2025-09-28 07:00:00,109410.0,109410.0,109409.99,109409.99,0.99541
4259709,2025-09-28 07:01:00,109409.99,109410.0,109409.99,109410.0,0.68273
4259710,2025-09-28 07:02:00,109410.0,109410.0,109409.99,109409.99,1.10861
4259711,2025-09-28 07:03:00,109409.99,109410.0,109409.99,109410.0,0.59584


In [22]:
#download the data into your google drive to use the following code
infp=PurePath('ETHBTC_1min.csv')

#import data and set timestamps as index
data2 = pd.read_csv(infp)   
data2['open_time']= pd.to_datetime((data2['open_time']))

data2 = data2[['open_time', 'open', 'high', 'low', 'close', 'volume']]
data2.rename(columns = {'open_time':'timestamp', 'open':'open', 'high':'high', 'low':'low','close':'close', 'volume':'vol'},
            inplace = True)

data2.head()

Unnamed: 0,timestamp,open,high,low,close,vol
0,2017-07-14 04:00:00,0.08,0.08,0.08,0.08,0.043
1,2017-07-14 04:01:00,0.08,0.08,0.08,0.08,0.0
2,2017-07-14 04:02:00,0.08,0.08,0.08,0.08,0.306
3,2017-07-14 04:03:00,0.08,0.08,0.08,0.08,0.212
4,2017-07-14 04:04:00,0.08,0.08,0.08,0.08,0.165


In [23]:
# dollar bar : 5 million threshold
data_dict = data.to_dict('records') # 각 row를 딕셔너리로 해서 리스트 변환
data_dict[0]

{'timestamp': Timestamp('2017-08-17 04:00:00'),
 'open': 4261.48,
 'high': 4261.48,
 'low': 4261.48,
 'close': 4261.48,
 'vol': 1.775183}

In [27]:
dollar_bars = get_dollar_bars(data_dict, 1000000) #5,000,000 is an arbitrarily selected threshold
data_db = pd.DataFrame(dollar_bars)
data_db = data_db.set_index('timestamp')
data_db.head()

Unnamed: 0_level_0,open,high,low,close
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2017-08-17 14:39:00,4484.0,4485.39,4261.32,4484.0
2017-08-17 18:09:00,4231.87,4485.39,4218.68,4231.87
2017-08-17 22:39:00,4328.24,4369.69,4200.74,4341.96
2017-08-18 01:27:00,4202.75,4359.13,4134.61,4219.88
2017-08-18 05:39:00,4330.82,4330.82,4200.6,4295.4


In [28]:
data = data.set_index('timestamp')
data2 = data2.set_index('timestamp')
data['eth_close'] = data['close'].mul(data2['close']).dropna()
data.head()

Unnamed: 0_level_0,open,high,low,close,vol,eth_close
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017-08-17 04:00:00,4261.48,4261.48,4261.48,4261.48,1.775183,300.783781
2017-08-17 04:01:00,4261.48,4261.48,4261.48,4261.48,0.0,300.677244
2017-08-17 04:02:00,4280.56,4280.56,4280.56,4280.56,0.261074,302.126205
2017-08-17 04:03:00,4261.48,4261.48,4261.48,4261.48,0.012008,300.178651
2017-08-17 04:04:00,4261.48,4261.48,4261.48,4261.48,0.140796,299.778072


In [29]:
data_db = pd.concat([data_db, data['eth_close']], join = 'inner', axis = 1)
data_db.head()

Unnamed: 0_level_0,open,high,low,close,eth_close
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2017-08-17 14:39:00,4484.0,4485.39,4261.32,4484.0,308.956568
2017-08-17 18:09:00,4231.87,4485.39,4218.68,4231.87,299.675642
2017-08-17 22:39:00,4328.24,4369.69,4200.74,4341.96,307.189328
2017-08-18 01:27:00,4202.75,4359.13,4134.61,4219.88,297.10059
2017-08-18 05:39:00,4330.82,4330.82,4200.6,4295.4,306.080515


In [30]:
import copy
data_w = copy.deepcopy(data)

# compute bands
window = 50
data_w['avg'], data_w['upper'], data_w['lower'] = bbands(data_w['close'], window, no_of_stdev=2)

# compute sides
data_w['side'] = np.nan
long_signals = (data_w['close'] <= data_w['lower'])
short_signals = (data_w['close'] >= data_w['upper'])
data_w.loc[long_signals, 'side'] = 1
data_w.loc[short_signals, 'side'] = -1

print(data_w.side.value_counts())

# Remove Look ahead biase by lagging the signal
data_w['side'] = data_w['side'].shift(1) #다음 1분이 숏이다 롱이다. 편의상 1분이라는 기간을 뒀다. 그 다음 1분동안이 신호다.

# Drop the NaN values from our data set
data_w.dropna(axis=0, how='any', inplace=True)  

side
 1.0    57944
-1.0    55498
Name: count, dtype: int64


In [32]:
close = data_w['close']

daily_vol = get_daily_vol(close=close, lookback=50)

Calculating daily volatility for dynamic thresholds


In [40]:
daily_vol

timestamp
2017-08-18 05:18:00         NaN
2017-08-18 05:19:00    0.004795
2017-08-18 06:35:00    0.004079
2017-08-18 06:42:00    0.007376
2017-08-18 06:46:00    0.010103
                         ...   
2025-09-28 06:28:00    0.007829
2025-09-28 06:29:00    0.007675
2025-09-28 06:30:00    0.007525
2025-09-28 06:31:00    0.007377
2025-09-28 06:32:00    0.007232
Name: close, Length: 113397, dtype: float64

In [39]:
daily_vol.shape

(113397,)

In [49]:
close

timestamp
2017-08-17 04:02:00      4280.56
2017-08-17 04:31:00      4274.67
2017-08-17 04:35:00      4300.38
2017-08-17 04:36:00      4300.38
2017-08-17 04:37:00      4300.38
                         ...    
2025-09-28 06:28:00    109304.86
2025-09-28 06:29:00    109280.01
2025-09-28 06:30:00    109269.91
2025-09-28 06:31:00    109266.15
2025-09-28 06:32:00    109287.99
Name: close, Length: 113442, dtype: float64

In [38]:
close.shape

(113442,)

In [44]:
def get_t_events(raw_price, threshold):
    """
    :param raw_price: (series) of close prices.
    :param threshold: (float) when the abs(change) is larger than the threshold, the
    function captures it as an event.
    :return: (datetime index vector) vector of datetimes when the events occurred. This is used later to sample.
    """
    print('Applying Symmetric CUSUM filter.')

    t_events = []
    s_pos = 0
    s_neg = 0

    # log returns
    diff = np.log(raw_price).diff().dropna()

    # Get event time stamps for the entire series
    for i in tqdm(diff.index[1:]):
        pos = float(s_pos + diff.loc[i])
        neg = float(s_neg + diff.loc[i])
        s_pos = max(0.0, pos)
        s_neg = min(0.0, neg)

        if s_neg < -threshold:
            s_neg = 0
            t_events.append(i)

        elif s_pos > threshold:
            s_pos = 0
            t_events.append(i)

    event_timestamps = pd.DatetimeIndex(t_events)
    print('get_t_event over.')
    return event_timestamps

In [45]:
cusum_events = get_t_events(close, threshold=daily_vol.mean()*0.1)

Applying Symmetric CUSUM filter.


100%|███████████████████████████████████████████████████████████████████████| 113440/113440 [00:01<00:00, 88913.92it/s]

get_t_event over.





In [46]:
cusum_events

DatetimeIndex(['2017-08-17 04:35:00', '2017-08-17 06:36:00',
               '2017-08-17 07:30:00', '2017-08-17 08:06:00',
               '2017-08-17 09:32:00', '2017-08-17 09:34:00',
               '2017-08-17 15:25:00', '2017-08-17 15:33:00',
               '2017-08-17 15:36:00', '2017-08-17 16:09:00',
               ...
               '2025-09-26 14:27:00', '2025-09-26 16:20:00',
               '2025-09-26 17:27:00', '2025-09-26 21:15:00',
               '2025-09-27 01:42:00', '2025-09-27 03:33:00',
               '2025-09-27 07:34:00', '2025-09-27 21:39:00',
               '2025-09-28 00:26:00', '2025-09-28 01:09:00'],
              dtype='datetime64[ns]', length=49468, freq=None)

In [48]:
print(cusum_events)

DatetimeIndex(['2017-08-17 04:35:00', '2017-08-17 06:36:00',
               '2017-08-17 07:30:00', '2017-08-17 08:06:00',
               '2017-08-17 09:32:00', '2017-08-17 09:34:00',
               '2017-08-17 15:25:00', '2017-08-17 15:33:00',
               '2017-08-17 15:36:00', '2017-08-17 16:09:00',
               ...
               '2025-09-26 14:27:00', '2025-09-26 16:20:00',
               '2025-09-26 17:27:00', '2025-09-26 21:15:00',
               '2025-09-27 01:42:00', '2025-09-27 03:33:00',
               '2025-09-27 07:34:00', '2025-09-27 21:39:00',
               '2025-09-28 00:26:00', '2025-09-28 01:09:00'],
              dtype='datetime64[ns]', length=49468, freq=None)


In [47]:
cusum_events.shape

(49468,)

In [42]:
def get_events(close, t_events, pt_sl, target, min_ret, num_threads, 
              vertical_barrier_times=False, side=None):
    """
    :param close: (series) Close prices
    :param t_events: (series) of t_events. 
                     These are timestamps that will seed every triple barrier.
    :param pt_sl: (2 element array) element 0, indicates the profit taking level; 
                  element 1 is stop loss level.
                  A non-negative float that sets the width of the two barriers. 
                  A 0 value means that the respective horizontal barrier will be disabled.
    :param target: (series) of values that are used (in conjunction with pt_sl)
                   to determine the width of the barrier.
    :param min_ret: (float) The minimum target return required for running a triple barrier search.
    :param num_threads: (int) The number of threads concurrently used by the function.
    :param vertical_barrier_times: (series) A pandas series with the timestamps of the vertical barriers.
    :param side: (series) Side of the bet (long/short) as decided by the primary model
    :return: (data frame) of events
            -events.index is event's starttime
            -events['t1'] is event's endtime
            -events['trgt'] is event's target
            -events['side'] (optional) implies the algo's position side
    """
    print('get_events start')
    # 1) Get target
    target = target.loc[target.index.intersection(t_events)]
    target = target[target > min_ret]  # min_ret

    # 2) Get vertical barrier (max holding period)
    if vertical_barrier_times is False:
        vertical_barrier_times = pd.Series(pd.NaT, index=t_events)

    # 3) Form events object, apply stop loss on vertical barrier
    if side is None:
        side_ = pd.Series(1., index=target.index)
        pt_sl_ = [pt_sl[0], pt_sl[0]]
    else:
        side_ = side.loc[target.index]
        pt_sl_ = pt_sl[:2]

    events = pd.concat({'t1': vertical_barrier_times, 'trgt': target, 'side': side_},
                        axis=1)
    events = events.dropna(subset=['trgt'])

    print('MultiProcessing mp_pandas start')
    # Apply Triple Barrier
    df0 = MultiProcessingFunctions.mp_pandas_obj(func=apply_pt_sl_on_t1,
                                                 pd_obj=('molecule', events.index),
                                                 num_threads=num_threads,
                                                 close=close,
                                                 events=events,
                                                 pt_sl=pt_sl_)

    print('MultiProcessing mp_pandas stop')
    events['t1'] = df0.dropna(how='all').min(axis=1)  # pd.min ignores nan

    if side is None:
        events = events.drop('side', axis=1)
    print('get_events stop')
    return events

In [77]:
def add_vertical_barrier(t_events, close, num_days=1):
    """
    :param t_events: (series) series of events (symmetric CUSUM filter)
    :param close: (series) close prices
    :param num_days: (int) maximum number of days a trade can be active
    :return: (series) timestamps of vertical barriers
    """
    print( 'vertical_barrier start')
    t1 = close.index.searchsorted(t_events + pd.Timedelta(days=num_days))
    print(t1)
    t1 = t1[t1 < close.shape[0]]
    print(t1)
    t1 = pd.Series(close.index[t1], index=t_events[:t1.shape[0]])  # NaNs at end
    print('vertical_barrier over')
    return t1

In [78]:
vertical_barriers = add_vertical_barrier(t_events=cusum_events,
                                         close=close, num_days=0.5)

vertical_barrier start
[    26     28     28 ... 113442 113442 113442]
[    26     28     28 ... 113408 113413 113422]
vertical_barrier over


In [53]:
vertical_barriers

2017-08-17 04:35:00   2017-08-17 17:58:00
2017-08-17 06:36:00   2017-08-17 21:56:00
2017-08-17 07:30:00   2017-08-17 21:56:00
2017-08-17 08:06:00   2017-08-17 21:56:00
2017-08-17 09:32:00   2017-08-17 21:56:00
                              ...        
2025-09-26 17:27:00   2025-09-27 05:40:00
2025-09-26 21:15:00   2025-09-27 10:37:00
2025-09-27 01:42:00   2025-09-27 14:47:00
2025-09-27 03:33:00   2025-09-27 15:38:00
2025-09-27 07:34:00   2025-09-27 20:17:00
Name: timestamp, Length: 49465, dtype: datetime64[ns]

In [63]:
vertical_barriers.shape

(49465,)

In [59]:
pt_sl = [1, 2] # setting profit-take and stop-loss at 1% and 2%          2% 떨어지면 손절 , 1% 이익이 나면 손절 사용자에게 직접 입력받음
min_ret = 0.0005 # setting a minimum return of 0.05%                     0.05% 최소 수익률..

triple_barrier_events = get_events(close=close,
                                  t_events=cusum_events,
                                  pt_sl=pt_sl,
                                  target=daily_vol,
                                  min_ret=min_ret,
                                  num_threads=1,
                                  vertical_barrier_times=vertical_barriers,
                                  side=data_w['side'])

get_events start
MultiProcessing mp_pandas start
MultiProcessing mp_pandas stop
get_events stop


In [61]:
triple_barrier_events.head()

Unnamed: 0,t1,trgt,side
2017-08-18 05:19:00,2017-08-18 06:46:00,0.004795,-1.0
2017-08-18 06:46:00,2017-08-18 08:32:00,0.010103,1.0
2017-08-18 08:32:00,2017-08-18 08:48:00,0.009257,-1.0
2017-08-18 08:33:00,2017-08-18 08:48:00,0.008787,-1.0
2017-08-18 08:48:00,2017-08-18 10:14:00,0.009836,1.0


In [62]:
triple_barrier_events.shape

(49444, 3)

In [64]:
def get_bins(triple_barrier_events, close):
    """
    :param triple_barrier_events: (data frame)
                -events.index is event's starttime
                -events['t1'] is event's endtime
                -events['trgt'] is event's target
                -events['side'] (optional) implies the algo's position side
                Case 1: ('side' not in events): bin in (-1,1) <-label by price action
                Case 2: ('side' in events): bin in (0,1) <-label by pnl (meta-labeling)
    :param close: (series) close prices
    :return: (data frame) of meta-labeled events
    """

    # 1) Align prices with their respective events
    events_ = triple_barrier_events.dropna(subset=['t1'])
    prices = events_.index.union(events_['t1'].values)
    prices = prices.drop_duplicates()
    prices = close.reindex(prices, method='bfill')
    
    # 2) Create out DataFrame
    out_df = pd.DataFrame(index=events_.index)
    # Need to take the log returns, else your results will be skewed for short positions
    out_df['ret'] = np.log(prices.loc[events_['t1'].values].values) - np.log(prices.loc[events_.index])
    out_df['trgt'] = events_['trgt']

    # Meta labeling: Events that were correct will have pos returns
    if 'side' in events_:
        out_df['ret'] = out_df['ret'] * events_['side']  # meta-labeling

    # Added code: label 0 when vertical barrier reached
    out_df = barrier_touched(out_df)

    # Meta labeling: label incorrect events with a 0
    if 'side' in events_:
        out_df.loc[out_df['ret'] <= 0, 'bin'] = 0
    
    # Transform the log returns back to normal returns.
    out_df['ret'] = np.exp(out_df['ret']) - 1
    
    # Add the side to the output. This is useful for when a meta label model must be fit
    tb_cols = triple_barrier_events.columns
    if 'side' in tb_cols:
        out_df['side'] = triple_barrier_events['side']
        
    out_df


In [66]:
def barrier_touched(out_df):
    """
    :param out_df: (DataFrame) containing the returns and target
    :return: (DataFrame) containing returns, target, and labels
    """
    store = []
    for i in np.arange(len(out_df)):
        date_time = out_df.index[i]
        ret = out_df.loc[date_time, 'ret']
        target = out_df.loc[date_time, 'trgt']

        if ret > 0.0 and ret > target:
            # Top barrier reached
            store.append(1)
        elif ret < 0.0 and ret < -target:
            # Bottom barrier reached
            store.append(-1)
        else:
            # Vertical barrier reached
            store.append(0)

    out_df['bin'] = store

    return out_df

In [72]:
labels = get_bins(triple_barrier_events, data_w['close'])
labels.side.value_counts()
labels

AttributeError: 'NoneType' object has no attribute 'side'

In [69]:
print(labels)

None
