In [None]:
# import os
# from datetime import datetime, timedelta
# import pandas as pd
# from binance.client import Client
# from dotenv import load_dotenv

# load_dotenv()
# api_key = os.getenv("API_KEY")
# api_secret = os.getenv("API_SECRET")

# client = Client(api_key, api_secret)

# three_months_ago = datetime.now() - timedelta(days=90)

# klines = client.get_historical_klines(
#     symbol='BTCUSDT',
#     interval=Client.KLINE_INTERVAL_5MINUTE,
#     start_str=three_months_ago.strftime("%Y-%m-%d %H:%M:%S")
# )

# df = pd.DataFrame(klines, columns=[
#     'Open time', 'Open', 'High', 'Low', 'Close', 'Volume', 'Close time',
#     'Quote asset volume', 'Number of trades', 'Taker buy base asset volume',
#     'Taker buy quote asset volume', 'Ignore'
# ])

# df['Open time'] = pd.to_datetime(df['Open time'], unit='ms')
# price_df = df[['Open time', 'Close']].copy()
# price_df['Close'] = pd.to_numeric(price_df['Close'])

# print(price_df)

In [None]:
import os
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import pyarrow.parquet as pq

from sklearn.metrics import accuracy_score, classification_report
from binance.client import Client
from dotenv import load_dotenv
from warnings import simplefilter

import plotly.graph_objects as go
from plotly.subplots import make_subplots
from xgboost import plot_importance

simplefilter(action='ignore', category=FutureWarning)
simplefilter(action='ignore', category=DeprecationWarning)

load_dotenv()
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")

In [None]:
client = Client(api_key, api_secret)
one_year_ago = datetime.now() - timedelta(days=365*5)
klines = client.get_historical_klines(
    symbol='BTCUSDT',
    interval=Client.KLINE_INTERVAL_5MINUTE,
    start_str=one_year_ago.strftime("%Y-%m-%d %H:%M:%S")
)

df = pd.DataFrame(klines, columns=[
    'Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Close time',
    'Quote asset volume', 'Number of trades', 'Taker buy base asset volume',
    'Taker buy quote asset volume', 'Ignore'
])

df['Date'] = pd.to_datetime(df['Date'], unit='ms')
numeric_cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Number of trades']
for col in numeric_cols:
    df[col] = pd.to_numeric(df[col], errors='coerce')
df = df[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']].dropna()
print(f"{df.shape}")
df.to_parquet("btc_5m.parquet")

In [None]:
df = pd.read_parquet("btc_5m_5years.parquet")

def create_volume_bars(df, volume_threshold=1000):
    bars = []
    current_bar = None
    cumulative_volume = 0
    for _, row in df.iterrows():
        if current_bar is None:
            current_bar = {'Date': row['Date'], 'Open': row['Open'], 'High': row['High'], 'Low': row['Low']}
        cumulative_volume += row['Volume']
        current_bar['High'] = max(current_bar['High'], row['High'])
        current_bar['Low'] = min(current_bar['Low'], row['Low'])
        if cumulative_volume >= volume_threshold:
            current_bar['Close'] = row['Close']
            current_bar['Volume'] = cumulative_volume
            bars.append(current_bar)
            current_bar = None
            cumulative_volume = 0
    return pd.DataFrame(bars)

df_vb = create_volume_bars(df, volume_threshold=1000)

def get_weights_ffd(d, size):
    w = [1.]
    for k in range(1, size):
        w_ = -w[-1] / k * (d - k + 1)
        w.append(w_)
    return np.array(w[::-1]).reshape(-1, 1)

def frac_diff_ffd(series, d, thres=1e-5):
    w = get_weights_ffd(d, len(series))
    w_ = np.cumsum(np.abs(w))
    w_ /= w_[-1]
    skip = np.searchsorted(w_, thres)
    df = {}
    for name in series.columns:
        series_f = series[[name]].ffill().dropna()
        df_ = pd.Series(index=series.index, dtype=float)
        for iloc in range(skip, series_f.shape[0]):
            loc = series_f.index[iloc]
            if not np.isfinite(series.loc[loc, name]): continue
            df_[loc] = np.dot(w[-(iloc + 1):, :].T, series_f.iloc[:iloc + 1])[0, 0]
        df[name] = df_
    return pd.concat(df, axis=1)

print("Applying Fractional Differentiation...")
close_prices_vb = df_vb[['Close']].set_index(df_vb['Date'])
frac_close = frac_diff_ffd(close_prices_vb, d=0.4).dropna()
df_vb = df_vb.set_index('Date').loc[frac_close.index].reset_index()
df_vb['frac_close'] = frac_close['Close'].values

In [None]:
def calculate_features(df):
    print("Calculating features...")
    df.ta.adx(length=14, append=True)
    df.ta.aroon(length=25, append=True)
    df.ta.bbands(length=20, append=True)
    df.ta.atr(length=14, append=True)
    df.ta.obv(append=True)
    df['slope_24'] = df['Close'].rolling(window=24).apply(lambda x: np.polyfit(np.arange(len(x)), x, 1)[0] if x.notna().all() else np.nan, raw=False)
    df['bbw_pct'] = (df['BBU_20_2.0'] - df['BBL_20_2.0']) / df['BBM_20_2.0']
    df.ta.chop(length=14, append=True)
    df['clv'] = ((df['Close'] - df['Low']) - (df['High'] - df['Close'])) / (df['High'] - df['Low'] + 1e-12)
    df['clv_mean_12'] = df['clv'].rolling(window=12).mean()
    return df

df_features = calculate_features(df_vb.copy())

In [None]:
def get_regime_labels(close_series, look_forward_period=20):
    print("Creating regime labels...")
    future_vol = close_series.shift(-look_forward_period).rolling(look_forward_period).std()
    trend_cutoff = future_vol.quantile(0.66)
    sideway_cutoff = future_vol.quantile(0.33)
    labels = pd.Series(index=close_series.index, data=-1, dtype=int)
    labels[future_vol >= trend_cutoff] = 1
    labels[future_vol <= sideway_cutoff] = 0
    return labels

df_features['regime'] = get_regime_labels(df_features['Close'])
df_model_data = df_features[df_features['regime'] != -1].dropna()


In [None]:
print("Preparing data for the model...")
X = df_model_data.drop(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'clv', 'regime', 'frac_close'])
y = df_model_data['regime']

test_size = 0.2
split_index = int(len(X) * (1 - test_size))
X_train, X_test = X.iloc[:split_index], X.iloc[split_index:]
y_train, y_test = y.iloc[:split_index], y.iloc[split_index:]

print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Shape before dropping NaNs from features: {df_features.shape}")
df_features.dropna(inplace=True)
print(f"Shape after dropping NaNs: {df_features.shape}")

In [None]:
import os
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import matplotlib.pyplot as plt
import optuna
from datetime import datetime, timedelta
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score
from scipy.stats import linregress, kendalltau
from binance.client import Client
from dotenv import load_dotenv
from warnings import simplefilter
from xgboost import plot_importance

simplefilter(action='ignore', category=FutureWarning)
simplefilter(action='ignore', category=DeprecationWarning)
optuna.logging.set_verbosity(optuna.logging.WARNING)

load_dotenv()
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")

def download_and_cache_data(file_path="btc_5m_5years.parquet", years=5):
    if os.path.exists(file_path):
        print(f"Loading data from cached file: {file_path}")
        return pd.read_parquet(file_path)
    
    print("Downloading historical data (this may take a while)...")
    client = Client(api_key, api_secret)
    start_date = datetime.now() - timedelta(days=365 * years)
    klines = client.get_historical_klines(
        symbol='BTCUSDT',
        interval=Client.KLINE_INTERVAL_5MINUTE,
        start_str=start_date.strftime("%Y-%m-%d %H:%M:%S")
    )
    
    df = pd.DataFrame(klines, columns=[
        'Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Close time',
        'Quote asset volume', 'Number of trades', 'Taker buy base asset volume',
        'Taker buy quote asset volume', 'Ignore'
    ])
    
    df['Date'] = pd.to_datetime(df['Date'], unit='ms')
    for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
        
    df = df[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']].dropna()
    df.to_parquet(file_path)
    return df

def create_volume_bars(df, volume_threshold=2000):
    print(f"Creating Volume Bars with threshold: {volume_threshold} BTC...")
    bars = []
    current_bar = None
    cumulative_volume = 0
    for _, row in df.iterrows():
        if current_bar is None:
            current_bar = {'Date': row['Date'], 'Open': row['Open'], 'High': row['High'], 'Low': row['Low']}
        cumulative_volume += row['Volume']
        current_bar['High'] = max(current_bar['High'], row['High'])
        current_bar['Low'] = min(current_bar['Low'], row['Low'])
        if cumulative_volume >= volume_threshold:
            current_bar['Close'] = row['Close']
            current_bar['Volume'] = cumulative_volume
            bars.append(current_bar)
            current_bar = None
            cumulative_volume = 0
    return pd.DataFrame(bars).set_index('Date')

def get_weights_ffd(d, size):
    w = [1.]
    for k in range(1, size):
        w_ = -w[-1] / k * (d - k + 1)
        w.append(w_)
    w = np.array(w[::-1]).reshape(-1, 1)
    return w

def frac_diff_ffd(series, d, thres=1e-5):
    w = get_weights_ffd(d, len(series))
    w_ = np.cumsum(np.abs(w))
    w_ /= w_[-1]
    skip = np.searchsorted(w_, thres)
    df = {}
    for name in series.columns:
        series_f = series[[name]].ffill().dropna()
        df_ = pd.Series(index=series.index, dtype=float)
        for iloc in range(skip, series_f.shape[0]):
            loc = series_f.index[iloc]
            if not np.isfinite(series.loc[loc, name]): continue
            df_[loc] = np.dot(w[-(iloc + 1):, :].T, series_f.iloc[:iloc + 1])[0, 0]
        df[name] = df_
    return pd.concat(df, axis=1)

def get_daily_vol(close, lookback=100):
    ret = close.pct_change()
    vol = ret.ewm(span=lookback).std()
    return vol

def apply_pt_sl_on_t1(close, events, pt_sl, molecule):
    events_ = events.loc[molecule]
    out = events_[['t1']].copy(deep=True)
    pt_target = events_['trgt'] * pt_sl[0]
    sl_target = events_['trgt'] * pt_sl[1]
    for loc, t1 in events_['t1'].items():
        df0 = close[loc:t1]
        df0 = (df0 / close[loc] - 1)
        sl_val = -sl_target.loc[loc]
        pt_val = pt_target.loc[loc]
        out.loc[loc, 'sl'] = df0[df0 < sl_val].index.min()
        out.loc[loc, 'pt'] = df0[df0 > pt_val].index.min()
    return out

def get_events(close, t_events, pt_sl, trgt, min_ret, t1=None):
    trgt = trgt.loc[t_events]
    trgt = trgt[trgt > min_ret]
    if len(trgt) == 0: return pd.DataFrame()
    if t1 is None: t1 = pd.Series(pd.NaT, index=t_events)
    events = pd.concat({'t1': t1, 'trgt': trgt}, axis=1).dropna(subset=['trgt'])
    df0 = apply_pt_sl_on_t1(close, events, pt_sl, events.index)
    events['t1'] = df0.min(axis=1)
    return events

def get_bins(events, close):
    events_ = events.dropna(subset=['t1'])
    start_prices = close.loc[events_.index]
    end_prices = close.reindex(events_['t1'], method='ffill')
    out = pd.DataFrame(index=events_.index)
    out['ret'] = end_prices.values / start_prices.values - 1
    out['bin'] = np.sign(out['ret'])
    out.loc[out['bin'] == 0, 'bin'] = -1
    return out

def _higuchi_fd(x, k_max):
    n_times = x.size
    lk = np.empty(k_max)
    x_reg = np.empty(k_max)
    y_reg = np.empty(k_max)
    for k in range(1, k_max + 1):
        lm = np.empty(k)
        for m in range(k):
            ll = 0
            n_max = int(np.floor((n_times - m - 1) / k))
            n_max = max(1, n_max)
            for j in range(1, n_max):
                ll += np.abs(x[m + j * k] - x[m + (j - 1) * k])
            lm[m] = (ll * (n_times - 1) / (k * n_max))
        lk[k - 1] = np.log(np.mean(lm))
        x_reg[k - 1] = np.log(1. / k)
    (p, _) = np.polyfit(x_reg, lk, 1)
    return p

def _get_t_stat_slope(series):
    x = np.arange(len(series))
    log_y = np.log(series.dropna())
    if len(log_y) < 2:
        return 0
    res = linregress(x, log_y)
    return res.slope / res.stderr if res.stderr > 0 else 0

def _get_cusum_filter(series, h):
    s_pos, s_neg = 0, 0
    diff = series.diff().dropna()
    events = []
    for i in diff.index:
        s_pos = max(0, s_pos + diff.loc[i])
        s_neg = min(0, s_neg + diff.loc[i])
        if s_neg < -h:
            s_neg = 0; events.append(i)
        elif s_pos > h:
            s_pos = 0; events.append(i)
    if len(events) > 0:
        return pd.Series(1, index=pd.to_datetime(events))
    return pd.Series(dtype=int)

def calculate_all_features(df):
    print("Calculating extensive features...")
    df.ta.adx(length=14, append=True)
    df.ta.bbands(length=20, append=True)
    df.ta.atr(length=14, append=True)
    df.ta.chop(length=14, append=True)
    df.ta.obv(append=True)
    if 'Volume' in df.columns and not df['Volume'].isnull().all():
        frac_volume = frac_diff_ffd(df[['Volume']], d=0.4)
        df['frac_volume'] = frac_volume['Volume']
    if 'OBV' in df.columns and not df['OBV'].isnull().all():
        frac_obv = frac_diff_ffd(df[['OBV']], d=0.4)
        df['frac_obv'] = frac_obv['OBV']
    df['adx_over_atr'] = df['ADX_14'] / df['ATRr_14']
    df['volume_power'] = df['Volume'] * (df['Close'] - df['Open'])
    df['t_slope_24'] = df['Close'].rolling(24).apply(_get_t_stat_slope, raw=False)
    df['kendall_tau_24'] = df['Close'].rolling(24).apply(lambda s: kendalltau(np.arange(len(s)), s.dropna())[0] if s.notna().sum() > 1 else np.nan, raw=False)
    if 'frac_close' in df.columns:
        cusum_vol_threshold = df['frac_close'].std() * 2
        cusum_events = _get_cusum_filter(df['frac_close'], cusum_vol_threshold)
        df['cusum_flag'] = cusum_events.reindex(df.index).fillna(0)
    for lag in [1, 2, 3, 5]:
        if 'frac_close' in df.columns:
            df[f'frac_close_lag_{lag}'] = df['frac_close'].shift(lag)
        if 'volume_zscore_24' in df.columns:
            df[f'volume_zscore_lag_{lag}'] = df['volume_zscore_24'].shift(lag)
    df['atr_pct'] = df['ATRr_14'] / df['Close'] * 100
    df['volume_zscore_24'] = (df['Volume'] - df['Volume'].rolling(24).mean()) / df['Volume'].rolling(24).std()
    return df

class PurgedKFold(KFold):
    def __init__(self, n_splits=5, t1=None, pct_embargo=0.01):
        super().__init__(n_splits, shuffle=False)
        self.t1 = t1
        self.pct_embargo = pct_embargo

    def split(self, X, y=None, groups=None):
        indices = np.arange(X.shape[0])
        embargo = int(X.shape[0] * self.pct_embargo)
        test_splits = [(i[0], i[-1] + 1) for i in np.array_split(indices, self.n_splits)]
        
        self.t1 = self.t1.reindex(X.index).ffill()

        for i, j in test_splits:
            test_indices = indices[i:j]
            t0 = X.index[i]
            
            train_indices_1 = self.t1.index.searchsorted(self.t1[self.t1 < t0].index)
            
            t1_test_max = self.t1.iloc[test_indices].max() if len(test_indices) > 0 else t0
            
            train_indices_2 = self.t1.index.searchsorted(self.t1[self.t1 > t1_test_max].index)

            if len(test_indices) > 0:
                embargo_idx = self.t1.index.searchsorted(t1_test_max + pd.Timedelta(minutes=embargo*5))
                if embargo_idx < len(self.t1.index):
                    train_indices_2 = train_indices_2[train_indices_2 >= embargo_idx]

            train_indices = np.union1d(train_indices_1, train_indices_2)
            yield train_indices, test_indices

df_time_bars = download_and_cache_data()
df_vb = create_volume_bars(df_time_bars, volume_threshold=2000)
print("Applying Fractional Differentiation to 'Close' and other series...")
df_vb = df_vb.join(frac_diff_ffd(df_vb[['Close']], d=0.4).rename(columns={'Close': 'frac_close'}))
print("Fractional Differentiation complete.")
df_features = calculate_all_features(df_vb.copy())
barrier_results = []
pt_sl_options = [[1.5, 0.75], [2.0, 1.0], [2.5, 1.25]]
days_options = [3, 5, 7]
for pt_sl in pt_sl_options:
    for days in days_options:
        print(f"\n{'='*20} TESTING BARRIER: pt_sl={pt_sl}, days={days} {'='*20}")
        print("\nStarting Triple-Barrier Labeling...")
        volatility = get_daily_vol(df_vb['Close'], lookback=50)
        t_events = df_vb.index[50:]
        all_vertical_barriers = pd.Series(t_events + pd.Timedelta(days=days), index=t_events)
        vertical_barrier = all_vertical_barriers[all_vertical_barriers < df_vb.index[-1]]
        t_events = vertical_barrier.index
        events = get_events(df_vb['Close'], t_events, pt_sl, volatility, 0.001, vertical_barrier)
        labels = get_bins(events, df_vb['Close'])
        print("Labeling complete.")
        print("Label distribution:\n", labels['bin'].value_counts(normalize=True))
        data = df_features.join(labels['bin']).dropna()
        X = data.drop(columns=['Open', 'High', 'Low', 'Close', 'Volume', 'bin'])
        y = data['bin']
        y[y <= 0] = 0
        print(f"Final dataset shape for this barrier: {X.shape}")
        if len(X) < 500:
            print("Not enough labeled events for this barrier setting. Skipping.")
            barrier_results.append({'pt_sl': pt_sl, 'days': days, 'avg_f1_score': 0, 'num_events': len(data)})
            continue
        print("\nStarting Purged K-Fold Cross-Validation with Optuna...")
        cv_splitter = PurgedKFold(n_splits=5, t1=events['t1'], pct_embargo=0.01)
        oos_scores = []
        for fold, (train_idx, test_idx) in enumerate(cv_splitter.split(X)):
            print(f"\n--- Processing Fold {fold+1}/5 ---")
            if len(train_idx) < 50 or len(test_idx) < 50:
                print("Skipping fold due to insufficient data.")
                continue
            X_train, X_test = X.iloc[train_idx, :], X.iloc[test_idx, :]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
            def objective(trial):
                params = {
                    'objective': 'binary:logistic', 'eval_metric': 'logloss', 'use_label_encoder': False,
                    'n_estimators': 1000,
                    'learning_rate': trial.suggest_float('learning_rate', 1e-3, 1e-1, log=True),
                    'max_depth': trial.suggest_int('max_depth', 3, 8),
                    'subsample': trial.suggest_float('subsample', 0.6, 1.0),
                    'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
                    'gamma': trial.suggest_float('gamma', 1e-8, 1.0, log=True),
                    'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
                    'random_state': 42
                }
                scale_pos_weight = y_train.value_counts().get(0, 1) / y_train.value_counts().get(1, 1)
                params['scale_pos_weight'] = scale_pos_weight
                model = xgb.XGBClassifier(**params)
                model.fit(X_train, y_train, eval_set=[(X_test, y_test)], early_stopping_rounds=50, verbose=False)
                y_pred = model.predict(X_test)
                return f1_score(y_test, y_pred)
            study = optuna.create_study(direction="maximize")
            study.optimize(objective, n_trials=25)
            print(f"Best F1-Score for this fold: {study.best_value:.4f}")
            oos_scores.append(study.best_value)
        avg_f1_score = np.mean(oos_scores) if oos_scores else 0
        print(f"\nResult for barrier pt_sl={pt_sl}, days={days}: Avg F1 = {avg_f1_score:.4f}")
        barrier_results.append({'pt_sl': pt_sl, 'days': days, 'avg_f1_score': avg_f1_score, 'num_events': len(data)})
best_barrier = max(barrier_results, key=lambda x: x['avg_f1_score'])
print("\n\n" + "="*50)
print("BARRIER CALIBRATION COMPLETE")
print("="*50)
print(f"Best Barrier Settings Found:")
print(f"  - PT/SL Multipliers: {best_barrier['pt_sl']}")
print(f"  - Expiry Days: {best_barrier['days']}")
print(f"  - Achieved Average F1-Score: {best_barrier['avg_f1_score']:.4f}")
print(f"  - Number of Labeled Events: {best_barrier['num_events']}")
print("="*50)

In [None]:
labels['bin'].value_counts(normalize=True)

In [None]:
import os
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

TARGET_URL = "https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/SOLUSDT/"
DOWNLOAD_DIR = os.path.join(os.getcwd(), "binance_downloads")
WEBDRIVER_WAIT_TIMEOUT = 20

if not os.path.exists(DOWNLOAD_DIR):
    os.makedirs(DOWNLOAD_DIR)

chrome_options = Options()
prefs = {
    "download.default_directory": DOWNLOAD_DIR,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920x1080")
driver = webdriver.Chrome(options=chrome_options)
wait = WebDriverWait(driver, WEBDRIVER_WAIT_TIMEOUT)
driver.get(TARGET_URL)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#listing tr")))
link_elements = driver.find_elements(By.TAG_NAME, "a")
zip_links = [
    link.get_attribute("href")
    for link in link_elements
    if link.get_attribute("href") and link.get_attribute("href").endswith(".zip")
]

if not zip_links:
    print("No .zip files found after checking all links on the page.")
else:
    print(f"Found {len(zip_links)} .zip files to download.")
    
    for url in zip_links:
        file_name = os.path.basename(url)
        print(f"Initiating download for: {file_name}")
        driver.get(url)
        time.sleep(1) 

    print("\nAll download requests have been sent. Waiting for them to complete...")
    total_wait_time_seconds = len(zip_links) * 5
    print(f"Waiting for an estimated {total_wait_time_seconds} seconds...")
    time.sleep(total_wait_time_seconds)

driver.quit()
print(f"\nAll downloads should be complete. Files are saved in: {DOWNLOAD_DIR}")

In [None]:
import os
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

TARGET_URL = "https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/SOLUSDT/"
DOWNLOAD_DIR = os.path.join(os.getcwd(), "binance_downloads")
WEBDRIVER_WAIT_TIMEOUT = 20
START_FILE = "SOLUSDT-aggTrades-2025-08-29.zip"
END_FILE = "SOLUSDT-aggTrades-2020-09-14.zip"

if not os.path.exists(DOWNLOAD_DIR):
    os.makedirs(DOWNLOAD_DIR)

chrome_options = Options()
prefs = {
    "download.default_directory": DOWNLOAD_DIR,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920x1080")
driver = webdriver.Chrome(options=chrome_options)
wait = WebDriverWait(driver, WEBDRIVER_WAIT_TIMEOUT)
driver.get(TARGET_URL)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#listing tr")))
link_elements = driver.find_elements(By.TAG_NAME, "a")
all_zip_links = [
    link.get_attribute("href")
    for link in link_elements
    if link.get_attribute("href") and link.get_attribute("href").endswith(".zip")
]

all_file_names = [os.path.basename(link) for link in all_zip_links]
start_index = all_file_names.index(START_FILE)
end_index = all_file_names.index(END_FILE)
links_to_download = all_zip_links[start_index : end_index + 1]

print(f"Found {len(links_to_download)} .zip files in the specified range to download.")

for url in links_to_download:
    file_name = os.path.basename(url)
    print(f"Initiating download for: {file_name}")
    driver.get(url)
    time.sleep(1)

print("\nAll download requests have been sent. Waiting for them to complete...")
total_wait_time_seconds = len(links_to_download) * 5
print(f"Waiting for an estimated {total_wait_time_seconds} seconds...")
time.sleep(total_wait_time_seconds)

driver.quit()

print(f"\nAll downloads should be complete. Files are saved in: {DOWNLOAD_DIR}")

Found 1812 .zip files in the specified range to download.
Initiating download for: SOLUSDT-aggTrades-2025-08-29.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-28.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-27.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-26.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-25.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-24.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-23.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-22.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-21.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-20.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-19.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-18.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-17.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-16.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-15.zip
Initiating download for: SOLUSDT-aggTrades-2025-08-14.zip
Initiating dow

In [1]:
import os
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

TARGET_URL = "https://data.binance.vision/?prefix=data/futures/um/daily/aggTrades/BTCUSDT/"
DOWNLOAD_DIR = os.path.join(os.getcwd(), "binance_downloads_btc")
WEBDRIVER_WAIT_TIMEOUT = 20

START_FILE = "BTCUSDT-aggTrades-2025-08-30.zip"
END_FILE = "BTCUSDT-aggTrades-2019-12-31.zip"

if not os.path.exists(DOWNLOAD_DIR):
    os.makedirs(DOWNLOAD_DIR)

chrome_options = Options()
prefs = {
    "download.default_directory": DOWNLOAD_DIR,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920x1080")

driver = webdriver.Chrome(options=chrome_options)
wait = WebDriverWait(driver, WEBDRIVER_WAIT_TIMEOUT)

driver.get(TARGET_URL)

wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#listing tr")))

link_elements = driver.find_elements(By.TAG_NAME, "a")

all_zip_links = [
    link.get_attribute("href")
    for link in link_elements
    if link.get_attribute("href") and link.get_attribute("href").endswith(".zip")
]

all_file_names = [os.path.basename(link) for link in all_zip_links]
start_index = all_file_names.index(START_FILE)
end_index = all_file_names.index(END_FILE)
links_to_download = all_zip_links[start_index : end_index + 1]

print(f"Found {len(links_to_download)} .zip files in the specified range to download.")

for url in links_to_download:
    file_name = os.path.basename(url)
    print(f"Initiating download for: {file_name}")
    driver.get(url)
    time.sleep(1)

print("\nAll download requests have been sent. Waiting for them to complete...")
total_wait_time_seconds = len(links_to_download) * 5
print(f"Waiting for an estimated {total_wait_time_seconds} seconds...")
time.sleep(total_wait_time_seconds)

driver.quit()

print(f"\nAll downloads should be complete. Files are saved in: {DOWNLOAD_DIR}")

Found 2070 .zip files in the specified range to download.
Initiating download for: BTCUSDT-aggTrades-2025-08-30.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-29.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-28.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-27.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-26.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-25.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-24.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-23.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-22.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-21.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-20.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-19.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-18.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-17.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-16.zip
Initiating download for: BTCUSDT-aggTrades-2025-08-15.zip
Initiating dow

In [16]:
import os
import re
import zipfile
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm

DOWNLOAD_DIR = r"E:\IVF-IT-Solution\QuantTrade-Model\binance_downloads_SOL"
OUTPUT_PARQUET_FILE = r"E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet"

COLUMN_NAMES = [
    'agg_trade_id', 'price', 'quantity', 'first_trade_id', 
    'last_trade_id', 'transact_time', 'is_buyer_maker'
]

COLUMN_DTYPES = {
    'agg_trade_id': 'int64', 'price': 'float64', 'quantity': 'float64',
    'first_trade_id': 'int64', 'last_trade_id': 'int64'
}

zip_files = sorted([f for f in os.listdir(DOWNLOAD_DIR) if f.endswith('.zip')])
print(f"Found {len(zip_files)} .zip files to process.")

parquet_writer = None

for file_name in tqdm(zip_files, desc="Processing and Appending to Parquet"):
    file_path = os.path.join(DOWNLOAD_DIR, file_name)
    date_match = re.search(r'(\d{4}-\d{2}-\d{2})', file_name)
    if not date_match:
        continue
    trade_date = date_match.group(1)
    with zipfile.ZipFile(file_path, 'r') as z:
        csv_filename_in_zip = z.namelist()[0]
        with z.open(csv_filename_in_zip) as csv_file:
            daily_df = pd.read_csv(
                csv_file,
                header=0,
                names=COLUMN_NAMES,
                dtype=COLUMN_DTYPES,
                converters={
                    'transact_time': lambda x: int(float(x)),
                    'is_buyer_maker': lambda x: str(x).strip().lower() in ('true', '1', 't', 'yes', 'y')
                },
                low_memory=False
            )
            daily_df.insert(0, 'date', trade_date)
            table = pa.Table.from_pandas(daily_df)
            if parquet_writer is None:
                parquet_writer = pq.ParquetWriter(OUTPUT_PARQUET_FILE, table.schema)
            parquet_writer.write_table(table)

if parquet_writer:
    parquet_writer.close()
    print("\nParquet file has been finalized.")

print("\nData preparation complete!")
print(f"All data has been sequentially saved to '{OUTPUT_PARQUET_FILE}'")


Found 1812 .zip files to process.


Processing and Appending to Parquet: 100%|██████████| 1812/1812 [1:11:08<00:00,  2.36s/it]


Parquet file has been finalized.

Data preparation complete!
All data has been sequentially saved to 'E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet'





In [15]:
import pandas as pd
import pyarrow.parquet as pq
import pyarrow.compute as pc

PATH = r"E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades_combined_final.parquet"

pf = pq.ParquetFile(PATH)
print(f"rows: {sum(pf.metadata.row_group(i).num_rows for i in range(pf.num_row_groups))}")
print(f"row_groups: {pf.num_row_groups}")
print(f"columns: {pf.schema.names}")
print(pf.schema)

names = pf.schema.names

dmin = None
dmax = None
tmin = None
tmax = None

if "date" in names:
    j = names.index("date")
    mins = []
    maxs = []
    for i in range(pf.num_row_groups):
        s = pf.metadata.row_group(i).column(j).statistics
        if s and s.has_min_max:
            vmin = s.min
            vmax = s.max
            if isinstance(vmin, (bytes, bytearray)):
                vmin = vmin.decode()
            if isinstance(vmax, (bytes, bytearray)):
                vmax = vmax.decode()
            mins.append(vmin)
            maxs.append(vmax)
    if mins:
        dmin = min(mins)
        dmax = max(maxs)
        print(f"date_range_from_stats: {dmin} -> {dmax}")
    if dmin is None or dmax is None:
        for i in range(pf.num_row_groups):
            col = pf.read_row_group(i, columns=["date"])["date"]
            if col.length() > col.null_count:
                mm = pc.min_max(col).as_py()
                vmin = mm["min"]
                vmax = mm["max"]
                if isinstance(vmin, (bytes, bytearray)):
                    vmin = vmin.decode()
                if isinstance(vmax, (bytes, bytearray)):
                    vmax = vmax.decode()
                dmin = vmin if dmin is None or vmin < dmin else dmin
                dmax = vmax if dmax is None or vmax > dmax else dmax
        if dmin is not None:
            print(f"date_range_scanned: {dmin} -> {dmax}")

if "transact_time" in names:
    j = names.index("transact_time")
    mins = []
    maxs = []
    for i in range(pf.num_row_groups):
        s = pf.metadata.row_group(i).column(j).statistics
        if s and s.has_min_max:
            mins.append(int(s.min))
            maxs.append(int(s.max))
    if mins:
        tmin = min(mins)
        tmax = max(maxs)
        print(f"transact_time_ms_range_from_stats: {tmin} -> {tmax}")
        print("transact_time_range_from_stats:", pd.to_datetime([tmin, tmax], unit="ms").to_list())
    if tmin is None or tmax is None:
        for i in range(pf.num_row_groups):
            col = pf.read_row_group(i, columns=["transact_time"])["transact_time"]
            if col.length() > col.null_count:
                mm = pc.min_max(col).as_py()
                vmin = int(mm["min"])
                vmax = int(mm["max"])
                tmin = vmin if tmin is None or vmin < tmin else tmin
                tmax = vmax if tmax is None or vmax > tmax else tmax
        if tmin is not None:
            print(f"transact_time_ms_range_scanned: {tmin} -> {tmax}")
            print("transact_time_range_scanned:", pd.to_datetime([tmin, tmax], unit="ms").to_list())

head_tbl = pf.read_row_group(0)
print(head_tbl.slice(0, 20).to_pandas())

tail_tbl = pf.read_row_group(pf.num_row_groups - 1)
n = tail_tbl.num_rows
print(tail_tbl.slice(max(n - 20, 0)).to_pandas())

rows: 2319321186
row_groups: 3098
columns: ['date', 'agg_trade_id', 'price', 'quantity', 'first_trade_id', 'last_trade_id', 'transact_time', 'is_buyer_maker']
<pyarrow._parquet.ParquetSchema object at 0x000001D88AB4B640>
required group field_id=-1 schema {
  optional binary field_id=-1 date (String);
  optional int64 field_id=-1 agg_trade_id;
  optional double field_id=-1 price;
  optional double field_id=-1 quantity;
  optional int64 field_id=-1 first_trade_id;
  optional int64 field_id=-1 last_trade_id;
  optional int64 field_id=-1 transact_time;
  optional boolean field_id=-1 is_buyer_maker;
}

date_range_from_stats: 2019-12-31 -> 2025-08-30
transact_time_ms_range_from_stats: 1577750402710 -> 1756598399612
transact_time_range_from_stats: [Timestamp('2019-12-31 00:00:02.710000'), Timestamp('2025-08-30 23:59:59.612000')]
          date  agg_trade_id    price  quantity  first_trade_id  \
0   2019-12-31      18266803  7240.43     2.366        25090491   
1   2019-12-31      18266804  72

In [10]:
import os
import re
import zipfile
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm

DOWNLOAD_DIR = r"E:\IVF-IT-Solution\QuantTrade-Model\binance_downloads_btc"
OUTPUT_PARQUET_FILE = r"E:\IVF-IT-Solution\QuantTrade-Model\BTCUSD_aggTrades_combined_final.parquet"

COLUMN_NAMES = [
    'agg_trade_id', 'price', 'quantity', 'first_trade_id', 
    'last_trade_id', 'transact_time', 'is_buyer_maker'
]

COLUMN_DTYPES = {
    'agg_trade_id': 'int64', 'price': 'float64', 'quantity': 'float64',
    'first_trade_id': 'int64', 'last_trade_id': 'int64'
}

zip_files = sorted([f for f in os.listdir(DOWNLOAD_DIR) if f.endswith('.zip')])
print(f"Found {len(zip_files)} .zip files to process.")

parquet_writer = None

for file_name in tqdm(zip_files, desc="Processing and Appending to Parquet"):
    file_path = os.path.join(DOWNLOAD_DIR, file_name)
    date_match = re.search(r'(\d{4}-\d{2}-\d{2})', file_name)
    if not date_match:
        continue
    trade_date = date_match.group(1)
    with zipfile.ZipFile(file_path, 'r') as z:
        csv_filename_in_zip = z.namelist()[0]
        with z.open(csv_filename_in_zip) as csv_file:
            daily_df = pd.read_csv(
                csv_file,
                header=0,
                names=COLUMN_NAMES,
                dtype=COLUMN_DTYPES,
                converters={
                    'transact_time': lambda x: int(float(x)),
                    'is_buyer_maker': lambda x: str(x).strip().lower() in ('true', '1', 't', 'yes', 'y')
                },
                low_memory=False
            )
            daily_df.insert(0, 'date', trade_date)
            table = pa.Table.from_pandas(daily_df)
            if parquet_writer is None:
                parquet_writer = pq.ParquetWriter(OUTPUT_PARQUET_FILE, table.schema)
            parquet_writer.write_table(table)

if parquet_writer:
    parquet_writer.close()
    print("\nParquet file has been finalized.")

print("\nData preparation complete!")
print(f"All data has been sequentially saved to '{OUTPUT_PARQUET_FILE}'")


Found 1739 .zip files to process.


Processing and Appending to Parquet:   0%|          | 0/1739 [00:00<?, ?it/s]

Processing and Appending to Parquet: 100%|██████████| 1739/1739 [2:33:46<00:00,  5.31s/it]  



Parquet file has been finalized.

Data preparation complete!
All data has been sequentially saved to 'E:\IVF-IT-Solution\QuantTrade-Model\BTCUSD_aggTrades_combined_final.parquet'


In [1]:
import pandas as pd
import pyarrow.parquet as pq
import pyarrow.compute as pc

PATH = r"E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet"

pf = pq.ParquetFile(PATH)
print(f"rows: {sum(pf.metadata.row_group(i).num_rows for i in range(pf.num_row_groups))}")
print(f"row_groups: {pf.num_row_groups}")
print(f"columns: {pf.schema.names}")
print(pf.schema)

names = pf.schema.names

dmin = None
dmax = None
tmin = None
tmax = None

if "date" in names:
    j = names.index("date")
    mins = []
    maxs = []
    for i in range(pf.num_row_groups):
        s = pf.metadata.row_group(i).column(j).statistics
        if s and s.has_min_max:
            vmin = s.min
            vmax = s.max
            if isinstance(vmin, (bytes, bytearray)):
                vmin = vmin.decode()
            if isinstance(vmax, (bytes, bytearray)):
                vmax = vmax.decode()
            mins.append(vmin)
            maxs.append(vmax)
    if mins:
        dmin = min(mins)
        dmax = max(maxs)
        print(f"date_range_from_stats: {dmin} -> {dmax}")
    if dmin is None or dmax is None:
        for i in range(pf.num_row_groups):
            col = pf.read_row_group(i, columns=["date"])["date"]
            if col.length() > col.null_count:
                mm = pc.min_max(col).as_py()
                vmin = mm["min"]
                vmax = mm["max"]
                if isinstance(vmin, (bytes, bytearray)):
                    vmin = vmin.decode()
                if isinstance(vmax, (bytes, bytearray)):
                    vmax = vmax.decode()
                dmin = vmin if dmin is None or vmin < dmin else dmin
                dmax = vmax if dmax is None or vmax > dmax else dmax
        if dmin is not None:
            print(f"date_range_scanned: {dmin} -> {dmax}")

if "transact_time" in names:
    j = names.index("transact_time")
    mins = []
    maxs = []
    for i in range(pf.num_row_groups):
        s = pf.metadata.row_group(i).column(j).statistics
        if s and s.has_min_max:
            mins.append(int(s.min))
            maxs.append(int(s.max))
    if mins:
        tmin = min(mins)
        tmax = max(maxs)
        print(f"transact_time_ms_range_from_stats: {tmin} -> {tmax}")
        print("transact_time_range_from_stats:", pd.to_datetime([tmin, tmax], unit="ms").to_list())
    if tmin is None or tmax is None:
        for i in range(pf.num_row_groups):
            col = pf.read_row_group(i, columns=["transact_time"])["transact_time"]
            if col.length() > col.null_count:
                mm = pc.min_max(col).as_py()
                vmin = int(mm["min"])
                vmax = int(mm["max"])
                tmin = vmin if tmin is None or vmin < tmin else tmin
                tmax = vmax if tmax is None or vmax > tmax else tmax
        if tmin is not None:
            print(f"transact_time_ms_range_scanned: {tmin} -> {tmax}")
            print("transact_time_range_scanned:", pd.to_datetime([tmin, tmax], unit="ms").to_list())

head_tbl = pf.read_row_group(0)
print(head_tbl.slice(0, 20).to_pandas())

tail_tbl = pf.read_row_group(pf.num_row_groups - 1)
n = tail_tbl.num_rows
print(tail_tbl.slice(max(n - 20, 0)).to_pandas())

rows: 952584666
row_groups: 2077
columns: ['date', 'agg_trade_id', 'price', 'quantity', 'first_trade_id', 'last_trade_id', 'transact_time', 'is_buyer_maker']
<pyarrow._parquet.ParquetSchema object at 0x000001AB19812AC0>
required group field_id=-1 schema {
  optional binary field_id=-1 date (String);
  optional int64 field_id=-1 agg_trade_id;
  optional double field_id=-1 price;
  optional double field_id=-1 quantity;
  optional int64 field_id=-1 first_trade_id;
  optional int64 field_id=-1 last_trade_id;
  optional int64 field_id=-1 transact_time;
  optional boolean field_id=-1 is_buyer_maker;
}

date_range_from_stats: 2020-09-14 -> 2025-08-29
transact_time_ms_range_from_stats: 1600066809427 -> 1756511999973
transact_time_range_from_stats: [Timestamp('2020-09-14 07:00:09.427000'), Timestamp('2025-08-29 23:59:59.973000')]
          date  agg_trade_id   price  quantity  first_trade_id  last_trade_id  \
0   2020-09-14             2  3.2319      31.0               2              2   
1   2

In [7]:
import pyarrow.parquet as pq

# ==============================================================================
# 1. CONFIGURATION
# ==============================================================================
TICK_DATA_PATH = r"E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet"

# ==============================================================================
# 2. SCHEMA INSPECTION FUNCTION
# ==============================================================================
def inspect_parquet_schema(file_path):
    """
    อ่านและแสดง Schema (ชื่อคอลัมน์และ Data Type) ของไฟล์ Parquet
    """
    print(f"--- Inspecting Schema for: {file_path} ---\n")
    
    parquet_file = pq.ParquetFile(file_path)
    schema = parquet_file.schema

    print(f"{'Column Name':<20} {'Physical Type'}") # เปลี่ยนหัวตารางเพื่อความชัดเจน
    print("-" * 40)
    
    for field in schema:
        # --- จุดที่แก้ไข ---
        # เปลี่ยนจาก field.type เป็น field.physical_type
        print(f"{field.name:<20} {field.physical_type}")
        
    print("\n--- Inspection Complete ---")

# ==============================================================================
# 3. EXECUTION
# ==============================================================================
if __name__ == "__main__":
    inspect_parquet_schema(TICK_DATA_PATH)

--- Inspecting Schema for: E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet ---

Column Name          Physical Type
----------------------------------------
date                 BYTE_ARRAY
agg_trade_id         INT64
price                DOUBLE
quantity             DOUBLE
first_trade_id       INT64
last_trade_id        INT64
transact_time        INT64
is_buyer_maker       BOOLEAN

--- Inspection Complete ---


In [None]:
import numpy as np
import pandas as pd
import time
import pyarrow.parquet as pq
import os

TICK_DATA_PATH = r"E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet"
OUTPUT_FOLDER = 'trend-label'
OUTPUT_DIB_FILE = 'SOLUSDT_dollar_imbalance_bars.parquet'
OUTPUT_VOL_FILE = 'SOLUSDT_volume_bars.parquet'

INITIAL_EWMA_WINDOW = 200000
VOLUME_PER_BAR = 1_000_000

VOLATILITY_LOOKBACK = 50
PT_SL_MULTIPLIERS = [1.5, 0.75]
VERTICAL_BARRIER_DAYS = 3
MIN_VOLATILITY_RET = 0.001

def get_daily_vol(close, lookback=50):
    ret = close.pct_change()
    vol = ret.ewm(span=lookback).std()
    return vol

def apply_pt_sl_on_t1(close, events, pt_sl, molecule):
    events_ = events.loc[molecule]
    out = events_[['t1']].copy(deep=True)
    
    pt_target = events_['trgt'].reindex(molecule) * pt_sl[0]
    sl_target = events_['trgt'].reindex(molecule) * pt_sl[1]
    
    out['sl'] = pd.NaT
    out['pt'] = pd.NaT

    for loc, t1 in events_['t1'].items():
        if loc not in close.index or (pd.notna(t1) and t1 not in close.index):
            continue

        df0 = close.loc[loc:t1]
        df0 = (df0 / close.loc[loc] - 1)
        
        sl_val = -sl_target.loc[loc]
        pt_val = pt_target.loc[loc]

        sl_hit_time = df0[df0 < sl_val].index.min()
        pt_hit_time = df0[df0 > pt_val].index.min()

        out.loc[loc, 'sl'] = sl_hit_time
        out.loc[loc, 'pt'] = pt_hit_time
        
    return out

def get_events(close, t_events, pt_sl, trgt, min_ret, t1=None):
    trgt = trgt.loc[t_events]
    trgt = trgt[trgt > min_ret]
    if len(trgt) == 0:
        return pd.DataFrame()
    if t1 is None:
        t1 = pd.Series(pd.NaT, index=t_events)
    events = pd.concat({'t1': t1, 'trgt': trgt}, axis=1).dropna(subset=['trgt'])
    df0 = apply_pt_sl_on_t1(close, events, pt_sl, events.index)
    events['t1'] = df0.min(axis=1)
    events = events.drop(columns='trgt')
    return events

def get_bins(events, close):
    events_ = events.dropna(subset=['t1'])
    px = close.reindex(events_.index)
    px_ = close.reindex(events_['t1'].values).values
    
    if len(px) != len(px_):
         raise ValueError("Index alignment issue in get_bins")
    
    ret = pd.Series(px_ / px.values - 1, index=events_.index)
    out = pd.DataFrame({'ret': ret})
    
    out['bin'] = np.sign(out['ret'])
    out.loc[out['bin'] == 0, 'bin'] = -1
    out['bin'] = out['bin'].astype('int8')
    out.loc[out['bin'] < 1, 'bin'] = 0 
    
    return out
    
def build_dollar_imbalance_bars(data_iterator, initial_ewma_window, total_groups):
    bars = []
    
    warmup_df = next(data_iterator, None)
    if warmup_df is None: return pd.DataFrame()
    warmup_df = warmup_df.to_pandas()
    
    warmup_df['b_t'] = np.where(warmup_df['is_buyer_maker'], -1.0, 1.0)
    warmup_df['bv'] = warmup_df['b_t'] * (warmup_df['price'] * warmup_df['quantity'])
    
    ewma_num_ticks = initial_ewma_window
    ewma_imbalance_per_tick = warmup_df['bv'].mean()
    expected_imbalance = ewma_num_ticks * abs(ewma_imbalance_per_tick)

    bar_state = {
        'cum_theta_dollar': 0.0, 'open_price': 0.0, 'high_price': 0.0, 'low_price': float('inf'),
        'cum_volume': 0.0, 'cum_dollar_volume': 0.0, 'num_ticks_in_bar': 0
    }
    
    def process_chunk(chunk_df, state):
        nonlocal ewma_num_ticks, ewma_imbalance_per_tick, expected_imbalance
        
        chunk_df['b_t'] = np.where(chunk_df['is_buyer_maker'], -1.0, 1.0)
        chunk_df['bv'] = chunk_df['b_t'] * (chunk_df['price'] * chunk_df['quantity'])
        
        for row in chunk_df.itertuples():
            if state['num_ticks_in_bar'] == 0:
                state['open_price'] = row.price
                state['high_price'] = row.price
                state['low_price'] = row.price
            
            state['high_price'] = max(state['high_price'], row.price)
            state['low_price'] = min(state['low_price'], row.price)
            state['cum_volume'] += row.quantity
            state['cum_dollar_volume'] += (row.price * row.quantity)
            state['num_ticks_in_bar'] += 1
            state['cum_theta_dollar'] += row.bv

            if abs(state['cum_theta_dollar']) > expected_imbalance:
                bars.append((
                    row.transact_time, state['open_price'], state['high_price'], state['low_price'], row.price,
                    state['cum_volume'], state['cum_dollar_volume'], state['num_ticks_in_bar']
                ))
                
                alpha_ticks = 2 / (ewma_num_ticks + 1)
                ewma_num_ticks = (1 - alpha_ticks) * ewma_num_ticks + alpha_ticks * state['num_ticks_in_bar']
                
                current_imbalance_per_tick = abs(state['cum_theta_dollar']) / state['num_ticks_in_bar']
                alpha_imbalance = 2 / (state['num_ticks_in_bar'] + 1)
                ewma_imbalance_per_tick = (1 - alpha_imbalance) * ewma_imbalance_per_tick + alpha_imbalance * current_imbalance_per_tick
                
                expected_imbalance = ewma_num_ticks * ewma_imbalance_per_tick
                
                state['cum_theta_dollar'], state['num_ticks_in_bar'], state['cum_volume'], state['cum_dollar_volume'] = 0.0, 0, 0.0, 0.0
    
    process_chunk(warmup_df, bar_state)
    for batch in data_iterator:
        chunk_df = batch.to_pandas()
        process_chunk(chunk_df, bar_state)
        
    return bars

def build_volume_bars(data_iterator, volume_per_bar, total_groups):
    bars = []
    bar_state = {
        'open_price': 0.0, 'high_price': 0.0, 'low_price': float('inf'),
        'cum_volume': 0.0, 'cum_dollar_volume': 0.0, 'num_ticks_in_bar': 0
    }

    def process_chunk(chunk_df, state):
        for row in chunk_df.itertuples():
            if state['num_ticks_in_bar'] == 0:
                state['open_price'] = row.price
                state['high_price'] = row.price
                state['low_price'] = row.price

            state['high_price'] = max(state['high_price'], row.price)
            state['low_price'] = min(state['low_price'], row.price)
            state['cum_volume'] += row.quantity
            state['cum_dollar_volume'] += (row.price * row.quantity)
            state['num_ticks_in_bar'] += 1

            if state['cum_volume'] >= volume_per_bar:
                bars.append((
                    row.transact_time, state['open_price'], state['high_price'], state['low_price'], row.price,
                    state['cum_volume'], state['cum_dollar_volume'], state['num_ticks_in_bar']
                ))
                state['cum_volume'], state['num_ticks_in_bar'], state['cum_dollar_volume'] = 0.0, 0, 0.0
    
    for batch in data_iterator:
        chunk_df = batch.to_pandas()
        process_chunk(chunk_df, bar_state)
        
    return bars

def process_data(bar_type):
    os.makedirs(OUTPUT_FOLDER, exist_ok=True)
    
    cols_to_read = ['transact_time', 'price', 'quantity', 'is_buyer_maker']
    parquet_file = pq.ParquetFile(TICK_DATA_PATH)
    total_groups = parquet_file.num_row_groups
    data_iterator = parquet_file.iter_batches(batch_size=1_000_000, columns=cols_to_read)
    
    if bar_type == 'dollar_imbalance':
        raw_bars = build_dollar_imbalance_bars(data_iterator, INITIAL_EWMA_WINDOW, total_groups)
        output_path = os.path.join(OUTPUT_FOLDER, OUTPUT_DIB_FILE)
    elif bar_type == 'volume':
        raw_bars = build_volume_bars(data_iterator, VOLUME_PER_BAR, total_groups)
        output_path = os.path.join(OUTPUT_FOLDER, OUTPUT_VOL_FILE)
    else:
        raise ValueError("Invalid bar_type")

    if not raw_bars:
        return

    bar_columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dollar_Volume', 'Num_Ticks']
    bars = pd.DataFrame(raw_bars, columns=bar_columns).set_index('Date')
    bars.index = pd.to_datetime(bars.index, unit='ms')
    
    volatility = get_daily_vol(bars['Close'], lookback=VOLATILITY_LOOKBACK)
    t_events = bars.index[VOLATILITY_LOOKBACK:]
    
    if len(t_events) > 0:
        vertical_barrier_times = t_events + pd.Timedelta(days=VERTICAL_BARRIER_DAYS)
        vertical_barrier = vertical_barrier_times[vertical_barrier_times < bars.index[-1]]
        
        events = get_events(
            close=bars['Close'], t_events=vertical_barrier.index, pt_sl=PT_SL_MULTIPLIERS,
            trgt=volatility, min_ret=MIN_VOLATILITY_RET, t1=vertical_barrier
        )
        
        if not events.empty:
            labels = get_bins(events, bars['Close'])
            final_df = bars.join(labels[['bin', 'ret']])
            final_df.to_parquet(output_path)
    
if __name__ == "__main__":
    process_data(bar_type='dollar_imbalance')

Starting bar processing...


952it [38:50,  2.45s/it]

Bar creation complete. Total bars created: 17
Not enough bars to create labels. Exiting.

Total execution time: 38.88 minutes





In [None]:
volume_bars

In [5]:
import pyarrow.parquet as pq
import pandas as pd

TICK_DATA_PATH = r"E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet"

EXPECTED_SCHEMA = {
    'transact_time': ['int64'],
    'price': ['float64', 'object', 'double'],
    'quantity': ['float64', 'object', 'double'],
    'is_buyer_maker': ['bool']
}

if __name__ == "__main__":
    print(f"--- Starting Validation for: {TICK_DATA_PATH} ---\n")
    validation_passed = True
    error_summary = {
        'schema_errors': [],
        'null_values': 0,
        'negative_or_zero_price': 0,
        'negative_or_zero_quantity': 0,
        'timestamp_out_of_order': 0
    }

    print("[Step 1/3] Checking file readability and metadata...")
    parquet_file = pq.ParquetFile(TICK_DATA_PATH)
    metadata = parquet_file.schema.to_arrow_schema()
    print(f"  - Success: File opened. Contains {parquet_file.num_row_groups} row groups.")
    
    print("\n[Step 2/3] Validating schema...")
    for col, expected_types in EXPECTED_SCHEMA.items():
        actual_type = str(metadata.field(col).type)
        if actual_type not in expected_types:
            error = f"  - ERROR: Column '{col}' has type '{actual_type}', expected one of {expected_types}."
            print(error)
            error_summary['schema_errors'].append(error)
            validation_passed = False
        else:
            print(f"  - OK: Column '{col}' has correct type '{actual_type}'.")

    if validation_passed:
        print("\n[Step 3/3] Validating data content row group by row group...")
        last_timestamp = pd.Timestamp.min
        
        for i in range(parquet_file.num_row_groups):
            print(f"  - Processing row group {i + 1}/{parquet_file.num_row_groups}...")
            row_group = parquet_file.read_row_group(i)
            df = row_group.to_pandas()
            
            df['transact_time'] = pd.to_datetime(df['transact_time'], unit='ms')
            df['price'] = pd.to_numeric(df['price'], errors='coerce')
            df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce')

            nulls_found = df[['price', 'quantity']].isnull().sum().sum()
            if nulls_found > 0:
                error_summary['null_values'] += nulls_found
                validation_passed = False

            error_summary['negative_or_zero_price'] += (df['price'] <= 0).sum()
            error_summary['negative_or_zero_quantity'] += (df['quantity'] <= 0).sum()
            
            if not df['transact_time'].is_monotonic_increasing:
                error_summary['timestamp_out_of_order'] += 1
                validation_passed = False
            
            if df['transact_time'].iloc[0] < last_timestamp:
                error_summary['timestamp_out_of_order'] += 1
                validation_passed = False
            
            last_timestamp = df['transact_time'].iloc[-1]
    else:
        print("  - Schema validation failed. Halting further checks.")

    print("\n--- Validation Summary ---")
    if validation_passed:
        print("✅ STATUS: PASSED - File appears to be valid and data quality is acceptable.")
    else:
        print("❌ STATUS: FAILED - Issues were found.")

    if error_summary['schema_errors']:
        print("\nSchema Issues:")
        for err in error_summary['schema_errors']:
            print(f"  {err}")
    
    print("\nData Quality Issues:")
    print(f"  - Null values in 'price' or 'quantity': {error_summary['null_values']}")
    print(f"  - Trades with zero or negative price: {error_summary['negative_or_zero_price']}")
    print(f"  - Trades with zero or negative quantity: {error_summary['negative_or_zero_quantity']}")
    print(f"  - Number of chunks with out-of-order timestamps: {error_summary['timestamp_out_of_order']}")
    print("--------------------------")

--- Starting Validation for: E:\IVF-IT-Solution\QuantTrade-Model\SOLUSDT_aggTrades.parquet ---

[Step 1/3] Checking file readability and metadata...
  - Success: File opened. Contains 2077 row groups.

[Step 2/3] Validating schema...
  - OK: Column 'transact_time' has correct type 'int64'.
  - OK: Column 'price' has correct type 'double'.
  - OK: Column 'quantity' has correct type 'double'.
  - OK: Column 'is_buyer_maker' has correct type 'bool'.

[Step 3/3] Validating data content row group by row group...
  - Processing row group 1/2077...
  - Processing row group 2/2077...
  - Processing row group 3/2077...
  - Processing row group 4/2077...
  - Processing row group 5/2077...
  - Processing row group 6/2077...
  - Processing row group 7/2077...
  - Processing row group 8/2077...
  - Processing row group 9/2077...
  - Processing row group 10/2077...
  - Processing row group 11/2077...
  - Processing row group 12/2077...
  - Processing row group 13/2077...
  - Processing row group 14/