In [1]:
import numpy as np
import polars as pl
from tqdm import tqdm
import matplotlib.pyplot as plt
import numba as nb
from numba import njit

In [2]:
book = pl.read_csv('./exchange_data/book.csv')
trades = pl.read_csv('./exchange_data/trades.csv')

In [3]:
len(book), len(trades)

(9904, 7977)

In [4]:
def compute_imbalances(ask_prices, ask_amounts, bid_prices, bid_amounts):
    ask_eligible = [ask_amounts[i] for i, v in enumerate(ask_prices) \
                    if v < (ask_prices[0] * 1.0005)]
    median_index = len(ask_eligible) // 2
    if len(ask_eligible) % 2:
        ask_median = ask_eligible[median_index]
    else:
        ask_median = (ask_eligible[median_index] + ask_eligible[median_index - 1]) / 2
    
    bid_eligible = [bid_amounts[i] for i, v in enumerate(bid_prices) \
                    if v > (bid_prices[0] * 0.9995)]
    median_index = len(bid_eligible) // 2
    if len(bid_eligible) % 2:
        bid_median = bid_eligible[median_index]
    else:
        bid_median = (bid_eligible[median_index] + bid_eligible[median_index - 1]) / 2

    median = (ask_median + bid_median) / 2

    size = median
    money = 0
    for i, amount in enumerate(ask_amounts):
        if np.isclose(size, 0):
            break
        else:
            if amount < size:
                size -= amount
                money += ask_prices[i] * amount
            else:
                money += ask_prices[i] * size
                size = 0
    ask_imbalance = ((money / median) / ask_prices[0] - 1) * 10**5

    size = median
    money = 0
    for i, amount in enumerate(bid_amounts):
        if np.isclose(size, 0):
            break
        else:
            if amount < size:
                size -= amount
                money += bid_prices[i] * amount
            else:
                money += bid_prices[i] * size
                size = 0
                
    bid_imbalance = (bid_prices[0] / (money / median) - 1) * 10**5

    return (ask_imbalance, bid_imbalance)

def compute_improved_imbalance(ob_snapshot):
    ts, data = ob_snapshot[0], ob_snapshot[1:]
    ask_prices = data[::4]
    ask_amounts = data[1::4]
    bid_prices = data[2::4]
    bid_amounts = data[3::4]
    ask_imbalance, bid_imbalance = compute_imbalances(ask_prices, ask_amounts, 
                                                        bid_prices, bid_amounts)
    
    return ts, ask_imbalance, bid_imbalance

def numba_imb(dataset: np.ndarray):
    tuples = [(0, 0.0, 0.0)] * dataset.shape[0]
    for i, row in enumerate(dataset):
        tuples[i] = compute_improved_imbalance(row)
    
    return tuples

In [5]:
tuples = numba_imb(book.to_numpy())

In [6]:
improved_imbalance = pl.DataFrame(data=np.array(tuples)).rename(
    {'column_0': 'timestamp', 'column_1': 'imbalance_ask', 'column_2': 'imbalance_bid'}
).with_columns(pl.from_epoch(pl.col("timestamp"), time_unit="ms")).set_sorted('timestamp')

In [7]:
trades = trades.set_sorted('timestamp').with_columns(
    pl.from_epoch(pl.col("timestamp"), time_unit="ms"))

trades = trades.with_columns(side=2*pl.col('side')-1)

In [8]:
trades_windows = [1000] #[100, 250, 500, 1000, 2000] # in ms

for window in tqdm(trades_windows):
    trades = trades.with_columns(trades.rolling(period=f"{window}ms", index_column="timestamp", 
               closed="right").agg(
               pl.sum('side').alias(f"trade_imbalance_{window}ms")))

100%|██████████| 1/1 [00:00<00:00, 100.12it/s]


In [9]:
def calculate_past_returns(trades_avg, delta):
    past_returns = [0.0 for i in range(trades_avg.shape[0])]
    
    start_index = 0
    delta_ms = delta
    
    for i, v in enumerate(trades_avg):
        while (v[0] - trades_avg[start_index][0]) > delta_ms:
            start_index += 1

        if np.isclose(trades_avg[start_index][1], 0):
            past_returns[i] = 0
        else:
            past_returns[i] = (v[1] / trades_avg[start_index][1] - 1) * 10**5
    
    return past_returns

In [10]:
past_returns_window = 1000 # in ms

averaged_trades = trades.with_columns(
    ((pl.col('amount') * pl.col('price')).rolling_sum(
        window_size=f"{past_returns_window}ms", by="timestamp", 
        closed="right") / pl.col('amount').rolling_sum(
        window_size=f"{past_returns_window}ms", by="timestamp", 
        closed="right")).alias(f"averaged_{past_returns_window}ms")).select(
    ["timestamp", f"averaged_{past_returns_window}ms"])

averaged_trades_numpy = averaged_trades.with_columns(
    pl.col('timestamp').dt.timestamp('ms')).to_numpy()


trades_windows = [5000] #[100, 250, 500, 1000, 2000]

past_returns_array = []
for window in tqdm(trades_windows):
    past_returns_array.append(calculate_past_returns(averaged_trades_numpy, window))

  0%|          | 0/1 [00:00<?, ?it/s]

100%|██████████| 1/1 [00:00<00:00,  3.12it/s]


In [11]:
trades = trades.with_columns(
    [pl.Series(past_returns_array[i]).alias(
        f"past_returns_{window}ms") for i, window in enumerate(trades_windows)])

In [12]:
trades = trades.with_columns(
    pl.col('price').log().diff().fill_null(0).alias('log_return'))

In [13]:
trades_log_returns = trades.select(['timestamp', 'log_return']).with_columns(
    pl.col('timestamp').dt.timestamp('ms')).to_numpy()

In [14]:
autocorrelation_time_window = 10**4 # ms

def shift(xs, n):
    e = np.empty_like(xs, np.float64)
    e[:n] = 0.0
    e[n:] = xs[:-n]
    return e

def data_autocorrelation(time_series, 
                         lags, 
                         time_window):
    autocorrelations = [[0.0 for i in range(time_series.shape[0])] for j in lags]
    ts = time_series[:, 0]
    prices = time_series[:, 1]
    lag_prices_prod = [np.cumsum(prices * shift(prices, lag)) for lag in lags]

    cum_prices = np.cumsum(prices)
    cum_prices_2 = np.cumsum(prices**2)
    
    start_index = 0
    delta_ms = time_window
    
    for i, v in enumerate(ts):
        while (v - ts[start_index]) > delta_ms:
            start_index += 1
        
        for j, lag in enumerate(lags):
            n = i - start_index + 1 - lag
            if n <= 1 or start_index == 0:
                autocorrelations[j][i] = 0
            else:
                sum_x_2 = cum_prices_2[i] - cum_prices_2[start_index + lag - 1]
                sum_x = cum_prices[i] - cum_prices[start_index + lag - 1]
                sum_y_2 = cum_prices_2[i - lag] - cum_prices_2[start_index - 1]
                sum_y = cum_prices[i - lag] - cum_prices[start_index - 1]
                denominator = (n * sum_x_2 - sum_x**2) * (n * sum_y_2 - sum_y**2)
                
                sum_xy = lag_prices_prod[j][i] - lag_prices_prod[j][start_index + lag - 1]
                numerator = n * sum_xy - sum_x * sum_y
                
                if np.isclose(numerator, 0):
                    autocorrelations[j][i] = 0.0
                elif denominator > 0:
                    autocorrelations[j][i] = np.divide(numerator, np.sqrt(denominator))
                else:
                    autocorrelations[j][i] = 0.0
    
    return autocorrelations

In [15]:
lags = np.array([1])

autocorrelations_per_lag = data_autocorrelation(
    trades_log_returns, lags, autocorrelation_time_window)

autocorrelations_per_lag = np.array(autocorrelations_per_lag)

trades = trades.with_columns([pl.Series(autocorrelations_per_lag[i]).alias(
    f"auto_corr{v}lag").clip(-1, 1) for i, v in enumerate(lags)])

In [16]:
realized_volatility_time_window = 10**5 # ms

trades = trades.with_columns(pl.col('log_return').pow(2).rolling_sum(
    window_size=f"{realized_volatility_time_window}ms", by="timestamp", closed="right"
).pow(0.5).fill_nan(0).alias('realized_volatility'))

In [17]:
realized_kernel_time_window = 10**4 # ms

def shift(xs, n):
    if n == 0:
        return xs.copy()
    e = np.empty_like(xs, np.float64)
    e[:n] = 0.0
    e[n:] = xs[:-n]
    return e


def parzen_kernel(x):
    x = abs(x)
    if x >= 1:
        return 0
    elif x >= 0.5:
        return 2 * (1 - x)**3
    else:
        return 1 - 6 * x**2 * (1 - x)


def data_realized_kernel(time_series, 
                         H, 
                         time_window):

    autocorrelations = [0.0 for l in range(time_series.shape[0])]
    
    ts = time_series[:, 0]
    prices = time_series[:, 1]
    
    lag_prices_prod = [np.cumsum(prices * shift(prices, lag)) for lag in range(H+1)]
    kernel_values = [parzen_kernel(k / H) for k in range(1, H + 1)]

    start_index = 0
    delta_ms = time_window

    for i, v in enumerate(ts):
        while (v - ts[start_index]) > delta_ms:
            start_index += 1
        
        if start_index == 0:
            autocorrelations[i] = 0
        else:
            kernel_range = min(i + 1 - start_index, H)
            res = lag_prices_prod[0][i] - lag_prices_prod[0][start_index - 1]
            for j in range(1, kernel_range+1):
                res += 2 * kernel_values[j - 1] * (lag_prices_prod[j][i] - \
                                            lag_prices_prod[j][start_index + j - 1])
            autocorrelations[i] = res
    
    return autocorrelations

In [18]:
H = 3
realized_kernel_per_H = []

realized_kernel_per_H = np.array(data_realized_kernel(
    trades_log_returns, H, realized_kernel_time_window))



In [19]:
trades = trades.with_columns(pl.Series(
    realized_kernel_per_H).alias(f"real_kernel_H{H}"))

In [20]:
book = book.with_columns(midprice_w = (pl.col('ask[0].price') * pl.col('bid[0].amount') + 
                     pl.col('bid[0].price') * pl.col('ask[0].amount')) / 
                   (pl.col('bid[0].amount') + pl.col('ask[0].amount')))

In [21]:
book = book.with_columns((pl.col('ask[0].price') - pl.col('bid[0].price')).alias('spread'))

In [22]:
book = book.with_columns(((pl.col('ask[0].price') + pl.col('bid[0].price')) / 2).alias('mid_price'))

In [23]:
book = book.select(['timestamp', 'midprice_w', 'spread', 'mid_price']).with_columns(pl.from_epoch(pl.col("timestamp"), time_unit="ms")).set_sorted('timestamp')

In [24]:
features = book.join_asof(improved_imbalance, on='timestamp', strategy='backward')

In [25]:
features = features.join_asof(trades.drop(['price', 'amount', 'side']), 
                              on='timestamp', strategy='backward').fill_null(0
                                                                             ).with_columns(pl.col('timestamp').dt.timestamp('ms'))

In [27]:
features.write_csv('features.csv')

In [26]:
features

timestamp,midprice_w,spread,mid_price,imbalance_ask,imbalance_bid,trade_imbalance_1000ms,past_returns_5000ms,log_return,auto_corr1lag,realized_volatility,real_kernel_H3
i64,f64,f64,f64,f64,f64,i64,f64,f64,f64,f64,f64
1700251145100,1938.499671,0.01,1938.495,1.778173,-1.1102e-11,0,0.0,0.0,0.0,0.0,0.0
1700251145201,1938.493591,0.01,1938.495,0.0,-1.1102e-11,0,0.0,0.0,0.0,0.0,0.0
1700251145301,1938.492478,0.01,1938.495,0.0,-1.1102e-11,2,0.0,0.0,0.0,0.0,0.0
1700251145401,1938.492249,0.01,1938.495,0.0,-1.1102e-11,2,0.0,0.0,0.0,0.0,0.0
1700251145501,1938.49186,0.01,1938.495,0.0,-1.1102e-11,2,0.0,0.0,0.0,0.0,0.0
1700251145601,1938.491446,0.01,1938.495,0.0,-1.1102e-11,2,0.0,0.0,0.0,0.0,0.0
1700251145701,1938.491446,0.01,1938.495,0.0,-1.1102e-11,2,0.0,0.0,0.0,0.0,0.0
1700251145801,1938.490995,0.01,1938.495,0.0,0.0,2,0.0,0.0,0.0,0.0,0.0
1700251145901,1938.490985,0.01,1938.495,0.0,0.0,2,0.0,0.0,0.0,0.0,0.0
1700251146001,1938.490906,0.01,1938.495,0.0,2.2204e-11,2,0.0,0.0,0.0,0.0,0.0
