Below, I install my own implementation of Professor Boonstra's "memoize DataFrame to disk" feature. The source code can be found at [github.com/ethho/memoize](https://github.com/ethho/memoize).

In [1]:
!python3 -m pip install git+https://github.com/ethho/memoize.git

Collecting git+https://github.com/ethho/memoize.git
  Cloning https://github.com/ethho/memoize.git to /tmp/pip-req-build-zqs6fve2
  Running command git clone --filter=blob:none --quiet https://github.com/ethho/memoize.git /tmp/pip-req-build-zqs6fve2
  Resolved https://github.com/ethho/memoize.git to commit bef633bd22e4acde44cccb63399a176c6cef79b9
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25h

In [2]:
import json
import re
import os
from glob import glob
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from scipy.stats import norm, probplot
import quandl
import functools
import plotly.express as px
import plotly.graph_objects as go
from joblib import Parallel, delayed
import multiprocessing
from multiprocessing import Pool
from src.ubacktester import (
    BacktestEngine, StrategyBase, PositionBase, FeedBase,
    PlotlyPlotter, FeedID, PriceFeed, px_plot, ClockBase
)
from memoize.dataframe import memoize_df

%matplotlib inline
pd.options.display.float_format = '{:,.4f}'.format

# 20230202_hw4_ho_ethan_12350006

@mpcs
@finm33550

Ethan Ho 2/2/2023

----

## Configuration & Helper Functions

The following cell contains helper functions and configuration options that I will use in this notebook.

In [3]:
def get_secrets(fp='./secrets.json'):
    """
    Reads secret values such as API keys from a JSON-formatted file at `fp`.
    """
    with open(fp, 'r') as f:
        data = json.load(f)
    return data

def get_quandl_api_key() -> str:
    """
    Returns Quandl API key stored in secrets.json.
    """
    secrets = get_secrets()
    key = secrets.get('NASTAQ_DATA_API_KEY')
    assert key, f"NASTAQ_DATA_API_KEY field in secrets.json is empty or does not exist"
    return key

def strip_str_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """
    Given a DataFrame, strips values in columns with string or object
    dtype. I noticed that this was an issue when I saw some m_ticker values
    like "AAPL       " with trailing whitespace.
    """
    for col in df.columns:
        if pd.api.types.is_string_dtype(df[col]) or pd.api.types.is_object_dtype(df[col]):
            df[col] = df[col].str.strip()
    return df

@memoize_df(cache_dir='/tmp/memoize')
def fetch_quandl_quotemedia_prices(
    start_date, end_date, ticker
) -> pd.DataFrame:
    df = quandl.get_table(
        'QUOTEMEDIA/PRICES',
        date={'gte': start_date, 'lte': end_date},
        ticker=ticker,
        api_key=get_quandl_api_key(),
        paginate=True,
    )
    df['date'] = pd.to_datetime(df['date'])
    df.sort_values(by='date', inplace=True)
    return df

@memoize_df(cache_dir='/tmp/memoize')
def fetch_quandl_tbill_prices(
    start_date, end_date,
) -> pd.DataFrame:
    """Fetch table of treasury bill prices from Quandl."""
    df = quandl.get(
        ['USTREASURY/BILLRATES'],
        returns="pandas",
        start_date=start_date,
        end_date=end_date,
        ticker=ticker,
        api_key=get_quandl_api_key(),
    )
    df = df.reset_index().rename(columns={'Date': 'date'})
    df['date'] = pd.to_datetime(df['date'])
    df.sort_values(by='date', inplace=True)
    return df

def unique_index_keys(df, level=0) -> List[str]:
    return df.index.get_level_values(level=level).unique().tolist()

def risk_free_rate(**kw) -> float:
    """Calculates risk-free rate R_f from the 3-month T-bill rate."""
    tbill_prices = fetch_quandl_tbill_prices(**kw)
    tbill_returns = tbill_prices['USTREASURY/BILLRATES - 13 Wk Coupon Equiv']
    return tbill_returns.mean()

# Fetch High Frequency Trading Data

I started by unzipping the data using `gzip -d *.delim.gz`.

In [4]:
!ls data/Crypto/2021/For_Homework/*.delim

data/Crypto/2021/For_Homework/book_narrow_BTC-USD_2021.delim
data/Crypto/2021/For_Homework/book_narrow_ETH-BTC_2021.delim
data/Crypto/2021/For_Homework/book_narrow_ETH-USD_2021.delim
data/Crypto/2021/For_Homework/trades_narrow_BTC-USD_2021.delim
data/Crypto/2021/For_Homework/trades_narrow_ETH-BTC_2021.delim
data/Crypto/2021/For_Homework/trades_narrow_ETH-USD_2021.delim


In [5]:
!ls data/Crypto/2022/*.delim

data/Crypto/2022/trades_narrow_BTC-USD_2022.delim
data/Crypto/2022/trades_narrow_ETH-BTC_2022.delim
data/Crypto/2022/trades_narrow_ETH-USD_2022.delim


I compiled almost all of the accumulation simulation logic into the below class. Since our simulation is fairly simple, I did use my backtesting engine `ubacktester` to a significant degree, though I did use it's plotting methods.

In [18]:
def downsample_to_pow(val: int, pow10: int = 6) -> int:
    n = pow10 + 1
    hi, lo = str(val)[:-n], str(val)[-n:]
    roundup = lambda x: int(ceil(x / 10.0)) * 10
    suffix = str(roundup(int(lo[:2])))[0] + (pow10 * '0')
    final = int(hi + suffix)
    assert len(str(final)) == len(str(val))
    return final

class InsufficientRowsError(Exception):
    pass

class AccumulateRunner(dict):

    def __init__(
        self, side: int = 1,
        downsample_rate: int = 6, # 1e6 ns, or 1 ms
    ):
        assert side in (1, -1)
        self.side = side
        self.downsample_rate = downsample_rate

    # @profiler()
    def mark_qualified_trades(self, df: pd.DataFrame) -> pd.DataFrame:
        df['dt_ds'] = (
            pd.Series(df.index, dtype=np.int64)
            .apply(downsample_to_pow, args=[self.downsample_rate])
            .values
        )
        grp = df.groupby('dt_ds', group_keys=False).apply(self._mark_qualified)
        grp.index.name = 'dt'
        # breakpoint()
        return grp

    def _mark_qualified(self, df):
        if len(df) == 1:
            df['is_qual'] = 1
            return df
        if self.side > 0:
            qual_price = df['PriceMillionths'].max()
        else:
            qual_price = df['PriceMillionths'].min()
        qualified_mask = df['PriceMillionths'] == qual_price
        df['is_qual'] = qualified_mask.astype(int)
        return df

    @memoize_df(cache_dir='data/memoize', cache_lifetime_days=None)
    def get_trades_data(
        self, fp, downsample_rate, side, start_date_ns,
        row_limit,
    ):
        df = pd.read_csv(fp, delim_whitespace=True)
        df.rename(columns={
            'timestamp_utc_nanoseconds': 'dt',
        }, inplace=True)
        df.sort_values(by='dt', inplace=True)
        df['Side'] = df['Side'].astype(int)
        print(
            f"Dates in trades data {fp=} range between "
            f"{df['dt'].min()} ({pd.to_datetime(df['dt'].min())}) and "
            f"{df['dt'].max()} ({pd.to_datetime(df['dt'].max())})"
        )
        df.drop(columns=['received_utc_nanoseconds'], inplace=True)
        # assert not (df['Side'] == 0).any()
        df = df[df['Side'] / side > 0]
        df.set_index('dt', inplace=True)
        df = df.loc[start_date_ns:].iloc[:int(row_limit)]
        df = self.mark_qualified_trades(df)
        df = df.convert_dtypes()
        # breakpoint()
        return df

    # @profiler()
    @memoize_df(cache_dir='data/memoize', cache_lifetime_days=None)
    def run_accumulate_strat(
        self, fp,
        start_date='1970-01-01', # trim data starting at this date
        target_prt_rate=0.01, # 1% of traded volume
        target_notional=1e6, # stop trading when notional has reached this
        fee_rate=50, # basis points on notional
        row_limit=1e5, # number of trades to pull from data
    ):
        start_date_ns = pd.to_datetime(start_date, unit='ns').value
        df = self.get_trades_data(
            fp=fp, downsample_rate=self.downsample_rate, side=self.side,
            start_date_ns=start_date_ns, row_limit=row_limit,
        )
        df = df.convert_dtypes()
        if 'dt' in df.columns:
            df.set_index('dt', inplace=True)

        # Define masks for same side and qualifying trades
        same_side = df['Side'] * self.side > 0
        qual_mask = same_side & df['is_qual']

        # Calculate cumulative volume over time for each side, for all trades,
        # and for qualifying trades.
        df.loc[same_side, 'cum_volm_side'] = df.loc[same_side, 'SizeBillionths'].cumsum()
        df.loc[~same_side, 'cum_volm_side'] = df.loc[~same_side, 'SizeBillionths'].cumsum()
        df['cum_volm_all'] = df.loc[:, 'SizeBillionths'].cumsum()
        df['cum_volm_qual'] = pd.NA
        df.loc[qual_mask, 'cum_volm_qual'] = df.loc[qual_mask, 'SizeBillionths'].cumsum()
        df = df.convert_dtypes()

        # Calculate target participation for each qualifying trade (billionths).
        # In theory, the below calculation should get us the same as
        # df['cum_volm_qual'] * target_prt_rate. They're not exactly equal
        # due to the rounding we do with astype(int)
        df['target_prt'] = (same_side.astype(int) * df['is_qual'] * target_prt_rate * df['SizeBillionths'])#.astype(int)
        df['target_prt_cumsum'] = df['target_prt'].cumsum()
        # Approximately equal:
        # df['target_prt_cumsum'] = (df['cum_volm_qual'] * target_prt_rate).astype(int)

        # Calculate notional (billionths), fees (billionths), and VWAP
        df['notional'] = (df['target_prt'] * (df['PriceMillionths'] / 1e6))#.astype(int)
        # df['notional_cumsum'] = df['notional'].cumsum().astype(int)
        df['vwap_cumsum'] = df['notional'].cumsum().astype(int).div(df['target_prt'].cumsum().astype(int))
        df['fees'] = (df['notional'] * fee_rate / 1e4).astype(int)
        df['market_vwap'] = (
            (df['SizeBillionths'] * (df['PriceMillionths'] / 1e6)).cumsum() /
            (df['SizeBillionths']).cumsum())

        df['since_arrival'] = df['dt_ds'] - df['dt_ds'].iloc[0]

        # DEBUG
        # df['market_vwap_side'] = (
        #     (df.loc[same_side, 'SizeBillionths'] * (df.loc[same_side, 'PriceMillionths'] / 1e6)).cumsum() /
        #     (df.loc[same_side, 'SizeBillionths']).cumsum())
        # assert not (df['market_vwap'] - df['vwap_cumsum'] > 2.).any()
        vwap = (
            (df['SizeBillionths'] * (df['PriceMillionths'] / 1e6)).sum() /
            (df['SizeBillionths']).sum())

        traded_notional = df['notional'].sum() / 1e9
        if traded_notional < target_notional:
            # Raise error if we haven't reached target_notional
            raise InsufficientRowsError(
                f"{traded_notional=} {target_notional=}"
            )
        else:
            # Trim dataframe once we've reached target_notional
            last_trade = df[df['notional'].cumsum() / 1e9 > target_notional].iloc[0]
            last_idx = int(last_trade.name)
            df = df.loc[:last_idx]
        return df

In [20]:
!rm data/memoize/run_accumulate_strat_468cc0c_20230202.csv

In [21]:
%%time

runner = AccumulateRunner(side=1, downsample_rate=6)
df = runner.run_accumulate_strat(
    fp='data/Crypto/2021/For_Homework/trades_narrow_BTC-USD_2021.delim',
    start_date='1970-01-01', # trim data before this date
    target_prt_rate=0.01, # 1% of traded volume
    target_notional=1e6, # stop trading when notional has reached this
    fee_rate=50, # basis points on notional
    row_limit=1e5, # number of trades to pull from data
)

Using cache fp='data/memoize/run_accumulate_strat_468cc0c_20230202.csv' to write results of function run_accumulate_strat
Using cache fp='data/memoize/get_trades_data_885f465_20230202.csv' to write results of function get_trades_data
Using cached call from data/memoize/get_trades_data_885f465_20230202.csv
CPU times: user 1.14 s, sys: 49 ms, total: 1.19 s
Wall time: 1.18 s


In [36]:
foo = df.head(2000).tail()
foo

Unnamed: 0_level_0,PriceMillionths,SizeBillionths,Side,dt_ds,is_qual,cum_volm_side,cum_volm_all,cum_volm_qual,target_prt,target_prt_cumsum,notional,vwap_cumsum,fees,market_vwap,since_arrival
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
1618091183064028000,58946360000,29417260,1,1618091183064000000,1,56024695730,56024695730,29094351790.0,294172.6,290943517.9,17340403981.736,59106.8037,86702019,59115.4844,1049593000000
1618091183064028000,58933440000,20260760,1,1618091183064000000,0,56044956490,56044956490,,0.0,290943517.9,0.0,59106.8037,0,59115.4186,1049593000000
1618091183855674000,58942030000,8310870,1,1618091183856000000,1,56053267360,56053267360,29102662660.0,83108.7,291026626.6,4898595488.661,59106.7566,24492977,59115.3929,1050385000000
1618091185148458000,58942030000,14959600,1,1618091185149000000,1,56068226960,56068226960,29117622260.0,149596.0,291176222.6,8817491919.88,59106.672,44087459,59115.3466,1051678000000
1618091185306019000,58942030000,152160,1,1618091185306000000,1,56068379120,56068379120,29117774420.0,1521.6,291177744.2,89686192.848,59106.671,448430,59115.3462,1051835000000


In [37]:
809340 * 0.01

8093.400000000001

In [38]:
8093.40 * 59089900000 / 1e6

478238196.66

In [39]:
foo['notional'].cumsum().div(foo['target_prt'].cumsum())

dt
1618091183064028000   58,946.3600
1618091183064028000   58,946.3600
1618091183855674000   58,945.4062
1618091185148458000   58,944.4476
1618091185306019000   58,944.4406
dtype: object

In [25]:
vwap = (
    (df['SizeBillionths'] * (df['PriceMillionths'] / 1e6)).sum() /
    (df['SizeBillionths']).sum())
vwap

59838.52341688931

Note that I've excluded trades that are not on the same `side` as us. In this case, we specified `side = 1`, meaning that we intend to participate in buyer-initiated trades only. The DataFrame is also truncated between our arrival time (`start_date`) and the date of our last trade, i.e. when our `target_notional` has been traded.

Descriptions of the columns are as follows:

- The index `dt` is simply the `timestamp_utc_nanoseconds` field.
- The downsampled timestamp `dt_ds` is the ns timestamp downsampled to the power of 10 specified in `downsample_rate`. In the above case, `downsample_rate = 6`, so we downsample the timestamp to the next microsecond. This is meant to simulate latency in our networking systems.
- `is_qual` is a boolean flag that is 1 for a qualified trade.
- The `cum_volm_*` variables are the cumulative sum of trade volume on:
    - Same side as us
    - All trades, including opposite side
    - Qualified trades on the same side only
- `target_prt` is our targeted participation volume for this trade, i.e. some small percentage of the volume traded at this level.
- `target_prt_cumsum` is the cumulative sum of `target_prt` over time.
- `vwap_cumsum` is _our_ achieved VWAP

In [22]:
df['notional'].sum() / 1e9

1000096.6643073194