In [None]:
import zmq
import time
import threading
from queue import Queue

import dtp.dtp_api_id as dtp_api_id
import dtp.api_pb2 as dtp_struct

# config:
# python -m unittest
SYNC_CHANNEL_PORT = "tcp://192.168.221.52:9003"
# ASYNC_CHANNEL_PORT = "tcp://localhost:9001"     # connecct adapter
ASYNC_CHANNEL_PORT = "tcp://192.168.221.91:9101"     # connect compliance
SUB_COUNTER_PORT = "tcp://192.168.221.52:9002"
SUB_COMPLIANCE_PORT = "tcp://192.168.221.91:9102"




class DtpSyncChannel(object):
    def __init__(self):
        self.context = zmq.Context()
        self.socket_req = self.context.socket(zmq.REQ)

    def connect(self):
        print("connecting dtp sync channel...")
        self.socket_req.connect(SYNC_CHANNEL_PORT)

    def disconnect(self):
        print("disconnecting dtp sync channel...")
        self.socket_req.disconnect(SYNC_CHANNEL_PORT)

    def login_account(self, payload):
        return self._process_invoke(payload, dtp_api_id.LOGIN_ACCOUNT_REQUEST, dtp_api_id.LOGIN_ACCOUNT_RESPONSE)

    def logout_account(self, payload):
        return self._process_invoke(payload, dtp_api_id.LOGOUT_ACCOUNT_REQUEST, dtp_api_id.LOGOUT_ACCOUNT_RESPONSE)

    def query_orders(self, payload):
        return self._process_invoke(payload, dtp_api_id.QUERY_ORDERS_REQUEST, dtp_api_id.QUERY_ORDERS_RESPONSE)

    def query_fills(self, payload):
        return self._process_invoke(payload, dtp_api_id.QUERY_FILLS_REQUEST, dtp_api_id.QUERY_FILLS_RESPONSE)

    def query_capital(self, payload):
        return self._process_invoke(payload, dtp_api_id.QUERY_CAPITAL_REQUEST, dtp_api_id.QUERY_CAPITAL_RESPONSE)

    def query_position(self, payload):
        return self._process_invoke(payload, dtp_api_id.QUERY_POSITION_REQUEST, dtp_api_id.QUERY_POSITION_RESPONSE)

    def query_ration(self, payload):
        return self._process_invoke(payload, dtp_api_id.QUERY_RATION_REQUEST, dtp_api_id.QUERY_RATION_RESPONSE)

    def _process_invoke(self, payload, request_api_id, response_api_id):
        payload.header.api_id = request_api_id
        self.socket_req.send(payload.header.SerializeToString(), zmq.SNDMORE)
        self.socket_req.send(payload.body.SerializeToString())

        header = self.socket_req.recv()
        body = self.socket_req.recv()

        response_header = dtp_struct.ResponseHeader()
        response_header.ParseFromString(header)
        assert(response_header.api_id == response_api_id)
        if(response_header.api_id == dtp_api_id.LOGIN_ACCOUNT_RESPONSE):
            response_body = dtp_struct.LoginAccountResponse()
        elif(response_header.api_id == dtp_api_id.LOGOUT_ACCOUNT_RESPONSE):
            response_body = dtp_struct.LogoutAccountResponse()
        elif(response_header.api_id == dtp_api_id.QUERY_ORDERS_RESPONSE):
            response_body = dtp_struct.QueryOrdersResponse()
        elif(response_header.api_id == dtp_api_id.QUERY_FILLS_RESPONSE):
            response_body = dtp_struct.QueryFillsResponse()
        elif(response_header.api_id == dtp_api_id.QUERY_CAPITAL_RESPONSE):
            response_body = dtp_struct.QueryCapitalResponse()
        elif(response_header.api_id == dtp_api_id.QUERY_POSITION_RESPONSE):
            response_body = dtp_struct.QueryPositionResponse()
        elif(response_header.api_id == dtp_api_id.QUERY_RATION_RESPONSE):
            response_body = dtp_struct.QueryRationResponse()
        else:
            assert(False)
        response_body.ParseFromString(body)
        response_payload = Payload(response_header, response_body)
        return response_payload



class DtpAsyncChannel(object):
    def __init__(self):
        self.context = zmq.Context()
        self.socket_dealer = self.context.socket(zmq.DEALER)

    def connect(self):
        print("connecting dtp async channel...start")
        self.socket_dealer.connect(ASYNC_CHANNEL_PORT)
        print("connecting dtp async channel...finished")

    def disconnect(self):
        print("disconnecting dtp async channel...")
        self.socket_dealer.disconnect(ASYNC_CHANNEL_PORT)

    def place_order(self, payload):
        self._process_invoke(payload, dtp_api_id.PLACE_ORDER)

    def cancel_order(self, payload):
        self._process_invoke(payload, dtp_api_id.CANCEL_ORDER)

    def place_batch_order(self, payload):
        self._process_invoke(payload, dtp_api_id.PLACE_BATCH_ORDER)

    def _process_invoke(self, payload, async_request_api_id):
        payload.header.api_id = async_request_api_id
        self.socket_dealer.send(payload.header.SerializeToString(), zmq.SNDMORE)
        self.socket_dealer.send(payload.body.SerializeToString())



class DtpSubscribeChannel(object):
    def __init__(self, subcribe_interval=0.00001):
        # default subcribe_interval is 10um
        self.subcribe_interval = subcribe_interval
        self.context = zmq.Context()
        self.socket_sub_counter_report = self.context.socket(zmq.SUB)
        self.socket_sub_compliance_report = self.context.socket(zmq.SUB)

    def connect(self):
        print("connecting dtp subcribe channel...")
        self.socket_sub_counter_report.connect(SUB_COUNTER_PORT)
        self.socket_sub_compliance_report.connect(SUB_COMPLIANCE_PORT)

    def disconnect(self):
        print("disconnecting dtp subcribe channel...")
        self.stop_subscribe_report()
        self.socket_sub_counter_report.disconnect(SUB_COUNTER_PORT)
        self.socket_sub_compliance_report.disconnect(SUB_COMPLIANCE_PORT)

    def register_compliance_callback(self, compiance_failed_callback):
        self._compiance_failed_callback = compiance_failed_callback

    def register_counter_callback(self, place_report_callback, fill_report_callback, cancel_report_callback):
        self._place_report_callback = place_report_callback
        self._fill_report_callback = fill_report_callback
        self._cancel_report_callback = cancel_report_callback

    def start_subscribe_report(self, topic):
        self._running = True
        self.socket_sub_counter_report.setsockopt_string(zmq.SUBSCRIBE, topic)
        threading.Thread(target=self._subscribe_counter_report).start()
        self.socket_sub_compliance_report.setsockopt_string(zmq.SUBSCRIBE, topic)
        threading.Thread(target=self._subscribe_compliance_report).start()

    def stop_subscribe_report(self):
        self._running = False

    def _subscribe_compliance_report(self):
        print("subcribing compliance report...")
        while self._running:
            try:
                topic = self.socket_sub_compliance_report.recv(flags=zmq.NOBLOCK)
                report_header = self.socket_sub_compliance_report.recv()
                report_body = self.socket_sub_compliance_report.recv()
                print("subcribed compliance report...")
            except zmq.ZMQError as e:
                time.sleep(self.subcribe_interval)
                continue
            header = dtp_struct.ReportHeader()
            header.ParseFromString(report_header)
            body = dtp_struct.PlacedReport()
            body.ParseFromString(report_body)
            self._compiance_failed_callback(Payload(header, body))

    def _subscribe_counter_report(self):
        print("subcribing counter report...")
        while self._running:
            try:
                topic = self.socket_sub_counter_report.recv(flags=zmq.NOBLOCK)
                report_header = self.socket_sub_counter_report.recv()
                report_body = self.socket_sub_counter_report.recv()
                print("subcribed counter report...")
            except zmq.ZMQError as e:
                time.sleep(self.subcribe_interval)
                continue
            self._distribute_counter_report_by_header(report_header, report_body)

    def _distribute_counter_report_by_header(self, report_header, report_body):
        header = dtp_struct.ReportHeader()
        header.ParseFromString(report_header)
        if(header.api_id == dtp_api_id.PLACE_REPORT):
            body = dtp_struct.PlacedReport()
            body.ParseFromString(report_body)
            self._place_report_callback(Payload(header, body))
        elif(header.api_id == dtp_api_id.FILL_REPORT):
            body = dtp_struct.FillReport()
            body.ParseFromString(report_body)
            self._fill_report_callback(Payload(header, body))
        elif(header.api_id == dtp_api_id.CANCEL_REPORT):
            body = dtp_struct.CancellationReport()
            body.ParseFromString(report_body)
            self._cancel_report_callback(Payload(header, body))



class Payload(object):
    def __init__(self, header, body):
        self.header = header
        self.body = body

In [None]:
dtp_sync = DtpSyncChannel()

In [None]:
dtp_sync.socket_req.connect(SYNC_CHANNEL_PORT)

In [None]:
dtp_sync.socket_req.context

In [None]:
dtp_sync.connect()

In [1]:
import zmq
import time
import threading

import dtp.type_pb2 as dtp_type
import dtp.api_pb2 as dtp_struct
import dtp.dtp_api_id as dtp_api_id

import mock_data as my_data
import mock_dtp_pub_channel as dtp_pub_channel

class DtpServerAsyncChannelMocker:
    def __init__(self):
        self.context = zmq.Context()
        self.socket_dealer = self.context.socket(zmq.DEALER)
        self.socket_dealer.bind("tcp://192.168.221.91:9101")

    def start(self):
        self._running = True
        threading.Thread(target=self._start_asyc_channel).start()

    def terminate(self):
        self._running = False

    def _start_asyc_channel(self):
        while self._running:
            try:
                # Wait for next request from client
                frame1 = self.socket_dealer.recv(flags=zmq.NOBLOCK)
                frame2 = self.socket_dealer.recv()
            except zmq.ZMQError as e:
                print(e)
                # time.sleep(0.000001)
                time.sleep(0.1)
                continue

            request_header = dtp_struct.RequestHeader()
            request_header.ParseFromString(frame1)
            place_order_body = dtp_struct.PlaceOrder()
            place_order_body.ParseFromString(frame2)

            if(request_header.api_id == dtp_api_id.PLACE_BATCH_ORDER):
                # fake
                print(fake)
                continue

            if(place_order_body.order_original_id == my_data.unnormal_order_01.original_id):
                print(".............mocking report: compliance failed")
                report_header = dtp_struct.ReportHeader()
                report_header.api_id = dtp_api_id.PLACE_REPORT
                report_header.code = dtp_type.RESPONSE_CODE_FORBIDDEN
                report_header.message = "compliance failed."
                report_body = dtp_struct.PlacedReport()
                report_body.order_original_id = my_data.unnormal_order_01.original_id
                report_payload = my_data.Payload(report_header, report_body)

                topic = place_order_body.account_no
                dtp_pub_channel.publish_compliacne_failed_report(topic, report_payload)
            elif(request_header.token == my_data.account_no_token_dict[place_order_body.account_no]):
                print(".............mocking report: fill")
                report_header = dtp_struct.ReportHeader()
                report_header.api_id = dtp_api_id.PLACE_REPORT
                report_header.code = dtp_type.RESPONSE_CODE_OK
                report_header.message = "success."
                report_body = dtp_struct.PlacedReport()
                report_body.order_exchange_id = my_data.normal_order_01_placing.exchange_id
                report_body.status = my_data.normal_order_01_placing.status
                report_payload = my_data.Payload(report_header, report_body)

                topic = place_order_body.account_no
                dtp_pub_channel.publish_order_report(topic, report_payload)

                report_header = dtp_struct.ReportHeader()
                report_header.api_id = dtp_api_id.FILL_REPORT
                report_header.code = dtp_type.RESPONSE_CODE_OK
                report_header.message = "success."
                report_body = dtp_struct.FillReport()
                report_body.fill_exchange_id = my_data.normal_order_01_fill.fill_exchange_id
                report_body.order_exchange_id = my_data.normal_order_01_placing.exchange_id
                report_body.fill_status = my_data.normal_order_01_fill.fill_status
                report_payload = my_data.Payload(report_header, report_body)

                topic = place_order_body.account_no
                dtp_pub_channel.publish_order_report(topic, report_payload)
            else:
                print(".............mocking report: unknown token")
                report_header = dtp_struct.ReportHeader()
                report_header.api_id = dtp_api_id.PLACE_REPORT
                report_header.code = dtp_type.RESPONSE_CODE_UNAUTHORIZED
                report_header.message = "unknown token."
                report_body = dtp_struct.PlacedReport()
                report_payload = my_data.Payload(report_header, report_body)

                topic = place_order_body.account_no
                dtp_pub_channel.publish_order_report(topic, report_payload)


In [2]:
dtp_async_mocker = DtpServerAsyncChannelMocker()

ZMQError: Cannot assign requested address

In [None]:
dtp_async_mocker.start()