## bitMEX trading test

目标：跑通bitMEX交易

策略：专供测试交易系统的随机策略RandomTargetPostionStrategy：每隔30s发出一个随机的SignalEvent。

In [13]:
import logging
import time
import queue

In [None]:
class NaivePortfolio(object):
    """简单CTA策略的组合管理器
    
    将各个CTA子策略的signal汇总，计算汇总的target_position， (目前是单一identifier)
    当实际仓位与理论仓位不一致时发出OrderEvent，并处理FillEvent。
    """
    
    def __init__(self, event_queue, is_test=True):
        
        self.is_test = True   # 是否测试
        self.logger = logging.getLogger()   # 日志
        
        self.event_queue = event_queue
        
        self.symbol = 'XBTUSD'
        self.lots = 20
        
        # 目标仓位。
        # 当实际仓位与目标仓位不一致时，会触发下单逻辑。
        self.target_position = 0
        
        # 实际仓位。
        # 1. 初始化时会主动查询 self.query_positon()  
        # 2. 收到成交回报时会更新。  -> 如果程序开着时手动下单，那么程序会收到成交回报，把仓位调回到目标仓位。
        self.actual_position = None         
        
    def onSignalEvent(self, signalEvent):
        """
        signalEvent Handler
        
        signalEvent格式:
        {
            identifier: 'Turtle_XBTUSD_1m_9999',
            symbol: 'XBTUSD',
            direction: -1,
        }
        """
        self.logger.info('signalEvent: %s' % signalEvent)
        self._signal_event_handler(signalEvent)   # 处理signal_event
        
    def _signal_event_handler(signalEvent):
        if not signalEvent.get('symbol') == self.symbol:
            self.logger.error('signalEvent.symbol != bitmexTrader.symbol')
            return 
        if signalEvent.get('direction') in (0, -1, 1):
            self.logger.error('invalid signalEvent.direction: %s')
            return
        self.target_position = signalEvent['direction'] * self.lots   # 设定 target_position
        self._trade_to_target()   # 交易
        
    def _trade_to_target(self):
        position_diff = self.target_position - self.actual_position
        if position_diff > 0:
            direction = 'buy'
            limit_price = self.ask1  # 对价下单
        elif position_diff < 0:
            direction = 'sell'
            limit_price = self.bid1   # 对价下单
        else:
            self.logger.info('target==actual, no need to trade')
            return 0
        symbol=self.symbol
        volume=abs(position_diff)
        order_event = orderEvent(symbol, direction, volume, limit_price)   # 构造Order
        
        # TODO: 如果有未执行的委托单怎么办？
        self.event_queue.put(order_event)

重新理清思路：

portfolio 只负责维护 target_position. 发出 target_position_event -> 提示target_position可能有变化。
它的主要功能后续应该是汇总各个子策略的仓位。

executor 负责维护actual_position. 处理发单、撤单、查询持仓。

第一阶段任务，先以最简单的方式，跑通bitMEX实盘交易：
- portofolio线程: 随机生成 target_position 放入队列
- executor线程: 循环从队列中取事件，交易

In [16]:
import threading
import random
import queue


event_q = queue.Queue()  # 事件队列


def generate_random_target_positon(event_queue):
    """随机生成target_position的函数"""
    e = {
        'etype': 'TARGET_POSITION',
        'target_position': random.sample((0, -1, 1), k=1)[0]
    }
    event_queue.put(e)


# 开一个线程，专门生成 target_position event
portfolio_td = threading.Thread(target=generate_random_target_positon, args=(event_q))
portfolio_td.start()

# 另开一个线程，跑executor
bitmex_executor = bitmexExecutor()
executor_td = threading.Thread(target=bitmex_executor.run)
executor_td.start()

# 主程序
while True:
    try:
        event = self.event_queue.get_nowait()
    except queue.Empty:
        time.sleep(0.5)
    else:
        if event.etype == 'TARGET_POSITION':
            bitmex_executor.onTargetPositionEvent(event)   # 交易

In [40]:
class bitmexExecutor(object):
    """bitmex交易执行器"""
    
    def __init__(self):
        print('init bitmexExecutor')
        self.actual_position = self._query_actual_position()   # to-write   
        
    def run(self):
        pass
        
    def onTargetPositionEvent(self):
        pass
    
    def _query_actual_position(self):
        pass    ### 
    
    def _order(self, symbol, side, volume, limit_price=None):
        pass    ### 
        

## Demo of placing an order

**BitMEX 不支持通过 WebSocket 提交或取消委托，这些操作只能通过 HTTP 进行。**

我们的服务器支持保持 HTTP 连接和缓存的 SSL 会话。 如果你保持一个有效连接，你会得到与 websocket 类似的延迟，而无须使用 websocket 进行沟通。

我们保持活动状态的超时时间为90秒。

## bitmexREST

## BitMEX WebSocket

copy form https://github.com/BitMEX/api-connectors/ and https://github.com/BitMEX/sample-market-maker

## bitmex TargetPosition-Based OMS

TODO: seperate bitmex gateway and TargetPosition-Based OMS

- bitmexWS
    + 行情（无需验证）
    + 交易。实时持仓情况、下单回报、成交回报
- bitmexREST
    + 下单
    + 主动持仓查询。仅用于定时校验。
    + 查历史行情（无需验证）。仅用于策略程序启动时回看。
    

OMS has three key member：

- `bm_ws_market`  subclass of `bitmexWS`. 【行情】收行情，包括 trades, orderbook, (funding, markprice etc.)
- `bm_ws_trading` subclass of `bitmexWS`. 【交易】监听下单回报、成交回报、持仓变动 （保证金）
- `bm_rest` instance of `bitmexREST`. 下单函数 `place_order()`，查询函数`get_positions()`


`bm_ws_market` subscribe `trade:XBTUSD` and override `onData()`, which do 2 things: 
1. upate the 4 members  (members: last_price, bar, prev_bar, order_book etc.)
2. generate MARKET_EVENT when needed(eg. on_bar_close)

`bm_ws_trading` subscribe `order, position, execution` and override `onData()`, which 
1. update 2 members (unfilled_order, actual_position)
2. generate ORDER_EVENT, FILL_EVENT

`bm_rest` is used to `place_order()`, do not generate or listen to or handle any event. Another function is query and check positions from WS.

and one important function `trade_to_target()`, which make order placing and/or order canceling decision.

> 要不这样：     
> 把bm_ws_market和bm_rest的一个实例打包成 `bitmexDataHandler`    
> 把 bm_ws_trading和bm_rest的另一个实例打包成  `bitmexTrader`

In [4]:
from bitmexWS import bitmexWS
from bitmexREST import bitmexREST
from utils import generate_logger

In [5]:
#from qsEvent import MarketEvent
#from qsObject import Bar
#from utils import calculate_ts

class MarketEvent(object):
    def __init__(self, data):
        self.etype='MARKET'
        self.data = data
       
    
class Bar(object):
    
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)
    
    def __repr__(self):
        return self.__dict__.__repr__()
    
    
def calculate_ts(timestamp, bar_type='1m'):
    """bitmex-timestamp to ts. eg 2018-09-29T06:00:17.271Z -> 20180929060017"""
    return timestamp[:16].replace('T', ' ')  # TODO: now only allow '1m'

In [6]:
class bitmexWSMarket(bitmexWS):
    """bitmexWS subscribing market data"""
    
    def addEventQueue(self, q):
        self.eventQueue = q

    def subscribe(self, symbol='XBTUSD', bar_type='1m', subscribe_tick=True):
        self.symbol = symbol
        self.bar_type = bar_type
        self.subscribe_tick = subscribe_tick
        self.subscribe_topic('trade:%s' % symbol)  # TODO: subscribe more topic, such as order book
        self._got_partial = False
        self.last_price = None
        
    def onData(self, msg):
        if self._got_partial:
            self.logger.debug('get new data: \n%s' % msg)
            self._handle_data(msg['data'])   # msg['data'] is a list of dict
        else:
            if msg['action'] == 'partial':
                self._got_partial = True
                self._handle_init_data(msg['data'])
                self.logger.debug('Got partial.')
            elif msg['action'] == 'insert':
                self.logger.debug('drop data before partial')
    
    def _handle_init_data(self, data):
        tick_d = data[-1]
        ts = calculate_ts(tick_d['timestamp'], self.bar_type)
        tick_price = tick_d['price']
        init_bar = Bar(open=tick_price, high=tick_price, low=tick_price, ts=ts)
        self.current_bar = init_bar
        
        self.logger.debug('init_bar: %s' % init_bar)
    
    def _handle_data(self, data):
        # data is list of dict: [{}, {}]
        # eg 'data': [{'symbol': 'XBTUSD', 'tickDirection': 'PlusTick', 'timestamp': '2018-09-29T06:00:17.271Z', 'price': 6484.5, 'trdMatchID': '87bd9b19-6747-c804-3eae-7a84fc7abcf8', 'foreignNotional': 30, 'grossValue': 462630, 'homeNotional': 0.0046263, 'side': 'Buy', 'size': 30}]
        for tick_d in data:
            self._on_tick(tick_d)
    
    def _on_tick(self, tick_d):
        tick_price = tick_d['price']
        
        # 更新last_price
        self.last_tick_price = self.last_price   # move:self.last_price  -> self.last_tick_price
        self.last_price = tick_price   # **current**_tick_price
        
        # bar-generator
        ts = calculate_ts(tick_d['timestamp'], self.bar_type)  # 'timestamp': '2018-09-29T06:00:17.271Z'
        if ts > self.current_bar.ts:
            # bar_close event
            self.current_bar.close = self.last_tick_price            
            self.prev_bar = self.current_bar
            self.current_bar = Bar(open=tick_price, high=tick_price, low=tick_price, ts=ts)
            
            bar_close_event = MarketEvent(data={'type': 'BAR_CLOSE'})
            self.eventQueue.put(bar_close_event)
            self.logger.debug('bar_close event. prev_bar is %s' % self.prev_bar)
            
            
            # bar_open event
            bar_open_event = MarketEvent(data={'type': 'BAR_OPEN'})
            self.eventQueue.put(bar_open_event)
            self.logger.debug('bar_open event. current_bar is %s' % self.current_bar)
        else:
            self.current_bar.high = max(self.current_bar.high, tick_price)
            self.current_bar.low = min(self.current_bar.low, tick_price)
        # tick event    
        if self.subscribe_tick:
            tick_event = MarketEvent(data={'type':'TICK'})
            self.eventQueue.put(tick_event)
            self.logger.debug('tick event: %s' % tick_d)
        
import queue

events = queue.Queue()
        
bm_ws_market = bitmexWSMarket()
bm_ws_market.addEventQueue(events)
bm_ws_market.connect()
bm_ws_market.subscribe('XBTUSD')

[2018-09-29 16:21:02 bitmexWS INFO] ws thread start
[2018-09-29 16:21:03 bitmexWS DEBUG] Calling ws.__on_open()
[2018-09-29 16:21:03 bitmexWS INFO] Successful connected to BitMEX WebSocket API
[2018-09-29 16:21:03 bitmexWS DEBUG] >>> send ping
[2018-09-29 16:21:04 bitmexWS INFO] Subscribe to trade:XBTUSD
[2018-09-29 16:21:04 bitmexWS DEBUG] init_bar: {'high': 6521.5, 'open': 6521.5, 'low': 6521.5, 'ts': '2018-09-29 08:21'}
[2018-09-29 16:21:04 bitmexWS DEBUG] Got partial.
[2018-09-29 16:21:06 bitmexWS DEBUG] get new data: 
{'action': 'insert', 'data': [{'homeNotional': 0.0030672, 'side': 'Buy', 'tickDirection': 'MinusTick', 'foreignNotional': 20, 'symbol': 'XBTUSD', 'grossValue': 306720, 'trdMatchID': 'd77a6456-b56f-fde9-4d88-8079bec47e59', 'timestamp': '2018-09-29T08:21:05.160Z', 'price': 6520.5, 'size': 20}, {'homeNotional': 0.0122672, 'side': 'Buy', 'tickDirection': 'PlusTick', 'foreignNotional': 80, 'symbol': 'XBTUSD', 'grossValue': 1226720, 'trdMatchID': 'cf9438d9-b950-73ca-aff8-b

In [1]:
events.qsize()

NameError: name 'events' is not defined

In [5]:
e = events.get()
e

NameError: name 'events' is not defined

In [3]:
print(bm_ws_market.current_bar)
print(bm_ws_market.prev_bar)

NameError: name 'bm_ws_market' is not defined

In [4]:
bm_ws_market.last_price   # 每一秒切个片，丢到 eventQueue，作为marketEvent,不然太频繁了

NameError: name 'bm_ws_market' is not defined

### OMS class

In [None]:
class bitmexTargetPositionOMS(object):
    """bitmex TargetPosition-Based Order Management System"""
    
    def __init__(self, bm_ws_market, eventQueue):
        self.eventQueue = eventQueue
        
        # websocket-market
        self.bm_ws_market = bm_ws_market   # 外部的，因为DataHandler同时也在用它 or 它就是DataHandler
        
        # websocket-trading
        self.bm_ws_trading = bitmexWSTrading(apiKey, apiSecret)  # TODO. 订阅order, position, execution (to-add: margin)
        self.bm_ws_trading.connect()
        self.bm_ws_trading.subscribe()
        self.bm_ws_trading.wait_for_initial_status()  # 等待的初始信息
        self.actual_position = self.bm_ws_trading.actual_positon  # 由websocket接收的信息计算出的实际仓位 `position`
        self.unfilled_orders = self.bm_ws_trading.unfilled_orders  # 由websocket接收的信息计算出的未成交委托  `order`
        
        # rest
        self.bm_rest = bitmexREST(apiKey, apiSecret)
        
    def set_target_position(self, symbol, position):
        self.target_position[symbol] = position      
        
    def trade_to_target(self, symbol):
        target_pos = self.target_position.get(symbol)  # int
        actual_pos = self.actual_position.get(symbol, 0)  # int
    
        if target_pos is None:
            self.logger.warning('Calling `trade_to_target()` but arg `symbol` is not in self.target_position\n' + 
                                'symbol=%\n' + 
                                'self.target_position=%s' % (symbol, self.target_position))
        
        # 这里采用比较暴力的办法：直接cancel_all_orders, 再挂目标仓位与实际仓位差值的单子
        # 有优化的空间，eg. bitmex支持改单；
        if target_pos == actual_pos
            if not unfilled_ord:
                self.logger.info('target_pos == actual_pos && no unfilled_ord, nothing to do')
            else:
                self.bm_rest.cancel_all_order(symbol)
        else:
            self.bm_rest.cancel_all_order(symbol)  # 先全撤掉
            pos_diff = target_pos - actual_pos
            direction = 'Buy' if pos_diff > 0 else 'Sell'
            price = self.bm_ws_market.ask1 if direction == 'Buy' else self.bm_ws_market.bid1  # 对价下单
            order = Order(symbol, direction, abs(pos_diff), price)   # TODO: class Order in qsObject.py
            self.bm_rest.place_order(order)
            
    def _check_actual_position_with_rest(self):
        """use REST api to query actual_position, check it with self.actual_position. Use a Thread to do this"""
        pass
    
    

In [None]:
bm_ws_market = bitmexWSMarket()

oms = bitmexTargetPositionOMS(bm_ws_market)

## Oct 4th

In [5]:
# summarize TODOS:
# class bitmexWSTrading   # Done
#   subscribe()
#   wait_for_initial_status()
#   actual_position
#   unfilled_qty
# class bitmexWSMarket
#   bid1, ask1
# qsObject.py
#   class Order

In [6]:
from bitmexWS import bitmexWS
from bitmexREST import bitmexREST
from utils import generate_logger

In [7]:
# import copy
# g_msg = 'aaaaa'

class bitmexWSTrading(bitmexWS):
    """bitmex Websockt subscribing topics related to live trading"""
    
    def subscribe(self, symbols=('XBTUSD', 'ETHUSD')):
        self.symbols = symbols
        
        self.actual_position = {s:0 for s in symbols}                    # {symbol: pos}   str:int
        self.unfilled_qty = {s:{'Buy': 0, 'Sell': 0} for s in symbols}    # {symbol: {'Buy': qty, 'Sell': qty}}
        
        self._got_position_partial = False
        
        self.subscribe_topic('order')
        self.subscribe_topic('position')
        self.subscribe_topic('execution')
        
    def onData(self, msg):
        if msg.get('table') == 'position':
            self._on_position_msg(msg)
        elif msg.get('table') == 'order':
            self.__on_order_msg(msg)
        elif msg.get('table') == 'execution':
            self.__on_execution_msg(msg)
    
    def _on_position_msg(self, msg):
        self.logger.info('Got position msg:')
        #print('========================position==================\n' + msg.__str__())
        
        #global g_msg
        #g_msg = copy.deepcopy(msg)  ###### 将第一个遇到的msg存入全局变量，调试用
        
        if self._got_position_partial and msg['action'] == 'update':

            if msg['data']:
                
                for d in msg['data']:
                    symbol = d.get('symbol')
                    if symbol not in self.symbols:
                        self.logger.warning('Got position subscription of symbol: %s, not in self.symbols' % symbol)
                        continue
                    currentQty = d.get('currentQty')
                    openOrderBuyQty = d.get('openOrderBuyQty', None)
                    openOrderSellQty = d.get('openOrderSellQty', None)
                    
                    old_pos = self.actual_position.get(symbol)
                    old_buy_qty = self.unfilled_qty.get(symbol, {}).get('Buy')
                    old_sell_qty = self.unfilled_qty.get(symbol, {}).get('Sell')
                    
                    if old_pos != currentQty:
                        self.actual_position[symbol] = currentQty   
                        self.logger.info('█████ Position update (actual_position) █████ %s: %s -> %s' % (symbol, old_pos, currentQty))
                    if openOrderBuyQty is not None:
                        self.unfilled_qty[symbol]['Buy'] = openOrderBuyQty
                        self.logger.info('███ Position update (unfilled_qty, Buy) ███ %s: %s -> %s' % (symbol, old_buy_qty, openOrderBuyQty))
                    if openOrderSellQty is not None:
                        self.unfilled_qty[symbol]['Sell'] = openOrderSellQty
                        self.logger.info('███ Position update (unfilled_qty, Sell) ███ %s: %s -> %s' % (symbol, old_sell_qty, openOrderSellQty))
            else:
                self.logger.debug('#### Position update #### is []')
            
        elif msg['action'] == 'partial':
            self._got_position_partial = True
            
            if msg['data']:
                for d in msg['data']:
                    symbol = d.get('symbol')
                    if symbol not in self.symbols:
                        self.logger.warning('Got position subscription of symbol: %s, not in self.symbols' % symbol)
                        continue
                    currentQty = d.get('currentQty')
                    openOrderBuyQty = d.get('openOrderBuyQty', 0)                    
                    openOrderSellQty = d.get('openOrderSellQty', 0)
                    
                    self.actual_position[symbol] = currentQty
                    self.unfilled_qty[symbol]['Buy'] = openOrderBuyQty
                    self.unfilled_qty[symbol]['Sell'] = openOrderSellQty
                    
                    txt = '%s  pos: %s, buy: %s, sell: %s' % (symbol, currentQty, openOrderBuyQty, openOrderSellQty)
                    self.logger.debug('██████████ Position partial ██████████ %s' % txt)
            else:
                self.logger.debug('██████████  Position partial ██████████  is []')
            
        
        
    def __on_order_msg(self, msg):
        self.logger.info('Got order msg')
        #print('========================order==================\n' + msg.__str__())
        
    def __on_execution_msg(self, msg):
        self.logger.info('Got execution msg')
        #print('========================execution==================\n' + msg.__str__())
            
        
    def wait_for_initial_status(self):
        if self._got_position_partial:
            return
        else:
            time.sleep(1)

In [8]:
#import pdb; pdb.set_trace()            
import json
import time


with open('accounts.json') as f:
    acc = json.load(f)

apiKey = acc[0]['apiKey']
apiSecret = acc[0]['apiSecret']


a = bitmexWSTrading(apiKey=apiKey, apiSecret=apiSecret)
a.connect()
a.subscribe()
a.wait_for_initial_status()

print(a.actual_position)
print(a.unfilled_qty)

[2018-10-04 19:56:34 bitmexWS INFO] ws thread start
[2018-10-04 19:56:35 bitmexWS DEBUG] Calling ws.__on_open()
[2018-10-04 19:56:35 bitmexWS INFO] Successful connected to BitMEX WebSocket API
[2018-10-04 19:56:35 bitmexWS DEBUG] >>> send ping
[2018-10-04 19:56:36 bitmexWS INFO] Subscribe to order
[2018-10-04 19:56:36 bitmexWS INFO] Subscribe to position
[2018-10-04 19:56:36 bitmexWS INFO] Subscribe to execution
[2018-10-04 19:56:36 bitmexWS INFO] Got order msg
[2018-10-04 19:56:36 bitmexWS INFO] Got position msg:
[2018-10-04 19:56:36 bitmexWS DEBUG] ██████████ Position partial ██████████ ETHUSD  pos: 0, buy: 0, sell: 0
[2018-10-04 19:56:36 bitmexWS DEBUG] ██████████ Position partial ██████████ XBTUSD  pos: 26, buy: 0, sell: 0
[2018-10-04 19:56:36 bitmexWS INFO] Got execution msg


{'ETHUSD': 0, 'XBTUSD': 26}
{'ETHUSD': {'Buy': 0, 'Sell': 0}, 'XBTUSD': {'Buy': 0, 'Sell': 0}}


[2018-10-04 19:56:40 bitmexWS DEBUG] >>> send ping
[2018-10-04 19:56:45 bitmexWS DEBUG] >>> send ping
[2018-10-04 19:56:47 bitmexWS INFO] Got position msg:
[2018-10-04 19:56:50 bitmexWS DEBUG] >>> send ping
[2018-10-04 19:56:51 bitmexWS INFO] Got order msg
[2018-10-04 19:56:51 bitmexWS INFO] Got execution msg
[2018-10-04 19:56:51 bitmexWS INFO] Got order msg
[2018-10-04 19:56:51 bitmexWS INFO] Got position msg:
[2018-10-04 19:56:51 bitmexWS INFO] ███ Position update (unfilled_qty, Sell) ███ XBTUSD: 0 -> 3
[2018-10-04 19:56:51 bitmexWS INFO] Got execution msg
[2018-10-04 19:56:51 bitmexWS INFO] Got order msg
[2018-10-04 19:56:51 bitmexWS INFO] Got position msg:
[2018-10-04 19:56:51 bitmexWS INFO] █████ Position update (actual_position) █████ XBTUSD: 26 -> 23
[2018-10-04 19:56:51 bitmexWS INFO] ███ Position update (unfilled_qty, Sell) ███ XBTUSD: 3 -> 0
[2018-10-04 19:56:55 bitmexWS DEBUG] >>> send ping
[2018-10-04 19:56:57 bitmexWS INFO] Got position msg:
[2018-10-04 19:56:58 bitmexWS I

In [9]:
a.exit()

[2018-10-04 19:58:03 bitmexWS INFO] Exiting ...
[2018-10-04 19:58:05 bitmexWS INFO] ping thread end.
[2018-10-04 19:58:06 bitmexWS DEBUG] Calling ws.__on_close()
[2018-10-04 19:58:06 bitmexWS INFO] Exit bitmexWS (intended)


In [10]:
a

<__main__.bitmexWSTrading at 0x1079425f8>

In [11]:
a.actual_position

{'ETHUSD': 0, 'XBTUSD': 23}

In [12]:
a.unfilled_qty

{'ETHUSD': {'Buy': 0, 'Sell': 0}, 'XBTUSD': {'Buy': 0, 'Sell': 0}}