Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to restart a coroutine after a websocket stream stops receiving data? (1006 error code) #14086

Closed
Kinzowa opened this issue Jun 27, 2022 · 15 comments
Assignees

Comments

@Kinzowa
Copy link

Kinzowa commented Jun 27, 2022

Hello,

I'm writing an asyncio application to monitor prices and trade/order events, but for an unknown reason some streams stop receiving data after few hours. I'm not familiar with the asyncio package and I would appreciate help in finding a solution.

Basically, the code below establishs websocket connections with Binance to listen streams of six symbols (ETH/USD, BTC/USD, BNB/USD for both spot and future) and trades events from two accounts (user1, user2). The application uses the library ccxtpro. The public method watch_ohlcv get price steams, while private methods watchMyTrades and watchOrders get new orders and trades events at account level.

The problem is that one or several streams are interrupted after few hours, and the object response get empty or None. I would like to detect and restart these streams after they stops working, how can I do that ?

# tasks.py
@app.task(bind=True, name='Start websocket loops')
def start_ws_loops(self):
    ws_loops()

# methods.py
def ws_loops():

    async def method_loop(client, exid, wallet, method, private, args):

        exchange = Exchange.objects.get(exid=exid)

        if private:
            account = args['account']
        else:
            symbol = args['symbol']

        while True:
            try:

                if private:
                    response = await getattr(client, method)()
                    if method == 'watchMyTrades':
                        do_stuff(response)

                    elif method == 'watchOrders':
                        do_stuff(response)

                else:
                    response = await getattr(client, method)(**args)  # <-------- Empty object after some times !?
                    if method == 'watch_ohlcv':
                        do_stuff(response)

                # await asyncio.sleep(3)

            except Exception as e:
                print(str(e))
                break
        
        await client.close()

    async def clients_loop(loop, dic):

        exid = dic['exid']
        wallet = dic['wallet']
        method = dic['method']
        private = dic['private']
        args = dic['args']

        exchange = Exchange.objects.get(exid=exid)
        parameters = {'enableRateLimit': True, 'asyncio_loop': loop, 'newUpdates': True}

        if private:
            log.info('Initialize private instance')
            account = args['account']
            client = exchange.get_ccxt_client_pro(parameters, wallet=wallet, account=account)

        else:
            log.info('Initialize public instance')
            client = exchange.get_ccxt_client_pro(parameters, wallet=wallet)

        mloop = method_loop(client, exid, wallet, method, private, args)
        await gather(mloop)
        await client.close()

    async def main(loop):

        lst = []
        private = ['watchMyTrades', 'watchOrders']
        public = ['watch_ohlcv']

        for exid in ['binance']:
            for wallet in ['spot', 'future']:
                
                # Private
                for method in private:
                    for account in ['user1', 'user2']:
                        lst.append(dict(exid=exid,
                                        wallet=wallet,
                                        method=method,
                                        private=True,
                                        args=dict(account=account)
                                        ))
                
                # Public
                for method in public:
                    for symbol in ['ETH/USD', 'BTC/USD', 'BNB/USD']:
                        lst.append(dict(exid=exid,
                                        wallet=wallet,
                                        method=method,
                                        private=False,
                                        args=dict(symbol=symbol,
                                                  timeframe='5m',
                                                  limit=1
                                                  )
                                        ))

        loops = [clients_loop(loop, dic) for dic in lst]
        await gather(*loops)

    loop = asyncio.new_event_loop()
    loop.run_until_complete(main(loop))
@Kinzowa Kinzowa changed the title How to restart a coroutine after a websocket stream stops receiving data? How to restart a coroutine after a websocket stream stops receiving data? (CCXT Pro) Jun 27, 2022
@carlosmiei carlosmiei self-assigned this Jun 27, 2022
@carlosmiei
Copy link
Collaborator

Hello @Kinzowa,

First, it is not expected to get stalled streams after some time running it, we will take a look at it to see if is something related to our code or something related to the exchange's server.

Second, currently, there is no good way to restart specific streams (unsubscribe is not yet implemented) so the only thing you can do is shut down all the connections in the current instance upon detecting a stalled stream/invalid response/etc (by invoking await exchange.close()) and re-subscribe everything again.

@Kinzowa
Copy link
Author

Kinzowa commented Jun 27, 2022

Hello @carlosmiei, thank you for your response. Please let us know if you find anything in the code that could cause the stream to crash. On my end, I'll try to catch an exception or something that might give a clue as to what's going on.

@Kinzowa
Copy link
Author

Kinzowa commented Jun 28, 2022

Hi @carlosmiei , just to let you know that I catched an exception with error code 1006. Will get the traceback next time.

@carlosmiei
Copy link
Collaborator

@Kinzowa thank you

@Kinzowa
Copy link
Author

Kinzowa commented Jun 28, 2022

Looks like the problem is discussed here and here.

In my case I have very few disconnection (one every day, or less), probably because the code is executed by a quality server in a datacenter. Also the code is executed by a Celery worker with concurrency set to 4 whereas server has only 2 CPUs.

That would be great if a fix could be implemented.

@Kinzowa
Copy link
Author

Kinzowa commented Jun 29, 2022

Hi @carlosmiei, hi @kroitor,

Please find below the traceback of the exceptions that occured this morning in the while loop above.

Six exceptions concerned public method watch_ohlcv() (6 over 12 markets monitored) , and one was for the only private method watch_orders. Exceptions occured in a 18 seconds time range. The bot stopped working after this :(

Do you think there is something we could do ? Yesterday I configured the Celery worker with only 2 processes instead of 4, to match the number of CPUs, but unfortunnatly it doesn't prevent the error from occuring.

Thanks!

Jun 29 07:51:17.232288 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=future worker=0
Jun 29 07:51:17.232790 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=future worker=0
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1042, in method_loop
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)(**args)
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 542, in watch_ohlcv
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     ohlcv = await self.watch(url, messageHash, self.extend(request, query), messageHash, subscribe)
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:17.232927 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006
Jun 29 07:51:17.233155 ubuntu-2cpu-4gb-de-fra1 celery[48995]:  [trading.tasks] exid=binance task=f1d wallet=future worker=0
Jun 29 07:51:20.517498 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:20.517865 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1042, in method_loop
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)(**args)
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 542, in watch_ohlcv
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     ohlcv = await self.watch(url, messageHash, self.extend(request, query), messageHash, subscribe)
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:20.518039 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006
Jun 29 07:51:20.518257 ubuntu-2cpu-4gb-de-fra1 celery[48995]:  [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:24.467232 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:24.467689 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1042, in method_loop
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)(**args)
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 542, in watch_ohlcv
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     ohlcv = await self.watch(url, messageHash, self.extend(request, query), messageHash, subscribe)
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:24.468076 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006
Jun 29 07:51:24.468298 ubuntu-2cpu-4gb-de-fra1 celery[48995]:  [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:29.878915 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=future worker=0
Jun 29 07:51:29.879351 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=future worker=0
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1042, in method_loop
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)(**args)
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 542, in watch_ohlcv
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     ohlcv = await self.watch(url, messageHash, self.extend(request, query), messageHash, subscribe)
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:29.879751 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006
Jun 29 07:51:29.880089 ubuntu-2cpu-4gb-de-fra1 celery[48995]:  [trading.tasks] exid=binance task=f1d wallet=future worker=0
Jun 29 07:51:35.176635 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.176966 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1019, in method_loop
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)()
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 918, in watch_orders
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     orders = await self.watch(url, messageHash, message, type)
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:35.177273 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006
Jun 29 07:51:35.177596 ubuntu-2cpu-4gb-de-fra1 celery[48995]:  [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.197866 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.198235 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1042, in method_loop
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)(**args)
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 542, in watch_ohlcv
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     ohlcv = await self.watch(url, messageHash, self.extend(request, query), messageHash, subscribe)
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:35.198540 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006
Jun 29 07:51:35.198778 ubuntu-2cpu-4gb-de-fra1 celery[48995]:  [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.199001 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] ***********************E********************** [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.199249 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] 1006                                        [trading.tasks] exid=binance task=f1d wallet=spot worker=0
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]: [error    ] Traceback (most recent call last):
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/var/www/myapp/trading/tasks.py", line 1042, in method_loop
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     response = await getattr(client, method)(**args)
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/home/username/.local/lib/python3.7/site-packages/ccxtpro/binance.py", line 542, in watch_ohlcv
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     ohlcv = await self.watch(url, messageHash, self.extend(request, query), messageHash, subscribe)
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]:   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]:     future.result()
Jun 29 07:51:35.199506 ubuntu-2cpu-4gb-de-fra1 celery[48995]: ccxt.base.errors.NetworkError: 1006

@ShieldTrade
Copy link

Hi @Kinzowa, let me share with you my experience since I am dealing with the same problem.

it is not expected to get stalled streams after some time running it.

Unfortunately practice and theory are different and error 1006 happens quite often. I am using Binance, OKX, Bitmex and BTSE ( BTSE is not supported by CCXT) and my code runs on AWS server so I should not have any connection issue. Binance and OKX are the worst as far as error 1006 is concerned.. Honestly, after researching it on google, I have only understood 1006 is a NetworkError and I know CCXT tries to resubscribe the channel automatically. All other explanations I found online did not convince me. If somebody of CCXT staff could give us more info about this error I think the community would appreciate it.

In any case, every time an exception is raised, I put it in an exception_list as a dictionary containing info like time in mls, method, exchange, description ecc. The exception_list is then passed to a handle_exception method. In this case, if the list contains two 1006 exception within X time handle_exception returns we are not on sync with market data and trading must stop. I cancel all my limit order and I emit a beep ( calling human intervention).

As for your second question:

restart these streams after they stops working, how can I do that

remember that you are Running Tasks Concurrently

If return_exceptions is False (default), the first raised exception is immediately propagated to the task that awaits on gather(). Other awaitables in the aws sequence won’t be cancelled and will continue to run.

here you can find info about restarting individual task in a a gather()

In your case, since you are using a single exchange (Binance)and unsubscribe is not implemented, as @carlosmiei pointed out you will have to close the connection and restart all the task. You can still use the above for automating it. In case you are using more then one exchange you can design your code in a way that let you close and restart only the Exchange that failed.

Another option for you would be defining the tasks with more granularity in the main so that every task is related to a single and well defined exchange/user/method/symbol and every task subscribes a single channel. This will result in a more verbose and less elegant code but it will help you catching the exception and eventually restart only a specific coroutine.

I am obviously assuming that after error 1006 you can resubscribe a channel without previously unsubscribe it ( does error 1006 unsubscribe a channel?)

final thought:

never leave a robot unattended

Professional market makers with a team of engineers working in London do not go to the pub while their algos ( usually co-located within the exchange ) execute thousands of trades.

I hope this can help you and I would be happy to continue the discussion.

@Kinzowa
Copy link
Author

Kinzowa commented Jul 1, 2022

Hi @ShieldTrade, first thank you for your suggestions and informations. The SF link you pointed out for restarting an individual task is much appreciated, although I'm still a beginner with asyncio and it's not easy to tackle.

I must say that I implemented an automatic ping to the exchange and since then I haven't had the 1006 disconnection. So far so good! This is the task I execute every 5 seconds with Celery beat scheduler.

import platform
import subprocess
from bot.celery import app

@app.task(name='Ping host')
def ping():
    host = 'api.binance.com'
    p1 = subprocess.Popen(['/bin/ping', '-c 1', host], stdout=subprocess.DEVNULL)
    output = p1.communicate()[0]

@ShieldTrade
Copy link

@Kinzowa, CCXT has its own keepalive mechanism which sends ping-pong messages to the exchange. I do not think this is the problem. In any case let me know if your ping solves the issue. I am also considering this malicious scenario: sometimes exchange servers are too busy and they simply drop a few non VIP clients connections.

@Kinzowa
Copy link
Author

Kinzowa commented Jul 1, 2022

@Kinzowa, CCXT has its own keepalive mechanism which sends ping-pong messages to the exchange. I do not think this is the problem. In any case let me know if your ping solves the issue. I am also considering this malicious scenario: sometimes exchange servers are too busy and they simply drop a few non VIP clients connections.

Hi, sure, I know there is a ping pong mecanism to maintains the connection alive, but maybe there is an issue with the timer. Since I implemented a 5 sec. ping I had no disconnection since the last 2 days. This solution is also discussed online in several forums.

I will post again if/when an occurence happens.

@dadas190
Copy link

dadas190 commented Jul 2, 2022

How exactly did you implement the ping please?

@Kinzowa
Copy link
Author

Kinzowa commented Jul 2, 2022

How exactly did you implement the ping please?

#14086 (comment)

I created a Celery task (see above) in my Django application and schedule it every 5 seconds with beat. It seems to solve the issue.

Hi, @carlosmiei, do you think it coul be implemented natively in ccxt ? or maybe the ping mechanism could be improved for Binance and some other exchanges ?

@Kinzowa Kinzowa changed the title How to restart a coroutine after a websocket stream stops receiving data? (CCXT Pro) How to restart a coroutine after a websocket stream stops receiving data? (1006 error code) Jul 2, 2022
@ShieldTrade
Copy link

@Kinzowa, CCXT has an integer keep-alive rate in milliseconds. you can see it using

exchange = ccxt.okx()
print(exchange.has)

The default for OKX is 20000 mls. You can override it to 5000 so you would have the same effect as with your Celery task.
Binance does not have this property. I found
'listenKeyRefreshRate': 1200000, # 20 mins
but I honestly do not know what it does. You might be in the right direction. let us know if the problem happens again.
Tx

@Kinzowa
Copy link
Author

Kinzowa commented Jul 5, 2022

Just to let you know that I had zero disconnection since I implemented the periodic ping few days ago and I assume it's a working solution. I think this ticket can be closed now.

@Kinzowa Kinzowa closed this as completed Jul 5, 2022
@kroitor
Copy link
Member

kroitor commented Jul 5, 2022

@Kinzowa thanks for your feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants