diff --git a/booker_api/booker_app.py b/booker_api/booker_app.py index 4f5f5d0..9e9e87c 100644 --- a/booker_api/booker_app.py +++ b/booker_api/booker_app.py @@ -23,6 +23,33 @@ def __init__(self): self.gateways_clients = {} self.order_processing: OrdersProcessor + async def check_connections(self): + log.info("Run connections checking...") + while True: + for coin, clients in self.gateways_clients.items(): + for side in clients: + client = clients.get(side) + if not client: + continue + try: + await client.connect( + client._host, client._port, client.ws_rpc_endpoint + ) + await client.disconnect() + if not client.is_successfully_connected: + log.info( + f"{client} client successfully connected to ws://{client._host}:{client._port}/{client.ws_rpc_endpoint}" + ) + client.is_successfully_connected = True + except Exception as ex: + if client.is_successfully_connected in (None, True): + log.warning( + f"{client} client unable to connect ws://{client._host}:{client._port}/{client.ws_rpc_endpoint}: {ex}" + ) + client.is_successfully_connected = False + + await asyncio.sleep(3) + def run(self): loop = asyncio.get_event_loop() signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) @@ -56,25 +83,17 @@ def run(self): self.gateways_clients[name] = {} log.info(f"Setup {name} gateways clients connections...") for side, params in data.items(): - if params: gw_client = BookerSideClient(name, side, self, params[0], params[1]) self.gateways_clients[name][side] = gw_client - try: - loop.run_until_complete( - gw_client.connect(params[0], params[1], "/ws-rpc") - ) - log.info( - f"{side}{name} client created and ready to connect ws://{params[0]}:{params[1]}/ws-rpc" - ) - loop.run_until_complete(gw_client.disconnect()) - except Exception as ex: - log.warning( - f"{side}{name} created, but unable to connect ws://{params[0]}:{params[1]}/ws-rpc: {ex}" - ) + log.info( + f"{gw_client} created and ready to connect ws://{params[0]}:{params[1]}/ws-rpc" + ) else: log.info(f"{side}{name} client not specified") + loop.create_task(self.check_connections()) + self.order_processing = OrdersProcessor(self) loop.create_task(self.order_processing.run()) diff --git a/booker_api/booker_process_orders_api.py b/booker_api/booker_process_orders_api.py index e9e95be..8b987fe 100644 --- a/booker_api/booker_process_orders_api.py +++ b/booker_api/booker_process_orders_api.py @@ -73,30 +73,34 @@ async def run(self): _in_coin.replace(f"{self.ctx.cfg.exchange_prefix}.", "") ]["native"] - if gw: - log.info( - f"Try {gw} initialize out transaction for {order.order_id}..." - ) + if not gw: + continue - """Gateway must response with TransactionDTO filled max_confirmations and from_address params. - It's a signal that gateway ready to execute new transaction - """ - try: + if not gw.is_successfully_connected: + continue - # TODO take_fee, remove this mock - if order.out_tx.amount == 0 and order.in_tx.amount != 0: - order.out_tx.amount = order.in_tx.amount + log.info( + f"Try {gw} initialize out transaction for {order.order_id}..." + ) - new_tx = await gw.init_new_tx_request(order) + """Gateway must response with TransactionDTO filled max_confirmations and from_address params. + It's a signal that gateway ready to execute new transaction + """ + try: + # TODO take_fee, remove this mock + if order.out_tx.amount == 0 and order.in_tx.amount != 0: + order.out_tx.amount = order.in_tx.amount - assert new_tx.max_confirmations > 0 - assert new_tx.from_address is not None + new_tx = await gw.init_new_tx_request(order) - order.out_tx.max_confirmations = new_tx.max_confirmations - order.out_tx.from_address = new_tx.from_address + assert new_tx.max_confirmations > 0 + assert new_tx.from_address is not None - await safe_update_order(conn, order) - except Exception as ex: - log.info(f"Unable to init new transaction: {new_tx}, {ex}") + order.out_tx.max_confirmations = new_tx.max_confirmations + order.out_tx.from_address = new_tx.from_address + + await safe_update_order(conn, order) + except Exception as ex: + log.info(f"Unable to init new transaction: {new_tx}, {ex}") await asyncio.sleep(1) diff --git a/finteh_proto/client.py b/finteh_proto/client.py index 3d12976..3dfc242 100644 --- a/finteh_proto/client.py +++ b/finteh_proto/client.py @@ -18,6 +18,7 @@ def __init__(self, ctx=None, host="0.0.0.0", port=8080, ws_rpc_endpoint="/ws-rpc self._host = host self._port = port self.ws_rpc_endpoint = ws_rpc_endpoint + self.is_successfully_connected = None async def ping(self): """Just RPC string ping-pong"""