diff --git a/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py b/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py index 86710b5..defdceb 100755 --- a/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py +++ b/unicorn_binance_websocket_api/unicorn_binance_websocket_api_manager.py @@ -228,7 +228,8 @@ def __init__(self, print(update_msg) logging.warn(update_msg) - def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label=None, stream_buffer_name=False): + def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label=None, stream_buffer_name=False, + symbol=False): """ Create a list entry for new sockets @@ -236,10 +237,17 @@ def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label= :type stream_id: uuid :param channels: provide the channels to create the URI :type channels: str, tuple, list, set - :param stream_label: provide a stream_label for the stream - :type stream_label: str :param markets: provide the markets to create the URI :type markets: str, tuple, list, set + :param stream_label: provide a stream_label for the stream + :type stream_label: str + :param stream_buffer_name: If `False` the data is going to get written to the default stream_buffer, + set to `True` to read the data via `pop_stream_data_from_stream_buffer(stream_id)` or + provide a string to create and use a shared stream_buffer and read it via + `pop_stream_data_from_stream_buffer('string')`. + :type stream_buffer_name: bool or str + :param symbol: provide the symbol for isolated_margin user_data streams + :type symbol: str """ self.stream_threading_lock[stream_id] = {'full_lock': threading.Lock(), 'receives_statistic_last_second_lock': threading.Lock()} @@ -250,6 +258,7 @@ def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label= 'markets': copy.deepcopy(markets), 'stream_label': copy.deepcopy(stream_label), 'stream_buffer_name': copy.deepcopy(stream_buffer_name), + 'symbol': copy.deepcopy(symbol), 'subscriptions': 0, 'payload': [], 'api_key': copy.deepcopy(self.api_key), @@ -275,11 +284,11 @@ def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label= 'processed_receives_statistic': {}, 'transfer_rate_per_second': {'bytes': {}, 'speed': 0}} logging.info("BinanceWebSocketApiManager->_add_socket_to_socket_list(" + - str(stream_id) + ", " + str(channels) + ", " + str(markets) + ", " + str(stream_label) - + str(stream_buffer_name) + ")") + str(stream_id) + ", " + str(channels) + ", " + str(markets) + ", " + str(stream_label) + ", " + + str(stream_buffer_name) + ", " + str(symbol) + ")") def _create_stream_thread(self, loop, stream_id, channels, markets, stream_label=None, stream_buffer_name=False, - restart=False): + symbol=False, restart=False): """ Co function of self.create_stream to create a thread for the socket and to manage the coroutine @@ -298,6 +307,8 @@ def _create_stream_thread(self, loop, stream_id, channels, markets, stream_label provide a string to create and use a shared stream_buffer and read it via `pop_stream_data_from_stream_buffer('string')`. :type stream_buffer_name: bool or str + :param symbol: provide the symbol for isolated_margin user_data streams + :type symbol: str :param restart: set to `True`, if its a restart! :type restart: bool :return: @@ -305,7 +316,7 @@ def _create_stream_thread(self, loop, stream_id, channels, markets, stream_label if self.is_stop_request(stream_id): return False if restart is False: - self._add_socket_to_socket_list(stream_id, channels, markets, stream_label, stream_buffer_name) + self._add_socket_to_socket_list(stream_id, channels, markets, stream_label, stream_buffer_name, symbol) if stream_buffer_name is not False: self.stream_buffer_locks[stream_buffer_name] = threading.Lock() self.stream_buffers[stream_buffer_name] = [] @@ -751,7 +762,7 @@ def create_payload(self, stream_id, method, channels=False, markets=False): str(markets) + ") finished ...") return payload - def create_stream(self, channels, markets, stream_label=None, stream_buffer_name=False): + def create_stream(self, channels, markets, stream_label=None, stream_buffer_name=False, symbol=False): """ Create a websocket stream @@ -809,6 +820,8 @@ def create_stream(self, channels, markets, stream_label=None, stream_buffer_name provide a string to create and use a shared stream_buffer and read it via `pop_stream_data_from_stream_buffer('string')`. :type stream_buffer_name: bool or str + :param symbol: provide the symbol for isolated_margin user_data streams + :type symbol: str :return: stream_id or 'False' """ # create a stream @@ -836,11 +849,12 @@ def create_stream(self, channels, markets, stream_label=None, stream_buffer_name elif self.is_exchange_type('cex'): markets_new.append(str(market).lower()) logging.info("BinanceWebSocketApiManager->create_stream(" + str(channels) + ", " + str(markets_new) + ", " - + str(stream_label) + str(stream_buffer_name) + ") with stream_id=" + str(stream_id)) + + str(stream_label) + ", " + str(stream_buffer_name) + ", " + str(symbol) + ") with stream_id=" + + str(stream_id)) loop = asyncio.new_event_loop() thread = threading.Thread(target=self._create_stream_thread, args=(loop, stream_id, channels, markets_new, stream_label, - stream_buffer_name)) + stream_buffer_name, symbol)) thread.start() return stream_id @@ -1299,7 +1313,7 @@ def get_number_of_free_subscription_slots(self, stream_id): free_slots = self.max_subscriptions_per_stream - self.stream_list[stream_id]['subscriptions'] return free_slots - def get_listen_key_from_restclient(self, stream_id, api_key, api_secret): + def get_listen_key_from_restclient(self, stream_id, api_key, api_secret, symbol=False): """ Get a new or cached (<30m) listen_key @@ -1309,6 +1323,8 @@ def get_listen_key_from_restclient(self, stream_id, api_key, api_secret): :type api_key: str :param api_secret: provide a valid Binance API secret :type api_secret: str + :param symbol: provide the symbol for isolated_margin user_data streams + :type symbol: str :return: str or False """ if (self.stream_list[stream_id]['start_time'] + self.stream_list[stream_id]['listen_key_cache_time']) > \ @@ -1321,7 +1337,8 @@ def get_listen_key_from_restclient(self, stream_id, api_key, api_secret): # no cached listen_key or listen_key is older than 30 min # acquire a new listen_key: binance_websocket_api_restclient = BinanceWebSocketApiRestclient(self.exchange, api_key, api_secret, - self.get_version(), self.binance_api_status) + self.get_version(), self.binance_api_status, + symbol) response = binance_websocket_api_restclient.get_listen_key() del binance_websocket_api_restclient if response: diff --git a/unicorn_binance_websocket_api/unicorn_binance_websocket_api_restclient.py b/unicorn_binance_websocket_api/unicorn_binance_websocket_api_restclient.py index 94fbf20..aa2fa52 100755 --- a/unicorn_binance_websocket_api/unicorn_binance_websocket_api_restclient.py +++ b/unicorn_binance_websocket_api/unicorn_binance_websocket_api_restclient.py @@ -45,10 +45,11 @@ class BinanceWebSocketApiRestclient(object): def __init__(self, exchange, binance_api_key, binance_api_secret, unicorn_binance_websocket_api_version, - binance_api_status): + binance_api_status, symbol): self.exchange = exchange self.api_key = copy.deepcopy(binance_api_key) self.api_secret = copy.deepcopy(binance_api_secret) + self.symbol = symbol self.unicorn_binance_websocket_api_version = unicorn_binance_websocket_api_version if self.exchange == "binance.com": self.restful_base_uri = "https://api.binance.com/" @@ -174,9 +175,12 @@ def get_listen_key(self): :return: listen_key :rtype: str or False """ - logging.info("BinanceWebSocketApiRestclient->get_listen_key()") + logging.info("BinanceWebSocketApiRestclient->get_listen_key() symbol=" + str(self.symbol)) method = "post" - response = self._request(method, self.path_userdata) + if self.exchange == "binance.com-isolated_margin": + response = self._request(method, self.path_userdata, False, {'symbol': str(self.symbol)}) + else: + response = self._request(method, self.path_userdata) try: self.listen_key = response['listenKey'] return response