In [1]:
from utils import *

basicConfig(filename = '_Preprocessing.log', level = INFO)


def get_signed_volume(df):
    tmp = copy(df)

    tmp['Bid Price'] = tmp['Bid Price'].ffill()
    tmp['Ask Price'] = tmp['Ask Price'].ffill()

    trades = tmp[tmp['Type'].isin(['Trade'])]
    trades['Mid Price']     = (trades['Bid Price'] + trades['Ask Price']) / 2
    trades['buy_sell_ind']  = np.where(trades['Price'] > trades['Mid Price'],1,-1)
    trades['signed Volume'] = trades['Volume'] * trades['buy_sell_ind']
    
    return trades.set_index('t')['signed Volume']


def extract(df):
    """Extract only the necessary information"""

    trade  = df[df['Type'].isin(['Trade'])].set_index('t')[['Price','Volume']]
    trade['Signed Volume'] = get_signed_volume(df)
    assert np.array_equal(trade['Volume'], np.abs(trade['Signed Volume']),
                          equal_nan=True)
    quote  = df[~df['Type'].isin(['Trade'])].set_index('t')[['Bid Price','Ask Price']]

    return {'trade': trade, 'quote': quote}

## Data Converting

Since `csv` file size exceeds the memory of the sever, we have to read chunk by chunks.

In [None]:
%%time
now_time  = 1999.5
CHUNKSIZE = 500000

PATH_TO_CSV_ARCHIVES = 'data/E-mini_TickDataMarket'

for now_time in np.arange(2020.25, 2022.75, .25):
    print(str(now_time), end=':')
    
    t1 = time()
    extrs = {}
    df0 = pd.DataFrame()
    for df in pd.read_csv(
        # 'data/E-mini_TickDataMarket/ES%s.csv'%get_futures_code(now_time),
        #                   chunksize=CHUNKSIZE):
        # results saving
        PATH_TO_CSV_ARCHIVES + '/ES%s.csv'%get_futures_code(now_time),
                          chunksize=CHUNKSIZE):
        df0 = pd.concat([df0,df])
        dates = pd.to_datetime(pd.unique(df['Date'])).sort_values()
        if len(dates) > 1:
            dates = pd.to_datetime(pd.unique(df0['Date'])).sort_values()
            if len(df) < CHUNKSIZE:
                df_now = df0
            else:
                df_now = df0[df0['Date'].isin(dates[:-1].strftime("%m/%d/%Y"))]
            df_now['t'] = pd.to_timedelta(df_now['Time']).dt.total_seconds()
            extr        = dict([(k,extract(v)) for k,v in df_now.groupby('Date')])
            extrs.update(extr)
            df0 = copy(df[df['Date']==dates[-1].strftime("%m/%d/%Y")])
            
    # results saving
    joblib.dump(extrs, 'save/E-mini/ES%s.dat'%get_futures_code(now_time))
    
    t2 = time()
    print('%.3f sec'%(t2 - t1))
    info(str(now_time) + ': %.3f sec'%(t2 - t1))

2022.25:2052.533 sec
2022.5:698.347 sec
CPU times: total: 43min 38s
Wall time: 45min 51s


## Calculations for days

In [2]:
from utils import *
from main  import *
from other import *

basicConfig(filename = 'Empirics.log', level = INFO)

In [3]:
START =  8.5*3600 + 15 * 60
END   = 15.0*3600 - 15 * 60
def filter_time(t, unit='s',
                start=START, end=END):
    """Filter regular trading hour (8:30 a.m. – 3:00 p.m.)
    
    For discarding open and close large trading, we remove first and last 15 
    minutes.
    
    References
    ----------
    .. [1] https://www.cmegroup.com/education/files/eq-trading-hours.pdf
    """
    if unit == 's':
        return t[(t >= start) & (t <= end)]
    if unit == 'm':
        return t[(t >= start/60) & (t <= end/60)]
    else:
        raise NotImplementedError("unit %s is not implemented yet."%unit)


def get_min_value(x, interval=1, start=START, end=END):
    min_ranges = pd.Series(index=np.arange(int(start/60), 
                                           int(end  /60) + interval,
                                           interval),
                           name=x.index.name)
    last = x.groupby(level=0).last()
    out  = pd.merge(min_ranges, last, how='left', 
                    left_index=True, right_index=True)
    out  = out[x.name]
    
    return out.ffill().bfill()


def get_redist(data, min_offset=.001):
    """For the same time points, randomly distribute them."""
    if isinstance(data, pd.DataFrame):
        tmp = data
    else:
        tmp = pd.DataFrame({'t':data})
        
    flag_dup = (tmp.groupby('t')['t'].count() > 1)    
    if flag_dup.sum() == 0:
        tmp['t_redist'] = tmp['t']
        if isinstance(data, pd.DataFrame):
            return tmp
        return tmp['t_redist']
    if len(flag_dup) == 1:
        tmp['t_redist'] = tmp['t']\
                            + np.sort(np.random.uniform(-min_offset * len(tmp),
                                                         min_offset * len(tmp),
                                                        size = len(tmp)))
        if isinstance(data, pd.DataFrame):
            return tmp
        return tmp['t_redist']
    
    flag_dup.name = '1_dup'
    flag_dup = flag_dup.sort_index()
    flag_dup = flag_dup.reset_index()
    flag_dup['st'] = (flag_dup['t'] + flag_dup['t'].shift(1)) / 2
    flag_dup['en'] = (flag_dup['t'] + flag_dup['t'].shift(-1)) / 2
    offset = min(np.nan_to_num(
        flag_dup['st'].iloc[:min(5,len(data))].diff().mean(), nan=1),
                 min_offset)
    flag_dup['st'] = flag_dup['st'].fillna(flag_dup['st'].iloc[1] - offset)
    offset = min(np.nan_to_num(
        flag_dup['en'].iloc[-min(5,len(data)):].diff().mean(), nan=1),
                 min_offset)
    flag_dup['en'] = flag_dup['en'].fillna(flag_dup['en'].iloc[-2] + offset)
    tmp = pd.merge(tmp, flag_dup.reset_index(), how='left', on='t')
    tmp['t_redist'] = (np.random.uniform(size=len(tmp)) * (tmp['en'] - tmp['st'])\
                       + tmp['st'])*tmp['1_dup'] + tmp['t'] * (~tmp['1_dup'])
    
    if isinstance(data, pd.DataFrame):
        return tmp.sort_values('t_redist')
    
    return np.sort(tmp['t_redist'])


def get_VPIN(trade):
    n = 50
    V = trade['Volume'].sum() / n
    if V <= 1:
        return np.nan

    trade['V_i'] = trade['Volume'].cumsum() // V
    VPIN = trade.groupby('V_i')['Signed Volume'].sum().abs().sum() \
            / trade['Volume'].sum()
    
    return VPIN


def estimate_Hawkes(points):
    t1 = time()
    ress = {}
    for estimator in estimators:
        ress[estimator.__name__] = estimator(points)
    if VERBOSE:
        print("\t %.3f sec"%(time() - t1))
        
    return ress


def estimate(date, seed=1):
    np.random.seed(seed)
    
    dat = data[date]
    
    out = {}
    trade = dat['trade']
    quote = dat['quote']
    quote['Bid Price'] = quote['Bid Price'].ffill().bfill()
    quote['Ask Price'] = quote['Ask Price'].ffill().bfill()
    quote['Mid Price'] = (quote['Bid Price'] + quote['Ask Price']) / 2
    
    # Summary stats
    out['price(init)'] = trade['Price'][~np.isnan(trade['Price'])].iloc[0]
    out['price(high)'] = np.nanmax(trade['Price'])
    out['price(low)' ] = np.nanmin(trade['Price'])
    out['price(last)'] = trade['Price'][~np.isnan(trade['Price'])].iloc[-1]
    out['price(mean)'] = np.nanmean(trade['Price'])
    out['price(std)' ] = np.nanstd(trade['Price'])
    
    # Liquidity measure
    out['volume'] = np.nansum(trade['Volume'])
    out['dollar_volume'] = np.nansum(trade['Price'] * trade['Volume'])
    out['VPIN'] = get_VPIN(trade[(START <= trade.index) & (trade.index <= END)])
    
    # RV caluclation
    trade['min'] = (trade.index // 60).astype(int)
    price_1min = get_min_value(trade.set_index('min')['Price'], interval=1)
    price_5min = get_min_value(trade.set_index('min')['Price'], interval=5)
    out['RV(1min)'] = RVEstimator(np.log(price_1min))
    out['RV(5min)'] = RVEstimator(np.log(price_5min))
    
    # Hawkes estimation
    mid_mask = (quote['Mid Price'] - quote['Mid Price'].shift(1)).fillna(1) != 0
    
    trade_points = to_0_1(filter_time(get_redist(trade.index)))
    quote_points = to_0_1(filter_time(get_redist(quote.index)))
    mid_points   = to_0_1(filter_time(get_redist(quote[mid_mask].index)))
    
    out['trade']      = estimate_Hawkes(trade_points)
    out['quote']      = estimate_Hawkes(quote_points)
    out['mid_change'] = estimate_Hawkes(mid_points)
    
    del price_1min, price_5min, trade, quote, mid_mask
    del trade_points, quote_points, mid_points
    gc.collect()

    return out


def log_out(i, out):
    pool_out[i] = out

In [4]:
POTENTIAL_EXCEPTIONS_FILE_PATH = "C:/YOUR_WAY_YO_FILE_WITH_EXCEPTIONS/Exceptions.txt"

# new code for functionalyty check
import traceback

## Estimation

For each day, determine futures product based on volume.
After that, estimate some statistics for that futures data.

    ESH2019        ▼ Maturity of ESH2019
 └─────────────┴─────────┐

        BUFFER ┖──┚    ESM2019  ▲ Maturity of ESM2019
        
                ▲ Choose the next date using volume within this BUFFER

In [5]:
BUFFER  = 14
MIN_NUM = 100
VERBOSE = True


now_quarter = 2020.25

prev_date   = pd.to_datetime("09/09/1999", format='%m/%d/%Y')
next_date   = pd.to_datetime("09/09/2199", format='%m/%d/%Y')


estimators = [BR_estimation, Hardiman, AVG_Hardiman_4, AVG_Hardiman_30min,
              AVG_Hardiman_5min, Exp_estimation, EM_estimation,
             ]


stop = False
while now_quarter < 2022.5 or stop:
    t1 = time()
    
    # Load data
    data   = joblib.load('save/E-mini/ES%s.dat'\
                         %get_futures_code(now_quarter))
    dates  = np.array(pd.to_datetime(list(data.keys()), format='%m/%d/%Y')\
                      .sort_values().strftime('%m/%d/%Y').to_list())
    daily_volume = pd.Series([np.nansum(data[date]['trade']['Volume'])
                              for date in dates], 
                             index=pd.to_datetime(dates))

    # Filtering
    bdates    = pd.bdate_range(prev_date, next_date).strftime('%m/%d/%Y')
    liq_dates = np.array([date for date in dates \
                          if len(data[date]['trade']) > MIN_NUM])
    flt_dates = intersect(dates, bdates, liq_dates)
    
    # Estimation
    pool_out = {}

    # modified algorithm
    try:
        print("The dates are: " + str(flt_dates))
        print("There are " + str(len(flt_dates)))


        for date in flt_dates:
            print("Current date is " + str(date))
            
            if date not in pool_out:
                # step by step version
                pool_out[date] = estimate(date)
    except KeyboardInterrupt:
        print('INTERRUPTED')
    pool_out = dict(sorted(pool_out.items()))
    
    # Choose a next date: A date choosing the next futures product.
    try:
        data2  = joblib.load('save/E-mini/ES%s.dat'\
                             %get_futures_code(now_quarter + .25))
        dates2 = np.array(pd.to_datetime(list(data2.keys()), format='%m/%d/%Y')\
                          .sort_values().strftime('%m/%d/%Y').to_list())
        daily_volume2 = pd.Series([np.nansum(data2[date]['trade']['Volume'])
                                   for date in dates2], 
                                  index=pd.to_datetime(dates2))
    
        next_date = daily_volume.index[-1]  # initialize with the last date.
        for date, vol in daily_volume[-BUFFER:].iteritems():
            if vol < daily_volume2[date]:
                next_date = date
                break
    except Exception as e:
        with open(POTENTIAL_EXCEPTIONS_FILE_PATH, "a") as f:
            f.write(str(e))
            f.write(traceback.format_exc())
        stop = True
    
    out_dates = np.array(list(pool_out.keys()))
    for date in out_dates[pd.to_datetime(out_dates) > pd.to_datetime(next_date)]:
        del pool_out[date]
    joblib.dump(pool_out, 'save/emp/%.2f.dat'%now_quarter)

    msg = str(now_quarter) + '\t' + next_date.strftime('%m/%d/%Y')\
          + '  (%5.3f s)'%(time()-t1)
    print(msg); info(msg)
    
    # Go to next step
    prev_date    = next_date + pd.to_timedelta(1, unit='d')
    next_date   += pd.to_timedelta(999, unit='d')
    now_quarter += 0.25

The dates are: ['09/15/2021' '06/18/2021' '09/10/2021' '09/08/2021' '09/03/2021'
 '07/14/2021' '02/25/2021' '03/22/2021' '06/28/2021' '07/19/2021'
 '03/03/2021' '05/12/2021' '07/27/2021' '09/16/2021' '09/02/2021'
 '07/01/2021' '03/16/2021' '06/01/2021' '07/23/2021' '03/24/2021'
 '08/27/2021' '07/05/2021' '06/10/2021' '07/15/2021' '08/19/2021'
 '03/26/2021' '05/24/2021' '08/03/2021' '04/16/2021' '05/31/2021'
 '07/02/2021' '07/16/2021' '09/09/2021' '04/26/2021' '04/08/2021'
 '04/13/2021' '07/09/2021' '05/11/2021' '06/16/2021' '04/05/2021'
 '06/15/2021' '06/22/2021' '08/06/2021' '04/19/2021' '05/05/2021'
 '06/09/2021' '05/14/2021' '08/31/2021' '07/06/2021' '07/20/2021'
 '08/05/2021' '09/14/2021' '04/06/2021' '04/01/2021' '05/18/2021'
 '05/28/2021' '04/20/2021' '07/13/2021' '04/21/2021' '06/14/2021'
 '08/17/2021' '06/07/2021' '08/11/2021' '06/24/2021' '04/29/2021'
 '03/10/2021' '05/04/2021' '08/09/2021' '06/17/2021' '06/29/2021'
 '04/23/2021' '07/30/2021' '05/07/2021' '05/21/2021' '05/20/2