In [1]:
import pandas as pd
import numpy as np
import os
from queue import PriorityQueue

%load_ext line_profiler

In [2]:
quotes_filename = os.path.join("play_data", "XBTUSD_quotes_191214_0434.csv")
trades_filename = os.path.join("play_data", "XBTUSD_trades_191214_0434.csv")

In [3]:
quotes_full = pd.read_csv(
    quotes_filename,
    index_col='recorded',
    parse_dates=['timestamp', 'recorded']
)

quotes_full.head()

Unnamed: 0_level_0,timestamp,bidSize,bidPrice,askPrice,askSize
recorded,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-12-14 04:34:40.731941+00:00,2019-12-14 04:34:40.024000+00:00,3840427,7251.5,7252.0,701477
2019-12-14 04:34:41.211565+00:00,2019-12-14 04:34:40.410000+00:00,3840391,7251.5,7252.0,701477
2019-12-14 04:34:42.210955+00:00,2019-12-14 04:34:41.165000+00:00,3840391,7251.5,7252.0,731477
2019-12-14 04:34:42.210955+00:00,2019-12-14 04:34:41.183000+00:00,3840187,7251.5,7252.0,731277
2019-12-14 04:34:42.210955+00:00,2019-12-14 04:34:41.431000+00:00,3840187,7251.5,7252.0,715277


In [4]:
trades_full = pd.read_csv(
    trades_filename,
    index_col='received',
    parse_dates=['timestamp', 'received']
)
trades_full.head()

Unnamed: 0_level_0,timestamp,side,size,price
received,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2019-12-14 04:34:40.742081+00:00,2019-12-14 04:34:38.713000+00:00,Sell,2500,7251.5
2019-12-14 04:34:42.499730+00:00,2019-12-14 04:34:42.378000+00:00,Buy,12000,7252.0
2019-12-14 04:34:48.168746+00:00,2019-12-14 04:34:48.044000+00:00,Buy,100000,7252.0
2019-12-14 04:34:48.332053+00:00,2019-12-14 04:34:48.213000+00:00,Buy,116,7252.0
2019-12-14 04:34:50.665331+00:00,2019-12-14 04:34:50.504000+00:00,Buy,2500,7252.0


In [5]:
def load_trade_times_into_priority_queue(trades_full, pq):
    for curr_time in trades_full.index.drop_duplicates():
        event = {"type":"data", "table":"trades"}
        pq.put((curr_time, event))
        
def load_quote_times_into_priority_queue(quotes_full, pq):
    for curr_time in quotes_full.index.drop_duplicates():
        event = {"type":"data", "table":"quotes"}
        pq.put((curr_time, event))

In [40]:
pq = PriorityQueue()
load_trade_times_into_priority_queue(trades_full, pq)
load_quote_times_into_priority_queue(quotes_full, pq)

FULL_TABLES = {
    "trades": trades_full.copy(),
    "quotes": quotes_full.copy(),
}

CURR_TABLES = {
    "trades": pd.DataFrame(columns=trades_full.columns),
    "quotes": pd.DataFrame(columns=quotes_full.columns),
}


def append_method(pq, FULL_TABLES, CURR_TABLES, count=30):
    
    while not pq.empty() and count > 0:
        curr_time, event = pq.get()

        if event["type"] == "data":
            table = event["table"]
            new_data = FULL_TABLES[table].loc[curr_time]
            CURR_TABLES[table] = (CURR_TABLES[table]
                                  .append(new_data, ignore_index=False, verify_integrity=False, sort=None))

#         print(curr_time)
#         for i, j in CURR_TABLES.items():
#             display(j.tail())

        count -= 1
    return None

%lprun -f append_method append_method(pq, FULL_TABLES, CURR_TABLES, 100)

In [42]:
pq = PriorityQueue()
load_trade_times_into_priority_queue(trades_full, pq)
load_quote_times_into_priority_queue(quotes_full, pq)


FULL_TABLES = {
    "trades": trades_full.copy(),
    "quotes": quotes_full.copy(),
}

CURR_TABLES = {
    "trades": pd.DataFrame().reindex_like(trades_full),
    "quotes": pd.DataFrame().reindex_like(quotes_full)
}

def replace_method(pq, FULL_TABLES, CURR_TABLES, count=30):
    
    while not pq.empty() and count > 0:
        curr_time, event = pq.get()

        if event["type"] == "data":
            table = event["table"]
            new_data = FULL_TABLES[table].loc[curr_time]
            CURR_TABLES[table].loc[curr_time] = new_data
        CURR_TABLES[table].dropna()
            
#         print(curr_time)
#         for i, j in CURR_TABLES.items():
#             display(j.loc[:curr_time])

        count -= 1
    return None

%lprun -f replace_method replace_method(pq, FULL_TABLES, CURR_TABLES, 100)