In [122]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter

import os

In [123]:
from pathlib import Path

# Configurable parts
SYMBOL = "USDJPY"
SAMPLE_TYPE = "time"
MINUTES = 1
DOLLAR_THRESHOLD = "115009542m"
EVENT_NAME = "CUSUM"
EVENT_THRESHOLD = 1.69e-04
LABEL_NAME = "TB"

START_DATE = "20240101"
END_DATE = "20241231"

# Build base name
if SAMPLE_TYPE == "dollar":
    RESAMPLED_NAME = f"{SYMBOL}-{DOLLAR_THRESHOLD}-dollar-{START_DATE}-{END_DATE}"
else:
    RESAMPLED_NAME = f"{SYMBOL}-{MINUTES}m-{START_DATE}-{END_DATE}"

# Base directories
BASE_DIR = Path("../data")
RESAMPLED_DIR = BASE_DIR / "resampled"
PROCESSED_DIR = BASE_DIR / "processed"
EVENTS_DIR = BASE_DIR / "events"

# Final paths
RESAMPLED_FILE_PATH = RESAMPLED_DIR / f"{RESAMPLED_NAME}.pkl"
PROCESSED_FILE_PATH = PROCESSED_DIR / f"{RESAMPLED_NAME}-processed.pkl"
EVENT_FILE_PATH = EVENTS_DIR / f"{RESAMPLED_NAME}-{EVENT_NAME}.pkl"

In [124]:
DIRECTION_LABEL_DIR = BASE_DIR / "direction_labels"
DIRECTION_LABEL_DIR.mkdir(parents=True, exist_ok=True)
DIRECTION_LABEL_FILE_PATH = DIRECTION_LABEL_DIR / f"{RESAMPLED_NAME}-{EVENT_NAME}-{LABEL_NAME}.pkl"

In [125]:
%%time
df = pd.read_pickle(PROCESSED_FILE_PATH)
t_events = pd.read_pickle(EVENT_FILE_PATH)

CPU times: user 1.77 ms, sys: 46 ms, total: 47.7 ms
Wall time: 63.2 ms


In [126]:
df.columns

Index(['open', 'high', 'low', 'close', 'volume', 'spread', 'close_delta',
       'close_return', 'close_log_return', 'ret_mean_5', 'ret_mean_10',
       'ret_mean_15', 'ret_mean_20', 'log_volume', 'rv5', 'log_rv5',
       'sqrt_rv5', 'rv15', 'log_rv15', 'sqrt_rv15', 'rv50', 'log_rv50',
       'sqrt_rv50', 'ema5', 'ema5_slope', 'close_above_ema5', 'ema20',
       'ema20_slope', 'close_above_ema20', 'ema50', 'ema50_slope',
       'close_above_ema50', 'ema100', 'ema100_slope', 'close_above_ema100',
       'atr14', 'atr60', 'atr120', 'log_atr14', 'atr14_percent',
       'atr14_adjusted_return', 'log_atr60', 'atr60_percent',
       'atr60_adjusted_return', 'log_atr120', 'atr120_percent',
       'atr120_adjusted_return', 'adx14', 'plus_di14', 'minus_di14',
       'bb_upper', 'bb_lower', 'bb_mavg', 'bb_width', 'bb_position',
       'dc20_upper', 'dc20_lower', 'dc20_mid', 'dc20_width',
       'close_above_dc20_mid', 'dc20_breakout', 'dc20_breakdown', 'dc50_upper',
       'dc50_lower', 'dc50_mi

In [127]:
df.loc[t_events].head()

Unnamed: 0_level_0,open,high,low,close,volume,spread,close_delta,close_return,close_log_return,ret_mean_5,...,dom,month,hour_sin,hour_cos,dow_sin,dow_cos,dom_sin,dom_cos,month_sin,month_cos
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2024-01-02 01:57:00,1.103655,1.10366,1.10351,1.10351,143140000000.0,2.1e-05,-0.00014,-0.000127,-0.000127,-5.3e-05,...,2,1,0.258819,0.965926,0.781831,0.62349,0.394356,0.918958,0.5,0.866025
2024-01-02 02:08:00,1.10335,1.103355,1.103155,1.103225,290260000000.0,2.6e-05,-0.00013,-0.000118,-0.000118,-5e-05,...,2,1,0.5,0.866025,0.781831,0.62349,0.394356,0.918958,0.5,0.866025
2024-01-02 02:15:00,1.10318,1.10318,1.102985,1.102985,458130000000.0,2.4e-05,-0.00021,-0.00019,-0.00019,-5.4e-05,...,2,1,0.5,0.866025,0.781831,0.62349,0.394356,0.918958,0.5,0.866025
2024-01-02 02:25:00,1.10273,1.10274,1.102685,1.10269,166190000000.0,2.3e-05,-5e-05,-4.5e-05,-4.5e-05,-5.2e-05,...,2,1,0.5,0.866025,0.781831,0.62349,0.394356,0.918958,0.5,0.866025
2024-01-02 02:40:00,1.102585,1.102595,1.102475,1.102475,385650000000.0,3.1e-05,-0.000105,-9.5e-05,-9.5e-05,-2.9e-05,...,2,1,0.5,0.866025,0.781831,0.62349,0.394356,0.918958,0.5,0.866025


## Calculate Target Volatility (trgt)
This will be used for the calculation of dynamic TP/SL

In [128]:
def get_daily_vol(close,span0=100):
    # daily vol reindexed to close
    df0=close.index.searchsorted(close.index-pd.Timedelta(days=1))
    #bp()
    df0=df0[df0>0]
    #bp()
    df0=(pd.Series(close.index[df0-1],
                   index=close.index[close.shape[0]-df0.shape[0]:]))
    #bp()
    try:
        df0=close.loc[df0.index]/close.loc[df0.values].values-1 # daily rets
    except Exception as e:
        print(e)
        print('adjusting shape of close.loc[df0.index]')
        cut = close.loc[df0.index].shape[0] - close.loc[df0.values].shape[0]
        df0=close.loc[df0.index].iloc[:-cut]/close.loc[df0.values].values-1
    df0=df0.ewm(span=span0).std().rename('dailyVol')
    return df0

In [129]:
from ta.volatility import AverageTrueRange

def get_atr(df, window=60):
    atr = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=window)
    return atr.average_true_range().rename(f"atr{window}")

In [130]:
trgt = get_daily_vol(df['close']).reindex(t_events, method='ffill')

In [131]:
alpha = 1.5  # 你要的寬度倍數
atr = get_atr(df, window=20)                    # 價格單位
atr_rel = (atr / df['close']).shift(1)          # 轉成相對幅度，並用前一根避免洩漏
trgt = (alpha * atr_rel).reindex(t_events, method='ffill')  # 對齊事件

In [132]:
trgt.describe()

count    39038.000000
mean         0.000244
std          0.000151
min          0.000000
25%          0.000143
50%          0.000205
75%          0.000301
max          0.001517
dtype: float64

## Apply Triple Barrier
We use daily volatility to calculate TP/SL, and we set our T1 as 1day

In [133]:
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd

def applyPtSlOnT1(close,events,ptSl,molecule):
    # apply stop loss/profit taking, if it takes place before t1 (end of event)
    events_=events.loc[molecule]
    out=events_[['t1']].copy(deep=True)
    
    pt = ptSl[0] * events_['trgt'] if ptSl[0] > 0 else pd.Series(index=events.index)
    sl = -ptSl[1] * events_['trgt'] if ptSl[1] > 0 else pd.Series(index=events.index)

        
    for loc, t1 in events_['t1'].fillna(close.index[-1]).items():
        df0=close[loc:t1] # path prices
        df0=(df0/close[loc]-1)*events_.at[loc,'side'] # path returns
        out.loc[loc,'sl']=df0[df0<sl[loc]].index.min() # earliest stop loss
        out.loc[loc,'pt']=df0[df0>pt[loc]].index.min() # earliest profit taking
        
    return out


def parallel_apply(func, items, num_threads=4, **kwargs):
    
    def worker(molecule):
        return func(molecule=molecule, **kwargs)

    chunks = np.array_split(items, num_threads)
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        results = list(executor.map(worker, chunks))

    return pd.concat(results).sort_index()


def getEvents(close, tEvents, ptSl, trgt, minTrgt, numThreads=4, t1=False, side=None):
    # Step 1: Filter targets
    trgt = trgt.loc[tEvents]
    trgt = trgt[trgt > minTrgt]

    # Step 2: Set vertical barrier (t1)
    if t1 is False:
        t1 = pd.Series(pd.NaT, index=tEvents)

    # Step 3: Build events DataFrame
    if side is None:
        side_, ptSl_ = pd.Series(1., index=trgt.index), [ptSl[0], ptSl[0]]
    else:
        side_, ptSl_ = side.loc[trgt.index], ptSl[:2]

    events = pd.concat({'t1': t1, 'trgt': trgt, 'side': side_}, axis=1).dropna(subset=['trgt'])

    # Step 4: Apply barriers in parallel
    df0 = parallel_apply(
        func=applyPtSlOnT1,
        items=events.index,
        num_threads=numThreads,
        close=close,
        events=events,
        ptSl=ptSl_
    )

    # Step 5: Choose the first touched barrier
    events['t1'] = df0.dropna(how='all').min(axis=1)
    
    if side is None:
        events = events.drop('side', axis=1)
        
    return events


In [134]:
def get_vertical_barrier(t_events, close, delta=pd.Timedelta(minutes=60)):
    """
    For each event in t_events, find the timestamp in close.index 
    that is at least delta later. Returns a pd.Series of t1.
    """
    barrier_times = t_events + delta
    t1_idx = close.index.searchsorted(barrier_times)
    valid_idx = t1_idx[t1_idx < len(close)]
    t1 = pd.Series(close.index[valid_idx], index=t_events[:len(valid_idx)])
    return t1

In [135]:
t1 = get_vertical_barrier(t_events, df['close'], delta=pd.Timedelta(minutes=30))

In [136]:
t1

timestamp
2024-01-02 01:57:00   2024-01-02 02:27:00
2024-01-02 02:08:00   2024-01-02 02:38:00
2024-01-02 02:15:00   2024-01-02 02:45:00
2024-01-02 02:25:00   2024-01-02 02:55:00
2024-01-02 02:40:00   2024-01-02 03:10:00
                              ...        
2024-12-30 22:05:00   2024-12-30 22:35:00
2024-12-30 22:50:00   2024-12-30 23:20:00
2024-12-30 23:00:00   2024-12-30 23:30:00
2024-12-30 23:09:00   2024-12-30 23:39:00
2024-12-30 23:15:00   2024-12-30 23:45:00
Name: timestamp, Length: 39037, dtype: datetime64[ns]

In [137]:
trgt.describe()

count    39038.000000
mean         0.000244
std          0.000151
min          0.000000
25%          0.000143
50%          0.000205
75%          0.000301
max          0.001517
dtype: float64

In [138]:
minTrgt = trgt.quantile(.3)
minTrgt

np.float64(0.00015532707052898375)

In [139]:
%%time

events = getEvents(
    close=df['close'],
    tEvents=t_events, 
    ptSl=[1,1], 
    trgt=trgt, 
    minTrgt=minTrgt, 
    numThreads=20, 
    t1=t1, 
    side=None
)

CPU times: user 6.61 s, sys: 402 ms, total: 7.01 s
Wall time: 6.67 s


In [140]:
events.head()

Unnamed: 0_level_0,t1,trgt
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2024-01-02 07:07:00,2024-01-02 07:09:00,0.000179
2024-01-02 07:15:00,2024-01-02 07:24:00,0.000199
2024-01-02 07:23:00,2024-01-02 07:24:00,0.000213
2024-01-02 07:24:00,2024-01-02 07:41:00,0.000228
2024-01-02 07:40:00,2024-01-02 07:44:00,0.00024


In [141]:
events = events.dropna().copy()

# Keep only intraday events

In [142]:
events = events[events.index.date==events.t1.dt.date]

## Create classification labels

In [143]:
def getBins(events, close, minRet=0.0, t1=None):
    '''
    Compute event's outcome (including side information, if provided).
    events is a DataFrame where:
    -events.index is event's starttime
    -events['t1'] is event's endtime
    -events['trgt'] is event's target
    -events['side'] (optional) implies the algo's position side
    -t1 is original vertical barrier series
    Case 1: ('side' not in events): bin in (-1,1) <-label by price action
    Case 2: ('side' in events): bin in (0,1) <-label by pnl (meta-labeling)
    '''
    # 1) prices aligned with events
    events_ = events.dropna(subset=['t1'])
    px = events_.index.union(events_['t1'].values).drop_duplicates()
    px = close.reindex(px, method='bfill')
    # 2) create out object
    out = pd.DataFrame(index=events_.index)
    out['ret'] = px.loc[events_['t1'].values].values / px.loc[
        events_.index] - 1
    if 'side' in events_: out['ret'] *= events_['side']  # meta-labeling
    out['bin'] = np.sign(out['ret'])

    if 'side' not in events_:
        # only applies when not meta-labeling.
        # to update bin to 0 when vertical barrier is touched, we need the
        # original vertical barrier series since the events['t1'] is the time
        # of first touch of any barrier and not the vertical barrier
        # specifically. The index of the intersection of the vertical barrier
        # values and the events['t1'] values indicate which bin labels needs
        # to be turned to 0.
        vtouch_first_idx = events[events['t1'].isin(t1.values)].index
        out.loc[vtouch_first_idx, 'bin'] = 0.

    if 'side' in events_: out.loc[out['ret'] <= 0, 'bin'] = 0  # meta-labeling
    out.loc[out['ret'].abs() < minRet, 'bin'] = 0. # 不做獲利過少的交易 (設為 0)
    # out = out.loc[out['ret'].abs() >= minRet] # 過濾掉獲利過少的交易 (刪掉)
    return out

In [144]:
labels = getBins(events, df['close'], minRet=0.00018, t1=t1)

In [145]:
print(labels.ret.abs().describe())
print(labels.ret.abs().quantile(.3))

count    27312.000000
mean         0.000392
std          0.000237
min          0.000000
25%          0.000247
50%          0.000326
75%          0.000461
max          0.004929
Name: ret, dtype: float64
0.0002633689790202176


In [146]:
labels.bin.value_counts()

bin
 1.0    10189
-1.0    10184
 0.0     6939
Name: count, dtype: int64

In [147]:
labels["bin_class"] = labels["bin"] + 1

In [148]:
labeled_events = events.join(labels, how='inner')

In [149]:
labeled_events.head()

Unnamed: 0_level_0,t1,trgt,ret,bin,bin_class
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2024-01-02 07:07:00,2024-01-02 07:09:00,0.000179,-0.000186,-1.0,0.0
2024-01-02 07:15:00,2024-01-02 07:24:00,0.000199,0.000422,1.0,2.0
2024-01-02 07:23:00,2024-01-02 07:24:00,0.000213,0.000258,1.0,2.0
2024-01-02 07:24:00,2024-01-02 07:41:00,0.000228,-0.000272,-1.0,0.0
2024-01-02 07:40:00,2024-01-02 07:44:00,0.00024,-0.000349,-1.0,0.0


## Calculate Concurrency

def get_concurrency(events, price_index):
    """
    Calculate concurrency: number of overlapping events at each time.
    
    Args:
        events (pd.DataFrame): Must have 't1' column with index as start time.
        price_index (pd.DatetimeIndex): Full time index (e.g., from your OHLCV data).
        
    Returns:
        pd.Series: concurrency count indexed by time.
    """
    concurrency = pd.Series(0, index=price_index)

    for start, end in events['t1'].items():
        concurrency[start:end] += 1

    return concurrency


# Let's say your OHLCV data or close series has this index
price_index = df.index  # or df_ohlcv.index

concurrency = get_concurrency(events=labeled_events, price_index=price_index)

plt.hist(concurrency)

## Save events to disk

In [150]:
labeled_events.to_pickle(DIRECTION_LABEL_FILE_PATH)

In [151]:
DIRECTION_LABEL_FILE_PATH

PosixPath('../data/direction_labels/USDJPY-1m-20240101-20241231-CUSUM-TB.pkl')