In [1]:
import glob
import vaex
import dask

import numpy as np
import pandas as pd
import itertools
import matplotlib.pyplot as plt

from typing import List
from datetime import datetime
from src.preprocessing import symmetrize_data, get_micro_adjustment

In [2]:
@dask.delayed
def extract_features(orderbook_file_path:str) -> pd.DataFrame:
    df = pd.read_csv(orderbook_file_path)[['timestamp', 'asks[0].price', 'bids[0].price', 'asks[0].amount', 'bids[0].amount']]
                           
   # calculate mid price and bidask spread
    df['mid_price'] = (df['asks[0].price'] + df['bids[0].price'])/2
    df['ba_spread'] = np.round((df['asks[0].price'] - df['bids[0].price']),5)
    df['imbalance'] = df['bids[0].amount']/(df['bids[0].amount'] + df['asks[0].amount'])
    df['timestamp'] = pd.to_datetime(df['timestamp']/1000, unit='ms')

    # convert timestamp to datetime format
    df = df[['timestamp','mid_price', 'ba_spread', 'imbalance']].set_index('timestamp')
    
    # resample by 1second frequency
    df = df.resample('1s').last().ffill()
    return df

@dask.delayed
def extract_quotes(trade_file_path:str) -> pd.DataFrame:
    df = pd.read_csv(trade_file_path)[['timestamp', 'ask_price', 'bid_price', 'ask_amount', 'bid_amount']]
    df['timestamp'] = pd.to_datetime(df['timestamp']/1000, unit='ms')
    return df.set_index('timestamp')

@dask.delayed
def extract_trades(trade_file_path:str) -> pd.DataFrame:
    df = pd.read_csv(trade_file_path)[['timestamp', 'side', 'price', 'amount']]
    df['timestamp'] = pd.to_datetime(df['timestamp']/1000, unit='ms')
    return df.set_index('timestamp')

In [16]:
DATA_PATH = '/Users/mac/Desktop/Repos/FBD_Project/datasets/'
orderbook_list = sorted(glob.glob(DATA_PATH + 'adausdt/orderbook/*.csv.gz'))
quote_list = sorted(glob.glob(DATA_PATH + 'btcusdt/quotes/*.csv.gz'))
trade_list = sorted(glob.glob(DATA_PATH + 'btcusdt/trades/*.csv.gz'))

In [17]:
# process raw data to get features for calculation.
%time
all_features = [extract_features(path) for path in orderbook_list[:5]] 
df_feat = dask.compute(all_features)[0]
df_feat = pd.concat(df_feat)

# symmetrized data
df_sig = symmetrize_data(df_feat)

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 5.25 µs


In [18]:
g1, B = get_micro_adjustment(df_sig)

(732856, 6) (125710, 6)


In [19]:
micro_adj = g1 + np.linalg.matrix_power(B, 6) @  g1

In [20]:
tick_size = abs(df_sig.mid_chg.unique()[1])
micro_adj.max(), micro_adj.min(), tick_size

(5.120014751155482e-05, -5.120014751155481e-05, 0.0001)

To do 
- check bidask imblaance and buy and sell volume (pre-trades only)
- eth, btc, ada spread distribution
- upgrade microprice prediction (bitcoin too much volatility)

In [4]:
# # process raw data to get features for calculation.
# %time
# trades = [extract_trades(path) for path in trade_list[:5]] 
# quotes = [extract_quotes(path) for path in quote_list[:5]] 

# df_trade = dask.compute(trades)[0]
# df_trade = pd.concat(df_trade)

# df_quote = dask.compute(quotes)[0]
# df_quote = pd.concat(df_quote)

# # join trade and quotes
# df_tq = pd.merge(df_quote.reset_index(), df_trade.reset_index(),how='outer', on='timestamp')
# df_tq = df_tq.set_index('timestamp').sort_index()

# df_tq['ba_spread'] = df_tq['ask_price'] - df_tq['bid_price']
# df_tq['imbalance'] = df_tq['bid_amount']/(df_tq['bid_amount'] + df_tq['ask_amount'])
# df_tq[['ba_spread', 'imbalance']] = df_tq[['ba_spread', 'imbalance']].ffill()