Skip to content

Commit

Permalink
feat(websocket): add timeout and sleep time to prevent issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Atem18 committed Mar 29, 2022
1 parent 0be997c commit 8b19f71
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 23 deletions.
7 changes: 7 additions & 0 deletions README.md
Expand Up @@ -5,6 +5,13 @@
Python client for Kraken API REST and Kraken Websockets API using httpx and websockets.
Supports both sync and async for API REST.

## Disclaimer
This software is for educational purposes only. Do not risk money which you are afraid to lose. USE THE SOFTWARE AT YOUR OWN RISK. THE AUTHORS AND ALL AFFILIATES ASSUME NO RESPONSIBILITY FOR YOUR TRADING RESULTS.

Always start by running a trading bot in Dry-run and do not engage money before you understand how it works and what profit/loss you should expect.

We strongly recommend you to have coding and Python knowledge. Do not hesitate to read the source code and understand the mechanism of this library.

## Installation
pip install kraky

Expand Down
85 changes: 64 additions & 21 deletions kraky/ws.py
Expand Up @@ -13,7 +13,9 @@
class KrakyWsClient:
"""Kraken Websocket client implementation"""

def __init__(self, connection_env: str = "production", logging_level: str = "INFO") -> None:
def __init__(
self, connection_env: str = "production", logging_level: str = "INFO"
) -> None:
"""
Initialize the object.
Expand All @@ -25,13 +27,23 @@ def __init__(self, connection_env: str = "production", logging_level: str = "INF
self.connections: dict = {}
self.logger = get_module_logger(__name__, logging_level)

async def connect(self, handler: Callable, connection_name: str = "main") -> None:
async def connect(
self,
handler: Callable,
connection_name: str = "main",
reply_timeout: int = 10,
ping_timeout: int = 5,
sleep_time: int = 5,
) -> None:
"""
Main function to be called.
Arguments:
handler: A function that will manage WS's messages
connection_name: Give it a proper name to distinguish between the public and private WS
reply_timeout: Timeout after x seconds if no message on websocket
ping_timeout: Timeout after x seconds if no pong is received
sleep_time: Reconnects after x seconds when connection is dropped
"""
if self.connection_env == "production":
ws_url = "wss://ws.kraken.com"
Expand Down Expand Up @@ -60,24 +72,51 @@ async def connect(self, handler: Callable, connection_name: str = "main") -> Non
connection_name=connection_name,
)
else:
message = await websocket.recv()
if "errorMessage" in message:
error = json.loads(message)
self.logger.error(error["errorMessage"])
else:
data = json.loads(message)
await handler(data)
try:
message = await asyncio.wait_for(
websocket.recv(), timeout=reply_timeout
)
if "errorMessage" in message:
error = json.loads(message)
self.logger.error(error["errorMessage"])
else:
data = json.loads(message)
await handler(data)
except asyncio.TimeoutError:
try:
pong = await websocket.ping()
await asyncio.wait_for(pong, timeout=ping_timeout)
self.logger.debug("Ping OK - keeping connection alive.")
continue
except:
self.logger.debug(
f"Ping error - retrying connection in {sleep_time} sec."
)
await asyncio.sleep(sleep_time)
break
except socket.gaierror:
self.logger.debug("Socket gaia error, let's reconnect anyway...")
self.logger.debug(
f"Socket gaia error - retrying connection in {sleep_time} sec."
)
await asyncio.sleep(sleep_time)
continue
except websockets.exceptions.ConnectionClosedError:
self.logger.debug("WebSockets connection closed error, let's reconnect anyway...")
self.logger.debug(
f"WebSockets connection closed error - retrying connection in {sleep_time} sec."
)
await asyncio.sleep(sleep_time)
continue
except websockets.exceptions.ConnectionClosedOK:
self.logger.debug("WebSockets connection closed ok, let's reconnect anyway...")
self.logger.debug(
f"WebSockets connection closed ok - retrying connection in {sleep_time} sec."
)
await asyncio.sleep(sleep_time)
continue
except ConnectionResetError:
self.logger.debug("Connection reset error, let's reconnect anyway...")
self.logger.debug(
f"Connection reset error - retrying connection in {sleep_time} sec."
)
await asyncio.sleep(sleep_time)
continue

async def disconnect(self, connection_name: str = "main") -> None:
Expand All @@ -91,13 +130,17 @@ async def disconnect(self, connection_name: str = "main") -> None:
try:
await self.connections[connection_name]["websocket"].close()
except socket.gaierror:
self.logger.debug("Socket gaia error, let's disconnect anyway...")
self.logger.debug("Socket gaia error - disconnecting anyway.")
except websockets.exceptions.ConnectionClosedError:
self.logger.debug("WebSockets connection closed error, let's disconnect anyway...")
self.logger.debug(
"WebSockets connection closed error - disconnecting anyway."
)
except websockets.exceptions.ConnectionClosedOK:
self.logger.debug("WebSockets connection closed ok, let's disconnect anyway...")
self.logger.debug(
"WebSockets connection closed ok - disconnecting anyway."
)
except ConnectionResetError:
self.logger.debug("Connection reset error, let's disconnect anyway...")
self.logger.debug("Connection reset error - disconnecting anyway.")
del self.connections[connection_name]

async def _send(self, data: dict, connection_name: str = "main") -> None:
Expand All @@ -106,13 +149,13 @@ async def _send(self, data: dict, connection_name: str = "main") -> None:
try:
await self.connections[connection_name]["websocket"].send(json.dumps(data))
except socket.gaierror:
self.logger.debug("Socket gaia error, message not sent...")
self.logger.debug("Socket gaia error - message not sent.")
except websockets.exceptions.ConnectionClosedError:
self.logger.debug("WebSockets connection closed error, message not sent...")
self.logger.debug("WebSockets connection closed error - message not sent.")
except websockets.exceptions.ConnectionClosedOK:
self.logger.debug("WebSockets connection closed ok, message not sent...")
self.logger.debug("WebSockets connection closed ok - message not sent.")
except ConnectionResetError:
self.logger.debug("Connection reset error, message not sent...")
self.logger.debug("Connection reset error - message not sent.")

async def ping(self, reqid: int = None, connection_name: str = "main") -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kraky"
version = "2022.03.26"
version = "2022.03.29"
description = "Python client for Kraken API REST and Kraken Websockets API using httpx and websockets. Supports both sync and async for API REST."
authors = ["Atem18 <contact@atemlire.io>"]
license = "MIT"
Expand Down
1 change: 0 additions & 1 deletion tests/test_api.py
Expand Up @@ -5,7 +5,6 @@


class TestKrakyApiClient:

def test_get_last_price(self):
result = kraky_api_client.get_last_price("xbtusd")
assert isinstance(result, float)
Expand Down

0 comments on commit 8b19f71

Please sign in to comment.