Skip to content

Commit

Permalink
v4.0.0rc1
Browse files Browse the repository at this point in the history
* v4.0.0rc1 Release
  • Loading branch information
jonte-z committed May 18, 2023
1 parent d23c9b9 commit c0472bb
Show file tree
Hide file tree
Showing 40 changed files with 555 additions and 520 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 4.0.0rc1 - 2023-05-18

### Changed
- Redesign of Websocket part. Please consult `README.md` for details on its new usage.

## 3.3.1 - 2023-03-21

### Updated
Expand Down
66 changes: 48 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,36 +176,66 @@ There are 2 types of error returned from the library:

## Websocket

### Connector v4

WebSocket can be established through the following connections:
- USD-M WebSocket Stream (`https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams`)
- COIN-M WebSocket Stream (`https://binance-docs.github.io/apidocs/delivery/en/#websocket-market-streams`)

```python
# WebSocket Stream Client
import time
from binance.websocket.cm_futures.websocket_client import CMFuturesWebsocketClient
from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient

def message_handler(message):
print(message)
def message_handler(_, message):
logging.info(message)

ws_client = CMFuturesWebsocketClient()
ws_client.start()
my_client = UMFuturesWebsocketClient(on_message=message_handler)

ws_client.mini_ticker(
symbol='bnbusdt',
id=1,
callback=message_handler,
)
# Subscribe to a single symbol stream
my_client.agg_trade(symbol="bnbusdt")
time.sleep(5)
logging.info("closing ws connection")
my_client.stop()
```

# Combine selected streams
ws_client.instant_subscribe(
stream=['bnbusdt@bookTicker', 'ethusdt@bookTicker'],
callback=message_handler,
)
#### Request Id

time.sleep(10)
Client can assign a request id to each request. The request id will be returned in the response message. Not mandatory in the library, it generates a uuid format string if not provided.

print("closing ws connection")
ws_client.stop()
```python
# id provided by client
my_client.agg_trade(symbol="bnbusdt", id="my_request_id")

# library will generate a random uuid string
my_client.agg_trade(symbol="bnbusdt")
```

#### Combined Streams
- If you set `is_combined` to `True`, `"/stream/"` will be appended to the `baseURL` to allow for Combining streams.
- `is_combined` defaults to `False` and `"/ws/"` (raw streams) will be appended to the `baseURL`.

More websocket examples are available in the `examples` folder

## Websocket < v4

```python
import time
from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient

def message_handler(message):
print(message)

my_client = UMFuturesWebsocketClient(on_message=message_handler)

# Subscribe to a single symbol stream
my_client.agg_trade(symbol="bnbusdt")
time.sleep(5)
print("closing ws connection")
my_client.stop()

```

### Heartbeat

Once connected, the websocket server sends a ping frame every 3 minutes and requires a response pong frame back within
Expand Down
2 changes: 1 addition & 1 deletion binance/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.3.1"
__version__ = "4.0.0rc1"
46 changes: 0 additions & 46 deletions binance/websocket/binance_client_factory.py

This file was deleted.

45 changes: 0 additions & 45 deletions binance/websocket/binance_client_protocol.py

This file was deleted.

154 changes: 88 additions & 66 deletions binance/websocket/binance_socket_manager.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,104 @@
import json
import logging
import threading
from urllib.parse import urlparse
from twisted.internet import reactor, ssl
from twisted.internet.error import ReactorAlreadyRunning
from autobahn.twisted.websocket import WebSocketClientFactory, connectWS
from binance.websocket.binance_client_protocol import BinanceClientProtocol
from binance.websocket.binance_client_factory import BinanceClientFactory
from websocket import (
ABNF,
create_connection,
WebSocketException,
WebSocketConnectionClosedException,
)


class BinanceSocketManager(threading.Thread):
def __init__(self, stream_url):
def __init__(
self,
stream_url,
on_message=None,
on_open=None,
on_close=None,
on_error=None,
on_ping=None,
on_pong=None,
logger=None,
):
threading.Thread.__init__(self)

self.factories = {}
self._connected_event = threading.Event()
if not logger:
logger = logging.getLogger(__name__)
self.logger = logger
self.stream_url = stream_url
self._conns = {}
self._user_callback = None

def _start_socket(
self, stream_name, payload, callback, is_combined=False, is_live=True
):
if stream_name in self._conns:
return False

if is_combined:
factory_url = self.stream_url + "/stream"
else:
factory_url = self.stream_url + "/ws"

if not is_live:
payload_obj = json.loads(payload.decode("utf8"))

if is_combined:
factory_url = factory_url + "?streams=" + payload_obj["params"]
else:
factory_url = factory_url + "/" + payload_obj["params"]
payload = None

logging.info("Connection with URL: {}".format(factory_url))
self.on_message = on_message
self.on_open = on_open
self.on_close = on_close
self.on_ping = on_ping
self.on_pong = on_pong
self.on_error = on_error
self.create_ws_connection()

factory = BinanceClientFactory(factory_url, payload=payload)
factory.base_client = self
factory.protocol = BinanceClientProtocol
factory.setProtocolOptions(
openHandshakeTimeout=5, autoPingInterval=300, autoPingTimeout=5
def create_ws_connection(self):
self.logger.debug(
"Creating connection with WebSocket Server: %s", self.stream_url
)
factory.callback = callback
self.factories[stream_name] = factory
reactor.callFromThread(self.add_connection, stream_name, self.stream_url)
self.ws = create_connection(self.stream_url)
self.logger.debug(
"WebSocket connection has been established: %s", self.stream_url
)
self._callback(self.on_open)

def add_connection(self, stream_name, url):
if not url.startswith("wss://"):
raise ValueError("expected wss:// URL prefix")
def run(self):
self.read_data()

factory = self.factories[stream_name]
options = ssl.optionsForClientTLS(hostname=urlparse(url).hostname)
self._conns[stream_name] = connectWS(factory, options)
def send_message(self, message):
self.logger.debug("Sending message to Binance WebSocket Server: %s", message)
self.ws.send(message)

def stop_socket(self, conn_key):
if conn_key not in self._conns:
return
def ping(self):
self.ws.ping()

# disable reconnecting if we are closing
self._conns[conn_key].factory = WebSocketClientFactory(self.stream_url)
self._conns[conn_key].disconnect()
del self._conns[conn_key]
def read_data(self):
data = ""
while True:
try:
op_code, frame = self.ws.recv_data_frame(True)
except WebSocketException as e:
if isinstance(e, WebSocketConnectionClosedException):
self.logger.error("Lost websocket connection")
else:
self.logger.error("Websocket exception: {}".format(e))
raise e
except Exception as e:
self.logger.error("Exception in read_data: {}".format(e))
raise e

def run(self):
try:
reactor.run(installSignalHandlers=False)
except ReactorAlreadyRunning:
# Ignore error about reactor already running
pass
if op_code == ABNF.OPCODE_CLOSE:
self.logger.warning(
"CLOSE frame received, closing websocket connection"
)
self._callback(self.on_close)
break
elif op_code == ABNF.OPCODE_PING:
self._callback(self.on_ping, frame.data)
self.ws.pong("")
self.logger.debug("Received Ping; PONG frame sent back")
elif op_code == ABNF.OPCODE_PONG:
self.logger.debug("Received PONG frame")
self._callback(self.on_pong)
else:
data = frame.data
if op_code == ABNF.OPCODE_TEXT:
data = data.decode("utf-8")
self._callback(self.on_message, data)

def close(self):
keys = set(self._conns.keys())
for key in keys:
self.stop_socket(key)
self._conns = {}
if not self.ws.connected:
self.logger.warn("Websocket already closed")
else:
self.ws.send_close()
return

def _callback(self, callback, *args):
if callback:
try:
callback(self, *args)
except Exception as e:
self.logger.error("Error from callback {}: {}".format(callback, e))
if self.on_error:
self.on_error(self, e)
Loading

0 comments on commit c0472bb

Please sign in to comment.