# Download packages

In [None]:
# !pip3 install gdown -q
# !pip3 install polars -q
# !pip3 install numba -q
# !pip3 install numba-progress -q

# Imports

In [11]:
import gdown
import numpy as np
import cupy as cp
import pandas as pd
import polars as pl
from tqdm import tqdm
import matplotlib.pyplot as plt
import numba as nb
from numba import njit
import time 

%load_ext autoreload

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [32]:
from numba_functions import numba_imb, numba_calculate_past_returns
from numpy_functions import numpy_imb, numpy_calculate_past_returns
from cupy_functions import cupy_imb, cupy_calculate_past_returns
%autoreload 2

# Download data

In [None]:
url = "https://drive.google.com/file/d/15E15XFVD8laDXNhw7uPpcPiWw-IGkYfv/view?usp=sharing"
output = 'book2h.parquet'
gdown.download(url=url, output=output, fuzzy=True, quiet=True)

url = "https://drive.google.com/file/d/1iTOy1bcQlgyuz-V2U-zgCFfzOuRm7NyN/view?usp=sharing"
output = "ticker2h.parquet"
gdown.download(url=url, output=output, fuzzy=True, quiet=True)

url = "https://drive.google.com/file/d/1Ro-3FbjQC2FDYIg1UZG7TIFEWOQkQ4uG/view?usp=sharing"
output = "trades2h.parquet"
gdown.download(url=url, output=output, fuzzy=True, quiet=True)

# Data Preparation

In [3]:
book = pl.read_parquet('book2h.parquet')
ticker = pl.read_parquet('ticker2h.parquet')
trades = pl.read_parquet('trades2h.parquet')

In [4]:
book = book.set_sorted('local_ts').with_columns(
    pl.from_epoch(pl.col("local_ts"), time_unit="ns"))

ticker = ticker.set_sorted('local_ts').with_columns(
    pl.from_epoch(pl.col("local_ts"), time_unit="ns"))

trades = trades.set_sorted('local_ts').with_columns(
    pl.from_epoch(pl.col("local_ts"), time_unit="ns"))

# Feature generation

### Improved order book imbalances

In [5]:
start = time.perf_counter()
tuples = np.array(numba_imb(book.to_numpy()))
end = time.perf_counter()

In [6]:
elapsed_time = end - start
print('numba time:', elapsed_time)

numba time: 4.000843027002702


In [7]:
start = time.perf_counter()
tuples = np.array(numba_imb(book.to_numpy()))
end = time.perf_counter()

In [8]:
elapsed_time = end - start
print('numba time after compilation:', elapsed_time)

numba time after compilation: 0.67423188900284


In [9]:
start = time.perf_counter()
tuples = np.array(numpy_imb(book.to_numpy()))
end = time.perf_counter()

In [10]:
elapsed_time = end - start
print('numpy time:', elapsed_time)

numpy time: 47.56558579999546


In [17]:
start = time.perf_counter()
tuples = np.array(cupy_imb(book.to_numpy()))
end = time.perf_counter()

  0%|          | 0/177479 [00:00<?, ?it/s]100%|██████████| 177479/177479 [08:50<00:00, 334.36it/s]


In [18]:
elapsed_time = end - start
print('cupy time:', elapsed_time)

cupy time: 531.0967542989965


In [19]:
past_returns_window = 100

averaged_trades = trades.with_columns(
    ((pl.col('amount') * pl.col('price')).rolling_sum(
        window_size=f"{past_returns_window}ms", by="local_ts", 
        closed="right") / pl.col('amount').rolling_sum(
        window_size=f"{past_returns_window}ms", by="local_ts", 
        closed="right")).alias(f"averaged_{past_returns_window}ms")).select(
    ["local_ts", f"averaged_{past_returns_window}ms"])

averaged_trades_numpy = averaged_trades.with_columns(
    pl.col('local_ts').dt.timestamp('ns')).to_numpy()

In [23]:
start = time.perf_counter()
res = numba_calculate_past_returns(averaged_trades_numpy, 500)
end = time.perf_counter()

In [24]:
elapsed_time = end - start
print('numba time:', elapsed_time)

numba time: 0.9871162130002631


In [25]:
start = time.perf_counter()
res = numba_calculate_past_returns(averaged_trades_numpy, 500)
end = time.perf_counter()

In [26]:
elapsed_time = end - start
print('numba time after compilation:', elapsed_time)

numba time after compilation: 0.06951532699895324


In [29]:
start = time.perf_counter()
res = numpy_calculate_past_returns(averaged_trades_numpy, 500)
end = time.perf_counter()

In [30]:
elapsed_time = end - start
print('numpy time:', elapsed_time)

numpy time: 0.8855756260018097


In [33]:
start = time.perf_counter()
res = cupy_calculate_past_returns(averaged_trades_numpy, 500)
end = time.perf_counter()

In [34]:
elapsed_time = end - start
print('cupy time:', elapsed_time)

cupy time: 27.944686636998085


### Autocorrelation

In [29]:
autocorrelation_time_window = 10**5 # ms

In [None]:
trades_log_returns = trades.select(['local_ts', 'log_return']).with_columns(
    pl.col('local_ts').dt.timestamp('ns')).to_numpy()

In [30]:
trades_log_returns_2h = trades_2h.select(['local_ts', 'log_return']).with_columns(
    pl.col('local_ts').dt.timestamp('ns')).to_numpy()

In [35]:
autocorrelation_time_window = 10**5 # ms


In [None]:
lags = np.array([1])
autocorrelations_per_lag = []

autocorrelations_per_lag = data_autocorrelation(
    trades_log_returns, lags, autocorrelation_time_window)

In [None]:
trades = trades.with_columns([pl.Series(autocorrelations_per_lag[i]).alias(
    f"auto_corr{v}lag").clip(-1, 1) for i, v in enumerate(lags)])

In [52]:
%%time
lags_2h = np.array([1])
autocorrelations_per_lag_2h = []

autocorrelations_per_lag_2h = data_autocorrelation(
    trades_log_returns_2h, lags_2h, autocorrelation_time_window)

CPU times: user 21.3 ms, sys: 7.02 ms, total: 28.3 ms
Wall time: 27.9 ms


In [40]:
%%time
lags_2h = np.array([1])
autocorrelations_per_lag_2h = []

autocorrelations_per_lag_2h = data_autocorrelation_numpy(
    trades_log_returns_2h, lags_2h, autocorrelation_time_window)

CPU times: user 20 s, sys: 4.95 ms, total: 20 s
Wall time: 20 s


In [50]:
%%time
lags_2h = cp.array([1])
autocorrelations_per_lag_2h = []

autocorrelations_per_lag_2h = data_autocorrelation_cupy(
    trades_log_returns_2h, lags_2h, autocorrelation_time_window)

AssertionError: 

In [53]:
trades_2h = trades_2h.with_columns([pl.Series(autocorrelations_per_lag_2h[i]).alias(
    f"auto_corr{v}lag").clip(-1, 1) for i, v in enumerate(lags_2h)])

In [None]:
trades.head(), trades_2h.head()

### Realized kernel

In [57]:
realized_kernel_time_window = 10**5 # ms

@njit
def shift(xs, n):
    if n == 0:
        return xs.copy()
    e = np.empty_like(xs, np.float64)
    e[:n] = 0.0
    e[n:] = xs[:-n]
    return e

@njit
def parzen_kernel(x):
    x = abs(x)
    if x >= 1:
        return 0
    elif x >= 0.5:
        return 2 * (1 - x)**3
    else:
        return 1 - 6 * x**2 * (1 - x)

@njit(nogil=True)
def data_realized_kernel(time_series, 
                         H, 
                         time_window,
                         progress_hook):

    autocorrelations = [0.0 for l in range(time_series.shape[0])]
    
    ts = time_series[:, 0]
    prices = time_series[:, 1]
    
    lag_prices_prod = [np.cumsum(prices * shift(prices, lag)) for lag in range(H+1)]
    kernel_values = [parzen_kernel(k / H) for k in range(1, H + 1)]

    start_index = 0
    delta_ms = time_window * 10**6

    for i, v in enumerate(ts):
        while (v - ts[start_index]) > delta_ms:
            start_index += 1
        
        if start_index == 0:
            autocorrelations[i] = 0
        else:
            kernel_range = min(i + 1 - start_index, H)
            res = lag_prices_prod[0][i] - lag_prices_prod[0][start_index - 1]
            for j in range(1, kernel_range+1):
                res += 2 * kernel_values[j - 1] * (lag_prices_prod[j][i] - \
                                            lag_prices_prod[j][start_index + j - 1])
            autocorrelations[i] = res
        
        progress_hook.update(1)
    
    return autocorrelations

In [61]:
@cp.fuse()
def shift(xs, n):
    if n == 0:
        return xs.copy()
    e = np.empty_like(xs, dtype=np.float64)
    e[:n] = 0.0
    e[n:] = xs[:-n]
    return e

@cp.fuse()
def parzen_kernel(x):
    x = np.abs(x)
    if x >= 1:
        return 0
    elif x >= 0.5:
        return 2 * (1 - x)**3
    else:
        return 1 - 6 * x**2 * (1 - x)
    
@cp.fuse()
def data_realized_kernel_numpy(time_series, H, time_window, progress_hook):
    autocorrelations = np.zeros(time_series.shape[0], dtype=np.float64)
    
    ts = time_series[:, 0]
    prices = time_series[:, 1]
    
    lag_prices_prod = [np.cumsum(prices * shift(prices, lag)) for lag in range(H + 1)]
    kernel_values = [parzen_kernel(k / H) for k in range(1, H + 1)]

    start_index = 0
    delta_ms = time_window * 10**6

    for i, v in enumerate(ts):
        while (v - ts[start_index]) > delta_ms:
            start_index += 1
        
        if start_index == 0:
            autocorrelations[i] = 0
        else:
            kernel_range = min(i + 1 - start_index, H)
            res = lag_prices_prod[0][i] - lag_prices_prod[0][start_index - 1]
            for j in range(1, kernel_range + 1):
                res += 2 * kernel_values[j - 1] * (lag_prices_prod[j][i] - lag_prices_prod[j][start_index + j - 1])
            autocorrelations[i] = res
        progress_hook.update(1)
    
    return autocorrelations

In [65]:
def shift_cupy(xs, n):
    if n == 0:
        return xs.copy()
    e = cp.empty_like(xs, dtype=cp.float64)
    e[:n] = 0.0
    e[n:] = xs[:-n]
    return e

def parzen_kernel_cupy(x):
    x = cp.abs(x)
    if x >= 1:
        return 0
    elif x >= 0.5:
        return 2 * (1 - x)**3
    else:
        return 1 - 6 * x**2 * (1 - x)
    
def data_realized_kernel_cupy(time_series, H, time_window, progress_hook):
    autocorrelations = cp.zeros(time_series.shape[0], dtype=cp.float64)
    
    ts = time_series[:, 0]
    prices = time_series[:, 1]
    
    lag_prices_prod = [cp.cumsum(prices * shift_cupy(prices, lag)) for lag in range(H + 1)]
    kernel_values = [parzen_kernel_cupy(k / H) for k in range(1, H + 1)]

    start_index = 0
    delta_ms = time_window * 10**6

    for i, v in enumerate(ts):
        while (v - ts[start_index]) > delta_ms:
            start_index += 1
        
        if start_index == 0:
            autocorrelations[i] = 0
        else:
            kernel_range = min(i + 1 - start_index, H)
            res = lag_prices_prod[0][i] - lag_prices_prod[0][start_index - 1]
            for j in range(1, kernel_range + 1):
                res += 2 * kernel_values[j - 1] * (lag_prices_prod[j][i] - lag_prices_prod[j][start_index + j - 1])
            autocorrelations[i] = res
        
        progress_hook.update(1)
    
    return cp.asnumpy(autocorrelations)

In [None]:
H = 3
realized_kernel_per_H = []
# realized_kernel_spot_per_H = []

with ProgressBar(total=len(trades_log_returns)) as numba_progress:
    realized_kernel_per_H = data_realized_kernel(
        trades_log_returns, H, realized_kernel_time_window, numba_progress)

In [None]:
trades = trades.with_columns(pl.Series(
    realized_kernel_per_H).alias(f"real_kernel_H{H}"))

In [58]:
H = 3
realized_kernel_per_H_2h = []

with ProgressBar(total=len(trades_log_returns_2h)) as numba_progress:
    realized_kernel_per_H_2h = data_realized_kernel(
        trades_log_returns_2h, H, realized_kernel_time_window, numba_progress)

  0%|          | 0/319851 [00:00<?, ?it/s]

In [63]:
H = 3
realized_kernel_per_H_2h = data_realized_kernel_numpy(
    trades_log_returns_2h, H, realized_kernel_time_window, ProgressBar(total=len(trades_log_returns_2h)))

  0%|          | 0/319851 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [66]:
H = 3
realized_kernel_per_H_2h = data_realized_kernel_cupy(
    trades_log_returns_2h, H, realized_kernel_time_window, ProgressBar(total=len(trades_log_returns_2h)))

  0%|          | 0/319851 [00:00<?, ?it/s]

TypeError: Argument 'a' has incorrect type (expected cupy._core.core._ndarray_base, got numpy.ndarray)