Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
john committed Mar 10, 2022
1 parent 9fb35f4 commit 53b8558
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 46 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Expand Up @@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p
- `process_stream_data` parameter for callback function to 'create_stream()'
- `high_performance` parameter to make `create_stream()` a non blocking
- `title` to `print_summary()` and `print_stream_info()`
### Fixed
- BinanceWebSocketApiManager.stop_stream() doesn't stop the stream immediately [issue#161](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/issues/161)

## 1.39.0
### Changed
Expand All @@ -26,7 +28,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p

## 1.38.1
### Fixed
- [issue#131](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/issues/131)
- Websocket fails to reconnect [issue#131](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/issues/131)

## 1.38.0
### Added
Expand Down
4 changes: 2 additions & 2 deletions tools/tidyup.sh
@@ -1,4 +1,4 @@
#!/bin/bash

rm ../*.log
rm ../.print*
rm *.log
rm .print*
21 changes: 12 additions & 9 deletions unicorn_binance_websocket_api/connection.py
Expand Up @@ -232,20 +232,23 @@ async def __aenter__(self):
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
sys.exit(1)
else:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") UnhandledException "
"ConnectionClosed" + str(error_msg))
logger.critical(f"BinanceWebSocketApiConnection.await._conn.__aenter__({self.stream_id}, "
f"{self.channels}, {self.markets}) - UnhandledException ConnectionClosed - {error_msg}")
return self

async def __aexit__(self, *args, **kwargs):
try:
await self._conn.__aexit__(*args, **kwargs)
except RuntimeError as error_msg:
logger.debug(f"BinanceWebSocketApiConnection.__aexit__({self.stream_id}) - RuntimeError - {error_msg}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False and \
self.manager.is_stop_as_crash_request is False:
self.manager.set_restart_request(self.stream_id)
except AttributeError as error_msg:
logger.error("BinanceWebSocketApiConnection.__aexit__(*args, **kwargs): "
"AttributeError - " + str(error_msg))
logger.error(f"BinanceWebSocketApiConnection.__aexit__({self.stream_id}) - AttributeError - {error_msg}")
except websockets.exceptions.ConnectionClosed as error_msg:
logger.error("BinanceWebSocketApiConnection.__aexit__(*args, **kwargs): "
"ConnectionClosed - " + str(error_msg))
logger.error(f"BinanceWebSocketApiConnection.__aexit__({self.stream_id}) - ConnectionClosed - {error_msg}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False and \
self.manager.is_stop_as_crash_request is False:
Expand All @@ -261,8 +264,7 @@ async def close(self):
except KeyError:
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) - Stream not found!")
except RuntimeError as error_msg:
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) - "
f"RuntimeError: {str(error_msg)}")
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) - RuntimeError: {str(error_msg)}")
except ValueError as error_msg:
# ValueError: The future belongs to a different loop than the one specified as the loop argument
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) socket_id="
Expand All @@ -276,6 +278,7 @@ async def receive(self):
self.manager.set_heartbeat(self.stream_id)
try:
received_data_json = await self.manager.websocket_list[self.stream_id].recv()
# received_data_json = await asyncio.wait_for(self.manager.websocket_list[self.stream_id].recv(), timeout=10)
try:
if self.manager.restart_requests[self.stream_id]['status'] == "restarted":
self.manager.increase_reconnect_counter(self.stream_id)
Expand Down
72 changes: 44 additions & 28 deletions unicorn_binance_websocket_api/manager.py
Expand Up @@ -339,8 +339,8 @@ def __init__(self,
self.restclient = BinanceWebSocketApiRestclient(self)
if warn_on_update and self.is_update_available():
update_msg = f"Release {self.name}_" + self.get_latest_version() + " is available, " \
"please consider updating! (Changelog: https://github.com/LUCIT-Systems-and-Development/unicorn-" \
"binance-websocket-api/blob/master/CHANGELOG.md)"
"please consider updating! (Changelog: https://github.com/LUCIT-Systems-and-Development/" \
"unicorn-binance-websocket-api/blob/master/CHANGELOG.md)"
print(update_msg)
logger.warning(update_msg)

Expand Down Expand Up @@ -518,7 +518,8 @@ def _create_stream_thread(self,
asyncio.set_event_loop(loop)
socket = BinanceWebSocketApiSocket(self, stream_id, channels, markets)
try:
loop.run_until_complete(socket.start_socket())
asyncio.ensure_future(socket.start_socket())
loop.run_forever()
except RuntimeError as error_msg:
if "cannot schedule new futures after interpreter shutdown" in str(error_msg):
logger.critical(f"BinanceWebSocketApiManager._create_stream_thread() stream_id={str(stream_id)} "
Expand Down Expand Up @@ -610,11 +611,11 @@ def _frequent_checks(self):
if timestamp_key < current_timestamp - self.keep_max_received_last_second_entries:
delete_index.append(timestamp_key)
except ValueError as error_msg:
logger.error(
"BinanceWebSocketApiManager._frequent_checks() timestamp_key=" + str(timestamp_key) +
" current_timestamp=" + str(current_timestamp) + " keep_max_received_last_second_"
"entries=" + str(self.keep_max_received_last_second_entries) + " error_msg=" +
str(error_msg))
logger.error("BinanceWebSocketApiManager._frequent_checks() timestamp_key=" +
str(timestamp_key) + " current_timestamp=" + str(current_timestamp) +
" keep_max_received_last_second_entries=" +
str(self.keep_max_received_last_second_entries) + " error_msg=" +
str(error_msg))
for timestamp_key in delete_index:
with self.stream_threading_lock[stream_id]['receives_statistic_last_second_lock']:
self.stream_list[stream_id]['receives_statistic_last_second']['entries'].pop(timestamp_key,
Expand Down Expand Up @@ -1447,20 +1448,20 @@ def create_websocket_uri(self, channels, markets, stream_id=False, api_key=False
for channel in channels:
if channel == "!userData":
logger.error("BinanceWebSocketApiManager.create_websocket_uri(" + str(channels) + ", " +
str(markets) + ", " + ", " + str(symbols) + ") - Can not create "
"'outboundAccountInfo' in a multi channel socket! "
"Unfortunately Binance only stream it in a single stream socket! ./"
"Use binance_websocket_api_manager.create_stream([\"arr\"], [\"!userData\"]) to "
"initiate an extra connection.")
str(markets) + ", " + ", " + str(symbols) + ") - Can not create "
"'outboundAccountInfo' in a multi channel socket! "
"Unfortunately Binance only stream it in a single stream socket! ./"
"Use create_stream([\"arr\"], [\"!userData\"]) to "
"initiate an extra connection.")
return False
for market in markets:
if market == "!userData":
logger.error("BinanceWebSocketApiManager.create_websocket_uri(" + str(channels) + ", " +
str(markets) + ", " + ", " + str(symbols) + ") - Can not create "
"'outboundAccountInfo' in a multi channel socket! "
"Unfortunatly Binance only stream it in a single stream socket! ./"
"Use binance_websocket_api_manager.create_stream([\"arr\"], [\"!userData\"]) to "
"initiate an extra connection.")
str(markets) + ", " + ", " + str(symbols) + ") - Can not create "
"'outboundAccountInfo' in a multi channel socket! "
"Unfortunatly Binance only stream it in a single stream socket! ./"
"Use create_stream([\"arr\"], [\"!userData\"]) to "
"initiate an extra connection.")
return False
if "!" in channel:
query += channel + final_market
Expand Down Expand Up @@ -1695,7 +1696,11 @@ def get_event_loop_by_stream_id(self, stream_id=False):
if stream_id is False:
return False
else:
return self.event_loops[stream_id]
try:
return self.event_loops[stream_id]
except KeyError as error_msg:
logger.debug(f"BinanceWebSocketApiManager.get_event_loop_by_stream_id() - KeyError - {str(error_msg)}")
return False

def get_exchange(self):
"""
Expand Down Expand Up @@ -3430,13 +3435,12 @@ def stop_manager_with_all_streams(self):
"""
logger.info("BinanceWebSocketApiManager.stop_manager_with_all_streams() - Stopping "
"unicorn_binance_websocket_api_manager " + self.version + " ...")
# send signal to all threads
self.stop_manager_request = True
# delete listenKeys
for stream_id in self.stream_list:
self.stop_stream(stream_id)
# stop monitoring API services
self.stop_monitoring_api()
# send signal to all threads
self.stop_manager_request = True

def stop_monitoring_api(self):
"""
Expand All @@ -3462,16 +3466,28 @@ def stop_stream(self, stream_id):
:return: bool
"""
# stop a specific stream by stream_id
logger.info("BinanceWebSocketApiManager.stop_stream(" + str(stream_id) + ")")
logger.info(f"BinanceWebSocketApiManager.stop_stream(" + str(stream_id) + ")")
self.stream_is_stopping(stream_id)
try:
self.stream_list[stream_id]['stop_request'] = True
except KeyError:
return False
try:
del self.restart_requests[stream_id]
except KeyError:
pass
self.delete_listen_key_by_stream_id(stream_id)
try:
self.stream_list[stream_id]['stop_request'] = True
except KeyError:
return False
loop = self.get_event_loop_by_stream_id(stream_id)
try:
if loop.is_running():
loop.stop()
except AttributeError as error_msg:
logger.debug(f"BinanceWebSocketApiManager.stop_stream({stream_id}) - AttributeError - {error_msg}")
except RuntimeError as error_msg:
logger.debug(f"BinanceWebSocketApiManager.stop_stream({stream_id}) - RuntimeError - {error_msg}")
except RuntimeWarning as error_msg:
logger.debug(f"BinanceWebSocketApiManager.stop_stream({stream_id}) - RuntimeWarning - {error_msg}")
return True

def stop_stream_as_crash(self, stream_id):
Expand Down Expand Up @@ -3548,7 +3564,7 @@ def subscribe_to_stream(self, stream_id, channels=[], markets=[]):
:return: bool
"""
logger.info("BinanceWebSocketApiManager.subscribe_to_stream(" + str(stream_id) + ", " + str(channels) +
", " + str(markets) + ") started ...")
", " + str(markets) + ") started ...")
try:
if type(channels) is str:
channels = [channels]
Expand All @@ -3560,7 +3576,7 @@ def subscribe_to_stream(self, stream_id, channels=[], markets=[]):
markets = list(markets)
except KeyError:
logger.error("BinanceWebSocketApiManager.subscribe_to_stream(" + str(stream_id) + ", " + str(channels) +
", " + str(markets) + ") KeyError: setting a restart request for this stream ...")
", " + str(markets) + ") KeyError: setting a restart request for this stream ...")
self.stream_is_stopping(stream_id)
self.set_restart_request(stream_id)
return False
Expand Down
3 changes: 1 addition & 2 deletions unicorn_binance_websocket_api/sockets.py
Expand Up @@ -210,11 +210,10 @@ async def start_socket(self):
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)

except Exception as error_msg:
logger.error("BinanceWebSocketApiSocket.start_socket(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - Exception General Exception -"
" error_msg: " + str(error_msg))
" error_msg: " + str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
Expand Down
9 changes: 5 additions & 4 deletions unittest_binance_websocket_api.py
Expand Up @@ -40,6 +40,10 @@
import unittest
import os
import time
#import tracemalloc


#tracemalloc.start(25)

BINANCE_COM_API_KEY = ""
BINANCE_COM_API_SECRET = ""
Expand Down Expand Up @@ -775,13 +779,10 @@ def test_live_run(self):
data = binance_rest_client.get_all_tickers()
for item in data:
markets.append(item['symbol'])
binance_websocket_api_manager.create_stream("trade", markets, stream_label="to much!")

binance_websocket_api_manager.create_stream("trade", markets, stream_label="too much!")
time.sleep(10)
binance_websocket_api_manager.stop_manager_with_all_streams()
time.sleep(2)


if __name__ == '__main__':
unittest.main()

0 comments on commit 53b8558

Please sign in to comment.