In [4]:
# deltix TimeBase client
import tbapi
from tbapi import InstrumentMessage

# FINOS Perspective
import perspective
from perspective import Table, PerspectiveWidget, Plugin

# OrderBook
from orderbook import Book, to_dict

# Other libs
import threading
import asyncio
from datetime import datetime
import time
from datetime import datetime
import ipywidgets as widgets
import logging
from typing import List, Iterable
from sortedcollections import SortedDict, ItemSortedDict
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

In [5]:
tb_info_url = 'dxtick://localhost:8011'
schema = {
    'key': str,
    'symbol': str,
    'side': str,
    'size': float,
    'price': float,
    'numberOfOrders': int
}

key = 'coinbase'
symbol = 'BTC/USD'
record_type = 'com.epam.deltix.timebase.messages.universal.PackageHeader'
table = Table(schema, limit=1000, index='key')
book = Book(symbol)
booksize = 20
time_widget = widgets.Text(
    disabled=True,
    value=str(datetime.now())
)

In [8]:
last_updated = 0

def current_milli_time():
    return round(time.time() * 1000)

        
def process_entry_update(symbol, entry: InstrumentMessage) -> None:
    global last_updated
    if entry.action == 'DELETE':
        book.remove(entry.side, entry.price)
        t = time.time()
        if t - last_updated >= 0.5:
            last_updated = t
            table.update(book.get_bids(size=booksize))
            table.update(book.get_asks(size=booksize))
    elif entry.action == 'UPDATE':
        e = to_dict(symbol, entry)
        book.update(e)
        t = time.time()
        if t - last_updated >= 0.5:
            last_updated = t
            table.update(book.get_bids(size=booksize))
            table.update(book.get_asks(size=booksize))
    else:
        raise Exception(f'Unknown action type: {entry.action}')

        
def process_entry_new(symbol, entry: InstrumentMessage) -> None:
    global last_updated
    e = to_dict(symbol, entry)
    book.update(e)
    t = time.time()
    if t - last_updated >= 0.5:
        last_updated = t
        table.update(book.get_bids(size=booksize))
        table.update(book.get_asks(size=booksize))
            

def process_snapshot(symbol, entries) -> None:
    global last_updated
    book.clear()
    book.update(*map(lambda e: to_dict(symbol, e), entries))
    t = time.time()
    if t - last_updated >= 0.5:
        last_updated = t
        table.update(book.get_bids(size=booksize))
        table.update(book.get_asks(size=booksize))
#     logging.info('bids: ' + str(list(map(lambda e: e, book.bids))))
#     logging.info('asks: ' + str(list(map(lambda e: e, book.asks))))
        
        
def initial_book():
    db = tbapi.TickDb.createFromUrl(tb_info_url)
    try:
        db.open(True)
        stream = db.getStream(key)
        options = tbapi.SelectionOptions()
        try:
            cursor = db.select(current_milli_time() - 10000, [stream], options, 
                               [record_type], 
                               [symbol])
            while cursor.next():
                msg = cursor.getMessage()
                if msg.packageType == 'PERIODICAL_SNAPSHOT':
                    return msg
        finally:
            cursor.close()
    finally:
        db.close()

        
async def read_cursor():
    db = tbapi.TickDb.createFromUrl(tb_info_url)
    try:
        db.open(True)
        stream = db.getStream(key)
        options = tbapi.SelectionOptions()
        options.live = True
        try:
            cursor = db.select(current_milli_time(), [stream], options, 
                               [record_type], 
                               [symbol])
            global stop_reading
            initialized = False
            while cursor.next() and not stop_reading and not initialized:
                msg = cursor.getMessage()
                if msg.packageType == 'PERIODICAL_SNAPSHOT' or msg.packageType == 'VENDOR_SNAPSHOT':
                    logging.info('received snapshot')
                    process_snapshot(msg.symbol, msg.entries)
                    initialized = True
                    time_widget.value = str(datetime.fromtimestamp(msg.timestamp / 10 ** 9))
            while cursor.next() and not stop_reading:
                msg = cursor.getMessage()
                if msg.packageType == 'INCREMENTAL_UPDATE':
                    for entry in msg.entries:
                        if entry.typeName.endswith('L2EntryUpdate'):
                            process_entry_update(msg.symbol, entry)
                        elif entry.typeName.endswith('L2EntryNew'):
                            process_entry_new(msg.symbol, entry)
                elif msg.packageType == 'PERIODICAL_SNAPSHOT' or msg.packageType == 'VENDOR_SNAPSHOT':
                    process_snapshot(msg.symbol, msg.entries)
                time_widget.value = str(datetime.fromtimestamp(msg.timestamp / 10 ** 9))
        finally:
            cursor.close()
    finally:
        db.close()

        
def update_table():
    logging.info('Started streaming!')
    loop = asyncio.new_event_loop()
    task = loop.create_task(read_cursor())
    loop.call_later(60, task.cancel)
    
    try:
        loop.run_until_complete(task)
    except asyncio.CancelledError:
        logging.info("Stopped streaming!")
        pass

In [9]:
initial = initial_book()

In [10]:
process_snapshot(initial.symbol, initial.entries)

In [None]:
grid = PerspectiveWidget(
    table, 
    sort=[], 
    group_by=['symbol', 'price'], 
    split_by=['side'], 
    aggregates={'price': 'avg', 'numberOfOrders': 'sum'},
    columns=['size', 'price', 'numberOfOrders'],
    plugin=Plugin.GRID
)
xbar = PerspectiveWidget(
    table, 
    sort=[], 
    columns=['size'],
    group_by=['symbol', 'price'], 
    split_by=['side'], 
    plugin=Plugin.XBAR
)
ybar = PerspectiveWidget(
    table, 
    sort=[], 
    columns=['size'],
    group_by=['symbol', 'price'], 
    split_by=['side'], 
    plugin=Plugin.YBAR
)
tabs = widgets.Tab()
tabs.children = [grid, xbar, ybar, time_widget]
tabs.set_title(0, 'Grid')
tabs.set_title(1, 'Horizontal Depth Chart')
tabs.set_title(2, 'Vertical Depth Chart')
tabs.set_title(3, 'Timestamp')

TypeError: __init__() got an unexpected keyword argument 'row_pivots'

In [7]:
tabs

Tab(children=(PerspectiveWidget(aggregates={'price': 'avg', 'numberOfOrders': 'sum'}, column_pivots=['side'], …

In [1]:
# Execute to start reading messages
stop_reading = False
thread = threading.Thread(target=update_table)
thread.start()

NameError: name 'threading' is not defined

In [9]:
# Execute to stop reading messages
stop_reading = True
thread.join()

In [10]:
table.clear()
grid.clear()
book.clear()