Skip to content

Commit

Permalink
fixes #38, fixes #39
Browse files Browse the repository at this point in the history
  • Loading branch information
timkpaine committed Jul 2, 2019
1 parent bb87a0c commit a626ab8
Show file tree
Hide file tree
Showing 19 changed files with 682 additions and 769 deletions.
61 changes: 59 additions & 2 deletions aat/exchanges/coinbase.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,71 @@
import json
from functools import lru_cache
from .utils.coinbase import CoinbaseMixins
from datetime import datetime
from ..enums import OrderSubType, PairType, TickType, TickType_from_string
from ..exchange import Exchange
from ..structs import MarketData, Instrument
from ..utils import parse_date, str_to_currency_pair_type, str_to_side, str_to_order_type


class CoinbaseExchange(CoinbaseMixins, Exchange):
class CoinbaseExchange(Exchange):
@lru_cache(None)
def subscription(self):
return [json.dumps({"type": "subscribe", "product_id": self.currencyPairToString(x)}) for x in self.options().currency_pairs]

@lru_cache(None)
def heartbeat(self):
return json.dumps({"type": "heartbeat", "on": True})

def tickToData(self, jsn: dict) -> MarketData:
if jsn.get('type') == 'received':
return

s = jsn.get('type')
reason = jsn.get('reason')
if s == 'match' or (s == 'done' and reason == 'filled'):
typ = TickType.TRADE
elif s in ('open', 'done', 'change', 'heartbeat'):
if reason == 'canceled':
typ = TickType.CANCEL
typ = TickType_from_string(s.upper())
else:
typ = TickType.ERROR

order_id = jsn.get('order_id', jsn.get('maker_order_id', ''))
time = parse_date(jsn.get('time')) if jsn.get('time') else datetime.now()
price = float(jsn.get('price', 'nan'))
volume = float(jsn.get('size', 'nan'))
currency_pair = str_to_currency_pair_type(jsn.get('product_id')) if typ != TickType.ERROR else PairType.NONE

instrument = Instrument(underlying=currency_pair)

order_type = str_to_order_type(jsn.get('order_type', ''))
side = str_to_side(jsn.get('side', ''))
remaining_volume = float(jsn.get('remaining_size', 0.0))

sequence = int(jsn.get('sequence', -1))
ret = MarketData(order_id=order_id,
time=time,
volume=volume,
price=price,
type=typ,
instrument=instrument,
remaining=remaining_volume,
side=side,
exchange=self.exchange(),
order_type=order_type,
sequence=sequence)
return ret

def tradeReqToParams(self, req) -> dict:
p = {}
p['price'] = str(req.price)
p['size'] = str(req.volume)
p['product_id'] = req.instrument.currency_pair.value[0].value + '-' + req.instrument.currency_pair.value[1].value
p['type'] = req.order_type.value.lower()

if req.order_sub_type == OrderSubType.FILL_OR_KILL:
p['time_in_force'] = 'FOK'
elif req.order_sub_type == OrderSubType.POST_ONLY:
p['post_only'] = '1'
return p
112 changes: 94 additions & 18 deletions aat/exchanges/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import time
from aiostream import stream
from functools import lru_cache
from .utils.gemini import GeminiMixins
from ..define import EXCHANGE_MARKET_DATA_ENDPOINT
from ..enums import TickType
from ..enums import TickType, OrderType, OrderSubType
from ..structs import MarketData
from ..exchange import Exchange
from ..logging import log


class GeminiExchange(GeminiMixins, Exchange):
class GeminiExchange(Exchange):
@lru_cache(None)
def subscription(self):
return [json.dumps({"type": "subscribe", "product_id": self.currencyPairToString(x)}) for x in self.options().currency_pairs]
Expand Down Expand Up @@ -42,7 +42,7 @@ async def run(self, engine) -> None:
# startup and redundancy
log.info('Starting....')
self.ws = [await session.ws_connect(EXCHANGE_MARKET_DATA_ENDPOINT(self.exchange(), options.trading_type) % x) for x in self.subscription()]
private_events = await session.ws_connect("wss://api.gemini.com/v1/order/events?eventTypeFilter=fill&eventTypeFilter=closed&apiSessionFilter=UI")
private_events = await session.ws_connect("wss://api.gemini.com/v1/order/events")
self.ws.append(private_events)

# set subscription for each ws
Expand Down Expand Up @@ -70,24 +70,100 @@ async def get_data_sub_pair(ws, sub=None):

# add one for private stream
async for val in stream.merge(*[get_data_sub_pair(self.ws[i], sub) for i, sub in enumerate(self.subscription() + [None])]):
jsn = json.loads(val[0].data)

if isinstance(jsn, dict) and 'events' in jsn:
events = jsn.get('events', [])
elif not isinstance(jsn, list):
events = [jsn]
else:
events = jsn

if val[1]:
# data stream
pair = json.loads(val[1]).get('product_id')
jsn = json.loads(val[0].data)
if jsn.get('type') == 'heartbeat':
else:
# private events
pair = None

for item in events:
if item.get('type', 'subscription_ack') in ('subscription_ack', 'heartbeat'):
continue
if item.get('type') == 'accepted':
# can ignore these as well
continue

if pair is None:
# private events
import ipdb; ipdb.set_trace()
pair = item['symbol']

item['symbol'] = pair
res = self.tickToData(item)

if not self._running:
pass
else:
for item in jsn.get('events'):
item['symbol'] = pair
res = self.tickToData(item)

if not self._running:
pass
if res.type != TickType.HEARTBEAT:
self.callback(res.type, res)

def tickToData(self, jsn: dict) -> MarketData:
order_id = jsn.get('order_id', '') or str(jsn.get('tid', ''))
time = datetime.now()
price = float(jsn.get('price', 'nan'))
volume = float(jsn.get('amount', 0.0))

if res.type != TickType.HEARTBEAT:
self.callback(res.type, res)
s = jsn.get('type')
if s in ('BLOCK_TRADE', ):
typ = TickType.TRADE
elif s in ('AUCTION_INDICATIVE', 'AUCTION_OPEN'):
typ = TickType.OPEN
else:
typ = TickType_from_string(s)
delta = float(jsn.get('delta', 0.0))

if typ == TickType.CHANGE and not volume:
delta = float(jsn.get('delta', 'nan'))
volume = delta
# typ = self.reasonToTradeType(reason)

side = str_to_side(jsn.get('side', ''))
remaining_volume = float(jsn.get('remaining', 'nan'))
sequence = -1

if 'symbol' not in jsn:
return

currency_pair = str_to_currency_pair_type(jsn.get('symbol'))
instrument = Instrument(underlying=currency_pair)

ret = MarketData(order_id=order_id,
time=time,
volume=volume,
price=price,
type=typ,
instrument=instrument,
remaining=remaining_volume,
side=side,
exchange=self.exchange(),
sequence=sequence)
return ret

def tradeReqToParams(self, req) -> dict:
p = {}
p['price'] = str(req.price)
p['size'] = str(req.volume)
p['product_id'] = req.instrument.currency_pair.value[0].value + req.instrument.currency_pair.value[1].value
p['type'] = req.order_type.value.lower()

if p['type'] == OrderType.MARKET:
if req.side == Side.BUY:
p['price'] = 100000000.0
else:
event = json.loads(val[0].data)
print(event)
if event.get('type', 'subscription_ack') in ('subscription_ack', 'heartbeat'):
continue
p['price'] = .00000001

if req.order_sub_type == OrderSubType.FILL_OR_KILL:
p['time_in_force'] = 'FOK'
elif req.order_sub_type == OrderSubType.POST_ONLY:
p['post_only'] = '1'
return p
14 changes: 2 additions & 12 deletions aat/exchanges/kraken.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import json
from functools import lru_cache
from .utils.kraken import KrakenMixins
from ..config import ExchangeConfig
from ..enums import ExchangeType, OrderType, PairType, TickType
from ..enums import ExchangeType
from ..exchange import Exchange
from ..structs import MarketData


class KrakenExchange(KrakenMixins, Exchange):
class KrakenExchange(Exchange):
def __init__(self, exchange_type: ExchangeType, options: ExchangeConfig) -> None:
super(KrakenExchange, self).__init__(exchange_type, options)
self._last = None
Expand All @@ -32,14 +31,5 @@ def heartbeat(self):
def tickToData(self, jsn: dict) -> MarketData:
raise NotImplementedError()

def strToTradeType(self, s: str) -> TickType:
raise NotImplementedError()

def tradeReqToParams(self, req) -> dict:
raise NotImplementedError()

def currencyPairToString(self, cur: PairType) -> str:
return cur.value[0].value + '/' + cur.value[1].value

def orderTypeToString(self, typ: OrderType) -> str:
raise NotImplementedError()

0 comments on commit a626ab8

Please sign in to comment.