From 8b19f71dc505ac73d24561293361dde29766d98e Mon Sep 17 00:00:00 2001 From: Atem18 Date: Tue, 29 Mar 2022 21:47:21 +0200 Subject: [PATCH] feat(websocket): add timeout and sleep time to prevent issues --- README.md | 7 ++++ kraky/ws.py | 85 +++++++++++++++++++++++++++++++++++------------ pyproject.toml | 2 +- tests/test_api.py | 1 - 4 files changed, 72 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index b4edbd4..ec0b914 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,13 @@ Python client for Kraken API REST and Kraken Websockets API using httpx and websockets. Supports both sync and async for API REST. +## Disclaimer +This software is for educational purposes only. Do not risk money which you are afraid to lose. USE THE SOFTWARE AT YOUR OWN RISK. THE AUTHORS AND ALL AFFILIATES ASSUME NO RESPONSIBILITY FOR YOUR TRADING RESULTS. + +Always start by running a trading bot in Dry-run and do not engage money before you understand how it works and what profit/loss you should expect. + +We strongly recommend you to have coding and Python knowledge. Do not hesitate to read the source code and understand the mechanism of this library. + ## Installation pip install kraky diff --git a/kraky/ws.py b/kraky/ws.py index e111f22..17488e8 100644 --- a/kraky/ws.py +++ b/kraky/ws.py @@ -13,7 +13,9 @@ class KrakyWsClient: """Kraken Websocket client implementation""" - def __init__(self, connection_env: str = "production", logging_level: str = "INFO") -> None: + def __init__( + self, connection_env: str = "production", logging_level: str = "INFO" + ) -> None: """ Initialize the object. @@ -25,13 +27,23 @@ def __init__(self, connection_env: str = "production", logging_level: str = "INF self.connections: dict = {} self.logger = get_module_logger(__name__, logging_level) - async def connect(self, handler: Callable, connection_name: str = "main") -> None: + async def connect( + self, + handler: Callable, + connection_name: str = "main", + reply_timeout: int = 10, + ping_timeout: int = 5, + sleep_time: int = 5, + ) -> None: """ Main function to be called. Arguments: handler: A function that will manage WS's messages connection_name: Give it a proper name to distinguish between the public and private WS + reply_timeout: Timeout after x seconds if no message on websocket + ping_timeout: Timeout after x seconds if no pong is received + sleep_time: Reconnects after x seconds when connection is dropped """ if self.connection_env == "production": ws_url = "wss://ws.kraken.com" @@ -60,24 +72,51 @@ async def connect(self, handler: Callable, connection_name: str = "main") -> Non connection_name=connection_name, ) else: - message = await websocket.recv() - if "errorMessage" in message: - error = json.loads(message) - self.logger.error(error["errorMessage"]) - else: - data = json.loads(message) - await handler(data) + try: + message = await asyncio.wait_for( + websocket.recv(), timeout=reply_timeout + ) + if "errorMessage" in message: + error = json.loads(message) + self.logger.error(error["errorMessage"]) + else: + data = json.loads(message) + await handler(data) + except asyncio.TimeoutError: + try: + pong = await websocket.ping() + await asyncio.wait_for(pong, timeout=ping_timeout) + self.logger.debug("Ping OK - keeping connection alive.") + continue + except: + self.logger.debug( + f"Ping error - retrying connection in {sleep_time} sec." + ) + await asyncio.sleep(sleep_time) + break except socket.gaierror: - self.logger.debug("Socket gaia error, let's reconnect anyway...") + self.logger.debug( + f"Socket gaia error - retrying connection in {sleep_time} sec." + ) + await asyncio.sleep(sleep_time) continue except websockets.exceptions.ConnectionClosedError: - self.logger.debug("WebSockets connection closed error, let's reconnect anyway...") + self.logger.debug( + f"WebSockets connection closed error - retrying connection in {sleep_time} sec." + ) + await asyncio.sleep(sleep_time) continue except websockets.exceptions.ConnectionClosedOK: - self.logger.debug("WebSockets connection closed ok, let's reconnect anyway...") + self.logger.debug( + f"WebSockets connection closed ok - retrying connection in {sleep_time} sec." + ) + await asyncio.sleep(sleep_time) continue except ConnectionResetError: - self.logger.debug("Connection reset error, let's reconnect anyway...") + self.logger.debug( + f"Connection reset error - retrying connection in {sleep_time} sec." + ) + await asyncio.sleep(sleep_time) continue async def disconnect(self, connection_name: str = "main") -> None: @@ -91,13 +130,17 @@ async def disconnect(self, connection_name: str = "main") -> None: try: await self.connections[connection_name]["websocket"].close() except socket.gaierror: - self.logger.debug("Socket gaia error, let's disconnect anyway...") + self.logger.debug("Socket gaia error - disconnecting anyway.") except websockets.exceptions.ConnectionClosedError: - self.logger.debug("WebSockets connection closed error, let's disconnect anyway...") + self.logger.debug( + "WebSockets connection closed error - disconnecting anyway." + ) except websockets.exceptions.ConnectionClosedOK: - self.logger.debug("WebSockets connection closed ok, let's disconnect anyway...") + self.logger.debug( + "WebSockets connection closed ok - disconnecting anyway." + ) except ConnectionResetError: - self.logger.debug("Connection reset error, let's disconnect anyway...") + self.logger.debug("Connection reset error - disconnecting anyway.") del self.connections[connection_name] async def _send(self, data: dict, connection_name: str = "main") -> None: @@ -106,13 +149,13 @@ async def _send(self, data: dict, connection_name: str = "main") -> None: try: await self.connections[connection_name]["websocket"].send(json.dumps(data)) except socket.gaierror: - self.logger.debug("Socket gaia error, message not sent...") + self.logger.debug("Socket gaia error - message not sent.") except websockets.exceptions.ConnectionClosedError: - self.logger.debug("WebSockets connection closed error, message not sent...") + self.logger.debug("WebSockets connection closed error - message not sent.") except websockets.exceptions.ConnectionClosedOK: - self.logger.debug("WebSockets connection closed ok, message not sent...") + self.logger.debug("WebSockets connection closed ok - message not sent.") except ConnectionResetError: - self.logger.debug("Connection reset error, message not sent...") + self.logger.debug("Connection reset error - message not sent.") async def ping(self, reqid: int = None, connection_name: str = "main") -> None: """ diff --git a/pyproject.toml b/pyproject.toml index d0a946c..983d3f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "kraky" -version = "2022.03.26" +version = "2022.03.29" description = "Python client for Kraken API REST and Kraken Websockets API using httpx and websockets. Supports both sync and async for API REST." authors = ["Atem18 "] license = "MIT" diff --git a/tests/test_api.py b/tests/test_api.py index 623db06..b52ff6e 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -5,7 +5,6 @@ class TestKrakyApiClient: - def test_get_last_price(self): result = kraky_api_client.get_last_price("xbtusd") assert isinstance(result, float)