<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Spectra" data-toc-modified-id="Spectra-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Spectra</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Create-Orderbook-Spectrum-for-6-instrument-for-the-first-day" data-toc-modified-id="Create-Orderbook-Spectrum-for-6-instrument-for-the-first-day-1.0.1"><span class="toc-item-num">1.0.1&nbsp;&nbsp;</span>Create Orderbook Spectrum for 6 instrument for the first day</a></span></li></ul></li></ul></li></ul></div>

# Spectra

### Create Orderbook Spectrum for 6 instrument for the first day

1. Spectrum is created for each moment of time when orderbook updated
2. Compare the forms of spectrum for different instruments
    - where are the max quantity values?(max density)
    - can you approximate it with any distribution?
3. The result for each instrument should in the following format:
    - timestamp, val1, val2, ..., val20

In [1]:
%load_ext Cython
%load_ext line_profiler

In [17]:
%%time
%%cython

cimport cython
cimport numpy as np
import numpy as np
from sortedcontainers import SortedDict
import pandas as pd
from math import ceil


cdef dict instruments_info = {'USD000000TOD': {'SCHEDULE': 174500000000, 'PRICE_STEP': 0.0025, 'TRADING_SEC': 27900},
                              'USD000UTSTOM': {'SCHEDULE': 235000000000, 'PRICE_STEP': 0.0025, 'TRADING_SEC': 49800},
                              'EUR_RUB__TOD': {'SCHEDULE': 150000000000, 'PRICE_STEP': 0.0025, 'TRADING_SEC': 18000},
                              'EUR_RUB__TOM': {'SCHEDULE': 235000000000, 'PRICE_STEP': 0.0025, 'TRADING_SEC': 49800},
                              'EURUSD000TOM': {'SCHEDULE': 235000000000, 'PRICE_STEP': 0.00001, 'TRADING_SEC': 49800},
                              'EURUSD000TOD': {'SCHEDULE': 150000000000, 'PRICE_STEP': 0.00001, 'TRADING_SEC': 18000}}


cdef is_spectrum_affected(dict bids,
                          dict asks,
                          long timestamp,
                          np.int_t action,
                          str buysell,
                          str instrument,
                          np.float_t price):
    
    if price == 0.0 or len(asks[instrument]) == 0 or len(bids[instrument]) == 0:
        return True

    cdef float best_bid = bids[instrument].keys()[-1]
    cdef float best_ask = asks[instrument].keys()[0]
    cdef float step = instruments_info[instrument]['PRICE_STEP'] * 50
                
    if buysell == 'B':
        if price > best_bid - step and price < best_bid:
            return True
        elif action == 0 and best_ask < price:
            return True
    else:
        if price > best_ask and price < best_ask + step:
            return True
        elif action == 0 and best_bid > price:
            return True

    return False


@cython.wraparound(False)
@cython.boundscheck(False)
cdef get_volumes(dict offers,
                 str instrument,
                 float best_offer,
                 float step,
                 str buysell):
    
    cdef np.ndarray volumes = np.zeros(10)
    cdef int subrange_id

    for subrange_id in range(10):
        if buysell == "B":
            prices = offers[instrument].irange(best_offer - step * 5 * (10 - subrange_id),
                                               best_offer - step * 5 * (9 - subrange_id),
                                               (False, True))
        else:
            prices = offers[instrument].irange(best_offer + step * 10 * subrange_id,
                                               best_offer + step * 10 * (subrange_id + 1),
                                               (True, False))

        for price in prices:
            volumes[subrange_id] += offers[instrument][price]    
    return volumes


# @cython.wraparound(False)
@cython.boundscheck(False)
cdef create_spectrum(dict bids, 
                     dict asks,
                     dict spectra,
                     long timestamp,
                     str instrument,
                     int action,
                     int index,
                     str foldername):

    if len(asks[instrument]) == 0 or len(bids[instrument]) == 0:
        return False

    cdef float best_bid = bids[instrument].keys()[-1]
    cdef float best_ask = asks[instrument].keys()[0]
    cdef float step = instruments_info[instrument]['PRICE_STEP']

    cdef np.ndarray bids_volumes = get_volumes(bids, instrument, best_bid, step, "B")
    cdef np.ndarray asks_volumes = get_volumes(asks, instrument, best_ask, step, "A")
        
    max_band = 10000000 # divide to 10m
#     bids_total = bids_volumes.sum()
#     asks_total = asks_volumes.sum()
    
    spectra[instrument][timestamp] = np.concatenate((bids_volumes / max_band,
                                                     asks_volumes / max_band))

    cdef str i
    if index % 250000 or index > 5000000:
        for i in spectra.keys():
            with open(foldername + '/' + i + '.txt', 'a') as out:
                for t in spectra[i]:
                    out.write(f'{action}, {t}, {best_bid}, {best_ask}, ' + ", ".join(map(str, spectra[i][t])) + "\n")
            spectra.pop(i)
            spectra[i] = SortedDict()


cdef calc_vwap(offers, 
               best_bid,
               best_ask,
               timestamp,
               seccode,
               buysell,
               volume,
               foldername):
    filled_offers = {}
    left_volume = volume
    if buysell == "B":
        offers_items = offers.items()
        
        for offer_price, offer_volume in offers_items:
            if left_volume - offer_volume <= 0:
                if offer_price not in filled_offers.keys():
                    filled_offers[offer_price] = left_volume
                else:
                    filled_offers[offer_price] += left_volume 
            else:
                if offer_price not in filled_offers.keys():
                    filled_offers[offer_price] = offer_volume
                else:
                    filled_offers[offer_price] += offer_volume

                left_volume -= offer_volume

        total_volume = sum(filled_offers.values())
        total_price = 0
        for offer_price, offer_volume in filled_offers.items():
            total_price += offer_price * offer_volume

        midpoint = (best_ask + best_bid) / 2
        normalized = (total_price / total_volume - midpoint) / 0.0025

        with open(foldername + '/' + seccode + f"_B_{volume}" + '.txt', 'a') as out:
            out.write(f'{timestamp}, {normalized}\n')
        
    else:
        offers_items = reversed(offers.items())
    
        for offer_price, offer_volume in offers_items:
            if left_volume - offer_volume <= 0:
                if offer_price not in filled_offers.keys():
                    filled_offers[offer_price] = left_volume
                else:
                    filled_offers[offer_price] += left_volume 
            else:
                if offer_price not in filled_offers.keys():
                    filled_offers[offer_price] = offer_volume
                else:
                    filled_offers[offer_price] += offer_volume

                left_volume -= offer_volume

        total_volume = sum(filled_offers.values())
        total_price = 0
        for offer_price, offer_volume in filled_offers.items():
            total_price += offer_price * offer_volume

        midpoint = (best_ask + best_bid) / 2
        normalized = (midpoint - total_price / total_volume) / 0.0025
        with open(foldername + '/' + seccode + f"_A_{volume}" + '.txt', 'a') as out:
            out.write(f'{timestamp}, {normalized}\n')        
        
        
@cython.wraparound(False)
@cython.boundscheck(False)
cdef bid_ask_spread(float best_bid,
                    float best_ask,
                    long timestamp,
                    str seccode,
                    str foldername):    
    spread = (best_ask - best_bid) / instruments_info[seccode]['PRICE_STEP']
    
    with open(foldername + '/' + seccode + '.txt', 'a') as out:
        out.write(f'{timestamp}, {ceil(spread)}\n')

        
@cython.wraparound(False)
@cython.boundscheck(False)
cdef inline delete_offer(dict offers, str seccode, float price, int volume):
    if price in offers[seccode]:
        if offers[seccode][price] - volume <= 0:
            offers[seccode].pop(price)
        else:
            offers[seccode][price] -= volume

            
@cython.wraparound(False)                        
@cython.boundscheck(False)
cdef inline add_offer(dict offers, str seccode, float price, int volume):       
    if price in offers[seccode]:
        offers[seccode][price] += volume
    else:
        offers[seccode][price] = volume
        
        
volumes = [100000, 200000, 500000, 1000000, 5000000, 10000000]        
time_periods = [1, 5, 15, 30, 60]


cdef is_aggressive(offers, seccode, buysell, price):
    if len(offers[seccode].keys()) != 0:
        if buysell == "B":
            if price > offers[seccode].keys()[0]:
                return True
        else:
            if price < offers[seccode].keys()[-1]:
                return True
    return False


cdef aggressors_offer_volumes(time_bucket_offers, seccode, buysell, foldername):
    volumes_per_period = []

    for period in time_periods:
        start = time_bucket_offers[seccode].keys()[0]
        end = start + period
        offers_in_period = time_bucket_offers[seccode].irange(start, end, (True, False))
        
        volume = 0
        for offer in offers_in_period:
            volume += time_bucket_offers[seccode][offer]
#             print(f"TBO:{time_bucket_offers[seccode]} Now Volume is : {volume}")
        volumes_per_period.append(volume / (10000000 * period))
    
#     print("\n\n")
    
    # delete old records
    start = time_bucket_offers[seccode].keys()[0] + 60
    end = time_bucket_offers[seccode].keys()[-1]
    offers_in_period = time_bucket_offers[seccode].irange(start, end, (False, True))
    for offer in offers_in_period:
        time_bucket_offers[seccode].pop(offer)
    
    # store results
    with open(foldername + '/' + seccode + f"_{buysell}" + '.txt', 'a') as out:
        out.write(','.join(map(str, volumes_per_period)) + '\n')


# TODO: test this code        
cdef liquidity_replenishment(time_bucket_offers, offers, seccode, buysell, time_seconds, foldername):
    px = instruments_info[seccode]["PRICE_STEP"]
    volumes = []
    
    for period in time_periods:
        if buysell == "B":
            best_bid = offers[seccode].keys()[-1]

            for time in time_bucket_offers[seccode].irange(time_seconds, time_seconds + period, (True, False)):
                period_volume = 0
                for price in time_bucket_offers[seccode][time].irange(best_bid - 5 * px, best_bid, (True, False)):
                    period_volume += time_bucket_offers[seccode][time][price]
                volumes.append(period_volume / (10000000 * period))
        else:
            best_ask = offers[seccode].keys()[0]

            for time in time_bucket_offers[seccode].irange(time_seconds, time_seconds + period, (True, False)):
                period_volume = 0
                for price in time_bucket_offers[seccode][time].irange(best_ask, best_ask + 5 * px, (True, False)):
                    period_volume += time_bucket_offers[seccode][time][price]
                volumes.append(period_volume / (10000000 * period))


    with open(foldername + "/" + seccode + f"_{buysell}.txt", "a") as out:
        out.write(",".join(map(str, volumes)) + '\n')
        
        
cdef add_timebucket_offer(time_seconds, time_bucket_offers, seccode, price, volume):
    if time_seconds in time_bucket_offers[seccode].keys():
        if price in time_bucket_offers[seccode][time_seconds].keys(): 
            time_bucket_offers[seccode][time_seconds][price] += volume
        else:
            time_bucket_offers[seccode][time_seconds][price] = volume
    else:
        time_bucket_offers[seccode][time_seconds] = SortedDict()
        time_bucket_offers[seccode][time_seconds][price] = volume
    
    
    

@cython.boundscheck(False)
# @cython.wraparound(False)        
cpdef row_scanner(np.ndarray timestamp,
                  np.ndarray [np.int_t, ndim=1] action,
                  np.ndarray buysell,
                  np.ndarray seccode,
                  np.ndarray [np.float_t, ndim=1] price,
                  np.ndarray [np.int_t, ndim=1] volume,
                  str foldername):

    cdef dict bids = {}
    cdef dict asks = {}
    cdef dict spectra = {}
    cdef str instrument 
    
    time_bucket_bids = {}
    time_bucket_asks = {}
    
    time_bucket2_bids = {}
    time_bucket2_asks = {}
    
    for instrument in instruments_info.keys():
        bids[instrument] = SortedDict()
        asks[instrument] = SortedDict()
        
        time_bucket_bids[instrument] = SortedDict()
        time_bucket_asks[instrument] = SortedDict()
        
        time_bucket2_bids[instrument] = SortedDict()
        time_bucket2_asks[instrument] = SortedDict()
        
        spectra[instrument] = {}
        
    cdef int i
    for i in range(len(timestamp)):    
        time_str = str(timestamp[i])[:6]
        time_seconds = int(time_str[:2]) * 3600 + int(time_str[2:4]) * 60 + int(time_str[4:6])

        if action[i] == 1:
            if buysell[i] == 'B':
                add_offer(bids, seccode[i], price[i], volume[i])
                if is_aggressive(asks, seccode[i], buysell[i], price[i]):
                    add_offer(time_bucket_asks, seccode[i], time_seconds, volume[i])
                
                add_timebucket_offer(time_seconds, time_bucket2_bids, seccode[i], price[i], volume[i])
            else:
                add_offer(asks, seccode[i], price[i], volume[i])
                if is_aggressive(bids, seccode[i], buysell[i], price[i]):
                    add_offer(time_bucket_bids, seccode[i], time_seconds, volume[i])
                
                add_timebucket_offer(time_seconds, time_bucket2_asks, seccode[i], price[i], volume[i])
        else:
            if buysell[i] == 'B':
                delete_offer(bids, seccode[i], price[i], volume[i])
                delete_offer(time_bucket_asks, seccode[i], time_seconds, volume[i])
            else:
                delete_offer(asks, seccode[i], price[i], volume[i])
                delete_offer(time_bucket_bids, seccode[i], time_seconds, volume[i])
                
        if is_spectrum_affected(bids, asks, timestamp[i], action[i], buysell[i], seccode[i], price[i]):
            create_spectrum(bids, asks, spectra, timestamp[i], seccode[i], action[i], i, foldername)
            # TODO: debug this code lines
            if len(bids[seccode[i]].keys()) != 0 and len(asks[seccode[i]].keys()) != 0:
                best_bid = bids[seccode[i]].keys()[-1]
                best_ask = asks[seccode[i]].keys()[0]
                
                for vol in volumes:
                    if buysell[i] == "B":
                        calc_vwap(asks[seccode[i]],
                                  best_bid,
                                  best_ask,
                                  timestamp[i],
                                  seccode[i],
                                  buysell[i],
                                  vol,
                                  foldername + '/vwap')
                    else:
                        calc_vwap(bids[seccode[i]],
                                  best_bid,
                                  best_ask,
                                  timestamp[i],
                                  seccode[i],
                                  buysell[i],
                                  vol,
                                  foldername + '/vwap')
                
                if len(time_bucket_bids[seccode[i]]) != 0 and len(time_bucket_asks[seccode[i]]) != 0:
                    aggressors_offer_volumes(time_bucket_bids, seccode[i], buysell[i], foldername + '/trade_volumes')
                    aggressors_offer_volumes(time_bucket_asks, seccode[i], buysell[i], foldername + '/trade_volumes')
                    

                    liquidity_replenishment(time_bucket2_bids, bids, seccode[i], buysell[i], time_seconds, foldername + '/liq_rep')
                    liquidity_replenishment(time_bucket2_bids, bids, seccode[i], buysell[i], time_seconds, foldername + '/liq_rep')

                bid_ask_spread(best_bid, best_ask, timestamp[i], seccode[i], foldername + '/spread')

CPU times: user 710 ms, sys: 19.7 ms, total: 730 ms
Wall time: 12.9 s


In [None]:
%%time

import pandas as pd
import shutil
import os
from sortedcontainers import SortedDict


instruments_info = {'USD000000TOD': {'SCHEDULE': 174500000000},
                    'USD000UTSTOM': {'SCHEDULE': 235000000000},
                    'EUR_RUB__TOD': {'SCHEDULE': 150000000000},
                    'EUR_RUB__TOM': {'SCHEDULE': 235000000000},
                    'EURUSD000TOM': {'SCHEDULE': 235000000000},
                   }

months = ["04"]  # , "04", "05"]


def make_folder(foldername):
    if os.path.exists(foldername):
        shutil.rmtree(foldername)
    os.mkdir(foldername)


for month in months:
    foldername = "../results/spectrums/2018-" + month
    make_folder(foldername)

    folder_with_data = "../data/2018-" + month
    files = os.listdir(folder_with_data)

    for file in files:
        if file.find("OrderLog2018") == 0:            
            day = file[-6:-4]
            foldername = "../results/spectrums/2018-" + month + "/" + day

            make_folder(foldername)
            make_folder(foldername + "/spread")
            make_folder(foldername + "/vwap")
            make_folder(foldername + "/trade_volumes")
            make_folder(foldername + "/liq_rep")
            
            # following 5 lines seem to take a lot of cpu time
            orderlog = pd.read_csv(folder_with_data + "/" + file, index_col='NO')

            orderlog = orderlog[orderlog.SECCODE.isin(instruments_info.keys())]
            for instrument, info in instruments_info.items():
                orderlog.drop(orderlog[(orderlog.TIME >= info['SCHEDULE']) & (orderlog.SECCODE == instrument)].index, inplace=True)


            row_scanner(orderlog.TIME.values,
                        orderlog.ACTION.values,
                        orderlog.BUYSELL.values,
                        orderlog.SECCODE.values,
                        orderlog.PRICE.values,
                        orderlog.VOLUME.values,
                        foldername)

  mask |= (ar1 == a)


In [None]:
# import line_profiler
# #Print profiling statistics using the `line_profiler` API
# profile = line_profiler.LineProfiler(run)
# profile.runcall(run)
# profile.print_stats()


# # %lprun -f cumulative_sum cumulative_sum(100)

# # from Cython.Compiler.Options import get_directive_defaults
# # directive_defaults = get_directive_defaults() 

# # directive_defaults['linetrace'] = True
# # directive_defaults['binding'] = True





# # %lprun -f run run()

# !pip install line_profiler
# %load_ext line_profiler

# def func():
#     print('hi')

# %lprun -f func func()

In [None]:
# import pandas as pd
# orderlog = pd.read_csv("../data/2018-03/OrderLog20180301.txt",
#                        index_col='NO',
#                        nrows=1000,
#                        usecols=['NO', 'SECCODE', 'BUYSELL', 'TIME', 'ACTION', 'PRICE', 'VOLUME']
#                        )

In [None]:
import matplotlib.pyplot as plt
import re

with open('../results/spectrums/2018-03/USD000UTSTOM.txt', 'r') as out:
    lines = out.readlines()

    plt.figure(figsize=(10, 5))

    splitted_list = re.split(': |, |\n |    ', lines[1002])
    values = list(map(float, splitted_list[2: len(splitted_list) - 1]))
    
    plt.bar(range(len(values)),
            values)

In [None]:
# %%timeit
# import dask.dataframe

# data = dask.dataframe.read_csv("../data/2018-03/OrderLog20180301.txt")
# # data.set_index("NO")
# # data.values()