# **DATA3888 Project: Optiver**

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask.dataframe as dd
import polars as pl
from glob import glob

In [2]:
csv_files = sorted(glob("Data/individual_book_train/*.csv"))

ldf = pl.scan_csv(
    csv_files,
    schema_overrides={         
        'time_id': pl.Int64,
        'seconds_in_bucket': pl.Int64,
        'bid_price1': pl.Float64,
        'ask_price1': pl.Float64,
        'bid_price2': pl.Float64,
        'ask_price2': pl.Float64,
        'bid_size1': pl.Int64,
        'ask_size1': pl.Int64,
        'bid_size2': pl.Int64,
        'ask_size2': pl.Int64,
        'stock_id': pl.Int64,
    },
    infer_schema_length=0
)

df = ldf.collect()  
df.write_parquet("Data/Combined_book_train.parquet", compression="snappy")

In [2]:
df = pl.read_parquet(
    "Data/Combined_book_train.parquet",
    use_pyarrow=True  
)

In [4]:
import polars as pl
import numpy as np

# ──────────────────────────────────────────────────────────────────────────────
# 1. Feature builder ─ all vectorised Polars expressions
# ──────────────────────────────────────────────────────────────────────────────
class MakeFeaturesPolars:
    def __init__(self, window: int = 30) -> None:
        self.window = window            # rolling window length

    # sklearn compatibility
    def fit(self, df: pl.DataFrame, y=None): 
        return self

    def transform(self, df: pl.DataFrame) -> pl.DataFrame:
        w       = self.window
        sizes   = ["bid_size1", "bid_size2", "ask_size1", "ask_size2"]

        out = (
            df.sort(["time_id", "seconds_in_bucket"])      # guarantee order
            # ───────────────────── basic microstructure columns ───────────────────
              .with_columns([
                  ((pl.col("bid_price1") + pl.col("ask_price1")) / 2)        .alias("mid_price"),
                  (pl.col("ask_price1") - pl.col("bid_price1"))              .alias("spread")
              ])
              .with_columns([
                  (pl.col("spread") / pl.col("mid_price"))                   .alias("rel_spread"),
                  ((pl.col("bid_size1") - pl.col("ask_size1")) /
                   (pl.col("bid_size1") + pl.col("ask_size1")))              .alias("imbalance"),
                  (((pl.col("bid_size1") + pl.col("bid_size2")) -
                    (pl.col("ask_size1") + pl.col("ask_size2"))) /
                   pl.sum_horizontal(sizes))                                 .alias("book_pressure"),
                  (((pl.col("ask_price1") * pl.col("bid_size1") +
                     pl.col("bid_price1") * pl.col("ask_size1")) ) /
                   (pl.col("bid_size1") + pl.col("ask_size1")))              .alias("microprice"),
                  # keep if you still need it; identical to rel_spread
                  pl.col("rel_spread").alias("normalized_spread"),
                  ((pl.col("bid_size1") + pl.col("bid_size2")) /
                   pl.sum_horizontal(sizes))                                 .alias("OBI_L2"),
              ])
            # ───────────────────── LOB entropy (row-wise list math) ───────────────
              .with_columns([
                  (
                      pl.concat_list(sizes)                                       # list of 4 elements
                        .list.eval(pl.element() / pl.sum())                       # p_i
                        .list.eval(                                               # –Σ p log p
                            pl.when(pl.element() > 0)
                              .then(pl.element() * pl.element().log())
                              .otherwise(0.0),
                            parallel=True)
                        .list.sum()
                        .map_batches(lambda s: -s)
                        .alias("LOB_entropy")
                  )
              ])
              .with_columns(
                  (pl.col("LOB_entropy") / np.log(4)).alias("LOB_entropy_normalized")
              )
            # ───────────────────── returns & realised-vol measures ───────────────
              .with_columns([
                  (pl.col("mid_price").log().diff().over("time_id"))              .alias("log_return"),
              ])
              .with_columns([
                  (pl.col("log_return").pow(2)
                       .rolling_sum(w, min_periods=1)
                       .sqrt()
                       .over("time_id"))                                          .alias("realized_volatility"),
                  ((pl.col("log_return").abs() *
                    pl.col("log_return").abs().shift(1))
                       .rolling_mean(w, min_periods=1)
                       .over("time_id"))                                          .alias("bipower_var"),
                  (pl.col("log_return").pow(2)
                       .rolling_sum(w, min_periods=1)
                       .over("time_id"))                                          .alias("rolling_integrated_variance")
              ])
              .drop_nulls()                             # tidy up
        )
        return out


# ──────────────────────────────────────────────────────────────────────────────
# 2. Re-index & forward-fill each 10-minute window to 0…n-1 seconds
# ──────────────────────────────────────────────────────────────────────────────
class ReindexFillPolars:
    def __init__(self, n_seconds: int = 600) -> None:
        self.n_seconds = n_seconds

    def fit(self, df: pl.DataFrame, y=None): 
        return self

    def transform(self, df: pl.DataFrame) -> pl.DataFrame:
        # all distinct time_ids
        times  = df.select("time_id").unique()
        # seconds 0…n-1
        secs   = pl.DataFrame({"seconds_in_bucket": range(self.n_seconds)})
        # Cartesian product = full grid
        full   = times.join(secs, how="cross")
        # left join original data, sort, forward-fill within each window
        out = (
            full
            .join(df, on=["time_id", "seconds_in_bucket"], how="left")
            .sort(["time_id", "seconds_in_bucket"])
            .with_columns(pl.all().fill_null(strategy="forward").over("time_id"))
        )
        return out


# ──────────────────────────────────────────────────────────────────────────────
# 3. MapTimeID – optional stock_id reconciliation + dense re-indexing
# ──────────────────────────────────────────────────────────────────────────────
class MapTimeIDPolars:
    def fit(self, df: pl.DataFrame, y=None): 
        return self

    def transform(self, df: pl.DataFrame) -> pl.DataFrame:
        out = df.clone()

        # 3 a. fill missing stock_id (if present) via one-to-one mapping
        if "stock_id" in out.columns:
            stock_map = (
                out.drop_nulls(subset=["stock_id"])
                   .unique(subset=["time_id"])
                   .select(["time_id", "stock_id"])
            )
            out = (
                out.drop("stock_id")
                   .join(stock_map, on="time_id", how="left")
                   .drop_nulls()
                   .drop("stock_id")                       # mirror original logic
            )

        # 3 b. compress time_id to {1,2,…}
        tids = out.select("time_id").unique().sort("time_id").to_series().to_list()
        mapping = {old: new for new, old in enumerate(tids, start=1)}
        out = out.with_columns(pl.col("time_id").map_dict(mapping))

        return out


# ──────────────────────────────────────────────────────────────────────────────
# 4. Convenience runner (same order as your original Pipeline)
# ──────────────────────────────────────────────────────────────────────────────
def polars_pipeline(df: pl.DataFrame,
                    n_seconds: int = 600,
                    window: int = 30) -> pl.DataFrame:
    df = MakeFeaturesPolars(window).transform(df)
    df = ReindexFillPolars(n_seconds).transform(df)
    df = MapTimeIDPolars().transform(df)
    return df