In [157]:
import json
import numpy as np
import pandas as pd
import datetime as dt
from pathlib import Path
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
from order_book import OrderBook
from decimal import Decimal
from probs import MidpriceUp, BidOrderExecution, MakingSpread, StopLossReaching

In [75]:
data_path = Path('../data')

### How much data we have

In [8]:
files = []
for fname in data_path.glob('*ws*'):
    group = fname.name.split('.')[0]
    files.append((group,  fname.stat().st_size / 1024**2))
    
files = pd.DataFrame(files, columns=['exchange', 'size'])

In [9]:
files.groupby('exchange').sum() / 1024  # in gb's

Unnamed: 0_level_0,size
exchange,Unnamed: 1_level_1
BINANCE,45.642381
BYBIT,140.271621
HUOBI,54.971219


### What data is about

In [76]:
binance = data_path.glob('BINANCE*')
bybit = data_path.glob('BYBIT*')
huobi = data_path.glob('HUOOBI*')

In [100]:
from tqdm import tqdm
from src.parsers.bybit import BybitUpdateMessage
from src.parsers.binance import BinanceUpdateMessage, Trade as BinanceTrade, Depth as BinanceDepth


def is_trade(msg):
    return isinstance(msg.payload, BinanceTrade)

def is_depth(msg):
    return isinstance(msg.payload, BinanceDepth)
    
    
class Parser:
    def __init__(self, source):
        self.parser = BinanceUpdateMessage if source == 'binance' else \
            BybitUpdateMessage

    def parse(self, message):
        if message.startswith('wss'):
            return None

        return self.parser(raw=message)

    def read(self, path):
        results = []
        with open(path, 'r') as f:
            data = f.readlines()

        for path in tqdm(data):
            res = self.parse(path)

            if res is None:
                continue

            results.append(res)

        return results



#### binance

In [95]:
p = Parser('binance')

binance = sorted(data_path.glob('BINANCE.ws*'), key=lambda x: int(x.suffix[1:]))

# msgs = p.read('../data/BINANCE.ws.3.0')

for path in binance:
    msgs = p.read(path)
    break
#     trades = pd.DataFrame([msg.payload.data.dict() for msg in msgs if is_trade(msg)])
#     trades.to_feather('processed/BINANCE.ws.3.0.feather')

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 260000/260000 [00:38<00:00, 6774.11it/s]


We need:
1. Indexes and sizes of Ask/Bid arrivals
2. Size of market arrivals
3. Indexes and sizes of cancels

In [None]:
def is_ask_arrive(msg, ob):
    ...
    
def is_bid_arrive(msg, ob):
    ...
    
def is_ask_cancel(msg, ob):
    ...
    
def is_bid_cancel(msg, ob):
    ...

In [197]:
s = msgs[1]

In [226]:
new_sizes

{Decimal('1863.76000000'): Decimal('1.27200000'),
 Decimal('1863.63000000'): Decimal('14.32540000'),
 Decimal('1863.59000000'): Decimal('0.48340000'),
 Decimal('1677.60000000'): Decimal('0E-8')}

In [227]:
new_sizes = s.payload.data.asks

In [229]:
arrivals = []


for price, size in new_sizes:
    
    try:
        existing = ob.asks[price]
        delta = size - existing
        
        if delta > 0:
            arrivals.append()
            # ask arrival
            # detect index
            ...
        
        elif delta < 0:
            # ask cancel
            # detect index
            ...
            
    except KeyError:
        delta = None
        
        # ask arrival
        # detect index (not zero)
        ob.asks[price] = ize
        

In [223]:
distance_map = dict(zip(ob.asks.keys(), range(len(ob.asks.to_list()))))

In [236]:
from enum import Enum

class Side(Enum):
    BUY = 'Buy'
    SELL = 'Sell'
    NONE = None



In [237]:
Side(None)

<Side.NONE: None>

In [235]:
ob.bids.index(0)

(Decimal('1863.76000000'), Decimal('1.27200000'))

In [133]:
ob = OrderBook()

ask_arrivals = []
bid_arrivals = []
ask_cancels = []
bid_cancels = []
market = []

for msg in tqdm(msgs):
    
    if is_trade(msg):
        market.append(msg.payload.data.quantity)
    
    if is_depth(msg):
        # if ask increased -> add ask size and remember index  
        if is_ask_arrive(msg, ob):
            ...

        # if bid increased -> add bid size and remember index
        if is_bid_arrive(msg, ob):
            ...
            
        # if ask decreased -> add ask cancels and remember index
        if is_ask_cancel(msg, ob):
            ...
            
        # if bid decreased -> add bid cancels and remember index
        if is_bid_cancel(msg, ob):
            ...
        
        
        


  0%|                                                                                                                                                    | 1/259999 [00:00<00:41, 6241.52it/s]


In [147]:
ob.asks = dict(msg.payload.data.asks)

False

In [188]:
ob.asks.to_dict()

{Decimal('1677.60000000'): Decimal('0E-8'),
 Decimal('1863.59000000'): Decimal('0.48340000'),
 Decimal('1863.63000000'): Decimal('14.32540000'),
 Decimal('1863.76000000'): Decimal('1.27200000')}

In [150]:
ob.bids = dict(msg.payload.data.bids)

In [161]:
msgs[0].payload.data.dict()

{'event_type': 'aggTrade',
 'event_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 489000, tzinfo=datetime.timezone.utc),
 'symbol': 'ETHUSDT',
 'agg_trade_id': 880496927,
 'price': 1864.1,
 'quantity': 0.0057,
 'first_trade_id': 1123675678,
 'last_trade_id': 1123675678,
 'trade_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 489000, tzinfo=datetime.timezone.utc),
 'is_market_maker': False,
 'is_ignore': True}

In [170]:
msgs[1].payload.data.dict()

{'event_type': 'depthUpdate',
 'event_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 504000, tzinfo=datetime.timezone.utc),
 'symbol': 'ETHUSDT',
 'first_update_id': 24470108730,
 'last_update_id': 24470108745,
 'bids': [[Decimal('1863.76000000'), Decimal('1.27200000')],
  [Decimal('1863.63000000'), Decimal('14.32540000')],
  [Decimal('1863.59000000'), Decimal('0.48340000')],
  [Decimal('1677.60000000'), Decimal('0E-8')]],
 'asks': [[Decimal('1863.76000000'), Decimal('1.27200000')],
  [Decimal('1863.63000000'), Decimal('14.32540000')],
  [Decimal('1863.59000000'), Decimal('0.48340000')],
  [Decimal('1677.60000000'), Decimal('0E-8')]]}

In [174]:
msgs[5].payload.data.dict()

{'event_type': 'aggTrade',
 'event_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 584000, tzinfo=datetime.timezone.utc),
 'symbol': 'ETHUSDT',
 'agg_trade_id': 880496928,
 'price': 1864.1,
 'quantity': 0.0054,
 'first_trade_id': 1123675679,
 'last_trade_id': 1123675679,
 'trade_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 584000, tzinfo=datetime.timezone.utc),
 'is_market_maker': False,
 'is_ignore': True}

In [182]:
msgs[6].payload.data.dict()

{'event_type': 'depthUpdate',
 'event_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 604000, tzinfo=datetime.timezone.utc),
 'symbol': 'ETHUSDT',
 'first_update_id': 24470108746,
 'last_update_id': 24470108756,
 'bids': [[Decimal('1863.77000000'), Decimal('0E-8')],
  [Decimal('1863.27000000'), Decimal('5.22210000')],
  [Decimal('1804.09000000'), Decimal('0.05600000')],
  [Decimal('1677.60000000'), Decimal('0E-8')]],
 'asks': [[Decimal('1863.77000000'), Decimal('0E-8')],
  [Decimal('1863.27000000'), Decimal('5.22210000')],
  [Decimal('1804.09000000'), Decimal('0.05600000')],
  [Decimal('1677.60000000'), Decimal('0E-8')]]}

In [181]:
msgs[13].payload.data.dict()

{'event_type': 'depthUpdate',
 'event_time': datetime.datetime(2023, 4, 6, 13, 34, 34, 704000, tzinfo=datetime.timezone.utc),
 'symbol': 'ETHUSDT',
 'first_update_id': 24470108757,
 'last_update_id': 24470108774,
 'bids': [[Decimal('1864.09000000'), Decimal('62.20950000')],
  [Decimal('1864.07000000'), Decimal('0.10870000')],
  [Decimal('1863.86000000'), Decimal('0.27920000')],
  [Decimal('1863.85000000'), Decimal('0.24410000')],
  [Decimal('1863.56000000'), Decimal('12.88890000')],
  [Decimal('1863.11000000'), Decimal('1.20070000')]],
 'asks': [[Decimal('1864.09000000'), Decimal('62.20950000')],
  [Decimal('1864.07000000'), Decimal('0.10870000')],
  [Decimal('1863.86000000'), Decimal('0.27920000')],
  [Decimal('1863.85000000'), Decimal('0.24410000')],
  [Decimal('1863.56000000'), Decimal('12.88890000')],
  [Decimal('1863.11000000'), Decimal('1.20070000')]]}

In [156]:
ob.asks[Decimal('1677.60000000')]

Decimal('0E-8')

In [149]:
ob.to_dict()

{'bid': {},
 'ask': {Decimal('1677.60000000'): Decimal('0E-8'),
  Decimal('1863.59000000'): Decimal('0.48340000'),
  Decimal('1863.63000000'): Decimal('14.32540000'),
  Decimal('1863.76000000'): Decimal('1.27200000')}}

In [138]:
msg.payload.data.bids

[[Decimal('1863.76000000'), Decimal('1.27200000')],
 [Decimal('1863.63000000'), Decimal('14.32540000')],
 [Decimal('1863.59000000'), Decimal('0.48340000')],
 [Decimal('1677.60000000'), Decimal('0E-8')]]

In [132]:
market[:5]

[0.0057, 0.01783, 0.00907, 0.0054, 0.00141]

In [129]:
msg.payload.data.quantity

0.0057

In [120]:
msg.payload

Trade(stream='ethusdt@aggTrade', data=TradeData(event_type='aggTrade', event_time=datetime.datetime(2023, 4, 6, 13, 34, 34, 489000, tzinfo=datetime.timezone.utc), symbol='ETHUSDT', agg_trade_id=880496927, price=1864.1, quantity=0.0057, first_trade_id=1123675678, last_trade_id=1123675678, trade_time=datetime.datetime(2023, 4, 6, 13, 34, 34, 489000, tzinfo=datetime.timezone.utc), is_market_maker=False, is_ignore=True))

In [106]:
!ls -lh

total 29M
-rw-rw-r-- 1 danya danya 342K May  6 13:04 al.png
-rw-rw-r-- 1 danya danya 454K Apr  6 22:48 data-observation.ipynb
-rw-rw-r-- 1 danya danya  42K May  6 21:38 parsing-data-new.ipynb
-rw-rw-r-- 1 danya danya 826K May  6 19:29 parsing-raw.ipynb
-rw-rw-r-- 1 danya danya  17K Apr  7 02:25 probs.py
drwxrwxr-x 2 danya danya 4.0K May  6 20:30 processed
drwxrwxr-x 2 danya danya 4.0K May  6 20:17 __pycache__
-rw-rw-r-- 1 danya danya  27M May  6 21:37 test.feather


In [107]:
snaps = pd.read_feather('test.feather')

In [110]:
snaps['bids'][0]

array([array([Decimal('1863.76000000'), Decimal('1.27200000')], dtype=object),
       array([Decimal('1863.63000000'), Decimal('14.32540000')], dtype=object),
       array([Decimal('1863.59000000'), Decimal('0.48340000')], dtype=object),
       array([Decimal('1677.60000000'), Decimal('0E-8')], dtype=object)],
      dtype=object)

In [112]:
snaps

Unnamed: 0,event_type,event_time,symbol,first_update_id,last_update_id,bids,asks
0,depthUpdate,2023-04-06 13:34:34.504000+00:00,ETHUSDT,24470108730,24470108745,"[[1863.76000000, 1.27200000], [1863.63000000, ...","[[1863.76000000, 1.27200000], [1863.63000000, ..."
1,depthUpdate,2023-04-06 13:34:34.504000+00:00,BTCUSDT,36116025446,36116025473,"[[27891.64000000, 5.63617000], [27891.58000000...","[[27891.64000000, 5.63617000], [27891.58000000..."
2,depthUpdate,2023-04-06 13:34:34.604000+00:00,ETHUSDT,24470108746,24470108756,"[[1863.77000000, 0E-8], [1863.27000000, 5.2221...","[[1863.77000000, 0E-8], [1863.27000000, 5.2221..."
3,depthUpdate,2023-04-06 13:34:34.604000+00:00,BTCUSDT,36116025474,36116025493,"[[27891.64000000, 5.61834000], [27891.60000000...","[[27891.64000000, 5.61834000], [27891.60000000..."
4,depthUpdate,2023-04-06 13:34:34.604000+00:00,EOSUSDT,6249308425,6249308425,"[[1.21700000, 49100.00000000]]","[[1.21700000, 49100.00000000]]"
...,...,...,...,...,...,...,...
142580,depthUpdate,2023-04-06 14:40:10.720000+00:00,ETHUSDT,24471440885,24471440903,"[[1865.49000000, 86.98320000], [1864.71000000,...","[[1865.49000000, 86.98320000], [1864.71000000,..."
142581,depthUpdate,2023-04-06 14:40:10.820000+00:00,ETHUSDT,24471440904,24471440923,"[[1865.49000000, 77.02210000], [1864.47000000,...","[[1865.49000000, 77.02210000], [1864.47000000,..."
142582,depthUpdate,2023-04-06 14:40:10.820000+00:00,BTCUSDT,36117323489,36117323504,"[[27929.46000000, 4.44046000], [27927.42000000...","[[27929.46000000, 4.44046000], [27927.42000000..."
142583,depthUpdate,2023-04-06 14:40:10.820000+00:00,LINKUSDT,7606632804,7606632804,[],[]


In [93]:
p.name + '_trades'

'BINANCE.ws.3.0_trades'

In [61]:
!mkdir processed