Skip to content

Commit

Permalink
Merge pull request #155 from AsyncAlgoTrading/ibfix
Browse files Browse the repository at this point in the history
fix some race conditions in ib
  • Loading branch information
timkpaine committed Feb 3, 2021
2 parents 1f73548 + 1869026 commit af6012a
Showing 1 changed file with 64 additions and 27 deletions.
91 changes: 64 additions & 27 deletions aat/exchange/public/ib/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,18 @@
from random import randint
from typing import Any, AsyncGenerator, Dict, List, Set, Tuple, Union

from aat.config import EventType, Side, TradingType
from aat.core import Event, ExchangeType, Instrument
from aat.core import Order as AATOrder
from aat.core import Position, Trade
from aat.exchange import Exchange
from ibapi.client import EClient # type: ignore
from ibapi.commission_report import CommissionReport # type: ignore
from ibapi.contract import Contract # type: ignore
from ibapi.execution import Execution, ExecutionFilter # type: ignore
from ibapi.order import Order # type: ignore
from ibapi.wrapper import EWrapper # type: ignore

from aat.exchange import Exchange
from aat.config import EventType, TradingType, Side
from aat.core import (
ExchangeType,
Event,
Trade,
Order as AATOrder,
Position,
Instrument,
)

from .utils import _constructContract, _constructContractAndOrder, _constructInstrument


Expand Down Expand Up @@ -160,6 +154,23 @@ def execDetailsEnd(self, reqId: int) -> None:
super().execDetailsEnd(reqId)
# TODO?

def error(self, reqId: int, errorCode: int, errorString: str) -> None:
super().error(reqId, errorCode, errorString)
if errorCode in (201,):
self._order_event_queue.put(
dict(
orderId=reqId,
status="Rejected",
)
)
elif errorCode in (202,):
self._order_event_queue.put(
dict(
orderId=reqId,
status="Cancelled",
)
)

def tickPrice(self, reqId: int, tickType: int, price: float, attrib: str) -> None:
# TODO implement more of order book

Expand Down Expand Up @@ -301,16 +312,36 @@ async def lookup(self, instrument: Instrument) -> List[Instrument]:
async def subscribe(self, instrument: Instrument) -> None:
self._api.subscribeMarketData(instrument)

def _create_order_received(self, orderId: str) -> None:
# create new event
self._order_received_map[orderId] = asyncio.Event()

if orderId in self._order_received_res:
# already received a result, set immediately
self._order_received_map[orderId].set()

def _create_cancel_received(self, orderId: str) -> None:
# create new event
self._order_cancelled_map[orderId] = asyncio.Event()

if orderId in self._order_cancelled_res:
# already received a result, set immediately
self._order_cancelled_map[orderId].set()

def _send_order_received(self, order: Order, ret: bool) -> None:
# set result
self._order_received_res[order.id] = ret

if order.id in self._order_received_map:
# cannot place order, return false
self._order_received_res[order.id] = ret
# if event waiting, set it
self._order_received_map[order.id].set()

def _send_cancel_received(self, order: Order, ret: bool) -> None:
# set result
self._order_cancelled_res[order.id] = ret

if order.id in self._order_cancelled_map:
# cannot cancel order, return false
self._order_cancelled_res[order.id] = False
# if event waiting, set it
self._order_cancelled_map[order.id].set()

async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
Expand All @@ -321,7 +352,6 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
order_data = self._order_event_queue.get()
status = order_data["status"]
order = self._orders[str(order_data["orderId"])]

if status in (
"ApiPending",
"PendingSubmit",
Expand All @@ -333,12 +363,17 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
continue

elif status in ("Inactive",):
self._finished_orders.add(order.id)
self._send_order_received(order, False)
await asyncio.sleep(0)

self._send_cancel_received(order, False)
await asyncio.sleep(0)

elif status in ("Rejected",):
self._finished_orders.add(order.id)
self._send_order_received(order, False)
await asyncio.sleep(0)

elif status in ("Submitted",):
self._send_order_received(order, True)
await asyncio.sleep(0)
Expand All @@ -361,14 +396,6 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
pass

elif status in ("Execution",):
# if submitted was skipped, clear out the wait
self._send_order_received(order, False)
await asyncio.sleep(0)

# if it was cancelled but already executed, clear out the wait
self._send_cancel_received(order, False)
await asyncio.sleep(0)

# set filled
order.filled = order_data["filled"]

Expand All @@ -388,6 +415,15 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
t.my_order = order

e = Event(type=EventType.TRADE, target=t)

# if submitted was skipped, clear out the wait
self._send_order_received(order, True)
await asyncio.sleep(0)

# if it was cancelled but already executed, clear out the wait
self._send_cancel_received(order, False)
await asyncio.sleep(0)

yield e

# clear market data events
Expand All @@ -401,6 +437,7 @@ async def tick(self) -> AsyncGenerator[Any, Event]: # type: ignore[override]
side=Side.BUY,
instrument=instrument,
exchange=self.exchange(),
filled=1,
)
t = Trade(volume=1, price=float(price), taker_order=o, maker_orders=[])
yield Event(type=EventType.TRADE, target=t)
Expand Down Expand Up @@ -439,7 +476,7 @@ async def newOrder(self, order: AATOrder) -> bool:

# set event for later trigerring
_temp_id = str(self._api.nextOrderId)
self._order_received_map[_temp_id] = asyncio.Event()
self._create_order_received(_temp_id)

# send to IB
id = self._api.placeOrder(ibcontract, iborder)
Expand Down Expand Up @@ -471,7 +508,7 @@ async def cancelOrder(self, order: AATOrder) -> bool:
return False

# set event for later trigerring
self._order_cancelled_map[order.id] = asyncio.Event()
self._create_cancel_received(order.id)

# send to IB
self._api.cancelOrder(order)
Expand Down

0 comments on commit af6012a

Please sign in to comment.