In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
from datetime import datetime
from typing import Union

import logging

import numpy as np
import pandas as pd

BUY = 0
SELL = 1
CME_datetime_format = "%Y%m%d%H%M%S%f"

In [3]:
symbol = 'ESH2'
month = '01'
day = '03'
datestr = "2022"+month+day
data_path = 'C:/data/CME/2022/%s/%s/MBO/XCME/'%(month, day)
order_file = data_path + symbol + '_orders.csv'
prefix_file = data_path + symbol + '_prefix.csv'
suffix_file = data_path + symbol + '_suffix.csv'
print(order_file, prefix_file, suffix_file)

C:/data/CME/2022/01/03/MBO/XCME/ESH2_orders.csv C:/data/CME/2022/01/03/MBO/XCME/ESH2_prefix.csv C:/data/CME/2022/01/03/MBO/XCME/ESH2_suffix.csv


In [4]:
def load_lit_orders(order_file, prefix=None, suffix=None):
    orders = pd.read_csv(order_file, parse_dates=['TRANSACTTIME'], date_parser=lambda x: pd.to_datetime(x, format=CME_datetime_format))
    if prefix is not None:
        prefix_df = pd.read_csv(prefix, parse_dates=['TRANSACTTIME'], date_parser=lambda x: pd.to_datetime(x, format=CME_datetime_format))
        orders = pd.concat([prefix_df, orders])
    if suffix is not None:
        suffix_df = pd.read_csv(suffix, parse_dates=['TRANSACTTIME'], date_parser=lambda x: pd.to_datetime(x, format=CME_datetime_format))
        orders = pd.concat([orders, suffix_df])
    columns = ['TRANSACTTIME', 'PUBLIC_ORDER_ID', 'EXECTYPE', 'SIDE', 'PRICE', 'VISIBLEQTY']
    df = orders[columns].reset_index().sort_values(['TRANSACTTIME', 'index']).set_index('TRANSACTTIME').iloc[:, 1:]
    df['EXECTYPE'] = df['EXECTYPE'].apply(lambda x: 'Insert' if x ==0 else ('Amend' if x == 1 else 'Cancel'))
    df['SIDE'] = df['SIDE'].apply(lambda x: 'Buy' if x == 0 else 'Sell')
    df['PUBLIC_ORDER_ID'] = df['PUBLIC_ORDER_ID'].astype(str)
    return df

In [7]:
# tmp_df = load_lit_orders(order_file, prefix=prefix_file, suffix=suffix_file)
md = parser(symbol, datestr, data_path)

order_file: C:/data/CME/2022/01/03/MBO/XCME/ESH2_orders.csv
prefix_file: C:/data/CME/2022/01/03/MBO/XCME/ESH2_prefix.csv
suffix_file: C:/data/CME/2022/01/03/MBO/XCME/ESH2_suffix.csv


In [8]:
md.lob

Unnamed: 0_level_0,PUBLIC_ORDER_ID,SIDE,PRICE,VISIBLEQTY,START,END
PUBLIC_ORDER_ID_,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
648870237966,648870237966,Buy,430500.0,1,2022-01-02 22:30:00.000000001,2022-01-03 22:30:00.000000001
649192975015,649192975015,Buy,474550.0,3,2022-01-02 22:30:00.000000001,2022-01-03 14:50:26.899534383
649192975001,649192975001,Buy,474575.0,3,2022-01-02 22:30:00.000000001,2022-01-03 14:50:26.650294035
649192974981,649192974981,Buy,474600.0,3,2022-01-02 22:30:00.000000001,2022-01-03 22:30:00.000000001
649192974967,649192974967,Buy,474625.0,3,2022-01-02 22:30:00.000000001,2022-01-03 22:30:00.000000001
...,...,...,...,...,...,...
649201576091,649201576091,Sell,478475.0,3,2022-01-03 21:59:59.859090789,2022-01-03 22:00:00.064072945
649201576093,649201576093,Buy,478450.0,1,2022-01-03 21:59:59.859304457,2022-01-03 22:00:00.064072945
649201576094,649201576094,Buy,478450.0,1,2022-01-03 21:59:59.859513401,2022-01-03 22:00:00.064072945
649201576095,649201576095,Buy,478450.0,1,2022-01-03 21:59:59.859534181,2022-01-03 22:00:00.064072945


In [10]:
%%time
# Try to process in Parallel to accelerate
ts = pd.to_datetime('20220103140030000000000', format='%Y%m%d%H%M%S%f').to_numpy()
tmp_df = md.get_l1(ts, True)

Wall time: 8.62 s


In [214]:
tmp_df

buy_depth            [6180]
total_bid_vol       [34646]
best_bid_vol           [15]
best_bid_prc     [477475.0]
best_ask_prc     [477500.0]
best_ask_vol           [42]
total_ask_vol       [33709]
sell_depth           [6049]
Name: TRANSACTTIME, dtype: object

In [6]:
BUY = 0
SELL = 1
turquoise_datetime_format = "%d-%b-%y %H.%M.%S.%f"
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)

def _get_side(side):
    return BUY if side == 'Buy' else SELL

def _decode_side(side):
    return 'Buy' if side == BUY else 'Sell'


class parser:
    def __init__(self, symbol: str, datestr: str, data_path: str):
        self.symbol = symbol
        self.datestr = datestr
        self.data_path = data_path
        order_file = data_path + symbol + '_orders.csv'
        prefix_file = data_path + symbol + '_prefix.csv'
        suffix_file = data_path + symbol + '_suffix.csv'
        print('order_file: %s' % order_file)
        print('prefix_file: %s' % prefix_file)
        print('suffix_file: %s' % suffix_file)
        
        self.litOrders = load_lit_orders(order_file, prefix=prefix_file, suffix=suffix_file)
        # self.litTrades = load_lit_trades(lit_path, date, symbol, self.mic)
        self.t0 = pd.to_datetime('{0} 22:00:00'.format(datestr)) - pd.Timedelta(1, 'day')
        self.lob, self.lobvv = self.get_durations()
        # self.validate()

    def validate(self):
        _l1 = self.get_l1(self.litTrades.index.values, post_clearing=True)
        _spread = _l1['best_ask_prc'] - _l1['best_bid_prc']
        if np.any(_spread <= 1e-6):
            logging.error('Invalid market state (negative or zero spread) at \'{0}\' ...'.format(', '.join([i.strftime('%Y-%m-%d') for i in _spread[_spread <= 1e-8].index[:3]])))

    def get_durations(self):
        lit_orders = self.litOrders.copy().reset_index()
        lit_orders['EXECTYPE_'] = lit_orders['EXECTYPE']
        lit_orders.loc[:, 'PUBLIC_ORDER_ID_'] = lit_orders['PUBLIC_ORDER_ID'].values
        lit_orders.loc[lit_orders['EXECTYPE'] != 'Insert', 'PUBLIC_ORDER_ID_'] = np.nan

        x0 = lit_orders[lit_orders['EXECTYPE'] == 'Amend'].copy()
        x0['EXECTYPE'] = 'Cancel'
        x1 = lit_orders[lit_orders['EXECTYPE'] == 'Amend'].copy()
        x1['EXECTYPE'] = 'Insert'
        x1['PUBLIC_ORDER_ID_'] = ['{0}_{1}'.format(i, 100000000+idx) for idx, i in enumerate(x1['PUBLIC_ORDER_ID'])]

        x2 = lit_orders[lit_orders['EXECTYPE'] == 'Fill'].copy()
        x2['EXECTYPE'] = 'Cancel'
        x3 = lit_orders[lit_orders['EXECTYPE'] == 'Fill'].copy()
        x3['EXECTYPE'] = 'Insert'
        x3['PUBLIC_ORDER_ID_'] = ['{0}_{1}'.format(i, 200000000+idx) for idx, i in enumerate(x3['PUBLIC_ORDER_ID'])]

        x = pd.concat([lit_orders[(lit_orders['EXECTYPE'] == 'Insert') |
                                  (lit_orders['EXECTYPE'] == 'Cancel')], x0, x1, x2, x3]).reset_index()
        x = x.sort_values(['TRANSACTTIME', 'index'], ascending=[True, True]).set_index('TRANSACTTIME').iloc[:, 1:] \
            .reset_index()

        x = x.drop(x[(x['EXECTYPE'] == 'Insert') & (x['VISIBLEQTY'] == 0)].index)
        x = x.reset_index().sort_values(['PUBLIC_ORDER_ID', 'TRANSACTTIME', 'index']).iloc[:, 1:]

        err_oids = x.groupby(['PUBLIC_ORDER_ID', 'EXECTYPE']).size().unstack(fill_value=0.0).diff(axis=1)['Insert'].replace(0, np.nan).dropna().index
        assert len(err_oids) == 0
        x = x.drop(x[x['PUBLIC_ORDER_ID'].isin(err_oids)].index)
        x['PUBLIC_ORDER_ID_'] = x['PUBLIC_ORDER_ID_'].fillna(method='ffill')

        xi = x[x['EXECTYPE'] == 'Insert'].reset_index().set_index('PUBLIC_ORDER_ID_').copy()
        xc = x[x['EXECTYPE'] == 'Cancel'].reset_index().set_index('PUBLIC_ORDER_ID_').copy()
        assert (xi.index == xc.index).all()
        assert xc[xc.index.duplicated()].empty

        xi['START'] = xi['TRANSACTTIME']
        xc['END'] = xc['TRANSACTTIME']

        lob = pd.concat([xi[['PUBLIC_ORDER_ID', 'SIDE', 'PRICE', 'VISIBLEQTY', 'START']],
                         xc.reindex(xi.index)[['PUBLIC_ORDER_ID', 'SIDE', 'END']]], axis=1)
        assert (np.all(lob.iloc[:, 0] == lob.iloc[:, 5]))
        assert (np.all(lob.iloc[:, 1] == lob.iloc[:, 6]))
        lob = lob.iloc[:, [0, 1, 2, 3, 4, 7]].sort_values('START')

        # Pre-process file for numpy handling.
        lobv = lob.copy()
        lobv['SIDE'] = lobv['SIDE'].replace({'Buy': 0, 'Sell': 1})
        _lobvv = lobv.values
        df = np.zeros(len(_lobvv), dtype={'names': ('SIDE', 'PRICE', 'VISIBLEQTY', 'START', 'END', 'PUBLIC_ORDER_ID'),
                                          'formats': ('u1', 'f8', 'u8', 'f8', 'f8', 'U12')})
        df['SIDE'] = _lobvv[:, 1]
        df['PRICE'] = _lobvv[:, 2]
        df['VISIBLEQTY'] = _lobvv[:, 3]
        df['START'] = [self._get_duration(i) for i in _lobvv[:, 4]]
        df['END'] = [self._get_duration(i) for i in _lobvv[:, 5]]
        df['PUBLIC_ORDER_ID'] = _lobvv[:, 0]
        return lob, df

    def get_l1(self, t: Union[np.datetime64, list, np.ndarray], post_clearing=True):
        if type(t) == np.datetime64:
            _l1vb = self._get_l1v_by_side([t], BUY, post_clearing)
            _l1va = self._get_l1v_by_side([t], SELL, post_clearing)
            df = pd.Series({'buy_depth': _l1vb['depth'], 'total_bid_vol': _l1vb['total_vol'],
                            'best_bid_vol': _l1vb['best_vol'], 'best_bid_prc': _l1vb['best_prc'],
                            'best_ask_prc': _l1va['best_prc'], 'best_ask_vol': _l1va['best_vol'],
                            'total_ask_vol': _l1va['total_vol'], 'sell_depth': _l1va['depth']})
            df.name = 'TRANSACTTIME'
            return df

        if type(t) == list or type(t) == np.ndarray:
            _l1vb = self._get_l1v_by_side(t, BUY, post_clearing)
            _l1va = self._get_l1v_by_side(t, SELL, post_clearing)
            df = pd.concat([pd.DataFrame(_l1vb), pd.DataFrame(_l1va)], axis=1)
            df.index = t
            df.columns = ['best_bid_prc', 'best_bid_vol', 'total_bid_vol', 'buy_depth', 'best_ask_prc', 'best_ask_vol',
                          'total_ask_vol', 'sell_depth']
            df.index.name = 'TRANSACTTIME'
            return df.astype({'buy_depth': int, 'total_bid_vol': int, 'best_bid_vol': int, 'best_bid_prc': float,
                              'best_ask_prc': float, 'best_ask_vol': int, 'total_ask_vol': int, 'sell_depth': int})

        logging.error('Unknown type {0}'.format(type(t)))
        raise TypeError

    def get_l2(self, t, post_clearing=True, axis=1):
        cols = ['PRICE', 'VISIBLEQTY', 'NUM']
        df = pd.concat({_decode_side(side): pd.DataFrame(self._get_l2v_by_side(t, side, post_clearing)[cols],
                                                         columns=cols) for side in [BUY, SELL]}, axis=axis)
        df.index.name = 'LEVEL'
        df.name = t
        return df

    def get_l3(self, t, axis=1, post_clearing=True):
        cols = ['PRICE', 'VISIBLEQTY', 'PUBLIC_ORDER_ID']
        df = pd.concat(
            {_decode_side(side): pd.DataFrame(self._get_l3v_by_side(t, side, post_clearing)[cols], columns=cols)
             for side in [BUY, SELL]}, axis=axis)
        df.index.name = 'LEVEL'
        df.name = t
        return df

    def _get_l1v_by_side(self, t, side, post_clearing):
        j = [self._filter_lobvv(_t, side, post_clearing) for _t in t]
        lj = len(j)
        df = np.zeros(max([1, lj]), dtype={'names': ('best_prc', 'best_vol', 'total_vol', 'depth'),
                                           'formats': ('f8', 'u8', 'u8', 'u8')})
        if lj > 0:
            bb = [np.nan if np.all(np.isnan(_J['PRICE'])) else
                  (1 - side) * np.nanmax(_J['PRICE']) + side * np.nanmin(_J['PRICE']) for _J in j]
            df['best_prc'] = bb
            df['best_vol'] = [j[i][j[i]['PRICE'] == bb[i]]['VISIBLEQTY'].sum() for i in np.arange(lj)]
            df['total_vol'] = [np.sum(_J['VISIBLEQTY']) for _J in j]
            df['depth'] = [len(_J) if np.sum(_J['VISIBLEQTY']) > 0 else 0 for _J in j]
        else:
            df['best_prc'] = None
            df['best_vol'] = 0
            df['total_vol'] = 0
            df['depth'] = 0

        return df

    def _get_l2v_by_side(self, t, side, post_clearing):
        _J = self._get_l3v_by_side(t, side, post_clearing)
        a, b = np.unique(_J['PRICE'], return_counts=True)
        ii = np.argsort(a)[::-1] if side == BUY else np.argsort(a)
        df = np.zeros(len(a), dtype={'names': ('PRICE', 'VISIBLEQTY', 'NUM'), 'formats': ('f8', 'u8', 'u8')})
        df['PRICE'] = a[ii]
        df['VISIBLEQTY'] = [np.sum(_J[_J['PRICE'] == i]['VISIBLEQTY']) for i in a[ii]]
        df['NUM'] = b[ii]
        return df

    def _get_l3v_by_side(self, t, side, post_clearing):
        _J = self._filter_lobvv(t, side, post_clearing)
        return _J[_J['PRICE'].argsort()[::-1]] if side == BUY else _J[_J['PRICE'].argsort()]

    def _filter_lobvv(self, t, side, post_clearing):
        t_ = self._get_duration(t)
        t_end_ = t_ + float(post_clearing) * 1e-9
        df = self.lobvv[(self.lobvv['START'] <= t_) & (self.lobvv['END'] >= t_end_) & (self.lobvv['SIDE'] == side)]

        _ids = np.unique(df['PUBLIC_ORDER_ID'])
        assert len(_ids) == len(df)
        _df = np.zeros(max([1, len(_ids)]), dtype={'names': ('SIDE', 'PRICE', 'VISIBLEQTY', 'PUBLIC_ORDER_ID'),
                                                   'formats': ('u1', 'f8', 'u8', 'U12')})
        if len(_ids) > 0:
            _df['SIDE'] = side
            _df['VISIBLEQTY'] = np.array([np.sum(df[df['PUBLIC_ORDER_ID'] == i]['VISIBLEQTY']) for i in _ids])
            _df['PRICE'] = np.array([df[df['PUBLIC_ORDER_ID'] == i]['PRICE'][0] for i in _ids])
            _df['PUBLIC_ORDER_ID'] = _ids
        else:
            _df['SIDE'] = side
            _df['PRICE'] = None
            _df['VISIBLEQTY'] = 0
            _df['PUBLIC_ORDER_ID'] = None

        return _df

    def _get_duration(self, t):
        t_diff = t - self.t0
        return t_diff.days * 24 * 3600 + t_diff.seconds + t_diff.microseconds * 1e-6 + t_diff.nanoseconds * 1e-9



In [180]:
lit_orders = load_lit_orders(order_file, prefix=None, suffix=None)
lit_orders = lit_orders.reset_index()
lit_orders = lit_orders[lit_orders['EXECTYPE'] != 'Amend']

insert = lit_orders[lit_orders['EXECTYPE'] == 'Insert'].copy()
cancel = lit_orders[lit_orders['EXECTYPE'] == 'Cancel'].copy()
insert.rename(columns={'TRANSACTTIME': 'START'}, inplace=True)
cancel.rename(columns={'TRANSACTTIME': 'END'}, inplace=True)

duration = pd.merge(insert, cancel[['PUBLIC_ORDER_ID', 'END']], how='inner', on='PUBLIC_ORDER_ID')
duration = duration[['PUBLIC_ORDER_ID', 'SIDE', 'PRICE', 'VISIBLEQTY', 'START', 'END']]

In [208]:
duration.to_pickle('../data_analysis/order_duration.pkl')