Skip to content

Commit

Permalink
implementation of #10 and fixed reconnect bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-zehentleitner committed Apr 28, 2019
1 parent 5396a34 commit 5490eaf
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 69 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Expand Up @@ -4,7 +4,15 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.2.0.dev (development stage/unreleased)
## 1.2.1.dev (development stage/unreleased)
## 1.2.1
### Added
- handling for status_code and used_weight from the binance REST Api (used for listen_key) - see `get_binance_api_status()`
### Fixing
- reconnect issues
### Changing
- log levels

## 1.2.0
### Changed
- if no method is provided to BinanceWebSocketApiManager when creating the instance, then all data will be written to
Expand Down
62 changes: 40 additions & 22 deletions dev_test_full_non_stop.py
Expand Up @@ -50,32 +50,50 @@
except Exception:
logging.critical("ATTENTION! Unexpected error", exc_info=True)

# set api key and secret for userData stream
binance_api_key = ""
binance_api_secret = ""
binance_websocket_api_manager.set_private_api_config(binance_api_key, binance_api_secret)
userdata_stream_id = binance_websocket_api_manager.create_stream(["arr"], ["!userData"])

ticker_all_stream_id = binance_websocket_api_manager.create_stream(["arr"], ["!ticker"])
miniticker_stream_id = binance_websocket_api_manager.create_stream(["arr"], ["!miniTicker"])
userdata_stream_id = binance_websocket_api_manager.create_stream(["arr"], ["!userData"])
markets = {'bnbbtc'}
aggtrade_stream_id = binance_websocket_api_manager.create_stream(["aggTrade"], markets)
trade_stream_id = binance_websocket_api_manager.create_stream(["trade"], markets)
kline_1m_stream_id = binance_websocket_api_manager.create_stream(["kline_1m"], markets)
ticker_bnbbtc_stream_id = binance_websocket_api_manager.create_stream(["ticker"], markets)
miniticker_stream_id = binance_websocket_api_manager.create_stream(["miniTicker"], markets)
kline_5m_stream_id = binance_websocket_api_manager.create_stream(["kline_5m"], markets)
depth5_stream_id = binance_websocket_api_manager.create_stream(["depth10"], markets)
depth_stream_id = binance_websocket_api_manager.create_stream(["depth"], markets)
markets = {'xrpusdt', 'rvnbtc', 'ltcusdt', 'adausdt', 'eosusdt', 'neousdt'}
aggtrade_stream_id = binance_websocket_api_manager.create_stream(["aggTrade"], markets)
trade_stream_id = binance_websocket_api_manager.create_stream(["trade"], markets)
kline_1m_stream_id = binance_websocket_api_manager.create_stream(["kline_1m"], markets)
ticker_bnbbtc_stream_id = binance_websocket_api_manager.create_stream(["ticker"], markets)
miniticker_stream_id = binance_websocket_api_manager.create_stream(["miniTicker"], markets)
kline_5m_stream_id = binance_websocket_api_manager.create_stream(["kline_5m"], markets)
depth5_stream_id = binance_websocket_api_manager.create_stream(["depth5"], markets)
depth_stream_id = binance_websocket_api_manager.create_stream(["depth"], markets)
channels = {'trade', 'kline_1', 'kline_5', 'kline_15', 'kline_30', 'kline_1h', 'kline_12h', 'kline_1w',
'miniTicker', 'depth20', '!miniTicker', '!ticker'}
multi_multi_stream_id = binance_websocket_api_manager.create_stream(channels, markets)

markets = {'bnbbtc', 'ethbtc', 'btcusdt', 'bchabcusdt', 'xrpusdt', 'rvnbtc', 'ltcusdt', 'adausdt', 'eosusdt',
'neousdt', 'bnbusdt', 'adabtc', 'ethusdt', 'trxbtc', 'trxbtc', 'bchabcbtc', 'ltcbtc', 'xrpbtc',
'ontbtc', 'bttusdt', 'eosbtc', 'xlmbtc', 'bttbtc', 'tusdusdt', 'xlmusdt', 'qkcbtc', 'zrxbtc',
'neobtc', 'adaeth', 'icxusdt', 'btctusd', 'icxbtc', 'btcusdc', 'wanbtc', 'zecbtc', 'wtcbtc',
'batbtc', 'adabnb', 'etcusdt', 'qtumusdt', 'xmrbtc', 'trxeth', 'adatusd', 'trxxrp', 'trxbnb',
'dashbtc', 'rvnbnb', 'bchabctusd', 'etcbtc', 'bnbeth', 'ethpax', 'nanobtc', 'xembtc', 'xrpbnb',
'bchabcpax', 'xrpeth', 'bttbnb', 'ltcbnb', 'agibtc', 'zrxusdt', 'xlmbnb', 'ltceth', 'eoseth',
'ltctusd', 'polybnb', 'scbtc', 'steembtc', 'trxtusd', 'npxseth', 'kmdbtc', 'polybtc', 'gasbtc',
'engbtc', 'zileth', 'xlmeth', 'eosbnb', 'xrppax', 'lskbtc', 'npxsbtc', 'xmrusdt', 'ltcpax', 'xmrusdt',
'ethtusd', 'batusdt', 'mcobtc', 'neoeth', 'bntbtc', 'eostusd', 'lrcbtc', 'funbtc', 'zecusdt',
'bnbpax', 'linkusdt', 'hceth', 'zrxeth', 'icxeth', 'xmreth', 'neobnb', 'etceth', 'zeceth', 'xmrbnb',
'wanbnb', 'zrxbnb', 'agibnb', 'funeth', 'arketh', 'engeth'}

binance_websocket_api_manager.create_stream(["aggTrade"], markets)
binance_websocket_api_manager.create_stream(["trade"], markets)
binance_websocket_api_manager.create_stream(["kline_1m"], markets)
binance_websocket_api_manager.create_stream(["kline_5m"], markets)
binance_websocket_api_manager.create_stream(["kline_15m"], markets)
binance_websocket_api_manager.create_stream(["kline_1h"], markets)
binance_websocket_api_manager.create_stream(["kline_12h"], markets)
binance_websocket_api_manager.create_stream(["kline_1w"], markets)
binance_websocket_api_manager.create_stream(["ticker"], markets)
binance_websocket_api_manager.create_stream(["miniTicker"], markets)
binance_websocket_api_manager.create_stream(["depth"], markets)
binance_websocket_api_manager.create_stream(["depth5"], markets)
binance_websocket_api_manager.create_stream(["depth10"], markets)
binance_websocket_api_manager.create_stream(["depth20"], markets)
binance_websocket_api_manager.create_stream(["aggTrade"], markets)

markets = {'bnbbtc', 'ethbtc', 'btcusdt', 'bchabcusdt', 'xrpusdt', 'rvnbtc', 'ltcusdt', 'adausdt', 'eosusdt',
'neousdt', 'bnbusdt', 'adabtc', 'ethusdt', 'trxbtc', 'trxbtc', 'bchabcbtc', 'ltcbtc', 'xrpbtc',
'bnbpax', 'linkusdt', 'hceth', 'zrxeth', 'icxeth', 'xmreth', 'neobnb', 'etceth', 'zeceth', 'xmrbnb'}
channels = {'trade', 'kline_1m', 'kline_5m', 'kline_15m', 'kline_30m', 'kline_1h', 'kline_12h', 'kline_1w',
'miniTicker', 'depth20', '!miniTicker', '!ticker'}
binance_websocket_api_manager.create_stream(channels, markets)

def print_stream_data_from_stream_buffer(binance_websocket_api_manager):
print("waiting 30 seconds, then we start flushing the stream_buffer")
Expand Down
3 changes: 3 additions & 0 deletions example_userdata_stream.py
Expand Up @@ -59,6 +59,9 @@
# create the userData stream
user_data_stream_id = binance_websocket_api_manager.create_stream('arr', '!userData')

# get the binance api status (used_weight, last status_code and timestamp of the last update
time.sleep(3)

# monitor the stream
while True:
binance_websocket_api_manager.print_stream_info(user_data_stream_id)
Expand Down
38 changes: 38 additions & 0 deletions pypi_remove_files.sh
@@ -0,0 +1,38 @@
#!/usr/bin/env bash
# -*- coding: utf-8 -*-
#
# File: pypi_remove_files.sh
#
# Part of ‘UNICORN Binance WebSocket API’
# Project website: https://github.com/unicorn-data-analysis/unicorn-binance-websocket-api
# Documentation: https://www.unicorn-data.com/unicorn-binance-websocket-api.html
# PyPI: https://pypi.org/project/unicorn-binance-websocket-api/
#
# Author: UNICORN Data Analysis
# https://www.unicorn-data.com/
#
# Copyright (c) 2019, UNICORN Data Analysis
# All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.

rm ./build -r
rm ./dist -r
rm ./unicorn_binance_websocket_api.egg-info
2 changes: 1 addition & 1 deletion pypi_upload_wheel.sh
Expand Up @@ -32,7 +32,7 @@
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.

#
# create this file:
# ~/.pypirc
#[distutils]
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -40,7 +40,7 @@

setuptools.setup(
name='unicorn-binance-websocket-api',
version='1.2.0',
version='1.2.1',
author="UNICORN Data Analysis",
url="https://www.unicorn-data.com",
scripts=['unicorn_binance_websocket_api.py'],
Expand Down
Expand Up @@ -66,7 +66,6 @@ async def __aenter__(self):
logging.critical("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - " + str(error_msg))
self.handler_binance_websocket_api_manager.stream_is_crashing(self.stream_id, str(error_msg))
#time.sleep(0.5)
self.handler_binance_websocket_api_manager.set_restart_request(self.stream_id)
sys.exit(1)
try:
Expand All @@ -86,7 +85,7 @@ async def __aenter__(self):
pass
logging.debug("BinanceWebSocketApiConnection->__enter__(" + str(self.stream_id) + ", " + str(self.channels) +
", " + str(self.markets) + ")" + " connecting to " + str(uri))
self._conn = connect(uri, ping_interval=10, ping_timeout=10, close_timeout=5,
self._conn = connect(uri, ping_interval=None, close_timeout=5,
extra_headers={'User-Agent': 'unicorn-data-analysis/unicorn-binance-websocket-api/' +
self.handler_binance_websocket_api_manager.version})
try:
Expand All @@ -100,21 +99,13 @@ async def __aenter__(self):
except KeyError:
pass
except ConnectionResetError as error_msg:
logging.critical("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - ConnectionResetError "
"- " + str(error_msg))
#self.handler_binance_websocket_api_manager.stream_is_crashing(self.stream_id, (str(error_msg) +
# " - ConnectionResetError"))
#self.handler_binance_websocket_api_manager.set_restart_request(self.stream_id)
#sys.exit(1)
logging.error("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - ConnectionResetError - " +
str(error_msg))
except OSError as error_msg:
logging.critical("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
logging.error("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - OSError "
"- " + str(error_msg))
#self.handler_binance_websocket_api_manager.stream_is_crashing(self.stream_id, (str(error_msg) +
# " - OSError"))
#self.handler_binance_websocket_api_manager.set_restart_request(self.stream_id)
#sys.exit(1)
except socket.gaierror as error_msg:
logging.critical("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - No internet connection? "
Expand All @@ -137,12 +128,8 @@ async def __aenter__(self):
pass
sys.exit(1)
elif "Status code not 101: 400" in str(error_msg):
logging.critical("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") " + str(error_msg))
# Test!!! This block is deactivated and `async def __aexit__(self, *args, **kwargs):` is now doing the job...
#self.handler_binance_websocket_api_manager.stream_is_crashing(self.stream_id, str (error_msg))
#self.handler_binance_websocket_api_manager.set_restart_request(self.stream_id)
#sys.exit(1)
logging.error("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") " + str(error_msg))
elif "Status code not 101: 500" in str(error_msg):
logging.critical("BinanceWebSocketApiConnection->await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") " + str(error_msg))
Expand Down Expand Up @@ -177,18 +164,14 @@ async def __aexit__(self, *args, **kwargs):
"ConnectionClosed - " + str(error_msg))
finally:
self.handler_binance_websocket_api_manager.stream_is_stopping(self.stream_id)
#try:
# self.handler_binance_websocket_api_manager.websocket_list[self.stream_id].close()
#except KeyError:
# pass
if self.handler_binance_websocket_api_manager.is_stop_request(self.stream_id) is False:
self.handler_binance_websocket_api_manager.set_restart_request(self.stream_id)
sys.exit(0)

def close(self):
# used to close the stream
self.handler_binance_websocket_api_manager.stream_is_stopping(self.stream_id)
logging.debug("binance_websocket_api_connection->close(" + str(self.stream_id) + ")")
logging.info("binance_websocket_api_connection->close(" + str(self.stream_id) + ")")
self.handler_binance_websocket_api_manager.websocket_list[self.stream_id].close()
sys.exit(0)

Expand All @@ -215,4 +198,3 @@ async def receive(self):
if self.handler_binance_websocket_api_manager.is_stop_request(self.stream_id) is False:
self.handler_binance_websocket_api_manager.set_restart_request(self.stream_id)
sys.exit(0)

0 comments on commit 5490eaf

Please sign in to comment.