In [41]:
import gzip
import json

def messages_parse(data):
    data_with_no_binary = str(data).split("'")[1:-1][0]
    data_list =[i for i in data_with_no_binary.split('\\n') if i != '']
    messages_data = [json.loads(D) for D in data_list]
    return messages_data 

def messages_read(file="../data/coinbase_BTC-USD_20_10_06_000000-010000.json.gz"):
    file_object = gzip.open(file, "r")
    data = file_object.read()
    messages_data = messages_parse(data)
    return messages_data 

def snapshot_read(file="../data/coinbase_BTC-USD_20_10_06_00_00.json"):
    snaphsot_data = json.loads(open(file,'r').read())
    return snaphsot_data

def messages_filter(messages_data,initial_clob,final_clob):
    messages_data_filtered = [message_dict for message_dict in messages_data if message_dict['sequence'] >= initial_clob['sequence'] and message_dict['sequence'] <= final_clob['sequence']]
    return messages_data_filtered
            
messages_data = messages_read()
initial_clob = snapshot_read(file="../data/coinbase_BTC-USD_20_10_06_00_00.json")
final_clob = snapshot_read(file="../data/coinbase_BTC-USD_20_10_06_00_15.json")
messages_data_filtered = messages_filter(messages_data,initial_clob,final_clob)



lists are quite slow for this purpose because inserting or deleting an element at the beginning requires shifting all of the other elements by one, requiring O(n) time.

“First in First Out”

Sell = Asks
Buy = Bids

Prior Values: 
initial_asks: %s 22670
final_asks: %s 12677
initial_bids: %s 40436
final_bids: %s 28413
done_msgs: %s 84084
open_msgs: %s 83274

MATCH

Taker Order is removed off the Order Book
Maker Order is kept with a down or an up

A trade occurred between two orders.
The aggressor or taker order is the one executing immediately after being received and the maker order is a resting order on the book.
The side field indicates the maker order side.
If the side is sell this indicates the maker was a sell order and the match is considered an up-tick.
A buy side match is a down-tick.

In [60]:
import logging
import queue

logging.basicConfig(level=20)
#logging.basicConfig(level=2)

logger = logging.getLogger()

class OrderDictTable:
    def __init__(self, order_items):
        self.order_dict = {} #
        self.assign_orders_dict(order_items)
        
    def assign_orders_dict(self,order_items):
        keys = [subli[2] for subli in order_items]#map(lambda price,size,order_id: order_id,order_items)
        self.order_dict = dict([(order_id,order_item) for (order_id,order_item) in zip(keys,order_items) ])
    
    def get_values(self):
        return self.order_dict.values()

    def get_obj(self):
        return self.order_dict

class CLOB:
    def __init__(self,clob):
        self.bids_obj = OrderDictTable(clob['bids']).get_obj()
        self.asks_obj = OrderDictTable(clob['asks']).get_obj()
        
    def order_remove(self,clob_bids_or_ask_dict,order_id):
        #logger.info('order_id: %s', str(order_id))
        removed_order = clob_bids_or_ask_dict.pop(order_id,None)
        #logger.info('clob_bids_or_ask_dict: %s', str(removed_order))
        
    def order_add(self,clob_bids_or_ask_dict,price,size,order_id):
        clob_bids_or_ask_dict[order_id] = [price,size,order_id]
        
    def update_from_message(self,message_obj):
        #logger.info('message_obj: %s', str(message_obj))
        message_type = message_obj['type']
        message_side = message_obj['side']
        clob_bids_or_ask_dict = self.asks_obj if message_side == 'sell' else self.bids_obj 
        #logger.info('message_side: %s', str(message_side))
        #logger.info('message_type: %s', str(message_type))

        if message_type == 'done':
            self.order_remove(clob_bids_or_ask_dict,message_obj['order_id'])
        elif message_type == 'open':
            self.order_add(clob_bids_or_ask_dict,message_obj['price'],message_obj['remaining_size'],message_obj['order_id'])
        
    def get_clob(self):
        return {
            'bids':self.bids_obj.values(),
            'asks':self.asks_obj.values()
        }
    
class CLOBValidate:
    def validate(self,initial_clob,final_clob,messages_queue_data):
        initial_bids = len(initial_clob['bids'])
        initial_asks = len(initial_clob['asks'])
        
        final_bids = len(final_clob['bids'])
        final_asks = len(final_clob['asks'])
        
        done_msgs = len([message_obj for message_obj in messages_queue_data if message_obj['type'] == 'done'])
        open_msgs = len([message_obj for message_obj in messages_queue_data if message_obj['type'] == 'open'])

        print ('initial_asks: %s', str(initial_asks))
        print ('final_asks: %s', str(final_asks))

        print ('initial_bids: %s', str(initial_bids))
        print ('final_bids: %s', str(final_bids))
        
        print ('done_msgs: %s', str(done_msgs))
        print ('open_msgs: %s', str(open_msgs))
        
        
clob = CLOB(initial_clob)
q = queue.Queue()
messages_queue_data = sorted(messages_data_filtered,key=lambda i: i['sequence'])

list(map(q.put,messages_queue_data ))
while not q.empty():
    message_obj = q.get()
    clob.update_from_message(message_obj)

final_clob_processed = clob.get_clob()
CLOBValidate().validate(final_clob_processed,final_clob,messages_queue_data)

#bids_dict_table = OrderDictTable(initial_clob['bids'])
#asks_dict_table = OrderDictTable(initial_clob['asks'])

initial_asks: %s 12677
final_asks: %s 12677
initial_bids: %s 28413
final_bids: %s 28413
done_msgs: %s 84084
open_msgs: %s 83274


In [58]:
messages_queue_data_sorted = sorted(messages_queue_data,key=lambda i: i['sequence'])
print (messages_queue_data_sorted[:2])

[{'type': 'done', 'side': 'sell', 'product_id': 'BTC-USD', 'time': '2020-10-06T00:00:05.810671Z', 'sequence': 16712804989, 'order_id': 'd98e69bd-aa7c-40ec-8c34-5087c20b86bd', 'reason': 'canceled', 'price': '10863.19', 'remaining_size': '0.04602702'}, {'type': 'received', 'side': 'sell', 'product_id': 'BTC-USD', 'time': '2020-10-06T00:00:05.826905Z', 'sequence': 16712804990, 'order_id': 'a2387f8d-3d86-41ed-b64e-8a610e4982fe', 'order_type': 'limit', 'size': '0.04607584', 'price': '10851.68', 'client_oid': '9deea56f-c4ad-476c-9afe-5c240da7a01c'}]


In [61]:
messages_queue_data_sorted_changed = [i for i in messages_queue_data_sorted if i['type'] == 'match']
print (len(messages_queue_data_sorted_changed))



1077


In [63]:
print (messages_queue_data_sorted_changed[0])

{'type': 'match', 'side': 'sell', 'product_id': 'BTC-USD', 'time': '2020-10-06T00:00:06.373746Z', 'sequence': 16712805059, 'trade_id': 105017229, 'maker_order_id': 'a80cad71-1631-4e5f-964b-5d61eaa67dc0', 'taker_order_id': '1f080093-08f6-44df-86c8-bddf077673e6', 'size': '0.289', 'price': '10798'}


In [62]:
len(list(set([i['order_id'] for i in messages_queue_data_sorted_changed])))

KeyError: 'order_id'

In [56]:
def list_difference(l1,l2):
    l1= list(l1)
    l2=list(l2)
    l1 = [i[2] for i in l1]
    l2 = [i[2] for i in l2]

    return list(set(l1) - set(l2))

#l1 = ['a','b']
#l2 = ['b','c']
#list_difference(l1,l2)
#list(final_clob_processed['asks'])
#list(final_clob['asks'])

asks_difference = list_difference(final_clob_processed['asks'],final_clob['asks'])
print (asks_difference[0])
#final_clob_processed['asks']
#final_clob['asks']


4fae0472-05b0-4c72-a99e-0158185683e4


In [65]:
filter_order_id_message = lambda messages_data,order_id: [D for D in messages_data if 'order_id' in D.keys() and D['order_id'] ==order_id]
filter_order_id_clob = lambda clob,asks_or_bids,order_id: [subli for subli in clob[asks_or_bids] if subli[2] == order_id]



filter_order_id_message(messages_data,'1f080093-08f6-44df-86c8-bddf077673e6')
#maker_order_id

#filter_order_id_message(messages_data,'4fae0472-05b0-4c72-a99e-0158185683e4')

[{'type': 'received',
  'side': 'buy',
  'product_id': 'BTC-USD',
  'time': '2020-10-06T00:00:06.373746Z',
  'sequence': 16712805058,
  'order_id': '1f080093-08f6-44df-86c8-bddf077673e6',
  'order_type': 'limit',
  'size': '0.289',
  'price': '10798',
  'client_oid': ''},
 {'type': 'done',
  'side': 'buy',
  'product_id': 'BTC-USD',
  'time': '2020-10-06T00:00:06.373746Z',
  'sequence': 16712805060,
  'order_id': '1f080093-08f6-44df-86c8-bddf077673e6',
  'reason': 'filled',
  'price': '10798',
  'remaining_size': '0'}]

In [11]:
list(clob.get_clob()['bids'])[0]

['10797.99', '2', 'a41dfd8c-41e0-4864-b848-6d30ccb1faa4']

In [7]:
print (len(bids_dict_table.get_values()))
print (len(initial_clob['bids']))

28370
28370


In [None]:
class Dicttable:
    def __init__(self, bids,asks):
        dict([(order_id,order_data) for (order_id,order_data) in bids ])
        self.bucket_size = len(elements)
        self.buckets = [[] for i in range(self.bucket_size)]
        self._assign_buckets(elements)


In [12]:

# Python program to
# demonstrate queue implementation
# using list
 
# Initializing a queue
queue = []
 
# Adding elements to the queue
queue.append('a')
queue.append('b')
queue.append('c')
 
print("Initial queue")
print(queue)
 
# Removing elements from the queue
print("\nElements dequeued from queue")
print(queue.pop(0))
print(queue.pop(0))
print(queue.pop(0))
 
print("\nQueue after removing elements")
print(queue)
 
# Uncommenting print(queue.pop(0))
# will raise and IndexError
# as the queue is now empty

Initial queue
['a', 'b', 'c']

Elements dequeued from queue
a
b
c

Queue after removing elements
[]


In [None]:
import queue
q = queue.Queue()
list(map(q.put, messages_data_filtered))


In [15]:
q.get()

{'type': 'done',
 'side': 'sell',
 'product_id': 'BTC-USD',
 'time': '2020-10-06T00:00:05.845766Z',
 'sequence': 16712804995,
 'order_id': 'ab1bbc24-25da-4034-baad-fd36d1a047a2',
 'reason': 'canceled',
 'price': '10845.82',
 'remaining_size': '0.09220147'}

In [16]:
with q.mutex:
    q.queue.clear()

In [17]:
q.get()

KeyboardInterrupt: 

In [None]:
messages_data_filtered[0]