In [1]:
import pyarrow.dataset as ds
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import math
import itertools

In [2]:
from statsmodels.stats import stattools
from scipy import stats
import seaborn as sns

In [3]:
from RiskLabAI.controller import Controller
# initialize controller
controller = Controller()
from RiskLabAI.data.structures.data_structures_lopez import *

In [4]:
import torch
print(torch.backends.mps.is_available())

True


# Load Datasets

In [5]:
#Asset under study
ticker = 'BTCUSDT'

# define dataset
dataset = ds.dataset(
    "/Users/bobet/Documents/Code Repository/Trading-Systems/_datasets",
    format="parquet")

# push filter into Arrow scan (faster, uses partition pruning if possible)
table = dataset.to_table(filter=ds.field("symbol") == ticker)

# convert to pandasssss
df = table.to_pandas()
df.tail()

Unnamed: 0,symbol,ts_ms,iso_utc,ohlc_ts_open,ohlc_open,ohlc_high,ohlc_low,ohlc_close,ohlc_volume,ohlc_ts_close,...,tr_volume_base,tr_volume_quote,tr_vwap,tr_buy_sell_imbalance,spot_price,perp_mark_price,basis_abs,basis_pct,funding_rate,next_funding_time_ms
40288,BTCUSDT,1757889323802,2025-09-14T22:35:23.802648+00:00,1757889300000,115882.22,115882.23,115882.22,115882.23,0.03231,1757889359999,...,2.42399,280905.729038,115885.679825,-0.335138,115882.23,115830.237257,-51.992743,-0.000449,7.1e-05,1757894400000
40289,BTCUSDT,1757889383842,2025-09-14T22:36:23.842662+00:00,1757889360000,115870.85,115870.85,115854.57,115854.58,1.17028,1757889419999,...,3.39239,393129.957709,115885.837922,-0.152379,115854.58,115801.2,-53.38,-0.000461,7.1e-05,1757894400000
40290,BTCUSDT,1757889443882,2025-09-14T22:37:23.882745+00:00,1757889420000,115854.57,115854.58,115839.69,115839.7,3.11556,1757889479999,...,3.90965,453069.854949,115885.016548,-0.01363,115839.7,115793.191441,-46.508559,-0.000401,7.1e-05,1757894400000
40291,BTCUSDT,1757889503902,2025-09-14T22:38:23.902651+00:00,1757889480000,115835.21,115835.21,115830.95,115830.96,1.35011,1757889539999,...,3.89453,451272.417425,115873.396129,-0.029808,115830.96,115777.1,-53.86,-0.000465,7.1e-05,1757894400000
40292,BTCUSDT,1757889563962,2025-09-14T22:39:23.962900+00:00,1757889540000,115825.44,115825.45,115825.44,115825.44,1.72079,1757889599999,...,4.71695,546509.222917,115860.719939,-0.759722,115825.45,115777.1,-48.35,-0.000417,7.1e-05,1757894400000


In [6]:
#features
df.columns

Index(['symbol', 'ts_ms', 'iso_utc', 'ohlc_ts_open', 'ohlc_open', 'ohlc_high',
       'ohlc_low', 'ohlc_close', 'ohlc_volume', 'ohlc_ts_close', 'ohlc_trades',
       'ohlc_taker_base', 'ohlc_taker_quote', 'l1_bid', 'l1_ask', 'l1_mid',
       'l1_spread', 'l1_bid_qty', 'l1_ask_qty', 'l1_imbalance', 'l2_bid_depth',
       'l2_ask_depth', 'l2_depth_asymmetry', 'l2_bid_vwap', 'l2_ask_vwap',
       'l2_bid_slope', 'l2_ask_slope', 'tr_volume_base', 'tr_volume_quote',
       'tr_vwap', 'tr_buy_sell_imbalance', 'spot_price', 'perp_mark_price',
       'basis_abs', 'basis_pct', 'funding_rate', 'next_funding_time_ms'],
      dtype='object')

In [7]:
sample_size = df.count()[0]
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 40293 entries, 0 to 40292
Data columns (total 37 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   symbol                 40293 non-null  object 
 1   ts_ms                  40293 non-null  int64  
 2   iso_utc                40293 non-null  object 
 3   ohlc_ts_open           40293 non-null  int64  
 4   ohlc_open              40293 non-null  float64
 5   ohlc_high              40293 non-null  float64
 6   ohlc_low               40293 non-null  float64
 7   ohlc_close             40293 non-null  float64
 8   ohlc_volume            40293 non-null  float64
 9   ohlc_ts_close          40293 non-null  int64  
 10  ohlc_trades            40293 non-null  int64  
 11  ohlc_taker_base        40293 non-null  float64
 12  ohlc_taker_quote       40293 non-null  float64
 13  l1_bid                 40293 non-null  float64
 14  l1_ask                 40293 non-null  float64
 15  l1

## Raw Features

### General
- **symbol**: Trading pair identifier (e.g., BTCUSDT).  
- **ts_ms**: Data timestamp in milliseconds (epoch time).  
- **iso_utc**: Data timestamp in human-readable UTC format.  

### OHLC Data (Candlestick)
- **ohlc_ts_open**: Opening timestamp for the candlestick period.  
- **ohlc_open**: Opening price of the candlestick.  
- **ohlc_high**: Highest price within the candlestick.  
- **ohlc_low**: Lowest price within the candlestick.  
- **ohlc_close**: Closing price of the candlestick.  
- **ohlc_volume**: Trading volume during the candlestick (base asset units).  
- **ohlc_ts_close**: Closing timestamp for the candlestick period.  
- **ohlc_trades**: Number of trades in the candlestick.  
- **ohlc_taker_base**: Base asset volume traded by takers (aggressors).  
- **ohlc_taker_quote**: Quote asset volume traded by takers.  

### Level 1 Order Book (Top of Book)
- **l1_bid**: Best bid price (highest buy order).  
- **l1_ask**: Best ask price (lowest sell order).  
- **l1_mid**: Midpoint price between bid and ask.  
- **l1_spread**: Difference between best ask and bid (ask - bid).  
- **l1_bid_qty**: Quantity available at best bid.  
- **l1_ask_qty**: Quantity available at best ask.  
- **l1_imbalance**: Order book imbalance at Level 1 = (bid_qty – ask_qty) / (bid_qty + ask_qty).  

### Level 2 Order Book (Depth of Market)
- **l2_bid_depth**: Total buy-side liquidity across multiple bid levels.  
- **l2_ask_depth**: Total sell-side liquidity across multiple ask levels.  
- **l2_depth_asymmetry**: Relative difference between bid and ask depth.  
- **l2_bid_vwap**: Volume-weighted average bid price across order book levels.  
- **l2_ask_vwap**: Volume-weighted average ask price across order book levels.  
- **l2_bid_slope**: Measure of how steeply bid prices rise with quantity (liquidity gradient).  
- **l2_ask_slope**: Measure of how steeply ask prices rise with quantity.  

### Trade Data
- **tr_volume_base**: Total traded volume in base asset.  
- **tr_volume_quote**: Total traded volume in quote asset.  
- **tr_vwap**: Trade volume-weighted average price.  
- **tr_buy_sell_imbalance**: Difference between buy-initiated and sell-initiated trade volumes.  

### Derived Prices
- **spot_price**: Current spot market price.  
- **perp_mark_price**: Mark price used in perpetual futures to avoid manipulation.  
- **basis_abs**: Absolute difference between perpetual mark price and spot price.  
- **basis_pct**: Percentage difference between perpetual mark price and spot price.  
- **funding_rate**: Periodic payment rate between long and short positions in perpetual contracts.  
- **next_funding_time_ms**: Timestamp (ms) of the next funding event.  

## Data hygiene & storage

✔ Why: storage efficiency + ordering. A 10GB dataset may shrink to ~3-4GB when optimized.

In [8]:
# Ensure correct dtypes (saves memory on 10GB dataset)
dtype_map = {
    "symbol": "category",
    "ohlc_open": "float32", "ohlc_high": "float32", "ohlc_low": "float32", "ohlc_close": "float32",
    "ohlc_volume": "float32", "ohlc_trades": "int32",
    "ohlc_taker_base": "float32", "ohlc_taker_quote": "float32",
    "l1_bid": "float32", "l1_ask": "float32", "l1_mid": "float32", "l1_spread": "float32",
    "l1_bid_qty": "float32", "l1_ask_qty": "float32", "l1_imbalance": "float32",
    "l2_bid_depth": "float32", "l2_ask_depth": "float32", "l2_depth_asymmetry": "float32",
    "l2_bid_vwap": "float32", "l2_ask_vwap": "float32",
    "l2_bid_slope": "float32", "l2_ask_slope": "float32",
    "tr_volume_base": "float32", "tr_volume_quote": "float32", "tr_vwap": "float32",
    "tr_buy_sell_imbalance": "float32",
    "spot_price": "float32", "perp_mark_price": "float32",
    "basis_abs": "float32", "basis_pct": "float32", "funding_rate": "float32"
}

df = df.astype(dtype_map)

# Make sure timestamp is datetime
df["iso_utc"] = pd.to_datetime(df["iso_utc"])
df = df.set_index("iso_utc").sort_index()


#Intergrity Check
# Drop duplicates, check ordering
df = df[~df.index.duplicated(keep="first")].sort_index()

# Sanity checks for OHLC
mask = (
    (df["ohlc_low"] <= df["ohlc_open"]) &
    (df["ohlc_low"] <= df["ohlc_close"]) &
    (df["ohlc_high"] >= df["ohlc_open"]) &
    (df["ohlc_high"] >= df["ohlc_close"])
)
df = df[mask]

# Check non-negative volumes
df = df[df["ohlc_volume"] >= 0]

## Convert DataFrame to RiskLA AI Input Format

# Parameter Search

In [10]:
# ---- Wrapper using your imported controller ----
def tick_run_bars(run_bars: str, wi: int, wn: int, seed: int, data: np.ndarray):
    
    runbars= controller.handle_input_command(
        method_name=run_bars,
        method_arguments={
            "window_size_for_expected_n_ticks_estimation": wn,
            "window_size_for_expected_imbalance_estimation": wi,
            "initial_estimate_of_expected_n_ticks_in_bar": seed,
        },
        input_data=data,
        batch_size=1_000_000,
    )

    return runbars

# ---- Sweep function ----
import itertools
import numpy as np
import pandas as pd
from scipy import stats

def sweep(run_bars, data, wn_range, wi_range, seed_range):
    rows = []
    n_input = len(data)

    for wn, wi, seed in itertools.product(wn_range, wi_range, seed_range):
        try:
            result = tick_run_bars(run_bars, wi, wn, seed, data)
            n_output = len(result)
            pct_change = (n_output - n_input) / n_input

            if n_output >= 4:  # need at least 3 diffs
                log_ret = np.log(result['Close']).diff().dropna()
                stat, pval = stats.shapiro(log_ret)
            else:
                stat, pval = None, None

            rows.append({
                "run_bars": run_bars,
                "wn": wn,
                "wi": wi,
                "seed": seed,
                "n_input": n_input,
                "n_output": n_output,
                "% change": pct_change,
                "statistic": stat,
                "p_value": pval,
            })

        except Exception as e:
            rows.append({
                "run_bars": run_bars,
                "wn": wn,
                "wi": wi,
                "seed": seed,
                "n_input": n_input,
                "n_output": None,
                "% change": None,
                "statistic": None,
                "p_value": None,
                "error": str(e),
            })

    df = pd.DataFrame(rows)
    df = df.sort_values(
        ["n_output", "p_value", "statistic"],
        ascending=[True, True, True],
        na_position="last"
    )

    df = df[(abs(df["% change"]) < 0.98)]
    return df

In [11]:
df_riskAI = df.copy()
df_riskAI = df_riskAI.loc[:, ['symbol', 'ohlc_close', 'ohlc_volume']]
#rename column
df_riskAI.reset_index(inplace=True) 
df_riskAI.set_index('symbol', inplace=True)
df_riskAI.columns = ['date', 'price', 'volume']
df_riskAI.head()



Unnamed: 0_level_0,date,price,volume
symbol,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
BTCUSDT,2025-08-17 15:45:47.575950+00:00,118251.351562,0.7558
BTCUSDT,2025-08-17 15:46:47.606265+00:00,118234.53125,5.14589
BTCUSDT,2025-08-17 15:47:47.643040+00:00,118234.53125,2.35213
BTCUSDT,2025-08-17 15:48:47.662644+00:00,118234.523438,6.03409
BTCUSDT,2025-08-17 15:49:47.702649+00:00,118234.523438,1.28289


In [12]:
rag_wn= range(1, 1001)
rag_wi = range(1, 500)
rag_seed= range(1, 500)

In [None]:
tick_imbalance_params = sweep(run_bars="expected_tick_imbalance_bars",
                            data=df_riskAI,
                            wn_range=rag_wn,
                            wi_range=rag_wi,
                            seed_range=rag_seed,
                        )
tick_imbalance_params = tick_imbalance_params.tail(1)
tick_imbalance_params

Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 with size 40293
Processing batch 0 w

In [None]:
vol_imbalance_params = sweep(run_bars="expected_volume_imbalance_bars",
                            data=df_riskAI,
                            wn_range=rag_wn,
                            wi_range=rag_wi,
                            seed_range=rag_seed,
                        )
vol_imbalance_params = vol_imbalance_params.tail(1)
vol_imbalance_params

In [None]:
dollar_imbalance_params = sweep(run_bars="expected_dollar_imbalance_bars",
                            data=df_riskAI,
                            wn_range=rag_wn,
                            wi_range=rag_wi,
                            seed_range=rag_seed,
                        )
dollar_imbalance_params = dollar_imbalance_params.tail(1)
dollar_imbalance_params

In [None]:
tick_runbars_params = sweep(run_bars="expected_tick_run_bars",
                            data=df_riskAI,
                            wn_range=rag_wn,
                            wi_range=rag_wi,
                            seed_range=rag_seed,
                        )
tick_runbars_params = tick_runbars_params.tail(1)
tick_runbars_params

In [None]:
vol_runbars_params = sweep(run_bars="expected_volume_run_bars",
                            data=df_riskAI,
                            wn_range=rag_wn,
                            wi_range=rag_wi,
                            seed_range=rag_seed,
                            )

vol_runbars_params = vol_runbars_params.tail(1)
vol_runbars_params 

In [None]:
dollar_runabars_params = sweep(run_bars="expected_dollar_run_bars",
                            data=df_riskAI,
                            wn_range=rag_wn,
                            wi_range=rag_wi,
                            seed_range=rag_seed,
                        )
dollar_runabars_params = dollar_runabars_params.tail(1)
dollar_runabars_params

# Sampling schemes

In financial time series, sampling schemes determine how raw tick-level data (individual trades) are aggregated into bars (OHLC structures). Traditional time bars sample at fixed calendar intervals, but these often distort statistical properties by oversampling quiet periods and undersampling volatile ones.

To address this, López de Prado (2018) introduced alternative, `event-driven` bars that adapt to market activity. In this work, the focus is on:

- `Expected Imbalance Bars (EIBs)`
EIBs close a bar when the accumulated buy–sell volume imbalance exceeds an expected threshold, estimated dynamically from historical data. This produces bars of variable length that contain approximately equal amounts of information, improving stationarity and normality of returns. EIBs are particularly well suited for machine learning tasks that rely on balanced and stable input data.

- `Expected Run Bars (ERBs)`
ERBs close a bar when the number of consecutive buy or sell trades (a “run”) surpasses an expected run length, again estimated adaptively. This highlights periods of persistent order flow, often associated with informed trading or liquidity grabs. ERBs are especially valuable for detecting market microstructure patterns, such as those studied in Smart Money Concepts (SMC).

In [None]:
def generate_information_driven_bars(run_bars, wi, wn,seed,data):

    run_bars= controller.handle_input_command(
        method_name=run_bars,
        method_arguments={
            "window_size_for_expected_n_ticks_estimation": wn,
            "window_size_for_expected_imbalance_estimation": wi,
            "initial_estimate_of_expected_n_ticks_in_bar": seed,
        },
        input_data=data,
        batch_size=1_000_000,
    )

    return run_bars

## Expected Imbalance Bars

### Imbalance Tick Bars

In [None]:
seed = 0

In [None]:
imbalance_tick_bar = generate_information_driven_bars(run_bars="expected_tick_imbalance_bars", 
                                                      wi=tick_imbalance_params.wi, 
                                                      wn=tick_imbalance_params.wn,
                                                      seed=tick_imbalance_params.seed,
                                                      data=df_riskAI)
print(imbalance_tick_bar.shape)
imbalance_tick_bar.head()

### Imbalance Volume Bars

In [None]:
imbalance_volume_bar=generate_information_driven_bars(run_bars="expected_volume_imbalance_bars", 
                                                      wi=vol_imbalance_params.wi, 
                                                      wn=vol_imbalance_params.wn,
                                                      seed=vol_imbalance_params.seed,
                                                      data=df_riskAI)

print(imbalance_volume_bar.shape)
imbalance_volume_bar.head()

### Imbalance Dollar Bars

In [None]:
imbalance_dollar_bar = generate_information_driven_bars(run_bars="expected_dollar_imbalance_bars", 
                                                      wi=dollar_imbalance_params.wi, 
                                                      wn=dollar_imbalance_params.wn,
                                                      seed=dollar_imbalance_params.seed,
                                                      data=df_riskAI)

print(imbalance_dollar_bar.shape)
imbalance_dollar_bar.head()

### Statistical Test

#### Log Return

In [None]:
#Compute log returns from a price series.l
log_return = lambda s: np.log(s).diff().dropna()

time_returns = log_return(df['ohlc_close'])
ticks_EIB_returns = log_return(imbalance_dollar_bar['close'])
volume_EIB_returns = log_return(imbalance_volume_bar['close'])
dollars_EIB_returns = log_return(imbalance_dollar_bar['close'])

####  Jarque–Bera test statistic 

The `Jarque–Bera (JB) test` is used to check whether data follow a normal distribution by looking at skewness and kurtosis. In this test, smaller values are desirable because they indicate the data are closer to being normally distributed. For example, a statistic around 1 suggests the data are reasonably consistent with normality. A very large value, such as 6,633,374, strongly signals that the data deviate from normality, often due to heavy tails or asymmetry. In rare cases, the statistic can be 0, which occurs if the data have exactly zero skewness and a normal level of kurtosis, or if the dataset has no variation at all.

In [None]:
print("Jarque-Bera test statistic for time returns:", int(stats.jarque_bera(time_returns)[0]))
print("Jarque-Bera test statistic for EIB tick returns:", int(stats.jarque_bera(ticks_EIB_returns)[0]))
print("Jarque-Bera test statistic for EIB volume returns:", int(stats.jarque_bera(volume_EIB_returns)[0]))
print("Jarque-Bera test statistic for EIB dollar returns:", int(stats.jarque_bera(dollars_EIB_returns)[0]))

#### Shapiro-Wilk Test

The `Shapiro–Wilk` test is a statistical method used to check whether a dataset follows a normal distribution. Unlike the Jarque–Bera test, which looks at skewness and kurtosis, the Shapiro–Wilk test directly compares the data to a perfectly normal shape. The test produces a statistic between 0 and 1, where values closer to 1 indicate the data are more consistent with normality. For example, a statistic of 0.98 would suggest the data are likely normal, while a much smaller value, such as 0.70, would indicate a strong departure from normality. The test also provides a p-value: if it is larger than 0.05, the data are considered roughly normal; if smaller, the data are unlikely to be normally distributed.

In [None]:
print("Shapiro-Wilk test statistic for time returns:", stats.shapiro(time_returns))
print("Shapiro-Wilk test statistic for EIB tick returns:", stats.shapiro(ticks_EIB_returns))
print("Shapiro-Wilk test statistic for EIB volume returns:", stats.shapiro(volume_EIB_returns))
print("Shapiro-Wilk test statistic for EIB dollar returns:", stats.shapiro(dollars_EIB_returns))

#### Kernel Density Estimate (KDE) plot

A `Kernel Density Estimate (KDE) plot` is a smooth curve that shows the probability distribution of a dataset. It can be thought of as a smoothed version of a histogram, where the peaks indicate where the data are most concentrated and the shape of the curve shows how the values are distributed. KDE plots are often used to visually assess whether data resemble a normal distribution or display skewness, heavy tails, or multiple peaks.

In [None]:
#Standardize Data 
time_standard = (time_returns - time_returns.mean()) / time_returns.std()
EIB_tick_standard = (ticks_EIB_returns - ticks_EIB_returns.mean()) / ticks_EIB_returns.std()
EIB_volume_standard = (volume_EIB_returns  - volume_EIB_returns.mean()) / volume_EIB_returns.std()
EIB_dollar_standard = (dollars_EIB_returns - dollars_EIB_returns.mean()) / dollars_EIB_returns.std()

In [None]:
#Distribution Plot
plt.figure(figsize=(16, 12))
sns.kdeplot(time_standard, label="Time", color="red")
sns.kdeplot(EIB_tick_standard, label="Tick", color="blue")
sns.kdeplot(EIB_volume_standard, label="Volume", color="green")
sns.kdeplot(EIB_dollar_standard, label="Dollar", color="purple", linestyle="-.")
sns.kdeplot(np.random.normal(size=1000000), label="Normal", linestyle="dotted")
plt.xticks(range(-4, +4))

#labels
plt.xlabel("Standardized Log Returns")
plt.ylabel("Density")
plt.title(
    'Partial Recovery of Normality for Expected Imbalance Bars',
    loc='center', 
)
plt.xlim(-5, 5)
plt.legend()
plt.show()

## Run Bars

### Tick Run Bars

In [None]:
tick_run_bars  = generate_information_driven_bars(run_bars="expected_dollar_imbalance_bars", 
                                                      wi=tick_runbars_params.wi, 
                                                      wn=tick_runbars_params.wn,
                                                      seed=tick_runbars_params.seed,
                                                      data=df_riskAI)

print(tick_run_bars.shape)
tick_run_bars

### Volume Run Bars

In [None]:
volume_run_bars = generate_information_driven_bars(run_bars="expected_volume_run_bars", 
                                                      wi=vol_runbars_params.wi, 
                                                      wn=vol_runbars_params.wn,
                                                      seed=vol_runbars_params.seed,
                                                      data=df_riskAI)
print(volume_run_bars.shape)
volume_run_bars.head()

### Dollar Run Bars

In [None]:
dollar_run_bars = generate_information_driven_bars(run_bars="expected_dollar_run_bars", 
                                                      wi=dollar_runabars_params.wi, 
                                                      wn=dollar_runabars_params.wn,
                                                      seed=dollar_runabars_params.seed,
                                                      data=df_riskAI)
print(dollar_run_bars.shape)
dollar_run_bars.head()

### Statistical Test

#### Log Return

In [None]:
tick_run_bars_returns = log_return(tick_run_bars['Close'])
volume_run_bars_returns = log_return(volume_run_bars['Close'])
dollar_run_bars_returns = log_return(dollar_run_bars['Close'])

####  Jarque–Bera test statistic 

In [None]:
#print("Jarque-Bera test statistic for time returns:", int(stats.jarque_bera(time_returns)[0]))
#print("Jarque-Bera test statistic for tick run bars returns:", int(stats.jarque_bera(tick_run_bars_returns)[0]))
#print("Jarque-Bera test statistic for volume run bars returns:", int(stats.jarque_bera(volume_run_bars_returns)[0]))
#print("Jarque-Bera test statistic for dollar run bars returns:", int(stats.jarque_bera(dollar_run_bars_returns)[0]))

#### Shapiro-Wilk Test

In [None]:
print("Shapiro-Wilk test statistic for time returns:", stats.shapiro(time_returns))
print("Shapiro-Wilk test statistic for tick run bars returns:", stats.shapiro(tick_run_bars_returns))
print("Shapiro-Wilk test statistic for volume run bars returns:", stats.shapiro(volume_run_bars_returns))
print("Shapiro-Wilk test statistic for dollar run bars returns:", stats.shapiro(dollar_run_bars_returns))

#### Kernel Density Estimate (KDE) plot

In [None]:
Standardize Data 
tick_run_bars_standard = (tick_run_bars_returns - tick_run_bars_returns.mean()) / tick_run_bars_returns.std()
volume_run_bars_standard = (volume_run_bars_returns  - volume_run_bars_returns.mean()) / volume_run_bars_returns.std()
dollar_run_bars_standard = (dollar_run_bars_returns - dollar_run_bars_returns.mean()) / dollar_run_bars_returns.std()

In [None]:
#Distribution Plot
plt.figure(figsize=(16, 12))
sns.kdeplot(time_standard, label="Time", color="red")
#sns.kdeplot(tick_run_bars_standard, label="Tick", color="blue")
#sns.kdeplot(volume_run_bars_standard, label="Volume", color="green")
sns.kdeplot(dollar_run_bars_standard , label="Dollar", color="purple", linestyle="-.")
sns.kdeplot(np.random.normal(size=1000000), label="Normal", linestyle="dotted")
plt.xticks(range(-4, +4))

#labels
plt.xlabel("Standardized Log Returns")
plt.ylabel("Density")
plt.title(
    'Partial Recovery of Normality for Run Bars',
    loc='center', 
)
plt.xlim(-5, 5)
plt.legend()
plt.show()

# Additional Features

In [None]:
def grouped_features(raw_df, resumpling_df, feature_name, agg_func=np.sum):
    """
    Aggregate a feature between Tick Number ranges using a specified aggregation function.

    Parameters
    ----------
    raw_df : pd.DataFrame
        Full DataFrame with tick-by-tick data.
    resumpling_df : pd.DataFrame
        DataFrame with "Tick Number" boundaries (breakpoints).
    feature_name : str
        Column name in raw_df to aggregate (e.g., "ohlc_trades").
    agg_func : function, default=np.sum
        Aggregation function (e.g., np.sum, np.mean, np.max).

    Returns
    -------
    pd.DataFrame
        resumpling_df with the aggregated feature filled.
    """

    resumpling_df[feature_name] = np.nan  

    m = raw_df[feature_name]
    idx_list = resumpling_df.index.astype(int).to_list()

    if idx_list:
        idx_start = idx_list[0]
        idx_end = idx_list[0] + 1
        resumpling_df.loc[idx_start, feature_name] = agg_func(m.iloc[0:idx_end])

    for start, end in zip(idx_list, idx_list[1:]):
        resumpling_df.loc[end, feature_name] = agg_func(m.iloc[start:(end+1)])

    return resumpling_df

def last_state(raw_df,resumpling_df, feature_name):
    """
    Align the latest state of a feature from the raw dataframe
    onto the resampled dataframe at matching indices.

    Parameters
    ----------
    raw_df : pandas.DataFrame
        Original/raw dataset containing the feature of interest.
    resampling_df : pandas.DataFrame
        Resampled dataset whose index is aligned to raw_df.
    feature_name : str
        Column name (feature) to propagate from raw_df to resampling_df.

    Returns
    -------
    pandas.DataFrame
        Updated resampling_df with a new column `feature_name`
        containing the values from raw_df at matching indices. 

    """

    resumpling_df[feature_name] = np.nan  
    idx_list = resumpling_df.index.to_list()
    for i in idx_list:
        resumpling_df.loc[i,feature_name] = raw_df[feature_name][i]
    return resumpling_df

def price_vwap(raw_df, resumpling_df, feature_name, vol_colum_name ='ohlc_volume'):
    """
    Compute VWAP (Volume-Weighted Average Price) for each resampled bar.

    Parameters
    ----------
    raw_df : pandas.DataFrame
        Original dataframe with price and volume data.
    resampling_df : pandas.DataFrame
        Resampled dataframe that defines the bar boundaries (by index).
    feature_name : str
        Column name in raw_df containing prices to weight.
    vol_column_name : str, default="ohlc_volume"
        Column name in raw_df containing volume weights.

    Returns
    -------
    pandas.DataFrame
        Updated resampling_df with an extra column containing VWAP values
        for each resampled interval.
    """
    
    resumpling_df[feature_name] = np.nan  

    m = raw_df[feature_name]
    vol = raw_df[vol_colum_name]

    idx_list = resumpling_df.index.astype(int).to_list()

    if idx_list:
        idx_start = idx_list[0]
        idx_end = idx_list[0] + 1

        resumpling_df.loc[idx_start, feature_name] = np.average(m[0:idx_end], weights=vol[0:idx_end])

    for start, end in zip(idx_list, idx_list[1:]):
        resumpling_df.loc[end, feature_name] = np.average(m[start:(end+1)], weights=vol[start:(end+1)])

    return resumpling_df

In [None]:
def aggregate_features(input_df, sampling_df):

    """
    Aggregate raw features into resampled bars using different strategies.

    Parameters
    ----------
    input_df : pandas.DataFrame
        Raw tick-level dataframe containing all features.
    sampling_df : pandas.DataFrame
        DataFrame with resampled bar boundaries.
        Must contain a 'Tick Number' column to align with input_df.

    Returns
    -------
    pandas.DataFrame
        Updated sampling_df containing aggregated features.
    """

    sampling_df =sampling_df.set_index("Tick Number")

    vol_sum = ['ohlc_trades','ohlc_taker_base','ohlc_taker_quote',
            'tr_volume_base','tr_volume_quote']

    last_states = ['l1_bid','l1_ask','l1_mid','l1_spread',
                'l1_bid_qty','l1_ask_qty','l1_imbalance',
                'l2_bid_depth','l2_ask_depth','l2_depth_asymmetry',
                'l2_bid_vwap','l2_ask_vwap','l2_bid_slope','l2_ask_slope',
                'spot_price','perp_mark_price','basis_abs','basis_pct',
                'funding_rate','next_funding_time_ms']

    mean_bar= ['tr_buy_sell_imbalance']

    weighted_mean = ['tr_vwap']

    #total
    for i in vol_sum:
        sampling_df  = grouped_features(raw_df=input_df, 
                                        resumpling_df=sampling_df , 
                                        feature_name=i, 
                                        agg_func=np.sum)
    #last value
    for j in last_states:
        sampling_df  = last_state(raw_df=input_df,
                                    resumpling_df=sampling_df ,
                                    feature_name=j)
    #average value   
    for k in mean_bar:
        sampling_df  = grouped_features(raw_df=input_df, 
                                        resumpling_df=sampling_df , 
                                        feature_name=k, 
                                        agg_func=np.mean)
    #weighted average
    for _ in weighted_mean:
        sampling_df  = price_vwap(raw_df=input_df, 
                                    resumpling_df=sampling_df , 
                                    feature_name= _,
                                    vol_colum_name ='ohlc_volume')
        
    return sampling_df

In [None]:
df_features = aggregate_features(input_df=df, sampling_df=dollar_run_bars)
df_features