Skip to content

Commit

Permalink
Merge pull request #87 from AsyncAlgoTrading/ibdata
Browse files Browse the repository at this point in the history
Ibdata
  • Loading branch information
timkpaine committed Sep 7, 2020
2 parents 226c3d9 + 3e13c46 commit c54646c
Show file tree
Hide file tree
Showing 22 changed files with 456 additions and 202 deletions.
4 changes: 1 addition & 3 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ include aat/*.dylib
graft aat/tests

# C++ build
graft cpp
graft cmake
include CMakeLists.txt
graft aat/cpp

# Patterns to exclude from any directory
global-exclude *~
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PYTHON=python3.7
PYTHON=python
CONFIG=./config/synthetic.cfg


Expand Down Expand Up @@ -100,7 +100,7 @@ docs: ## Build the sphinx docs
dist: ## dist to pypi
rm -rf dist build
$(PYTHON) setup.py sdist bdist_wheel
twine check dist/* && twine upload dist/*
$(PYTHON) -m twine check dist/* && twine upload dist/*

clean: ## clean the repository
find . -name "__pycache__" | xargs rm -rf
Expand Down
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,22 +238,32 @@ There are several callbacks for order entry:
- onRejected: called when a strategy's order is rejected
- onCanceled: called when a strategy's order is canceled

There are several methods for order entry and data subscriptions:
There are several methods for order entry and data subscriptions...

- subscribe: subscribe to an instrument/exchange data
- instruments: get available instruments
- exchanges: get available exchanges
- lookup: lookup an instrument on the exchange
- newOrder: submit a new order
- buy (alias of newOrder): submit a new order
- sell (alias of newOrder): submit a new order
- orders: get open orders
- pastOrders: get past orders
- trades: get past trades
- positions: get position informatino

... several helpers for analyzing positions and risk ...

- accounts: get account information
- positions: get position information
- risk: get risk information

... and some general utility methods ...

- tradingType: get the trading type of the runtime
- now: get current time as of engine (`datetime.now` when running in realtime)
- loop: get the event loop for the engine

There are also several optional callbacks for backtesting:
... and some optional simulators for backtesting.

- slippage
- transactionCost
Expand Down
5 changes: 4 additions & 1 deletion aat/core/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,14 @@ async def run(self):
event = self._queued_events.popleft()
await self.tick(event)

# process any secondary events
# process any secondary callback-targeted events (e.g. order fills)
while self._queued_targeted_events:
strat, event = self._queued_targeted_events.popleft()
await self.tick(event, strat)

# process any periodics
await asyncio.gather(*(asyncio.create_task(p.execute(self._latest)) for p in self.manager._periodics))

await self.tick(Event(type=EventType.EXIT, target=None))

async def tick(self, event, strategy=None):
Expand Down
1 change: 1 addition & 0 deletions aat/core/engine/manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .manager import StrategyManager # noqa: F401
99 changes: 99 additions & 0 deletions aat/core/engine/manager/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import sys
import traceback

from typing import List

from .order_entry import StrategyManagerOrderEntryMixin
from .risk import StrategyManagerRiskMixin
from .utils import StrategyManagerUtilsMixin

from aat.core.handler import EventHandler
from aat.exchange import Exchange


class StrategyManager(StrategyManagerOrderEntryMixin, StrategyManagerRiskMixin, StrategyManagerUtilsMixin, EventHandler):
def __init__(self, trading_engine, trading_type, exchanges: List[Exchange]):
'''The Manager sits between the strategies and the engine and manages state'''
# store trading engine
self._engine = trading_engine

# store the exchanges
self._exchanges = exchanges

# pull from trading engine class
self._risk_mgr = self._engine.risk_manager
self._order_mgr = self._engine.order_manager

# install self for callbacks
self._risk_mgr._setManager(self)
self._order_mgr._setManager(self)

# add exchanges for order manager
for exc in exchanges:
self._order_mgr.addExchange(exc)

# initialize event subscriptions
self._data_subscriptions = {} # type: ignore

# initialize order and trade tracking
self._strategy_open_orders = {}
self._strategy_past_orders = {}
self._strategy_trades = {}

# internal use for synchronizing
self._alerted_events = {}

# internal use for periodics
self._periodics = []

# ********************* #
# EventHandler methods *
# **********************
async def onTrade(self, event):
await self._risk_mgr.onTrade(event)
await self._order_mgr.onTrade(event)

async def onOpen(self, event):
await self._risk_mgr.onOpen(event)
await self._order_mgr.onOpen(event)

async def onCancel(self, event):
await self._risk_mgr.onCancel(event)
await self._order_mgr.onCancel(event)

async def onChange(self, event):
await self._risk_mgr.onChange(event)
await self._order_mgr.onChange(event)

async def onFill(self, event):
await self._risk_mgr.onFill(event)
await self._order_mgr.onFill(event)

async def onHalt(self, event):
await self._risk_mgr.onHalt(event)
await self._order_mgr.onHalt(event)

async def onContinue(self, event):
await self._risk_mgr.onContinue(event)
await self._order_mgr.onContinue(event)

async def onData(self, event):
# TODO
await self._risk_mgr.onData(event)
await self._order_mgr.onData(event)

async def onError(self, event):
# TODO
print('\n\nA Fatal Error has occurred:')
traceback.print_exception(type(event.target.exception), event.target.exception, event.target.exception.__traceback__)
sys.exit(1)

async def onExit(self, event):
# TODO
await self._risk_mgr.onExit(event)
await self._order_mgr.onExit(event)

async def onStart(self, event):
# TODO
await self._risk_mgr.onStart(event)
await self._order_mgr.onStart(event)
167 changes: 17 additions & 150 deletions aat/core/engine/manager.py → aat/core/engine/manager/order_entry.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,25 @@
import sys
import traceback
from typing import List, TYPE_CHECKING

from ..models import Event, Order, Trade
from ..instrument import Instrument
from ..exchange import ExchangeType
from ..handler import EventHandler
from ...config import Side, InstrumentType
from aat.core import Instrument, ExchangeType, Event, Order, Trade
from aat.core.risk import RiskManager
from aat.core.execution import OrderManager
from aat.config import Side
from aat.exchange import Exchange

if TYPE_CHECKING:
from aat.core import TradingEngine

class StrategyManager(EventHandler):
def __init__(self, trading_engine, trading_type, exchanges):
'''The Manager sits between the strategies and the engine and manages state'''
# store trading engine
self._engine = trading_engine

# store the exchanges
self._exchanges = exchanges
class StrategyManagerOrderEntryMixin(object):
_engine: 'TradingEngine'
_exchanges: List[Exchange]
_strategy_trades: dict
_strategy_open_orders: dict
_strategy_past_orders: dict
_alerted_events: dict
_risk_mgr: RiskManager
_order_mgr: OrderManager

# pull from trading engine class
self._risk_mgr = self._engine.risk_manager
self._order_mgr = self._engine.order_manager

# install self for callbacks
self._risk_mgr._setManager(self)
self._order_mgr._setManager(self)

# add exchanges for order manager
for exc in exchanges:
self._order_mgr.addExchange(exc)

# initialize event subscriptions
self._data_subscriptions = {}

# initialize order and trade tracking
self._strategy_open_orders = {}
self._strategy_past_orders = {}
self._strategy_trades = {}

# internal use for synchronizing
self._alerted_events = {}

# ********************* #
# Engine facing methods #
# ********************* #

# *********************** #
# Strategy facing methods #
# *********************** #
#####################
# Order Entry Hooks #
#####################
Expand Down Expand Up @@ -208,70 +181,6 @@ def trades(self, strategy, instrument: Instrument = None, exchange: ExchangeType
ret = [r for r in ret if r.side == side]
return ret

# *********************
# Risk Methods *
# *********************
def positions(self, instrument=None, exchange=None, side=None):
return self._risk_mgr.positions(instrument=instrument, exchange=exchange, side=side)

def risk(self, position=None):
return self._risk_mgr.risk(position=position)

def priceHistory(self, instrument=None):
return self._risk_mgr.priceHistory(instrument=instrument)

# **********************
# EventHandler methods *
# **********************
async def onTrade(self, event):
await self._risk_mgr.onTrade(event)
await self._order_mgr.onTrade(event)

async def onOpen(self, event):
await self._risk_mgr.onOpen(event)
await self._order_mgr.onOpen(event)

async def onCancel(self, event):
await self._risk_mgr.onCancel(event)
await self._order_mgr.onCancel(event)

async def onChange(self, event):
await self._risk_mgr.onChange(event)
await self._order_mgr.onChange(event)

async def onFill(self, event):
await self._risk_mgr.onFill(event)
await self._order_mgr.onFill(event)

async def onHalt(self, event):
await self._risk_mgr.onHalt(event)
await self._order_mgr.onHalt(event)

async def onContinue(self, event):
await self._risk_mgr.onContinue(event)
await self._order_mgr.onContinue(event)

async def onData(self, event):
# TODO
await self._risk_mgr.onData(event)
await self._order_mgr.onData(event)

async def onError(self, event):
# TODO
print('\n\nA Fatal Error has occurred:')
traceback.print_exception(type(event.target.exception), event.target.exception, event.target.exception.__traceback__)
sys.exit(1)

async def onExit(self, event):
# TODO
await self._risk_mgr.onExit(event)
await self._order_mgr.onExit(event)

async def onStart(self, event):
# TODO
await self._risk_mgr.onStart(event)
await self._order_mgr.onStart(event)

#########################
# Order Entry Callbacks #
#########################
Expand Down Expand Up @@ -307,45 +216,3 @@ async def onCanceled(self, event: Event):
strategy, order = self._alerted_events[event]
# remove from list of open orders
self._strategy_open_orders[strategy].remove(order)

#################
# Other Methods #
#################
def now(self):
'''Return the current datetime. Useful to avoid code changes between
live trading and backtesting. Defaults to `datetime.now`'''
return self._engine.now()

def instruments(self, type: InstrumentType = None, exchange=None):
'''Return list of all available instruments'''
return Instrument._instrumentdb.instruments(type=type, exchange=exchange)

async def subscribe(self, instrument=None, strategy=None):
'''Subscribe to market data for the given instrument'''
if strategy not in self._data_subscriptions:
self._data_subscriptions[strategy] = []

self._data_subscriptions[strategy].append(instrument)

for exc in self._exchanges:
await exc.subscribe(instrument)

def dataSubscriptions(self, handler, event):
'''does handler subscribe to the data for event'''
if handler not in self._data_subscriptions:
# subscribe all by default
return True
return event.target.instrument in self._data_subscriptions[handler]

async def lookup(self, instrument: Instrument, exchange=None):
'''Return list of all available instruments that match the instrument given'''
if exchange in self._exchanges:
return await self._exchanges.lookup(instrument)
elif exchange is None:
ret = []
for exchange in self._exchanges:
ret.extend(await exchange.lookup(instrument))
return ret

# None implement
raise NotImplementedError()
22 changes: 22 additions & 0 deletions aat/core/engine/manager/periodic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Callable, Awaitable
from temporalcache.utils import should_expire # type: ignore


class Periodic(object):
def __init__(self, loop, last_ts, function, second, minute, hour):
self._loop = loop
self._function: Callable[Awaitable[None]] = function
self._second = second
self._minute = minute
self._hour = hour

self._last = last_ts
self._continue = True

def stop(self) -> None:
self._continue = False

async def execute(self, timestamp):
if should_expire(self._last, timestamp, self._second, self._minute, self._hour):
await self._function()
self._last = timestamp

0 comments on commit c54646c

Please sign in to comment.