Skip to content

Commit

Permalink
Merge pull request #90 from AsyncAlgoTrading/livetweaks
Browse files Browse the repository at this point in the history
Fix spread construction, Add heartbeat, updated C++, start work on order book "lite"
  • Loading branch information
timkpaine committed Sep 10, 2020
2 parents c54646c + 83bc062 commit 48c15cd
Show file tree
Hide file tree
Showing 17 changed files with 365 additions and 105 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

Like [Zipline](https://github.com/quantopian/zipline) and [Lean](https://github.com/QuantConnect/Lean), `aat` exposes a single strategy class which is utilized for both live trading and backtesting. The strategy class is simple enough to write and test algorithms quickly, but extensible enough to allow for complex slippage and transaction cost modeling, as well as mid- and post- trade analysis.

`aat` is active use for live algorithmic trading on equities, commodity futures contracts, and commodity futures spreads by undisclosed funds.

# Overview
## Internals
Expand All @@ -21,6 +22,7 @@ Like [Zipline](https://github.com/quantopian/zipline) and [Lean](https://github.
- execution engine
- backtest engine


### Trading Engine
The trading engine initializes all exchanges and strategies, then martials data, trade requests, and trade responses between the strategy, risk, execution, and exchange objects, while keeping track of high-level statistics on the system

Expand Down Expand Up @@ -604,7 +606,7 @@ We can run any number of strategies against any number of exchanges, including c
|---|---|---|---|---|
| Synthetic | Yes | Yes | Simulation,Backtest | Equity |
| IEX | Yes | Fake | Live, Simulation, Sandbox, Backtest | Equity |
| InteractiveBrokers | In Progress | Yes | Live, Simulation, Sandbox | Equity, Option, Future, Commodities, Spreads, Pair |
| InteractiveBrokers | In Progress | Yes | Live, Simulation, Sandbox | Equity, Option, Future, Commodities, Spreads, Pairs |
| TD Ameritrade | In Progress | In Progress | Equity, Option |
| Alpaca | In Progress | In Progress | |
| Coinbase | In Progress | In Progress | |
Expand Down
5 changes: 4 additions & 1 deletion aat/config/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class OptionType(BaseEnum):


class EventType(BaseEnum):
# Heartbeat events
HEARTBEAT = 'HEARTBEAT'

# Trade events
TRADE = 'TRADE'

Expand Down Expand Up @@ -81,7 +84,7 @@ class InstrumentType(BaseEnum):

MUTUALFUND = 'MUTUALFUND'

COMMODITIES = 'COMMODITIES'
COMMODITY = 'COMMODITY'

# TODO Warrant?

Expand Down
38 changes: 27 additions & 11 deletions aat/core/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, **config):
self._handler_subscriptions = {m: [] for m in EventType.__members__.values()}

# setup `now` handler for backtest
self._latest = datetime.fromtimestamp(0)
self._latest = datetime.fromtimestamp(0) if self.trading_type in (TradingType.BACKTEST, TradingType.SIMULATION) else datetime.now()

# register internal management event handler before all strategy handlers
self.registerHandler(self.manager)
Expand Down Expand Up @@ -219,38 +219,47 @@ async def run(self):
await asyncio.gather(*(asyncio.create_task(exch.instruments()) for exch in self.exchanges))

# send start event to all callbacks
await self.tick(Event(type=EventType.START, target=None))
await self.processEvent(Event(type=EventType.START, target=None))

async with merge(*(exch.tick() for exch in self.exchanges if inspect.isasyncgenfunction(exch.tick))).stream() as stream:
async with merge(*(exch.tick() for exch in self.exchanges + [self] if inspect.isasyncgenfunction(exch.tick))).stream() as stream:
# stream through all events
async for event in stream:
# tick exchange event to handlers
await self.tick(event)
await self.processEvent(event)

# TODO move out of critical path
self._latest = event.target.timestamp if hasattr(event, 'target') and hasattr(event.target, 'timestamp') else self._latest
if self.trading_type in (TradingType.BACKTEST, TradingType.SIMULATION):
# use time of last event
self._latest = event.target.timestamp if hasattr(event, 'target') and hasattr(event.target, 'timestamp') else self._latest
else:
# use now
self._latest = datetime.now()

# process any secondary events
while self._queued_events:
event = self._queued_events.popleft()
await self.tick(event)
await self.processEvent(event)

# process any secondary callback-targeted events (e.g. order fills)
while self._queued_targeted_events:
strat, event = self._queued_targeted_events.popleft()
await self.tick(event, strat)
await self.processEvent(event, strat)

# process any periodics
await asyncio.gather(*(asyncio.create_task(p.execute(self._latest)) for p in self.manager._periodics))

await self.tick(Event(type=EventType.EXIT, target=None))
await self.processEvent(Event(type=EventType.EXIT, target=None))

async def tick(self, event, strategy=None):
async def processEvent(self, event, strategy=None):
'''send an event to all registered event handlers
Arguments:
event (Event): event to send
'''
if event.type == EventType.HEARTBEAT:
# ignore heartbeat
return

for callback, handler in self._handler_subscriptions[event.type]:
# TODO make cleaner? move to somewhere not in critical path?
if strategy is not None and (handler not in (strategy, self.manager)):
Expand All @@ -271,9 +280,16 @@ async def tick(self, event, strategy=None):
if event.type == EventType.ERROR:
# don't infinite error
raise
await self.tick(Event(type=EventType.ERROR, target=Error(target=event, handler=handler, callback=callback, exception=e)))
await self.processEvent(Event(type=EventType.ERROR, target=Error(target=event, handler=handler, callback=callback, exception=e)))
await asyncio.sleep(1)

async def tick(self):
'''helper method to ensure periodic methods execute periodically in absence
of market data'''
while True:
yield Event(type=EventType.HEARTBEAT, target=None)
await asyncio.sleep(1)

def now(self):
'''Return the current datetime. Useful to avoid code changes between
live trading and backtesting. Defaults to `datetime.now`'''
Expand All @@ -289,4 +305,4 @@ def start(self):
except KeyboardInterrupt:
pass
# send exit event to all callbacks
asyncio.ensure_future(self.tick(Event(type=EventType.EXIT, target=None)))
asyncio.ensure_future(self.processEvent(Event(type=EventType.EXIT, target=None)))
47 changes: 47 additions & 0 deletions aat/core/order_book/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from abc import ABC, abstractmethod
from typing import List, Mapping, Optional

from ...config import Side
from ..models import Order


class OrderBookBase(ABC):
@abstractmethod
def reset(self):
pass

@abstractmethod
def add(self, order: Order) -> None:
pass

@abstractmethod
def cancel(self, order: Order) -> None:
pass

@abstractmethod
def change(self, order: Order) -> None:
pass

@abstractmethod
def find(self, order: Order) -> Optional[Order]:
pass

@abstractmethod
def topOfBook(self) -> Mapping[Side, List[float]]:
pass

@abstractmethod
def spread(self) -> float:
pass

@abstractmethod
def level(self, level: int = 0, price: float = None):
pass

@abstractmethod
def levels(self, levels=0):
pass

@abstractmethod
def __iter__(self):
pass
66 changes: 38 additions & 28 deletions aat/core/order_book/order_book.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Callable, List, Mapping, Optional

from .base import OrderBookBase
from .collector import _Collector
from .price_level import _PriceLevel
from .utils import _insort
from ..exchange import ExchangeType
from ..models import Order
from ...config import Side, OrderFlag, OrderType
from ...common import _in_cpp

Expand All @@ -17,7 +21,7 @@ def _make_cpp_orderbook(instrument, exchange_name='', callback=lambda x: print(x
return OrderBookCpp(instrument, exchange_name or ExchangeType(''), callback)


class OrderBook(object):
class OrderBook(OrderBookBase):
'''A limit order book.
Supports the following order types:
Expand Down Expand Up @@ -68,33 +72,40 @@ def __init__(self,

self._instrument = instrument
self._exchange_name = exchange_name or ExchangeType('')
self._callback = callback

# reset levels and collector
self.reset()

def reset(self) -> None:
'''reset the order book to its base state'''
# levels look like [10, 10.5, 11, 11.5]
self._buy_levels = []
self._sell_levels = []
self._buy_levels: List[float] = []
self._sell_levels: List[float] = []

# look like {price level: PriceLevel}
self._buys = {}
self._sells = {}
self._buys: Mapping[float, _PriceLevel] = {}
self._sells: Mapping[float, _PriceLevel] = {}

# setup collector for conditional orders
self._collector = _Collector(callback)
self._collector = _Collector(self._callback)

def setCallback(self, callback):
def setCallback(self, callback: Callable) -> None:
self._callback = callback
self._collector.setCallback(callback)

def _clearOrders(self, order, amount):
def _clearOrders(self, order: Order, amount: int) -> None:
'''internal'''
if order.side == Side.BUY:
self._sell_levels = self._sell_levels[amount:]
else:
self._buy_levels = self._buy_levels[:-amount] if amount else self._buy_levels

def _getTop(self, side, cleared):
def _getTop(self, side: Side, cleared: int):
'''internal'''
return (self._sell_levels[cleared] if len(self._sell_levels) > cleared else None) if side == Side.BUY else (self._buy_levels[-1 - cleared] if len(self._buy_levels) > cleared else None)

def add(self, order):
def add(self, order: Order) -> None:
'''add a new order to the order book, potentially triggering events:
EventType.TRADE: if this order crosses the book and fills orders
EventType.FILL: if this order crosses the book and fills orders
Expand Down Expand Up @@ -198,7 +209,7 @@ def add(self, order):
# limit order, put on books
if _insort(levels, order.price):
# new price level
prices[order.price] = _PriceLevel(order.price, collector=self._collector)
prices[order.price] = _PriceLevel(order.price, collector=self._collector) # type: ignore

# add order to price level
prices[order.price].add(order)
Expand Down Expand Up @@ -228,7 +239,7 @@ def add(self, order):
# limit order, put on books
if _insort(levels, order.price):
# new price level
prices[order.price] = _PriceLevel(order.price, collector=self._collector)
prices[order.price] = _PriceLevel(order.price, collector=self._collector) # type: ignore

# add order to price level
prices[order.price].add(order)
Expand Down Expand Up @@ -261,7 +272,7 @@ def add(self, order):
# limit order, put on books
if _insort(levels, order.price):
# new price level
prices[order.price] = _PriceLevel(order.price, collector=self._collector)
prices[order.price] = _PriceLevel(order.price, collector=self._collector) # type: ignore

# add order to price level
prices[order.price].add(order)
Expand All @@ -281,7 +292,7 @@ def add(self, order):
# limit order, put on books
if _insort(levels, order.price):
# new price level
prices[order.price] = _PriceLevel(order.price, collector=self._collector)
prices[order.price] = _PriceLevel(order.price, collector=self._collector) # type: ignore

# add order to price level
prices[order.price].add(order)
Expand Down Expand Up @@ -309,7 +320,7 @@ def add(self, order):
# clear the collector
self._collector.clear()

def change(self, order):
def change(self, order: Order) -> None:
'''modify an order on the order book, potentially triggering events:
EventType.CHANGE: the change event for this
Args:
Expand All @@ -328,7 +339,7 @@ def change(self, order):
# modify order in price level
prices[price].modify(order)

def cancel(self, order):
def cancel(self, order: Order) -> None:
'''remove an order from the order book, potentially triggering events:
EventType.CANCEL: the cancel event for this
Args:
Expand All @@ -352,7 +363,7 @@ def cancel(self, order):
if not prices[price]:
levels.remove(price)

def find(self, order):
def find(self, order: Order) -> Optional[Order]:
'''find an order in the order book
Args:
order (Data): order to find in orderbook
Expand All @@ -368,7 +379,7 @@ def find(self, order):
# find order from price level
return prices[price].find(order)

def topOfBook(self):
def topOfBook(self) -> Mapping[Side, List[float]]:
'''return top of both sides
Args:
Expand All @@ -379,7 +390,7 @@ def topOfBook(self):
return {Side.BUY: [self._buy_levels[-1], self._buys[self._buy_levels[-1]].volume()] if len(self._buy_levels) > 0 else [0, 0],
Side.SELL: [self._sell_levels[0], self._sells[self._sell_levels[0]].volume()] if len(self._sell_levels) > 0 else [float('inf'), 0]}

def spread(self):
def spread(self) -> float:
'''return the spread
Args:
Expand All @@ -388,7 +399,7 @@ def spread(self):
value (float): spread between bid and ask
'''
tob = self.topOfBook()
return tob[Side.SELL] - tob[Side.BUY]
return tob[Side.SELL][0] - tob[Side.BUY][0]

def level(self, level: int = 0, price: float = None):
'''return book level
Expand All @@ -397,16 +408,15 @@ def level(self, level: int = 0, price: float = None):
level (int): depth of book to return
price (float): price level to look for
Returns:
value (tuple): returns ask or bid if Side specified, otherwise ask,bid
value (tuple): returns ask, bid
'''
# collect bids and asks at `level`
if price is not None:
bid = self._buys[price] if price in self._buy_levels else None
ask = self._sells[price] if price in self._sell_levels else None
else:
bid = [self._buy_levels[-level - 1], self._buys[self._buy_levels[-level - 1]].volume()] if len(self._buy_levels) > level else [0.0, 0.0]
ask = [self._sell_levels[level], self._sells[self._sell_levels[level]].volume()] if len(self._sell_levels) > level else [0.0, 0.0]
return ask, bid
return self._sells[price] if price in self._sell_levels else None, \
self._buys[price] if price in self._buy_levels else None

return [self._sell_levels[level], self._sells[self._sell_levels[level]].volume()] if len(self._sell_levels) > level else [0.0, 0.0], \
[self._buy_levels[-level - 1], self._buys[self._buy_levels[-level - 1]].volume()] if len(self._buy_levels) > level else [0.0, 0.0]

def levels(self, levels=0):
'''return book levels starting at top
Expand All @@ -419,7 +429,7 @@ def levels(self, levels=0):
if levels <= 0:
return self.topOfBook()

ret = {}
ret: Mapping[Side, List[List[float]]] = {}
ret[Side.BUY] = []
ret[Side.SELL] = []
for _ in range(levels):
Expand Down

0 comments on commit 48c15cd

Please sign in to comment.