In [None]:
from core.paths import HIVE_TRADES
from core.currency import CurrencyPair
from core.columns import SYMBOL, TRADE_TIME
from core.time_utils import Bounds
from datetime import datetime

import polars as pl

<h4>In this notebook we will refine our features and test if everything is calculated properly</h4>

In [None]:
hive = pl.scan_parquet(source=HIVE_TRADES, hive_partitioning=True)

In [None]:
currency_pair: CurrencyPair = CurrencyPair.from_string("ADA-USDT")

bounds: Bounds = Bounds(
    datetime(2025, 1, 1, 0, 0, 0),
    datetime(2025, 1, 30, 0, 0, 0)
)

str(bounds)

In [None]:
df_ticks: pl.DataFrame = hive.filter(
    (pl.col(SYMBOL) == currency_pair.name) &
    # Load data by filtering by both hive folder structure and columns inside each parquet file
    (pl.col("date").is_between(bounds.day0, bounds.day1)) &
    (pl.col(TRADE_TIME).is_between(bounds.start_inclusive, bounds.end_exclusive))
).collect()

In [None]:
df_ticks.select(
    pl.col("trade_time").min().alias("min_time"), 
    pl.col("trade_time").max().alias("max_time")
)

In [None]:
df_ticks = df_ticks.sort(by="trade_time", descending=False)
df_ticks.tail()

<h4>Now compute features</h4>

In [None]:
df_ticks = df_ticks.with_columns(
    quote_abs=(pl.col("price") * pl.col("quantity")),  # absolute value of quote transacted
    # When is_buyer_maker is True => someone came in and matched existing BID order => SELL
    # When is_buyer_maker is False => buyer came on and matched existing ASK order => BUY
    side_sign=1 - 2 * pl.col("is_buyer_maker")  # -1 if SELL, 1 if BUY
)
df_ticks = df_ticks.with_columns(
    quantity_sign=pl.col("side_sign") * pl.col("quantity"),
    quote_sign=pl.col("side_sign") * pl.col("quote_abs")
)

df_ticks.head()

<h4>Group by ticks into trades by trade_time</h4>

In [None]:
df_trades: pl.DataFrame = (
    df_ticks
    .group_by("trade_time", maintain_order=True)
    .agg(
        price_first=pl.col("price").first(),  # if someone placed a trade with price impact, then price_first
        price_last=pl.col("price").last(),  # and price_last will differ
        # Amount spent in quote asset for the trade
        quote_abs=pl.col("quote_abs").sum(),
        quote_sign=pl.col("quote_sign").sum(),
        # Amount of base asset transacted
        quantity_abs=pl.col("quantity").sum(),
        quantity_sign=pl.col("quantity_sign").sum(),
        num_ticks=pl.col("price").count(),  # number of ticks for each trade
    )
)
# Create boolean indicating if the trade was long or short
df_trades = df_trades.with_columns(
    (pl.col("quantity_sign") >= 0).alias("is_long")
)

df_trades.head()

In [None]:
df_trades["trade_time"].is_sorted()

<h4>Compute slippages</h4>

<p>Compute slippage as the difference between the actual amount of quote asset spent and the amount that could
    have been spent had all been executed at price_first</p>

In [None]:
df_trades = df_trades.with_columns(
    quote_slippage_abs=(pl.col("quote_abs") - pl.col("price_first") * pl.col("quantity_abs")).abs()
)
df_trades = df_trades.with_columns(
    quote_slippage_sign=pl.col("quote_slippage_abs") * pl.col("quantity_sign").sign()
)

In [None]:
df_trades.head()

<h4>Log return features</h4>

In [None]:
# create hourly candles to capture dynamics of hourly log_returns over different intervals
df_hourly: pl.DataFrame = (
    df_trades
    .group_by_dynamic(
        index_column="trade_time",
        every="1h",
        period="1h",
        closed="left",
        label="left"
    )
    .agg(
        log_return=(pl.col("price_last").last() - pl.col("price_first").first()) / pl.col("price_first").first(),
        quote_volume=pl.col("quote_abs").sum(),
        long_quote_volume=(pl.col("quote_abs") * pl.col("is_long")).sum(),
        quote_slippage_abs=pl.col("quote_slippage_abs").sum()
    )
)

df_hourly.tail()

In [None]:
from core.time_utils import TimeOffset
from datetime import datetime, timedelta
from tqdm import tqdm
from typing import *

# generate features
hour_offsets: List[int] = [1, 2, 4, 8, 12, 24, 24*3, 24*7, 24*14]
hourly_features: Dict[str, float] = {}

log_return_std_30d: float = df_hourly["log_return"].std() # long run std of hourly log_returns

quote_volume_mean_30d: float = df_hourly["quote_volume"].mean()
quote_volume_std_30d: float = df_hourly["quote_volume"].std()

quote_long_volume_mean_30d: float = df_hourly["long_quote_volume"].mean()
quote_long_volume_std_30d: float = df_hourly["long_quote_volume"].std()

quote_slippage_abs_mean_30d: float = df_hourly["quote_slippage_abs"].sum()
quote_slippage_abs_std_30d: float = df_hourly["quote_slippage_abs"].std()

for offset in tqdm(hour_offsets):
    df_interval: pl.DataFrame = df_hourly.filter(
        pl.col("trade_time").is_between(bounds.end_exclusive - timedelta(hours=offset), bounds.end_exclusive)
    )
    # Get dynamics of hourly log_returns relative to long run standard deviation
    hourly_features[f"hourly_log_return_zscore{offset}h_30d"] = df_interval["log_return"].mean() / log_return_std_30d
    # Standardize volume dynamics to long run mean volume and std volume
    hourly_features[f"hourly_quote_volume_zscore{offset}h_30d"] = (
        (df_interval["quote_volume"].mean() - quote_volume_mean_30d) / quote_volume_std_30d
    )
    # Similarily do for long volume
    hourly_features[f"hourly_long_quote_volume_zscore{offset}h_30d"] = (
        (df_interval["long_quote_volume"].mean() - quote_long_volume_mean_30d) / quote_long_volume_std_30d
    )
    # Std of hourly returns scaled by long run std of hourly returns
    if offset >= 4:
        hourly_features[f"hourly_log_return_std{offset}h_30d"] = df_interval["log_return"].std() / log_return_std_30d
    
    # Dynamics of hourly slippages, how fluid is the market
    hourly_features[f"hourly_quote_slippage_abs_zscore{offset}h_30d"] = (
        (df_interval["quote_slippage_abs"].mean() - quote_slippage_abs_mean_30d) / quote_slippage_abs_std_30d
    )

In [None]:
hourly_features