Skip to content

Commit

Permalink
Merge pull request #150 from AsyncAlgoTrading/ibcancel
Browse files Browse the repository at this point in the history
synchronize cancel events for ib
  • Loading branch information
timkpaine committed Jan 27, 2021
2 parents 1dee11a + 4751eb7 commit 3186bd6
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 12 deletions.
15 changes: 12 additions & 3 deletions aat/engine/dispatch/order_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ async def onTraded(self, event: Event) -> None:
strategy, order = self._alerted_events[event]
# remove from list of open orders if done
if order.filled >= order.volume:
self._strategy_open_orders[strategy].remove(order)
try:
self._strategy_open_orders[strategy].remove(order)
except ValueError:
...
else:
strategy = None

Expand All @@ -244,7 +247,10 @@ async def onRejected(self, event: Event) -> None:
if event in self._alerted_events:
strategy, order = self._alerted_events[event]
# remove from list of open orders
self._strategy_open_orders[strategy].remove(order)
try:
self._strategy_open_orders[strategy].remove(order)
except ValueError:
...
else:
strategy = None

Expand All @@ -257,7 +263,10 @@ async def onCanceled(self, event: Event) -> None:
if event in self._alerted_events:
strategy, order = self._alerted_events[event]
# remove from list of open orders
self._strategy_open_orders[strategy].remove(order)
try:
self._strategy_open_orders[strategy].remove(order)
except ValueError:
...
else:
strategy = None

Expand Down
74 changes: 65 additions & 9 deletions aat/exchange/public/ib/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from queue import Queue
from random import randint
from typing import Dict, List, Tuple, AsyncGenerator, Any, Union
from typing import Any, AsyncGenerator, Dict, List, Set, Tuple, Union

from ibapi.client import EClient # type: ignore
from ibapi.commission_report import CommissionReport # type: ignore
Expand Down Expand Up @@ -227,6 +227,9 @@ def __init__(
self._order_cancelled_map: Dict[str, asyncio.Event] = {}
self._order_cancelled_res: Dict[str, bool] = {}

# track "finished" orders so we can ignore them
self._finished_orders: Set[str] = set()

# IB TWS gateway
self._order_event_queue: Queue[Dict[str, Union[str, int, float]]] = Queue()
self._market_data_queue: Queue[
Expand Down Expand Up @@ -298,6 +301,18 @@ async def lookup(self, instrument: Instrument) -> List[Instrument]:
async def subscribe(self, instrument: Instrument) -> None:
self._api.subscribeMarketData(instrument)

def _send_order_received(self, order: Order, ret: bool) -> None:
if order.id in self._order_received_map:
# cannot place order, return false
self._order_received_res[order.id] = False
self._order_received_map[order.id].set()

def _send_cancel_received(self, order: Order, ret: bool) -> None:
if order.id in self._order_cancelled_map:
# cannot cancel order, return false
self._order_cancelled_res[order.id] = False
self._order_cancelled_map[order.id].set()

async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
"""return data from exchange"""
while True:
Expand All @@ -313,19 +328,24 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
"PendingCancel",
"PreSubmitted",
"ApiCancelled",
"Inactive",
):
# ignore
continue

elif status in ("Inactive",):
self._send_order_received(order, False)
await asyncio.sleep(0)

self._send_cancel_received(order, False)
await asyncio.sleep(0)

elif status in ("Submitted",):
self._order_received_res[order.id] = True
self._order_received_map[order.id].set()
self._send_order_received(order, False)
await asyncio.sleep(0)

elif status in ("Cancelled",):
self._order_cancelled_res[order.id] = True
self._order_cancelled_map[order.id].set()
self._finished_orders.add(order.id)
self._send_cancel_received(order, False)
await asyncio.sleep(0)

elif status in ("Filled",):
Expand All @@ -341,9 +361,21 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
pass

elif status in ("Execution",):
# if submitted was skipped, clear out the wait
self._send_order_received(order, False)
await asyncio.sleep(0)

# if it was cancelled but already executed, clear out the wait
self._send_cancel_received(order, False)
await asyncio.sleep(0)

# set filled
order.filled = order_data["filled"]

# finish order if fully filled
if order.finished():
self._finished_orders.add(order.id)

# create trade object
t = Trade(
volume=order_data["filled"], # type: ignore
Expand Down Expand Up @@ -398,21 +430,28 @@ async def newOrder(self, order: AATOrder) -> bool:
For MarketData-only, can just return None
"""
# ignore if already finished
if order.id and order.id in self._finished_orders:
return False

# construct IB contract and order
ibcontract, iborder = _constructContractAndOrder(order)

# set event for later trigerring
_temp_id = str(self._api.nextOrderId)
self._order_received_map[_temp_id] = asyncio.Event()

# send to IB
id = self._api.placeOrder(ibcontract, iborder)

# update order id
order.id = id
self._orders[order.id] = order

# set event for later trigerring
self._order_received_map[id] = asyncio.Event()
await self._order_received_map[id].wait()
# wait for IB to respond
await self._order_received_map[_temp_id].wait()

# get result from IB
res = self._order_received_res[id]
del self._order_received_map[id]
del self._order_received_res[id]
Expand All @@ -423,7 +462,24 @@ async def cancelOrder(self, order: AATOrder) -> bool:
For MarketData-only, can just return None
"""
# ignore if order not sujbmitted yet
if not order.id:
return False

# ignore if already finished
if order.id and order.id in self._finished_orders:
return False

# set event for later trigerring
self._order_cancelled_map[order.id] = asyncio.Event()

# send to IB
self._api.cancelOrder(order)

# wait for IB to respond
await self._order_cancelled_map[order.id].wait()

# get result from IB
res = self._order_cancelled_res[order.id]

del self._order_cancelled_map[order.id]
Expand Down

0 comments on commit 3186bd6

Please sign in to comment.