Skip to content

Commit

Permalink
Merge pull request #77 from AsyncAlgoTrading/fillroute
Browse files Browse the repository at this point in the history
fixes #70
  • Loading branch information
timkpaine committed Aug 11, 2020
2 parents df97c7f + bb420c0 commit b8e7314
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 41 deletions.
16 changes: 15 additions & 1 deletion aat/core/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,15 @@ def pushEvent(self, event):
'''push non-exchange event into the queue'''
self._queued_events.append(event)

def pushTargetedEvent(self, strategy, event):
'''push non-exchange event targeted to a specific strat into the queue'''
self._queued_targeted_events.append((strategy, event))

async def run(self):
'''run the engine'''
# setup future queue
self._queued_events = deque()
self._queued_targeted_events = deque()

# await all connections
await asyncio.gather(*(asyncio.create_task(exch.connect()) for exch in self.exchanges))
Expand All @@ -230,15 +235,24 @@ async def run(self):
event = self._queued_events.popleft()
await self.tick(event)

# process any secondary events
while self._queued_targeted_events:
strat, event = self._queued_targeted_events.popleft()
await self.tick(event, strat)

await self.tick(Event(type=EventType.EXIT, target=None))

async def tick(self, event):
async def tick(self, event, strategy=None):
'''send an event to all registered event handlers
Arguments:
event (Event): event to send
'''
for callback, handler in self._handler_subscriptions[event.type]:
# TODO make cleaner? move to somewhere not in critical path?
if strategy is not None and (handler not in (strategy, self.manager)):
continue

# TODO make cleaner? move to somewhere not in critical path?
if event.type in (EventType.TRADE, EventType.OPEN, EventType.CHANGE, EventType.CANCEL, EventType.DATA) and \
not self.manager.dataSubscriptions(handler, event):
Expand Down
11 changes: 6 additions & 5 deletions aat/core/engine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def _onBought(self, strategy, trade: Trade):

# push event to loop
ev = Event(type=Event.Types.BOUGHT, target=trade)
self._engine.pushEvent(ev)
self._engine.pushTargetedEvent(strategy, ev)

# synchronize state when engine processes this
self._alerted_events[ev] = (strategy, trade.my_order)
Expand All @@ -80,7 +80,7 @@ async def _onSold(self, strategy, trade: Trade):

# push event to loop
ev = Event(type=Event.Types.SOLD, target=trade)
self._engine.pushEvent(ev)
self._engine.pushTargetedEvent(strategy, ev)

# synchronize state when engine processes this
self._alerted_events[ev] = (strategy, trade.my_order)
Expand All @@ -95,7 +95,7 @@ async def _onRejected(self, strategy, order: Order):
'''
# push event to loop
ev = Event(type=Event.Types.REJECTED, target=order)
self._engine.pushEvent(ev)
self._engine.pushTargetedEvent(strategy, ev)

# synchronize state when engine processes this
self._alerted_events[ev] = (strategy, order)
Expand Down Expand Up @@ -320,14 +320,15 @@ def instruments(self, type: InstrumentType = None, exchange=None):
'''Return list of all available instruments'''
return Instrument._instrumentdb.instruments(type=type, exchange=exchange)

def subscribe(self, instrument=None, strategy=None):
async def subscribe(self, instrument=None, strategy=None):
'''Subscribe to market data for the given instrument'''
if strategy not in self._data_subscriptions:
self._data_subscriptions[strategy] = []

self._data_subscriptions[strategy].append(instrument)

for exc in self._exchanges:
exc.subscribe(instrument)
await exc.subscribe(instrument)

def dataSubscriptions(self, handler, event):
'''does handler subscribe to the data for event'''
Expand Down
24 changes: 13 additions & 11 deletions aat/core/risk/calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,23 +279,25 @@ def plotStdDev(self, ax, **plot_kwargs):
self._df_notional = self._getNotional()
self._df_notional.columns = [c.replace('n:', '') for c in self._df_notional.columns]

total_returns = self._df_notional.sum(axis=1).pct_change(1).fillna(0.0)
total_returns_rolling = total_returns.rolling(10)
total_returns_rolling.std().plot(ax=ax)
ax.axhline(total_returns.std())
ax.set_ylabel('Std.')
if not self._df_notional.empty:
total_returns = self._df_notional.sum(axis=1).pct_change(1).fillna(0.0)
total_returns_rolling = total_returns.rolling(10)
total_returns_rolling.std().plot(ax=ax)
ax.axhline(total_returns.std())
ax.set_ylabel('Std.')

def plotSharpe(self, ax, **plot_kwargs):
self._df_notional = self._getNotional()
self._df_notional.columns = [c.replace('n:', '') for c in self._df_notional.columns]

total_returns = self._df_notional.sum(axis=1).pct_change(1).fillna(0.0)
if not self._df_notional.empty:
total_returns = self._df_notional.sum(axis=1).pct_change(1).fillna(0.0)

sharpe = total_returns.values.mean() / total_returns.values.std() * np.sqrt(252)
total_returns['sharpe'] = total_returns.rolling(10).mean() / total_returns.rolling(10).std() * np.sqrt(252)
total_returns['sharpe'].plot(ax=ax)
ax.axhline(sharpe)
ax.set_ylabel('Sharpe')
sharpe = total_returns.values.mean() / total_returns.values.std() * np.sqrt(252)
total_returns['sharpe'] = total_returns.rolling(10).mean() / total_returns.rolling(10).std() * np.sqrt(252)
total_returns['sharpe'].plot(ax=ax)
ax.axhline(sharpe)
ax.set_ylabel('Sharpe')

def performanceCharts(self):
if not CalculationsMixin.__perf_charts:
Expand Down
2 changes: 2 additions & 0 deletions aat/exchange/public/iex.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ def _callback(record):
df = df[['close', 'volume']]
df.columns = ['close:{}'.format(i.name), 'volume:{}'.format(i.name)]
dfs.append(df)

data = pd.concat(dfs, axis=1)
data.sort_index(inplace=True)
data = data.groupby(data.index).last()
data.drop_duplicates(inplace=True)
data.fillna(method='ffill', inplace=True)

else:
for i in tqdm(self._subscriptions, desc="Fetching data..."):
date = self._start_date
Expand Down
43 changes: 20 additions & 23 deletions aat/strategy/sample/buy_and_hold_iex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,43 @@


class BuyAndHoldIEXStrategy(Strategy):
def __init__(self, *args, **kwargs) -> None:
def __init__(self, symbol, *args, **kwargs) -> None:
super(BuyAndHoldIEXStrategy, self).__init__(*args, **kwargs)
self._symbol = symbol

async def onStart(self, event: Event) -> None:
# Get available instruments from exchange
insts = self.instruments()
print('Available Instruments:\n{}'.format(insts))
# Create an instrument
inst = Instrument(name=self._symbol, type=InstrumentType.EQUITY)

for name in ('FB', 'AMZN', 'NFLX', 'GOOG', 'MSFT', 'AAPL', 'NVDA'):
# Create an instrument
inst = Instrument(name=name, type=InstrumentType.EQUITY)
# Check that its available
if inst not in self.instruments():
raise Exception('Not available on exchange: {}'.format(self._symbol))

# Check that its available
if inst not in insts:
raise Exception('Not available on exchange: {}'.format(name))

# Subscribe
self.subscribe(inst)
print('Subscribing to {}'.format(inst))
# Subscribe
await self.subscribe(inst)
print('Subscribing to {}'.format(inst))

async def onTrade(self, event: Event) -> None:
'''Called whenever a `Trade` event is received'''
trade: Trade = event.target # type: ignore

# no past trades, no current orders
if not self.orders(trade.instrument) and not self.trades(trade.instrument):
req = Order(side=Side.BUY,
price=trade.price,
volume=5000 // trade.price,
instrument=trade.instrument,
order_type=Order.Types.MARKET,
exchange=trade.exchange)
self._order = Order(side=Side.BUY,
price=trade.price,
volume=5000 // trade.price,
instrument=trade.instrument,
order_type=Order.Types.MARKET,
exchange=trade.exchange)

print('requesting buy : {}'.format(req))
print('requesting buy : {}'.format(self._order))

await self.newOrder(req)
await self.newOrder(self._order)

async def onBought(self, event: Event) -> None:
trade: Trade = event.target # type: ignore
print('bought {:.2f} @ {:.2f}'.format(trade.volume, trade.price))
print('bought {} {:.2f} @ {:.2f}'.format(trade.instrument, trade.volume, trade.price))
assert trade.my_order == self._order

async def onRejected(self, event: Event) -> None:
print('order rejected')
Expand Down
8 changes: 7 additions & 1 deletion config/iex.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ exchanges=

[strategy]
strategies =
aat.strategy.sample:BuyAndHoldIEXStrategy
aat.strategy.sample:BuyAndHoldIEXStrategy,FB
aat.strategy.sample:BuyAndHoldIEXStrategy,AMZN
aat.strategy.sample:BuyAndHoldIEXStrategy,NFLX
aat.strategy.sample:BuyAndHoldIEXStrategy,GOOG
aat.strategy.sample:BuyAndHoldIEXStrategy,MSFT
aat.strategy.sample:BuyAndHoldIEXStrategy,AAPL
aat.strategy.sample:BuyAndHoldIEXStrategy,NVDA

0 comments on commit b8e7314

Please sign in to comment.