In [1]:
%matplotlib inline

import json
import pathlib
import sqlite3
import datetime
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import dask
import dask.bag as db

from pandas import DataFrame, Series
from typing import Dict, List, Tuple

# Path containing snapshots
DATA_PATH = pathlib.Path('../data/luno/')

## Limit Orderbook

To understand what an order book is you first need to have a basic understaning of orders. 

There are two types of orders:
- limit order
    - Persist until removed
- market order
    - Execute immediately against any available liquidity

A central limit order book is a central database of limit orders. It contains a list of buy orders sell orders. Exchanges have various ways to prioritize execution but most prioritize based on price and then time for tie breaking. Under price-time-priority the list of buy orders is sorted `desc` by price while the list of sell orders is sorted `asc` by price. When two orders on the same side have the same price, the order there first will receive priorty.

Markets are notorious for generating large data sets. For this talk we will investigate some market data obtained from the luno [streaming api](https://www.luno.com/en/api#streaming).

The data from the streaming API was used to generate the orderbook at each event using the price alone (The API does not provide a sequence number or timestamp, all timestamps within the data were generated upon recieving an event).

### The Data

The data were saved into separate sqlite databases based on the type of event. Given this there is a database for orders and trades as well as a single database which contains the snapshots of the orderbook everytime an event occurred. To be more specific the orderbook is generated each time:

- A new order is entered
- An existing order is deleted
- A transaction occurs

Given that the luno orderbook has depth and that most of the activity will be close to best price, each snapshot recorded only includes the orders up to 5% away from the best bid and ask price.

In [1]:
%ls ../data/luno/ -l  --block-size=M | head -n 5

total 497M
-rw-r--r-- 1 bradleyk bradleyk 1M Jul 27 18:17 snapshot-1532276774.299144.json
-rw-r--r-- 1 bradleyk bradleyk 1M Jul 27 18:17 snapshot-1532276776.11659.json
-rw-r--r-- 1 bradleyk bradleyk 1M Jul 27 18:17 snapshot-1532276776.373577.json
-rw-r--r-- 1 bradleyk bradleyk 1M Jul 27 18:17 snapshot-1532276777.798901.json
ls: write error: Broken pipe


## Exploration

Before jumping into dask let's first look at the data to get a better understanding of it.

In [4]:
snapshot_path = next(DATA_PATH.glob('*.json'))
        
with open(snapshot_path) as f:
    raw_snapshot = f.read()
    
raw_snapshot[:1000]

'{"bids": [{"order_id": "BXECYTEKYUHF2PW", "action": "create", "side": "bid", "price": "104299.00", "volume": "0.022506", "timestamp": 1532285495.707869}, {"order_id": "BXCNKHW4FZRJYD2", "action": "create", "side": "bid", "price": "104299.00", "volume": "0.002", "timestamp": 1532286078.979793}, {"order_id": "BXCA6UPG5EUDRQ2", "action": "create", "side": "bid", "price": "104299.00", "volume": "0.50", "timestamp": 1532286500.659069}, {"order_id": "BXCKFAPXCH9FS", "action": "create", "side": "bid", "price": "104299.00", "volume": "1.5732", "timestamp": 1532286607.122184}, {"order_id": "BXEDSYHK2G7H96R", "action": "create", "side": "bid", "price": "104299.00", "volume": "0.0005", "timestamp": 1532286945.417388}, {"order_id": "BXGYUZEF8M23BX3", "action": "create", "side": "bid", "price": "104299.00", "volume": "0.0015", "timestamp": 1532286954.199881}, {"order_id": "BXB84ZPQWM823JS", "action": "create", "side": "bid", "price": "104299.00", "volume": "3.75", "timestamp": 1532287048.34976}, {

In [5]:
json.loads(raw_snapshot).keys()

dict_keys(['bids', 'asks', 'timestamp'])

## Build the orderbook

In [6]:
orderbook_dict = json.loads(raw_snapshot)

bids_frame = DataFrame(orderbook_dict['bids']).apply(Series)
asks_frame = DataFrame(orderbook_dict['asks']).apply(Series)

bids_frame.price = bids_frame.price.astype('float64')
asks_frame.price = asks_frame.price.astype('float64')
bids_frame.volume = bids_frame.volume.astype('float64')
asks_frame.volume = asks_frame.volume.astype('float64')

bids_frame.columns = bids_frame.columns.map(lambda x: f"bid_{x}")
asks_frame.columns = asks_frame.columns.map(lambda x: f"ask_{x}")

snapshot_example = pd.concat(
    [
        bids_frame,
        asks_frame
    ],
    axis=1
)

snapshot_example.head(10)

Unnamed: 0,bid_action,bid_order_id,bid_price,bid_side,bid_timestamp,bid_volume,ask_action,ask_order_id,ask_price,ask_side,ask_timestamp,ask_volume
0,create,BXECYTEKYUHF2PW,104299.0,bid,1532285000.0,0.022506,create,BXEQM7GZ96SJ68N,104300.0,ask,1532287000.0,0.87248
1,create,BXCNKHW4FZRJYD2,104299.0,bid,1532286000.0,0.002,create,BXJBVCF3R72TKFD,104300.0,ask,1532286000.0,0.0563
2,create,BXCA6UPG5EUDRQ2,104299.0,bid,1532287000.0,0.5,create,BXBM2K59G36GWEH,104300.0,ask,1532286000.0,0.001
3,create,BXCKFAPXCH9FS,104299.0,bid,1532287000.0,1.5732,create,BXEW5F8WYSWNWWA,104300.0,ask,1532286000.0,0.0005
4,create,BXEDSYHK2G7H96R,104299.0,bid,1532287000.0,0.0005,create,BXC9JBBPP2KSGBZ,104300.0,ask,1532285000.0,1.901294
5,create,BXGYUZEF8M23BX3,104299.0,bid,1532287000.0,0.0015,create,BXD29RF3UPH76TY,104300.0,ask,1532285000.0,0.021694
6,create,BXB84ZPQWM823JS,104299.0,bid,1532287000.0,3.75,create,BXGVNTBQWK86QH6,104300.0,ask,1532285000.0,0.000998
7,create,BXKCVZ8UNBRQFJ5,104298.0,bid,1532286000.0,0.0015,create,BXCXHX6P6SENWBK,104300.0,ask,1532285000.0,0.002608
8,create,BXDSW66BG65HJBJ,104215.0,bid,1532287000.0,0.000586,create,BXF2KM26TUKFGDE,104300.0,ask,1532285000.0,0.019578
9,create,BXKDJG6297KTMJ9,104214.0,bid,1532286000.0,0.006048,create,BXMC9F4RW9VZF38,104300.0,ask,1532285000.0,0.02344


In [7]:
snapshot_example.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 449 entries, 0 to 448
Data columns (total 12 columns):
bid_action       331 non-null object
bid_order_id     331 non-null object
bid_price        331 non-null float64
bid_side         331 non-null object
bid_timestamp    331 non-null float64
bid_volume       331 non-null float64
ask_action       449 non-null object
ask_order_id     449 non-null object
ask_price        449 non-null float64
ask_side         449 non-null object
ask_timestamp    449 non-null float64
ask_volume       449 non-null float64
dtypes: float64(6), object(6)
memory usage: 42.2+ KB


### Descriptive Statistics

In [8]:
descriptive_statistics_bid = bids_frame[['bid_price', 'bid_volume']].describe()
descriptive_statistics_ask = asks_frame[['ask_price', 'ask_volume']].describe()

descriptive_statistics = (
    pd.concat([descriptive_statistics_bid, descriptive_statistics_ask], axis=1)
        .style
        .format({
            'bid_price': '{:,.0f}'.format,
            'bid_volume': '{:,.2f}'.format,
            'ask_price': '{:,.0f}'.format,
            'ask_volume': '{:,.2f}'.format
        })
)

descriptive_statistics

Unnamed: 0,bid_price,bid_volume,ask_price,ask_volume
count,331,331.0,449,449.0
mean,101422,0.17,106041,0.23
std,1412,0.47,1539,1.04
min,99102,0.0,104300,0.0
25%,100008,0.0,104999,0.0
50%,101197,0.01,105099,0.01
75%,102557,0.1,107000,0.1
max,104299,3.92,109502,11.73


# Extract Load Transform

The section below is concerned with summarizing each snapshot.

Specifically the following processes are applied to each snapshot:

1. Read the JSON file from the file system
2. Parse the JSON file
3. Create a pandas dataframe
4. Calculate the average price, sum of volume and total orders at 1 percent increments from the best price for both the bids and offers

In [26]:
def parse_json(snapshot_raw: str) -> Dict:
    """Parse a raw snapshot into a Python Dict
    
    Args:
        snapshot_raw: A json orderbook snapshot
    
    Returns:
        A Python dict
    """
    
    return json.loads(snapshot_raw)


def get_snapshot_bids_frame(snapshot: Dict) -> DataFrame:
    """Extracts the bids from the given snapshot and applies
    some type casting.

    Args:
        snapshot: A Python dict
    
    Returns:
        A pandas dataframe
    """
    
    frame = DataFrame(snapshot['bids'])
    frame.price = frame.price.astype('float64')
    frame.volume = frame.volume.astype('float64')
    frame.columns = frame.columns.map(lambda x: f"bid_{x}")
    return frame


def get_snapshot_asks_frame(snapshot: Dict) -> DataFrame:
    """Extracts the bids from the given snapshot and applies
    some type casting.

    Args:
        snapshot: A Python dict
    
    Returns:
        A pandas dataframe
    """
    
    frame = DataFrame(snapshot['asks'])
    frame.price = frame.price.astype('float64')
    frame.volume = frame.volume.astype('float64')
    frame.columns = frame.columns.map(lambda x: f"ask_{x}")
    return frame


def discretize_bids_frame(frame: DataFrame) -> DataFrame:
    """Applies to the specified aggregation to the bids frame
    grouped by the specified bins.

    Args:
        frame: pandas dataframe containing bids

    Returns:
        A pandas dataframe
    """
    
    bins = [0.0, 0.01, 0.02, 0.03, 0.04, 0.05]
    labels = [1, 2, 3, 4, 5]
    aggregate = {'bid_volume': np.sum, 'bid_price': np.mean, 'bid_order_id': 'count'}
    
    pct_from_best = (frame.bid_price.values[0] / frame.bid_price) - 1
    cut = pd.cut(pct_from_best, bins=bins, include_lowest=True, labels=labels)
    group = frame.groupby(cut)
    summary = group.aggregate(aggregate)
    return summary.rename(columns={'bid_order_id': 'bid_count'})


def discretize_asks_frame(frame: DataFrame) -> DataFrame:
    """Applies to the specified aggregation to the bids frame
    grouped by the specified bins.

    Args:
        frame: pandas dataframe containing asks
        bins: percentage bins for discretization
        labels: Bin labels
        aggregate: The fields and aggregates to apply to the grouped data
    
    Returns:
        A pandas dataframe
    """

    bins = [0.0, 0.01, 0.02, 0.03, 0.04, 0.05]
    labels = [1, 2, 3, 4, 5]
    aggregate = {'ask_volume': np.sum, 'ask_price': np.mean, 'ask_order_id': 'count'}
    
    pct_from_best = (frame.ask_price / frame.ask_price.values[0]) - 1
    cut = pd.cut(pct_from_best, bins=bins, include_lowest=True, labels=labels)
    group = frame.groupby(cut)
    summary = group.aggregate(aggregate)
    return summary.rename(columns={'ask_order_id': 'ask_count'})


def concat_result(
    bids_frame: DataFrame,
    asks_frame: DataFrame,
    timestamp: str
) -> Tuple[DataFrame, str]:
    """Joins the processed bids and asks frames together.
    
    Args:
        bids_frame: Processed bids data frame
        asks_frame: Processed asks data frame
        timestamp: The timestamp at which the orderbook was generated
        
    Returns:
        A tuple containing the processed dataframe and timestamp
    """
    
    frame = pd.concat([bids_frame, asks_frame], axis=1)
    return (frame, timestamp)


def process_orderbook(raw_snapshot) -> Tuple[DataFrame, float]:
    """Extract, load and transform orderbook.
    
    Args:
        raw_snapshot: JSON representation of the order book
    
    Returns:
        A tuple containing the summarized orderbook and timestamp of when it was generated
    """
    snapshot_dict = parse_json(raw_snapshot)
    snapshot_timestamp = snapshot_dict['timestamp']

    bids_frame = get_snapshot_bids_frame(snapshot_dict)
    asks_frame = get_snapshot_asks_frame(snapshot_dict)

    bids_frame_reduced = discretize_bids_frame(bids_frame)
    asks_frame_reduced = discretize_asks_frame(asks_frame)

    return concat_result(bids_frame_reduced, asks_frame_reduced, snapshot_timestamp)

In [27]:
path = next(DATA_PATH.glob('*.json'))

with open(path) as f:
    snapshot = f.read()
    orderbook, timestamp = process_orderbook(snapshot)

orderbook

Unnamed: 0,bid_volume,bid_price,bid_count,ask_volume,ask_price,ask_count
1,21.309799,103793.604651,43,73.564302,104841.064378,233
2,7.617693,102746.564516,62,3.942177,105820.854839,62
3,5.422405,101746.418182,55,8.734996,106805.849057,53
4,12.692156,100802.855072,69,11.937143,107923.928571,42
5,7.045386,99952.977528,89,3.619296,108986.338983,59


# Summarizing all Snapshots

In [34]:
# Total snapshots
len(list(DATA_PATH.glob('*.json')))

4567

## Python Code

In [22]:
%%time
orderbooks = []
timestamps = []

for filepath in list(DATA_PATH.glob('*.json')):
    with open(filepath) as f:
        raw_snapshot = f.read()
        
    orderbook, timestamp = process_orderbook(raw_snapshot)
    orderbooks.append(orderbook)
    timestamps.append(timestamp)

CPU times: user 1min 19s, sys: 357 ms, total: 1min 20s
Wall time: 1min 20s


In [33]:
orderbooks[0]

Unnamed: 0,bid_volume,bid_price,bid_count,ask_volume,ask_price,ask_count
1,21.309799,103793.604651,43,73.564302,104841.064378,233
2,7.617693,102746.564516,62,3.942177,105820.854839,62
3,5.422405,101746.418182,55,8.734996,106805.849057,53
4,12.692156,100802.855072,69,11.937143,107923.928571,42
5,7.045386,99952.977528,89,3.619296,108986.338983,59


## Dask Bag

Below the ETL process is run using the dask bag API.

In [15]:
@dask.delayed
def process_batch(filepaths):
    for filepath in filepaths:
        with open(filepath) as f:
            yield process_orderbook(f.read())


chunk_size = 200
filepaths_chunked = []
filepaths = [filepath.absolute() for filepath in DATA_PATH.glob('*.json')]

n = len(filepaths) // chunk_size
for i in range(n + 1):    
    filepaths_chunked.append(
        filepaths[i * chunk_size: i * chunk_size + chunk_size]
    )

batches = []
for bunch in filepaths_chunked:
    batches.append(
        process_batch(bunch)
    )
    
graph = db.from_delayed(batches)

In [16]:
%%time
processed_orderbooks = graph.compute()

CPU times: user 7.7 s, sys: 115 ms, total: 7.82 s
Wall time: 30.8 s


In [17]:
orderbook, timestamp = processed_orderbooks[0]
orderbook

Unnamed: 0,bid_volume,bid_price,bid_count,ask_volume,ask_price,ask_count
1,21.309799,103793.604651,43,73.564302,104841.064378,233
2,7.617693,102746.564516,62,3.942177,105820.854839,62
3,5.422405,101746.418182,55,8.734996,106805.849057,53
4,12.692156,100802.855072,69,11.937143,107923.928571,42
5,7.045386,99952.977528,89,3.619296,108986.338983,59


## Dask Distributed

In [18]:
# from dask.distributed import Client

# client = Client()
# client

In [19]:
# future = client.compute(graph)

In [20]:
# processed_orderbooks = future.result()
# orderbook, timestamp = processed_orderbooks[0]
# orderbook