## Order book data

Recall that realtime is sampled as a full snapshot every 5 minutes.

In [1]:
from aspidoceleon.bigquery import BigQuery
from aspidoceleon.dataframe import convert_timestamp_columns
from charadrius.const import Const
from loguru import logger
import matplotlib.pyplot as plt
import pandas as pd
import datetime
import re
import os
import glob
constants = Const()
bigquery = BigQuery()

# workdir
workdir = f"{constants.PROJECT_WORKDIR}/incremental_book_l2"
os.makedirs(workdir, exist_ok=True)
logger.info("workdir")

# This week, Sunday to today inclusive
TODAY = datetime.datetime.now().strftime(constants.DATE_FMT_NODASH)
WEEK_START = pd.to_datetime(datetime.datetime.now()).to_period("W-SAT").start_time.strftime(constants.DATE_FMT_NODASH)
logger.info(f"Date range: {WEEK_START}, {TODAY}")

[32m2024-07-06 16:58:18.105[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m17[0m - [1mworkdir[0m
[32m2024-07-06 16:58:18.108[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m22[0m - [1mDate range: 20240630, 20240706[0m


## Read in the data

In [2]:
"""
Read in incremental_book_l2 RT data
"""
fp_rt = f"{workdir}/incremental_book_l2_{WEEK_START}_{TODAY}.parquet"
# could skip caching by removing file
sql_rt = f"""
select
    *
from `fastitocalon.coinbase.rt_coinbase_incremental_book_l2_*`
where _table_suffix between '{WEEK_START}' and '{TODAY}'
"""
df_rt = bigquery.query_cache(sql=sql_rt, fp=fp_rt)
df_rt = convert_timestamp_columns(df_rt)
logger.info(f"fetched shape {df_rt.shape}")

[32m2024-07-06 16:58:48.022[0m | [1mINFO    [0m | [36maspidoceleon.bigquery[0m:[36mquery_cache[0m:[36m131[0m - [1mfetching -> /mnt/vol1/charadrius/incremental_book_l2/incremental_book_l2_20240630_20240706.parquet[0m
[32m2024-07-06 16:58:49.005[0m | [1mINFO    [0m | [36maspidoceleon.bigquery[0m:[36mestimate_query_cost[0m:[36m89[0m - [1mEstimated query cost: $0.02 (5.5 GB)[0m


In [None]:
"""
quick summary
"""
print(df_rt.shape)
print(df_rt['local_timestamp'].agg(['min','max']))
print(df_rt.dtypes)
df_rt.head()

## Read in batch data

In [None]:
"""
Read in batch data for given time period (all currencies)

Convert to same format as live data:
> remove exchange (redundant)
> convert symbols: lower, remove dash

and noting that the batch files are not snapshots, but the realtime data is (stream is too fast)
"""
df_batch = pd.concat([
    pd.read_parquet(f) for f in glob.glob("/mnt/vol1/coinbase_incremental_book_l2_*")
    if (
        (pd.to_datetime(re.findall(r"2[0-9]+", f)[0]) >= pd.to_datetime(WEEK_START)) &
        (pd.to_datetime(re.findall(r"2[0-9]+", f)[0]) <= pd.to_datetime(TODAY))
    )
]).drop(columns=['exchange','date']) # redundant
for timestamp_col in ['timestamp','local_timestamp']:
    df_batch[timestamp_col] = df_batch[timestamp_col].astype("datetime64[ns, UTC]")
df_batch['id'] = df_batch['id'].astype("Int64")
df_batch['is_snapshot'] = False # no snapshots, actually all real data
df_batch['is_snapshot'] = df_batch['is_snapshot'].astype("boolean")
df_batch['symbol'] = df_batch['symbol'].apply(lambda x: x.replace("-","").lower())
df_batch = df_batch[df_rt.columns]

In [None]:
"""
quick summary
"""
print(df_batch.shape)
print(df_batch['local_timestamp'].agg(['min','max']))
print(df_batch.dtypes)
df_batch.head()

### Compare indices

In [None]:
assert (df_rt.columns == df_batch.columns).all()
assert (df_rt.dtypes == df_batch.dtypes).all()