Skip to content

Commit

Permalink
fixes #24
Browse files Browse the repository at this point in the history
  • Loading branch information
timkpaine committed Jul 2, 2019
1 parent f644b2d commit e2d32c9
Show file tree
Hide file tree
Showing 14 changed files with 346 additions and 382 deletions.
41 changes: 17 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Callback(metaclass=ABCMeta):
'''onOpen'''

@abstractmethod
def onFill(self, data: MarketData):
def onFill(self, resp: TradeResponse):
'''onFill'''

@abstractmethod
Expand Down Expand Up @@ -97,34 +97,28 @@ class BuyAndHoldStrategy(TradingStrategy):
super(BuyAndHoldStrategy, self).__init__()
self.bought = None

def onBuy(self, res: TradeResponse) -> None:
def onFill(self, res: TradeResponse) -> None:
self.bought = res
slog.info('d->g:bought %.2f @ %.2f' % (res.volume, res.price))

def onSell(self, res: TradeResponse) -> None:
pass
log.info('d->g:bought %.2f @ %.2f' % (res.volume, res.price))

def onTrade(self, data: MarketData) -> bool:
if self.bought is None:
req = TradeRequest(side=Side.BUY,
volume=1.0,
volume=1,
instrument=data.instrument,
order_type=OrderType.MARKET,
exchange=data.exchange,
price=data.price,
time=data.time)
slog.info("requesting buy : %s", req)
self.requestBuy(self.onBuy, req)

log.info("requesting buy : %s", req)
self.requestBuy(req)
self.bought = 'pending'
def onError(self, e) -> None:
elog.critical(e)

def onChange(self, data: MarketData) -> None:
pass

def onFill(self, data: MarketData) -> None:
pass

def onCancel(self, data: MarketData) -> None:
pass

Expand All @@ -134,19 +128,18 @@ class BuyAndHoldStrategy(TradingStrategy):

Trading strategies have a number of required methods for handling messages:

- onBuy
- onSell
- onTrade
- onChange
- onFill
- onCancel
- onError
- onOpen
- onReceived
- onTrade: Called when a trade occurs
- onChange: Called when an order is modified
- onFill: Called when a strategy's trade executes
- onCancel: Called when an order is cancelled
- onError: Called when an error occurs
- onOpen: Called when a new order occurs

There are other optional callbacks for more granular processing:
- onHalt
- onContinue
- onStart: Called when the program starts
- onHalt: Called when trading is halted
- onContinue: Called when trading continues
- onExit: Called when the program shuts down

There are also several optional callbacks for backtesting:

Expand Down
15 changes: 6 additions & 9 deletions aat/callback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABCMeta, abstractmethod
from .structs import MarketData
from .structs import MarketData, TradeResponse
from .logging import log


Expand All @@ -14,7 +14,7 @@ def onOpen(self, data: MarketData):
'''onOpen'''

@abstractmethod
def onFill(self, data: MarketData):
def onFill(self, resp: TradeResponse):
'''onFill'''

@abstractmethod
Expand Down Expand Up @@ -63,7 +63,7 @@ def onTrade(self, data: MarketData) -> None:
def onOpen(self, data: MarketData) -> None:
pass

def onFill(self, data: MarketData) -> None:
def onFill(self, resp: TradeResponse) -> None:
pass

def onCancel(self, data: MarketData) -> None:
Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self,
if not onOpen:
setattr(self, 'onOpen', False)
if not onFill:
setattr(self, 'onFilled', False)
setattr(self, 'onFill', False)
if not onCancel:
setattr(self, 'onCancelled', False)
if not onChange:
Expand All @@ -103,14 +103,11 @@ def __init__(self,
def onTrade(self, data: MarketData) -> None:
log.info(str(data))

def onReceived(self, data: MarketData) -> None:
log.info(str(data))

def onOpen(self, data: MarketData) -> None:
log.info(str(data))

def onFill(self, data: MarketData) -> None:
log.info(str(data))
def onFill(self, resp: TradeResponse) -> None:
log.info(str(resp))

def onCancel(self, data: MarketData) -> None:
log.info(str(data))
Expand Down
55 changes: 41 additions & 14 deletions aat/exchanges/gemini.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import aiohttp
import base64
import json
import hashlib
import hmac
import time
from aiostream import stream
from functools import lru_cache
from .utils.gemini import GeminiMixins
Expand All @@ -20,11 +24,26 @@ def heartbeat(self):

async def run(self, engine) -> None:
options = self.options()
session = aiohttp.ClientSession()
# private events
gemini_api_key = self.oe_client().apiKey
gemini_api_secret = self.oe_client().secret.encode()

payload = {"request": "/v1/order/events", "nonce": int(time.time() * 1000)}
encoded_payload = json.dumps(payload).encode()
b64 = base64.b64encode(encoded_payload)
signature = hmac.new(gemini_api_secret, b64, hashlib.sha384).hexdigest()

session = aiohttp.ClientSession(headers={
'X-GEMINI-PAYLOAD': b64.decode(),
'X-GEMINI-APIKEY': gemini_api_key,
'X-GEMINI-SIGNATURE': signature
})

# startup and redundancy
log.info('Starting....')
self.ws = [await session.ws_connect(EXCHANGE_MARKET_DATA_ENDPOINT(self.exchange(), options.trading_type) % x) for x in self.subscription()]
private_events = await session.ws_connect("wss://api.gemini.com/v1/order/events?eventTypeFilter=fill&eventTypeFilter=closed&apiSessionFilter=UI")
self.ws.append(private_events)

# set subscription for each ws
for i, sub in enumerate(self.subscription()):
Expand All @@ -49,18 +68,26 @@ async def get_data_sub_pair(ws, sub=None):
async for ret in ws:
yield ret, sub

async for val in stream.merge(*[get_data_sub_pair(self.ws[i], sub) for i, sub in enumerate(self.subscription())]):
pair = json.loads(val[1]).get('product_id')
jsn = json.loads(val[0].data)
if jsn.get('type') == 'heartbeat':
pass
else:
for item in jsn.get('events'):
item['symbol'] = pair
res = self.tickToData(item)
# add one for private stream
async for val in stream.merge(*[get_data_sub_pair(self.ws[i], sub) for i, sub in enumerate(self.subscription() + [None])]):
if val[1]:
# data stream
pair = json.loads(val[1]).get('product_id')
jsn = json.loads(val[0].data)
if jsn.get('type') == 'heartbeat':
pass
else:
for item in jsn.get('events'):
item['symbol'] = pair
res = self.tickToData(item)

if not self._running:
pass
if not self._running:
pass

if res.type != TickType.HEARTBEAT:
self.callback(res.type, res)
if res.type != TickType.HEARTBEAT:
self.callback(res.type, res)
else:
event = json.loads(val[0].data)
if event.get('type', 'subscription_ack') in ('subscription_ack', 'heartbeat'):
continue
import ipdb; ipdb.set_trace()
8 changes: 4 additions & 4 deletions aat/exchanges/utils/coinbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def tickToData(self, jsn: dict) -> MarketData:
if jsn.get('type') == 'received':
return
typ = self.strToTradeType(jsn.get('type'), jsn.get('reason', ''))
order_id = jsn.get('order_id', jsn.get('maker_order_id', ''))
time = parse_date(jsn.get('time')) if jsn.get('time') else datetime.now()
price = float(jsn.get('price', 'nan'))
volume = float(jsn.get('size', 'nan'))
Expand All @@ -21,7 +22,8 @@ def tickToData(self, jsn: dict) -> MarketData:
remaining_volume = float(jsn.get('remaining_size', 0.0))

sequence = int(jsn.get('sequence', -1))
ret = MarketData(time=time,
ret = MarketData(order_id=order_id,
time=time,
volume=volume,
price=price,
type=typ,
Expand All @@ -34,13 +36,11 @@ def tickToData(self, jsn: dict) -> MarketData:
return ret

def strToTradeType(self, s: str, reason: str = '') -> TickType:
if s == 'match':
if s == 'match' or (s == 'done' and reason == 'filled'):
return TickType.TRADE
elif s in ('open', 'done', 'change', 'heartbeat'):
if reason == 'canceled':
return TickType.CANCEL
elif reason == 'filled':
return TickType.FILL
return TickType_from_string(s.upper())
else:
return TickType.ERROR
Expand Down
6 changes: 5 additions & 1 deletion aat/exchanges/utils/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

class GeminiMixins(object):
def tickToData(self, jsn: dict) -> MarketData:
order_id = jsn.get('order_id', '')
if order_id:
import ipdb; ipdb.set_trace()
time = datetime.now()
price = float(jsn.get('price', 'nan'))
volume = float(jsn.get('amount', 0.0))
Expand All @@ -27,7 +30,8 @@ def tickToData(self, jsn: dict) -> MarketData:
currency_pair = str_to_currency_pair_type(jsn.get('symbol'))
instrument = Instrument(underlying=currency_pair)

ret = MarketData(time=time,
ret = MarketData(order_id=order_id,
time=time,
volume=volume,
price=price,
type=typ,
Expand Down
1 change: 1 addition & 0 deletions aat/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Execution(object):
def __init__(self, options: ExecutionConfig, exchanges: List[Exchange]) -> None:
self.trading_type = options.trading_type
self._exs = exchanges
self._pending = []

def requestBuy(self, req: TradeRequest) -> TradeResponse:
resp = self._exs[req.exchange].buy(req)
Expand Down
45 changes: 36 additions & 9 deletions aat/order_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from .enums import PairType, TradingType, CurrencyType, ExchangeType_to_string
from .exceptions import AATException
from .structs import TradeRequest, TradeResponse, Account, Instrument
from .utils import (get_keys_from_environment, str_to_currency_type,
from .utils import (get_keys_from_environment, str_to_currency_type, str_to_side,
exchange_type_to_ccxt_client, tradereq_to_ccxt_order,
findpath)
str_to_trade_result, parse_date, findpath)
# from .utils import elog as log


Expand Down Expand Up @@ -130,19 +130,46 @@ def orderBook(self, level=1):
def buy(self, req: TradeRequest) -> TradeResponse:
'''execute a buy order'''
params = tradereq_to_ccxt_order(req)
raise NotImplementedError()
self.oe_client().create_order(**params)
order = self.oe_client().create_order(**params)
resp = TradeResponse(request=req,
side=str_to_side(order['side']),
exchange=req.exchange,
volume=float(order['filled']),
price=float(order['price']),
instrument=req.instrument,
time=parse_date(order['datetime']),
status=str_to_trade_result(order['status']),
order_id=order['id'],
slippage=float(order['price']) - req.price,
transaction_cost=order['fee']['cost'],
remaining=order['remaining'])
return resp

def sell(self, req: TradeRequest) -> TradeResponse:
'''execute a sell order'''
params = tradereq_to_ccxt_order(req)
raise NotImplementedError()
self.oe_client().create_order(**params)
order = self.oe_client().create_order(**params)
resp = TradeResponse(request=req,
side=str_to_side(order['side']),
exchange=req.exchange,
volume=float(order['filled']),
price=float(order['price']),
instrument=req.instrument,
time=parse_date(order['datetime']),
status=str_to_trade_result(order['status']),
order_id=order['id'],
slippage=float(order['price']) - req.price,
transaction_cost=order['fee']['cost'],
remaining=order['remaining'])
return resp

def cancel(self, resp: TradeResponse):
params = tradereq_to_ccxt_order(resp)
raise NotImplementedError()
self.oe_client().cancel_order(**params)

def cancelAll(self, resp: TradeResponse):
return self.oe_client().cancel_all_orders()
def cancelAll(self, order_ids: List[str] = None):
if order_ids:
for order_id in order_ids:
self.oe_client().cancel_order(order_id)
else:
self.oe_client().cancel_all_orders()

0 comments on commit e2d32c9

Please sign in to comment.