In [18]:
import pandas as pd
import os
import numpy as np

from sklearn import ensemble
from multiprocessing import Pool
import time
import csv

import warnings
warnings.filterwarnings("ignore")


In [19]:
future2stock = {'JBF': 3443}
future_path = r'processedData_2023\futures'
stock_path = r'processedData_2023\stocks'

future_files = os.listdir(future_path)
stock_files = os.listdir(stock_path)

# Get all the future tickers
future_data = {}
stock_data = {}
for future_ticker, stock_ticker in future2stock.items():
    future_data[future_ticker] = pd.DataFrame()
    stock_data[stock_ticker] = pd.DataFrame()
    for file in future_files:
        if file.startswith(future_ticker):
            tmp = pd.read_csv(os.path.join(future_path, file), compression='gzip')    
            tmp = tmp[tmp['askPrice1'] > 0]
            tmp = tmp[tmp['bidPrice1'] > 0]
            tmp = tmp[tmp['askPrice1'] > tmp['bidPrice1']]
            future_data[future_ticker] = pd.concat([future_data[future_ticker], tmp])
            
    for file in stock_files:
        if file.startswith(str(stock_ticker)):
            tmp = pd.read_csv(os.path.join(stock_path, file), compression='gzip')
            tmp = tmp[tmp['SP1'] > 0]
            tmp = tmp[tmp['BP1'] > 0]
            tmp = tmp[tmp['SP1'] > tmp['BP1']]
            stock_data[stock_ticker] = pd.concat([stock_data[stock_ticker], tmp])

In [20]:
# Select common dates and indexing
for future_ticker, futureData in future_data.items():
    stockData = stock_data[future2stock[future_ticker]]
    stockData_dates = np.unique(stockData.date)
    stoD=pd.to_datetime(stockData_dates, format="%Y-%m-%d")
    qqqq=stoD.year*10000+stoD.month*100+stoD.day
    
    indexData_dates = np.unique(futureData.date)
    indD=pd.to_datetime(indexData_dates, format="%Y-%m-%d")
    pppp=indD.year*10000+indD.month*100+indD.day
    
    commonDays=pd.to_datetime(pppp.intersection(qqqq),format="%Y%m%d")
    
    d_futures=pd.to_datetime(futureData.date,format="%Y-%m-%d")
    futureData.date=d_futures
    futureData=futureData[futureData.date.isin(commonDays)]

    d_stock=pd.to_datetime(stockData.date,format="%Y-%m-%d")
    stockData.date=d_stock
    stockData=stockData[stockData.date.isin(commonDays)]

    stockData_DateTime = pd.to_datetime(stockData.date.astype(str) + ' ' + stockData.time.astype(str), format="%Y-%m-%d %H%M%S%f")
    futuresData_DateTime = pd.to_datetime(futureData.date.astype(str) + ' ' + futureData.time.astype(str), format="%Y-%m-%d %H%M%S%f")

    stockData.index = stockData_DateTime
    stockData = stockData[~stockData.index.duplicated(keep='last')]

    futureData.index = futuresData_DateTime
    futureData = futureData[~futureData.index.duplicated(keep='last')]
    
    stockData = stockData.sort_index()
    futureData = futureData.sort_index()
    
    stock_data[future2stock[future_ticker]] = stockData
    future_data[future_ticker] = futureData
    
# Downsampling
freq = '10s'
for future_ticker, futureData in future_data.items():
    stockData = stock_data[future2stock[future_ticker]]
    stockData = stockData.groupby(stockData.index.date).apply(lambda x: x.resample(freq, closed='left', label='right').last()).reset_index(level=0, drop=True)
    futureData = futureData.groupby(futureData.index.date).apply(lambda x: x.resample(freq, closed='left', label='right').last()).reset_index(level=0, drop=True)
    stockData.fillna(method='ffill', inplace=True)
    futureData.fillna(method='ffill', inplace=True)
    # get the common index
    comon_index = stockData.index.intersection(futureData.index)
    stock_data[future2stock[future_ticker]] = stockData.loc[comon_index]
    future_data[future_ticker] = futureData.loc[comon_index]

In [21]:
# Order Slope
def cal_slope(df):
    # Consider future and stock difference
    if 'bidSize1' in df.columns:
        bidSizes = [f'bidSize{i}' for i in range(1, 6)]
        bidPrices = [f'bidPrice{i}' for i in range(1, 6)]
        askSizes = [f'askSize{i}' for i in range(1, 6)]
        askPrices = [f'askPrice{i}' for i in range(1, 6)]
    else:
        bidSizes = [f'BV{i}' for i in range(1, 6)]
        bidPrices = [f'BP{i}' for i in range(1, 6)]
        askSizes = [f'SV{i}' for i in range(1, 6)]
        askPrices = [f'SP{i}' for i in range(1, 6)]

    df_bid = df[bidSizes + bidPrices].copy()
    df_ask = df[askSizes + askPrices].copy()
    df_bid.loc[:, bidPrices] = df_bid[bidPrices] / df[bidPrices[0]].values.reshape(-1, 1)
    df_ask.loc[:, askPrices] = df_ask[askPrices] / df[askPrices[0]].values.reshape(-1, 1)
    
    bid_data = df_bid.values
    ask_data = df_ask.values

    cum_bid_sizes = np.cumsum(bid_data[:, :5], axis=1) / bid_data[:, :5].sum(axis=1, keepdims=True)
    cum_ask_sizes = np.cumsum(ask_data[:, :5], axis=1) / ask_data[:, :5].sum(axis=1, keepdims=True)

    bid_price = bid_data[:, 5:]
    ask_price = ask_data[:, 5:]

    X_bid = cum_bid_sizes - cum_bid_sizes.mean(axis=1, keepdims=True)
    Y_bid = bid_price - bid_price.mean(axis=1, keepdims=True)
    slope_b = (X_bid * Y_bid).sum(axis=1) / ((X_bid ** 2).sum(axis=1) + 1e-10)

    X_ask = cum_ask_sizes - cum_ask_sizes.mean(axis=1, keepdims=True)
    Y_ask = ask_price - ask_price.mean(axis=1, keepdims=True)
    slope_a = (X_ask * Y_ask).sum(axis=1) / ((X_ask ** 2).sum(axis=1) + 1e-10)

    return -slope_b, slope_a

In [38]:
# Add features and labels' names to list
df_all = {}
basicCols = ['date', 'time', 'sAskPrice1','sBidPrice1','sMidQ', 'fAskPrice1','fBidPrice1', 'fMidQ', 'spreadRatio']
labelCols = []
featureCols = []

for i in range(1, 11):
    labelCols.extend(['Y_M_{}'.format(str(i))])
    
    featureCols.extend(['fLaggingRtn_{}'.format(str(i))])
    featureCols.extend(['spreadRatio_{}'.format(str(i))])
    featureCols.extend(['volumeImbalanceRatio_{}'.format(str(i))])
    featureCols.extend(['slope_b_{}'.format(str(i))])
    featureCols.extend(['slope_a_{}'.format(str(i))])
    featureCols.extend(['slope_ab_{}'.format(str(i))])
    featureCols.extend(['sLaggingRtn_{}'.format(str(i))])
    featureCols.extend(['stockSpreadRatio_{}'.format(str(i))])
    featureCols.extend(['stockVolumeImbalanceRatio_{}'.format(str(i))])
    featureCols.extend(['stockSlope_b_{}'.format(str(i))])
    featureCols.extend(['stockSlope_a_{}'.format(str(i))])
    featureCols.extend(['stockSlope_ab_{}'.format(str(i))])
    
    for j in range(1, 6):
        featureCols.extend(['fAskSize{}_{}'.format(str(j), str(i))])
        featureCols.extend(['fBidSize{}_{}'.format(str(j), str(i))])
        featureCols.extend(['sAskSize{}_{}'.format(str(j), str(i))])
        featureCols.extend(['sBidSize{}_{}'.format(str(j), str(i))])

# Generate features and labels
for future_ticker, futureData in future_data.items():
    stockData = stock_data[future2stock[future_ticker]]
    unique_dates = np.unique(stockData['date'])
    dfs = []
    for date in unique_dates:
        stockData_date = stockData[stockData['date'] == date]
        futureData_date = futureData[futureData['date'] == date]

        # Continue to next iteration if stockData_date or futureData_date is empty
        if stockData_date.empty or futureData_date.empty:
            continue

        df = pd.DataFrame(index=stockData_date.index, columns=basicCols+labelCols+featureCols)
        df['date'] = stockData_date['date']
        df['time'] = stockData_date['time']   
             
        # Normalize the size
        fAskSizeMax = futureData_date[['askSize1', 'askSize2', 'askSize3', 'askSize4', 'askSize5']].max(axis=1)
        fBidSizeMax = futureData_date[['bidSize1', 'bidSize2', 'bidSize3', 'bidSize4', 'bidSize5']].max(axis=1)
        sAskSizeMax = stockData_date[['SV1', 'SV2', 'SV3', 'SV4', 'SV5']].max(axis=1)
        sBidSizeMax = stockData_date[['BV1', 'BV2', 'BV3', 'BV4', 'BV5']].max(axis=1)
        
        for i in range(1, 6):
            df['fAskPrice{}'.format(str(i))] = futureData_date['askPrice{}'.format(str(i))]
            df['fBidPrice{}'.format(str(i))] = futureData_date['bidPrice{}'.format(str(i))]
            df['fAskSize{}'.format(str(i))] = futureData_date['askSize{}'.format(str(i))] / fAskSizeMax
            df['fBidSize{}'.format(str(i))] = futureData_date['bidSize{}'.format(str(i))] / fBidSizeMax
            
            df['sAskPrice{}'.format(str(i))] = stockData_date['SP{}'.format(str(i))]
            df['sBidPrice{}'.format(str(i))] = stockData_date['BP{}'.format(str(i))]
            df['sAskSize{}'.format(str(i))] = stockData_date['SV{}'.format(str(i))] / sAskSizeMax
            df['sBidSize{}'.format(str(i))] = stockData_date['BV{}'.format(str(i))] / sBidSizeMax
        
        df['fMidQ'] = (df['fAskPrice1'] + df['fBidPrice1']) / 2
        df['slope_b'], df['slope_a'] = cal_slope(futureData_date)
        df['slope_ab'] = df['slope_a'] - df['slope_b']
        
        # Order Imbalance Ratio
        ask = np.array([df['fAskPrice{}'.format(str(i))] * df['fAskSize{}'.format(str(i))] * (1 - (i - 1) / 5) for i in range(1, 6)]).sum(axis=0)
        bid = np.array([df['fBidPrice{}'.format(str(i))] * df['fBidSize{}'.format(str(i))] * (1 - (i - 1) / 5) for i in range(1, 6)]).sum(axis=0)
        df['spreadRatio'] = (ask - bid) / (ask + bid)

        delta_size_bid = np.where(df['fBidPrice1'] < df['fBidPrice1'].shift(1), 0, np.where(df['fBidPrice1'] == df['fBidPrice1'].shift(1), df['fBidSize1'] - df['fBidSize1'].shift(1), df['fBidSize1']))
        delta_size_ask = np.where(df['fAskPrice1'].shift(1), 0, np.where(df['fAskPrice1'] == df['fAskPrice1'].shift(1), df['fAskSize1'] - df['fAskSize1'].shift(1), df['fAskSize1']))
        df['fOrderImbalance'] = (delta_size_bid - delta_size_ask) / (delta_size_bid + delta_size_ask)

        for i in range(1, 11):
            df['fLaggingRtn_{}'.format(str(i))] = np.log(df['fMidQ']) - np.log(df['fMidQ'].shift(i))
            df['spreadRatio_{}'.format(str(i))] = df['spreadRatio'].shift(i)
            df['volumeImbalanceRatio_{}'.format(str(i))] = df['fOrderImbalance'].shift(i)
            df['slope_b_{}'.format(str(i))] = df['slope_b'].shift(i)
            df['slope_a_{}'.format(str(i))] = df['slope_a'].shift(i)
            df['slope_ab_{}'.format(str(i))] = df['slope_ab'].shift(i)
            
            for j in range(1, 6):
                df['fAskSize{}_{}'.format(str(j), str(i))] = df['fAskSize{}'.format(str(j))].shift(i)
                df['fBidSize{}_{}'.format(str(j), str(i))] = df['fBidSize{}'.format(str(j))].shift(i)
                df['sAskSize{}_{}'.format(str(j), str(i))] = df['sAskSize{}'.format(str(j))].shift(i)
                df['sBidSize{}_{}'.format(str(j), str(i))] = df['sBidSize{}'.format(str(j))].shift(i)

        # Add stock data
        df['sMidQ'] = (stockData_date['SP1'] + stockData_date['BP1'])/2
        df['sAskPrice1'] = stockData_date['SP1']
        df['sBidPrice1'] = stockData_date['BP1']
        df['sMidQ'] = (stockData_date['SP1'] + stockData_date['BP1'])/2
        df['stockSlope_b'], df['stockSlope_a'] = cal_slope(stockData_date)
        df['stockSlope_ab'] = df['stockSlope_a'] - df['stockSlope_b']
        
        ask = np.array([df['sAskPrice{}'.format(str(i))] * df['sAskSize{}'.format(str(i))] * (1 - (i - 1) / 5) for i in range(1, 6)]).sum(axis=0)
        bid = np.array([df['sBidPrice{}'.format(str(i))] * df['sBidSize{}'.format(str(i))] * (1 - (i - 1) / 5) for i in range(1, 6)]).sum(axis=0)
        df['stockSpreadRatio'] = (ask - bid) / (ask + bid)

        delta_size_bid = np.where(df['sBidPrice1'] < df['sBidPrice1'].shift(1), 0, np.where(df['sBidPrice1'] == df['sBidPrice1'].shift(1), df['sBidSize1'] - df['sBidSize1'].shift(1), df['sBidSize1']))
        delta_size_ask = np.where(df['sAskPrice1'] > df['sAskPrice1'].shift(1), 0, np.where(df['sAskPrice1'] == df['sAskPrice1'].shift(1), df['sAskSize1'] - df['sAskSize1'].shift(1), df['sAskSize1']))
        df['stockOrderImbalance'] = (delta_size_bid - delta_size_ask) / (delta_size_bid + delta_size_ask)

        for i in range(1, 11):
            df['Y_M_{}'.format(str(i))] = np.log(df['sMidQ'].shift(-i)) - np.log(df['sMidQ'])
            
            df['sLaggingRtn_{}'.format(str(i))] = np.log(df['sMidQ']) - np.log(df['sMidQ'].shift(i))
            df['stockSpreadRatio_{}'.format(str(i))] = df['stockSpreadRatio'].shift(i)
            df['stockVolumeImbalanceRatio_{}'.format(str(i))] = df['stockOrderImbalance'].shift(i)
            df['stockSlope_b_{}'.format(str(i))] = df['stockSlope_b'].shift(i)
            df['stockSlope_a_{}'.format(str(i))] = df['stockSlope_a'].shift(i)
            df['stockSlope_ab_{}'.format(str(i))] = df['stockSlope_ab'].shift(i)
            
            for j in range(1, 6):
                df['sAskSize{}_{}'.format(str(j), str(i))] = df['sAskSize{}'.format(str(j))].shift(i)
                df['sBidSize{}_{}'.format(str(j), str(i))] = df['sBidSize{}'.format(str(j))].shift(i)
        dfs.append(df)
    # convert inf to nan to 0
    df_all[future_ticker] = pd.concat(dfs, ignore_index=True).replace([np.inf, -np.inf], np.nan).fillna(0)

In [39]:
# Model Construction and Forecasting
def modelConstructionAndForecasting(numOfPastDays:int, numOfForwardDays:int, data:pd.DataFrame, numOfProcesses:int, featureColumns:list, labelColumns:list):
    date=data.date
    date_index=np.unique(date)
    date_num=date_index.size
    p=numOfPastDays
    
    #Output data
    columns1 = ['date', 'label', 'isR2', 'oosR2']
    outputData = pd.DataFrame(columns=columns1)
    
    for i in range(date_num-p):
        if i >= numOfForwardDays:
            break

        #Prepare training and testing data for the current testing day
        trainingDates=[date_index[i]]
        for j in range(i+1,i+p):
            trainingDates.append(date_index[j])        

        dataPd=data[data.date.isin(trainingDates)]
        data1d=data[data.date==date_index[i+p]]

        #Building models with gbrt models for each label
        gbrtModels = {}
        GBRTModels = {}
        for j in range(1, 11):
            gbrtModels['Y_M_{}'.format(str(j))] = ensemble.GradientBoostingRegressor()
        
        #Use a dictionry to hold training label data
        y_training = {}
        for j in range(1, 11):
            y_training['Y_M_{}'.format(str(j))] = dataPd['Y_M_{}'.format(str(j))]
        
        #Hold feature data for training
        x_training = pd.DataFrame()
        for j in range(len(featureColumns)):
            x_training[featureColumns[j]] = dataPd[featureColumns[j]]

        #Use a dictionry to hold testing label data
        y_testing = {}
        for j in range(1, 11):
            y_testing['Y_M_{}'.format(str(j))] = data1d['Y_M_{}'.format(str(j))]

        #Hold feature data for testing
        x_testing = pd.DataFrame()
        for j in range(len(featureColumns)):
            x_testing[featureColumns[j]] = data1d[featureColumns[j]]

        #Use a dictionry to hold predictions
        y_prediction = {}

        #timing the multi-Process prcessing
        t = time.time()
        
        #Construction of models
        ##Establish a number of Processs for the calculation tasks
        pool = Pool(processes=numOfProcesses)

        for j in range(1, 11):
            GBRTModels['Y_M_{}'.format(str(j))] = pool.apply_async(gbrtModels['Y_M_{}'.format(str(j))].fit, args=(x_training, y_training['Y_M_{}'.format(str(j))]))

        pool.close()
        pool.join()
        
        print(">>>Done with " + str(numOfProcesses) + " processes, and time taken is " + str(time.time() - t)) 

        #Use the models to predict
        for j in range(1, 11):
            y_prediction['Y_M_{}'.format(str(j))] = GBRTModels['Y_M_{}'.format(str(j))].get().predict(x_testing)
        
        #Now, we store modeling results
        for j in range(1, 11):
            oneLineModelResult = []
            oneLineModelResult.extend([str(date_index[i+p])])
            oneLineModelResult.extend(['Y_M_{}'.format(str(j))])
            oneLineModelResult.extend([GBRTModels['Y_M_{}'.format(str(j))].get().score(x_training, y_training['Y_M_{}'.format(str(j))])])
            oneLineModelResult.extend([GBRTModels['Y_M_{}'.format(str(j))].get().score(x_testing, y_testing['Y_M_{}'.format(str(j))])])            
            outputData = pd.concat([outputData, pd.DataFrame(data = [oneLineModelResult], columns = columns1)])

    return outputData

numOfPastDays = 10 
numOfForwardDays = 1
numOfProcesses = 8

for future_ticker, df in df_all.items():
    start_time = time.time()
    outputData = modelConstructionAndForecasting(numOfPastDays, numOfForwardDays, df, numOfProcesses, featureCols, labelCols)      
    end_time = time.time()
    print('Time taken: ', end_time - start_time, 'seconds')
    outputData.to_csv('./modelResultDataFile_{}.csv'.format(future_ticker), index=False)

>>>Done with 8 processes, and time taken is 591.2123265266418
Time taken:  593.971914768219 seconds
