Skip to content

Commit

Permalink
Merge pull request #157 from AsyncAlgoTrading/ibfix
Browse files Browse the repository at this point in the history
Fix for IB, periodic lockup
  • Loading branch information
timkpaine committed Feb 4, 2021
2 parents af6012a + 2dfc342 commit 0890ffb
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 66 deletions.
7 changes: 5 additions & 2 deletions aat/engine/dispatch/periodic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from datetime import datetime
from typing import Callable, Awaitable, List, Optional
from typing import Awaitable, Callable, List, Optional

from temporalcache.utils import should_expire # type: ignore


Expand Down Expand Up @@ -42,11 +43,13 @@ def stop(self) -> None:
self._continue = False

def expires(self, timestamp: datetime) -> bool:
if (timestamp - self._last).total_seconds() < 1:
return False
return should_expire(self._last, timestamp, self.second, self.minute, self.hour)

async def execute(self, timestamp: datetime) -> None:
if self.expires(timestamp):
await self._function(timestamp=timestamp)
asyncio.ensure_future(self._function(timestamp=timestamp))
self._last = timestamp


Expand Down
99 changes: 35 additions & 64 deletions aat/exchange/public/ib/ib.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import threading
from datetime import datetime
from queue import Queue
from queue import Empty, Queue
from random import randint
from typing import Any, AsyncGenerator, Dict, List, Set, Tuple, Union

Expand Down Expand Up @@ -312,44 +312,34 @@ async def lookup(self, instrument: Instrument) -> List[Instrument]:
async def subscribe(self, instrument: Instrument) -> None:
self._api.subscribeMarketData(instrument)

def _create_order_received(self, orderId: str) -> None:
# create new event
self._order_received_map[orderId] = asyncio.Event()

if orderId in self._order_received_res:
# already received a result, set immediately
self._order_received_map[orderId].set()

def _create_cancel_received(self, orderId: str) -> None:
# create new event
self._order_cancelled_map[orderId] = asyncio.Event()

if orderId in self._order_cancelled_res:
# already received a result, set immediately
self._order_cancelled_map[orderId].set()

def _send_order_received(self, order: Order, ret: bool) -> None:
def _send_order_received(self, orderId: str, ret: bool) -> None:
# set result
self._order_received_res[order.id] = ret

if order.id in self._order_received_map:
# if event waiting, set it
self._order_received_map[order.id].set()
self._order_received_res[orderId] = ret

def _send_cancel_received(self, order: Order, ret: bool) -> None:
def _send_cancel_received(self, orderId: str, ret: bool) -> None:
# set result
self._order_cancelled_res[order.id] = ret
self._order_cancelled_res[orderId] = ret

async def _consume_order_received(self, orderId: str) -> bool:
while orderId not in self._order_received_res:
await asyncio.sleep(0.1)
return self._order_received_res.pop(orderId)

if order.id in self._order_cancelled_map:
# if event waiting, set it
self._order_cancelled_map[order.id].set()
async def _consume_cancel_received(self, orderId: str) -> bool:
while orderId not in self._order_cancelled_res:
await asyncio.sleep(0.1)
return self._order_cancelled_res.pop(orderId)

async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
"""return data from exchange"""
while True:
# clear order events
while self._order_event_queue.qsize() > 0:
order_data = self._order_event_queue.get()
try:
order_data = self._order_event_queue.get_nowait()
except Empty:
await asyncio.sleep(0.1)
continue
status = order_data["status"]
order = self._orders[str(order_data["orderId"])]
if status in (
Expand All @@ -364,23 +354,21 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]

elif status in ("Inactive",):
self._finished_orders.add(order.id)
self._send_order_received(order, False)
await asyncio.sleep(0)
self._send_cancel_received(order, False)
await asyncio.sleep(0)
self._send_order_received(order.id, False)
self._send_cancel_received(order.id, False)

elif status in ("Rejected",):
self._finished_orders.add(order.id)
self._send_order_received(order, False)
self._send_order_received(order.id, False)
await asyncio.sleep(0)

elif status in ("Submitted",):
self._send_order_received(order, True)
self._send_order_received(order.id, True)
await asyncio.sleep(0)

elif status in ("Cancelled",):
self._finished_orders.add(order.id)
self._send_cancel_received(order, True)
self._send_cancel_received(order.id, True)
await asyncio.sleep(0)

elif status in ("Filled",):
Expand All @@ -403,6 +391,9 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
if order.finished():
self._finished_orders.add(order.id)

# if it was cancelled but already executed, clear out the wait
self._send_cancel_received(order.id, False)

# create trade object
t = Trade(
volume=order_data["filled"], # type: ignore
Expand All @@ -417,18 +408,16 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
e = Event(type=EventType.TRADE, target=t)

# if submitted was skipped, clear out the wait
self._send_order_received(order, True)
await asyncio.sleep(0)

# if it was cancelled but already executed, clear out the wait
self._send_cancel_received(order, False)
await asyncio.sleep(0)

self._send_order_received(order.id, True)
yield e

# clear market data events
while self._market_data_queue.qsize() > 0:
market_data = self._market_data_queue.get()
try:
market_data = self._market_data_queue.get_nowait()
except Empty:
await asyncio.sleep(0.1)
continue
instrument: Instrument = market_data["instrument"] # type: ignore
price: float = market_data["price"] # type: ignore
o = AATOrder(
Expand Down Expand Up @@ -474,9 +463,7 @@ async def newOrder(self, order: AATOrder) -> bool:
# construct IB contract and order
ibcontract, iborder = _constructContractAndOrder(order)

# set event for later trigerring
_temp_id = str(self._api.nextOrderId)
self._create_order_received(_temp_id)

# send to IB
id = self._api.placeOrder(ibcontract, iborder)
Expand All @@ -485,14 +472,8 @@ async def newOrder(self, order: AATOrder) -> bool:
order.id = id
self._orders[order.id] = order

# wait for IB to respond
await self._order_received_map[_temp_id].wait()

# get result from IB
res = self._order_received_res[id]
del self._order_received_map[id]
del self._order_received_res[id]
return res
return await self._consume_order_received(_temp_id)

async def cancelOrder(self, order: AATOrder) -> bool:
"""cancel a previously submitted order to the exchange.
Expand All @@ -507,18 +488,8 @@ async def cancelOrder(self, order: AATOrder) -> bool:
if order.id and order.id in self._finished_orders:
return False

# set event for later trigerring
self._create_cancel_received(order.id)

# send to IB
self._api.cancelOrder(order)

# wait for IB to respond
await self._order_cancelled_map[order.id].wait()

# get result from IB
res = self._order_cancelled_res[order.id]

del self._order_cancelled_map[order.id]
del self._order_cancelled_res[order.id]
return res
return await self._consume_cancel_received(order.id)

0 comments on commit 0890ffb

Please sign in to comment.