Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WARNING - server - Application instance took too long to shut down and was killed #1119

Open
sohamnavadiya opened this issue Aug 9, 2018 · 37 comments

Comments

@sohamnavadiya
Copy link

sohamnavadiya commented Aug 9, 2018

first message is succeed but further messages throw this exception:

2018-08-09 15:25:32,937 - WARNING - server - Application instance <Task pending coro=<__call__() running at /home/soham/PycharmProjects/lead-generation/venv/lib/python3.5/site-packages/channels/sessions.py:175> wait_for=<Future pending cb=[Task._wakeup()]>> for connection <WebSocketProtocol client=['127.0.0.1', 35996] path=b'/lead/'> took too long to shut down and was killed.

My consumer class

class LeadConsumer(AsyncWebsocketConsumer):
    async def websocket_connect(self, event):
        print("Connected", event)
        self.room_group_name = 'lead_list'

        # join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()

    async def websocket_receive(self, event):
        print("Receive", event)
        await self.send({
            "type": "websocket.send",
            "text": "From receive..."
        })

    async def websocket_disconnect(self, event):
        print("Disconnect", event)
        await self.send({
            "type": "websocket.close"
        })

    async def lead_list(self, event):
        _contact = event['contact']
        _campaign = event['campaign']
        _company = event['company']
        _contact_person = event['contact_person']

        await self.send(text_data=json.dumps({
            'contact': _contact,
            'campaign': _campaign,
            'contact_person': _contact_person,
            'company': _company
        }))

requirements.txt

aioredis==1.1.0
asgiref==2.3.2
async-timeout==2.0.1
attrs==18.1.0
autobahn==18.7.1
Automat==0.7.0
certifi==2018.4.16
channels==2.1.2
channels-redis==2.2.1
chardet==3.0.4
constantly==15.1.0
daphne==2.2.1
diff-match-patch==20121119
Django==2.0.7
django-import-export==1.0.1
django-rest-framework==0.1.0
djangorestframework==3.8.2
docutils==0.14
et-xmlfile==1.0.1
hiredis==0.2.0
hyperlink==18.0.0
idna==2.7
incremental==17.5.0
jdcal==1.4
msgpack==0.5.6
odfpy==1.3.6
openpyxl==2.5.4
PyHamcrest==1.9.0
python-decouple==3.1
pytz==2018.5
PyYAML==3.13
requests==2.19.1
six==1.11.0
tablib==0.12.1
Twisted==18.7.0
txaio==18.7.1
unicodecsv==0.14.1
urllib3==1.23
xlrd==1.1.0
xlwt==1.3.0
zope.interface==4.5.0

How to solve my issue?
Thanks in advance.

@sohamnavadiya
Copy link
Author

Here is my log:

System check identified no issues (0 silenced).
August 09, 2018 - 15:38:13
Django version 2.0.7, using settings 'leadgeneration.settings'
Starting ASGI/Channels version 2.1.2 development server at http://0.0.0.0:8082/
Quit the server with CONTROL-C.
2018-08-09 15:38:13,066 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2018-08-09 15:38:13,066 - INFO - server - Configuring endpoint tcp:port=8082:interface=0.0.0.0
2018-08-09 15:38:13,067 - INFO - server - Listening on TCP address 0.0.0.0:8082
[2018/08/09 15:38:19] WebSocket HANDSHAKING /lead/ [127.0.0.1:45618]
Connected {'type': 'websocket.connect'}
[2018/08/09 15:38:19] WebSocket CONNECT /lead/ [127.0.0.1:45618]
[2018/08/09 15:38:21] HTTP GET /lead-call/XXXXXXXXXX/ 200 [0.03, 127.0.0.1:45650]
[2018/08/09 15:38:21] WebSocket DISCONNECT /lead/ [127.0.0.1:45618]
Disconnect {'type': 'websocket.disconnect', 'code': 1001}
2018-08-09 15:38:32,087 - WARNING - server - Application instance <Task pending coro=<__call__() running at /home/soham/PycharmProjects/lead-generation/venv/lib/python3.5/site-packages/channels/sessions.py:175> wait_for=<Future pending cb=[Task._wakeup()]>> for connection <WebSocketProtocol client=['127.0.0.1', 45618] path=b'/lead/'> took too long to shut down and was killed.

@andrewgodwin
Copy link
Member

This means something in your consumer is not exiting after the disconnect (you don't need to send websocket.close, so get rid of that) - it's not a problem in Channels from what I can tell.

If you can trace it down to a clear problem in channels with steps to reproduce from a fresh project, please re-open the bug - you can see more about where to ask questions at http://channels.readthedocs.io/en/latest/support.html

@annshress
Copy link

annshress commented Oct 3, 2019

In my case, the problem was with redis.

check if there are tcp connections in CLOSE_WAIT state.

netstat | grep 6379 | fgrep -c CLOSE_WAIT

In my case I had around 50+ in such state. Had to update redis.

@smohsensh
Copy link

I'm gonna say this here as it may help others. In my case the problem was just a silly mistake.
In the code I was maintaining websocket_disconnect was defined without call to super() so StopConsumer() was never raised. The right way would be to override disconnect method which is wrapped inside websocket_disconnect in AsyncWebsocketConsumer. Just avoiding to override websocket_disconnect solved my problem.

In the example above the same pattern exists. However I am not sure if that was the case back then in 2018 or this is a new style in Channels' code.

@ReadMost
Copy link

ReadMost commented Nov 4, 2020

I have a similar situation, but not the same
I have:

  • django== 2.2.9
  • channels == 2.4.0
  • daphne == 2.5.0

I am using Redis as backend with the code:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [(CELERY_BROKER_URL)],
            'capacity': 300
        },
    },
}
ASGI_THREADS = 1000

Using Daphne for Http and WebSocket

My asgi file:

import django
django.setup()
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
	'websocket': AuthMiddlewareStack(
	        URLRouter(
	            chat.routing.websocket_urlpatterns
	        )
	    ),
})

Here is the error message:
Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed.
This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.

It is not the same case as mentioned above, because:

  1. I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM
  2. Error occurs because of AsgiHandler.__call__() on HTTP request. Websocket connection is not even established

Can you explain why I have this issue? I thought the reason is django's version incompatibility

@ThreshHNS
Copy link

I have a similar situation, but not the same
I have:

  • django== 2.2.9
  • channels == 2.4.0
  • daphne == 2.5.0

I am using Redis as backend with the code:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [(CELERY_BROKER_URL)],
            'capacity': 300
        },
    },
}
ASGI_THREADS = 1000

Using Daphne for Http and WebSocket

My asgi file:

import django
django.setup()
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
	'websocket': AuthMiddlewareStack(
	        URLRouter(
	            chat.routing.websocket_urlpatterns
	        )
	    ),
})

Here is the error message:
Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed.
This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.

It is not the same case as mentioned above, because:

  1. I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM
  2. Error occurs because of AsgiHandler.__call__() on HTTP request. Websocket connection is not even established

Can you explain why I have this issue? I thought the reason is django's version incompatibility

I'm facing the same issue in my project.

@MrVhek
Copy link

MrVhek commented Nov 17, 2020

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

@ReadMost
Copy link

worker's

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

I could not solve the problem as I wanted. Instead, I run two containers:

  1. The first container runs the Django without asgi (in wsgi mode) via the command ./manage.py runserver --noasgi
  2. The second container run the Django for the only WebSocket via daphne via the command daphne -u /tmp/daphne.sock -b 0.0.0.0 -p 8088 app.asgi:application -v 2

So, I used Nginx as the reverse proxy to navigate all requests with "ws/" path to the second container, and the rest to the first container. Here is my conf.template code:

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream websocket {
        server app_websocket:8088;
    }
server {
...............
location / {
        proxy_pass ${PROXY_PASS};
        proxy_set_header Host ${DOMAIN_NAME};
        #client_max_body_size 100M;
        #proxy_buffering on;
        #proxy_buffers 256 4k;
        #proxy_busy_buffers_size 4k;
        #proxy_read_timeout 60s;
        #proxy_send_timeout 60s;
        #proxy_max_temp_file_size 1024m;
        #proxy_temp_file_write_size 4k;
    }
     location /ws/ {
        proxy_pass http://websocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Host ${DOMAIN_NAME};
    }
............
}

@fbove
Copy link

fbove commented Dec 23, 2020

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

This solved the issue for us. The app started to time out requests and emitting those warnings.

Pip freezing seems to be a must.

@hafid-d
Copy link

hafid-d commented Jan 15, 2021

I have the same issue, I tried the solution from @MrVhek and unfortunately it didn't work. Any other hint ? Thanks

@tinylambda
Copy link

tinylambda commented Jan 31, 2021

@andrewgodwin I can reproduce this by changing the chat example in document:

import json
import asyncio
import logging
from channels.generic.websocket import AsyncWebsocketConsumer


class ChatConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super(ChatConsumer, self).__init__(*args, **kwargs)
        self.room_name = None
        self.room_group_name = None
        self.disconnected = True

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()
        self.disconnected = False  # connected, set flag to False

    async def disconnect(self, code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        self.disconnected = True  # disconnected, set flag to True

    async def receive(self, text_data=None, bytes_data=None):
        text_json = json.loads(text_data)
        message = text_json['message']

        await self.channel_layer.group_send(self.room_group_name, {
            'type': 'chat_message',
            'message': message,
        })

    async def chat_message(self, event):
        message = event['message']

        # now simulate subscribe to redis, get the newest state, and send to user
        # if disconnected, stop sending states
        i = 0
        while not self.disconnected:
            await asyncio.sleep(1)  # simulate sub to redis and wait new state
            i += 1
            logging.error('sending...')
            await self.send(text_data=json.dumps({
                'message': f'new state {i}'
            }))

and then

  1. start the dev server
  2. start a client to connect to a chat room
  3. send a chat message
  4. server start to push new states
  5. close the client
  6. wait several seconds and the exception happens

It seems that the server push code (here the chat_message) preventing the call to disconnect(), so the self.disconnected flag have no chance to be set to True.

And here is my log output:

System check identified no issues (0 silenced).
January 31, 2021 - 17:48:26
Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:43922]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:43922]
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:43922]          <<<<<=== Client closed here. but still sending states to client
INFO:django.channels.server:WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:43922]
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
ERROR:root:sending...
WARNING:daphne.server:Application instance <Task pending name='Task-1' coro=<StaticFilesWrapper.__call__() running at /home/felix/PycharmProjects/light/venv/lib/python3.9/site-packages/channels/staticfiles.py:44> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f36dc9dea30>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 43922] path=b'/ws/chat/cc/'> took too long to shut down and was killed.


@carltongibson
Copy link
Member

carltongibson commented Jan 31, 2021

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@tinylambda
Copy link

tinylambda commented Jan 31, 2021

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

@tinylambda
Copy link

tinylambda commented Feb 2, 2021

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).

As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.

We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.

If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service.
If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.

It's comparable:
Create more tasks to handle some type of messages in a consumer instance (for one connection with states).
And
Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).

@tinylambda
Copy link

tinylambda commented Feb 3, 2021

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).

As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.

We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.

If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service.
If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.

It's comparable:
Create more tasks to handle some type of messages in a consumer instance (for one connection with states).
And
Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).

I implemented a Consumer to dispatch message handler as tasks (only affect user defined tasks):

import asyncio
import copy
import collections
import functools
import json
from channels.consumer import get_handler_name
from channels.generic.websocket import AsyncWebsocketConsumer


class ChatConsumer(AsyncWebsocketConsumer):
    MAX_ACTIVE_TASKS = 2

    def __init__(self, *args, **kwargs):
        super(ChatConsumer, self).__init__(*args, **kwargs)
        self.handler_tasks = collections.defaultdict(list)
        self.joined_groups = set()

        self.room_name = None
        self.room_group_name = None

    def complete_task(self, task_instance, handler_name):
        print(f'Complete task for handler {handler_name}, task instance {task_instance}')
        self.handler_tasks[handler_name].remove(task_instance)
        print(
            f'There are still {len(self.handler_tasks[handler_name])} active tasks for'
            f' handler {handler_name}'
        )

    async def dispatch(self, message):
        handler_name = get_handler_name(message)
        handler = getattr(self, handler_name, None)
        if handler:
            if handler_name.startswith('chat_'):
                # Create a task to process message
                loop = asyncio.get_event_loop()
                if len(self.handler_tasks[handler_name]) >= self.MAX_ACTIVE_TASKS:
                    await self.send(text_data=json.dumps({
                        'message': 'MAX_ACTIVE_TASKS reached'
                    }))
                else:
                    handler_task = loop.create_task(handler(message))
                    # don't forget to remove the task from self.handler_tasks
                    # when task completed
                    handler_task.add_done_callback(
                        functools.partial(self.complete_task, handler_name=handler_name)
                    )
                    self.handler_tasks[handler_name].append(handler_task)
            else:
                # The old way to process message
                await handler(message)
        else:
            raise ValueError("No handler for message type %s" % message["type"])

    async def clear_handler_tasks(self):
        for handler_name in self.handler_tasks:
            task_instances = self.handler_tasks[handler_name]
            for task_instance in task_instances:
                task_instance.cancel()
                # try:
                #     await task_instance
                # except asyncio.CancelledError:
                #     print('Cancelled handler task', task_instance)

    async def disconnect(self, code):
        joined_groups = copy.copy(self.joined_groups)
        for group_name in joined_groups:
            await self.leave_group(group_name)
        self.joined_groups.clear()
        await self.clear_handler_tasks()

    async def leave_group(self, group_name):
        await self.channel_layer.group_discard(
            group_name, self.channel_name
        )
        self.joined_groups.remove(group_name)

    async def join_group(self, group_name):
        await self.channel_layer.group_add(
            group_name, self.channel_name
        )
        self.joined_groups.add(group_name)

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        await self.join_group(self.room_group_name)
        await self.accept()

    async def receive(self, text_data=None, bytes_data=None):
        text_json = json.loads(text_data)
        message = text_json['message'].strip()
        if message.endswith('1'):

            await self.channel_layer.group_send(self.room_group_name, {
                'type': 'chat_message',
                'message': message,
            })
        elif message.endswith('2'):
            await self.channel_layer.group_send(self.room_group_name, {
                'type': 'chat_message2',
                'message': message,
            })
        else:
            await self.send(text_data=json.dumps({
                'message': 'invalid data'
            }))

    async def chat_message(self, event):
        message = event['message']

        while True:
            print('sending...')
            await self.send(text_data=json.dumps({
                'message': message
            }))
            await asyncio.sleep(1)

    async def chat_message2(self, event):
        message = event['message']
        await self.send(text_data=json.dumps({
            'message': message
        }))

In this implementation, you can send message such as "g1" to start the long running handler(chat_message), and still you can send "g2" to start the chat_message2 handler, if you send other data not endswith 1 or 2, you will receive a "invalid message" reply.

Client intput and output:

(venv) localhost:light Felix$ python manage.py chat_client
connected
g1
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
Received:  {"message": "g1"}
2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g1
Received:  {"message": "MAX_ACTIVE_TASKS reached"}
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
Received:  {"message": "g1"}
g1
Received:  {"message": "MAX_ACTIVE_TASKS reached"}
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
x
Received:  {"message": "invalid data"}
Received:  {"message": "g1"}
gxReceived:  {"message": "g1"}

Received:  {"message": "invalid data"}
Received:  {"message": "g1"}
Received:  {"message": "g1"}
^CDone

Server input and output:


Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:57667]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:57667]
sending...
sending...
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-17' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-23' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-29' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:57667]
Complete task for handler chat_message, task instance <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>
There are still 0 active tasks for handler chat_message
Cancelled handler task <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>

It's a full duplex websocket consumer now, we can send data to websocket server at any time and get response, and websocket server can send data at any time too. The long running consumer handler will not block the consumer in the new implementation.

So, are you interested to implement it at the channels framework level ? @carltongibson @andrewgodwin

@jahanzeb-pixarsart
Copy link

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

You Saved me from a lot of troubles, May I ask why asgiref is the issue and how you found it ?

@codingfarhan
Copy link

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

Thanks!! This worked for me.

@belongwqz
Copy link

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

#205, same issue I met, maybe there's something about djangorestframework+channels+django+asgiref(new version)

@lgelmi
Copy link

lgelmi commented Jul 22, 2021

We're having the same problem using GraphQL so I'd remove djangorestframework from the picture 🤔

@benedictkioko
Copy link

benedictkioko commented Jul 28, 2021

We're having the same problem using GraphQL so I'd remove djangorestframework from the picture 🤔

Experiencing the same with GraphQL 😭

@husam-lune
Copy link

Has this been solved, because i have the same issue and i haven't been able to fix it

@mohsenkamini
Copy link

in my case it was that the disk space was full and i just freed up some space and it was solved !

@neethan
Copy link

neethan commented Dec 1, 2021

For anyone else with this error: my solution was to downgrade aioredis to 1.3.0. The newest version (2.0) is a complete rewrite and requires restructure of code, which I believe channels-redis has not taken into account when requiring aiosredis.

@Avramo
Copy link

Avramo commented Dec 5, 2021

I have a Django project.
I'm getting cpu spikes to 100% and many many of these logs at the same time,

2021-12-05 13:18:34,912 WARNING  Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6bf3625e8>()]>> for connection <WebRequest at 0x7fe6bf361898 method=GET uri=/**********  
 clientproto=HTTP/1.1> took too long to shut down and was killed.


2021-12-05 13:18:39,921 WARNING  Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6be55b6a8>()]>> for connection <WebRequest at 0x7fe6be55a898 method=GET uri=/********** 
 clientproto=HTTP/1.1> took too long to shut down and was killed.


2021-12-05 13:18:39,922 WARNING  Application instance <Task pending coro=<AsgiHandler.__call__() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7fe6be300ac8>()]>> for connection <WebRequest at 0x7fe6be30fd68 method=GET uri=/********** 
 clientproto=HTTP/1.1> took too long to shut down and was killed.

requirements.txt

django==2.2
djangorestframework
mysqlclient==1.3.13
django-push-notifications
markdown
python-dotenv
twilio==6.10.0
gunicorn==19.8.1
django-modeladmin-reorder
django-cors-headers==3.2.1
python-dateutil==2.8.1
channels==2.4.0
channels-redis==2.4.2
daphne==2.4.1
redis
pycountry==19.8.18
country-list==1.0.0
django-admin-rangefilter==0.6.0
django-import-export==2.1.0
django-storages==1.9.
Pillow==8.2.0
boto3==1.17.102
apns2==0.5.0

pip freeze

aioredis==1.3.1
apns2==0.5.0
asgiref==3.4.0
async-timeout==3.0.1
attrs==21.2.0
autobahn==21.2.1
Automat==20.2.0
boto3==1.17.102
botocore==1.20.102
certifi==2021.5.30
cffi==1.14.5
channels==2.4.0
channels-redis==2.4.2
chardet==4.0.0
constantly==15.1.0
country-list==1.0.0
cryptography==3.4.7
daphne==2.4.1
defusedxml==0.7.1
diff-match-patch==20200713
Django==2.2
django-admin-rangefilter==0.6.0
django-cors-headers==3.2.1
django-import-export==2.1.0
django-modeladmin-reorder==0.3.1
django-push-notifications==2.0.0
django-storages==1.9
djangorestframework==3.12.4
et-xmlfile==1.1.0
gunicorn==19.8.1
h2==2.6.2
hiredis==2.0.0
hpack==3.0.0
http-ece==1.1.0
hyper==0.7.0
hyperframe==3.2.0
hyperlink==21.0.0
idna==2.10
importlib-metadata==4.6.0
incremental==21.3.0
jmespath==0.10.0
Markdown==3.3.4
MarkupPy==1.14
msgpack==0.6.2
mysqlclient==1.3.13
odfpy==1.4.1
openpyxl==3.0.7
Pillow==8.2.0
py-vapid==1.8.2
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycountry==19.8.18
pycparser==2.20
PyJWT==2.1.0
pyOpenSSL==20.0.1
PySocks==1.7.1
python-dateutil==2.8.1
python-dotenv==0.18.0
pytz==2021.1
pywebpush==1.13.0
PyYAML==5.4.1
redis==3.5.3
requests==2.25.1
s3transfer==0.4.2
service-identity==21.1.0
six==1.16.0
sqlparse==0.4.1
tablib==3.0.0
twilio==6.10.0
Twisted==21.2.0
txaio==21.2.1
typing-extensions==3.10.0.0
urllib3==1.26.6
xlrd==2.0.1
xlwt==1.3.0
zipp==3.4.1
zope.interface==5.4.0

Please help me out over here,
Thanks!

@alv-around
Copy link

I have a similar situation, but not the same
I have:

django== 2.2.9
channels == 2.4.0
daphne == 2.5.0
I am using Redis as backend with the code:

CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [(CELERY_BROKER_URL)],
'capacity': 300
},
},
}
ASGI_THREADS = 1000
Using Daphne for Http and WebSocket

My asgi file:

import django
django.setup()
import chat.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
),
})
Here is the error message:
Application instance <Task pending coro=<AsgiHandler.call() running at /usr/local/lib/python3.6/site-packages/channels/http.py:192> wait_for=<Future pending cb=[_chain_future.._call_check_cancel() at /usr/local/lib/python3.6/asyncio/futures.py:403, <TaskWakeupMethWrapper object at 0x7f301c57f738>()]>> for connection <WebRequest at 0x7f301c56a710 method=POST uri=/api/v1/analytics/divided/meteo/ clientproto=HTTP/1.0> took too long to shut down and was killed.
This happens when Fron-end sends requests to the Django. Sometimes it returns the response immediately, sometimes it takes a long time, frequently it throws the above error. I thought that it because of worker's concurrency, which is responsible for HTTP requests.

It is not the same case as mentioned above, because:

I am using generic "WebsocketConsumer" because querying data from the database via Django's ORM
Error occurs because of AsgiHandler.call() on HTTP request. Websocket connection is not even established
Can you explain why I have this issue? I thought the reason is django's version incompatibility

@ReadMost I am facing for some weeks the same problem. How did you solve it?

@ReadMost
Copy link

ReadMost commented Dec 7, 2021

@alvaro-alonso I have written my solution on the top or here is link

@Saran33
Copy link

Saran33 commented Mar 24, 2022

I managed to resolve this issue in local dev without needing to use both wsgi and asgi servers together (channels==3.0.4, asgiref==3.5.0, Django==4.0.3). I tried a combination of:

  1. using Reconnecting WebSockets on the client side, as suggested by destelio. For example:
const options = {
  WebSocket: WebSocket,
  connectionTimeout: 10000,
  maxRetries: 3,
  minUptime: 2000,
  maxEnqueuedMessages: 1000000,
};

const newSocket = new ReconnectingWebSocket(
  'ws://' +
  window.location.host +
  '/ws/somepath/' +
  roomName +
  '/' +
  '?' +
  queryString,
  [],
  options
);
  1. Setting thread_sensitive to False globally, and setting it to True manually for ORM operations. i.e. using @sync_to_async(thread_sensitive=True) as a decorator kwarg for sync functions involving writing to the database.

  2. If you have a connect() method that involves long running tasks, simply call await self.accept() as early as possible within the method, right below await self.channel_layer.group_add().

Initially, this worked fine for connection timeouts. However, in my particular case, my disconnect method requires a relatively lengthy ORM operation involving celery workers. Therefore, after switching to a remote database, the disconnect method became an issue once again.

Fortunately, there is a simple workaround.

Instead of using the development server — i.e. python manage.py runserver — run it in production mode with Daphne. That way, you can specify a timeout interval via the --application-close-timeout flag.

Simply run:

daphne -t 60 --application-close-timeout 60 your_project.asgi:application

The -t flag is for --http-timeout. It may not be necessary.

Of course, with this approach, you will need to serve any local static files through `STATIC_ROOT'. Whitenoise can be used to serve static files. It will work with an ASGI server (within Django anyway. Outside of Django I don't think it supports ASGI). If you don't want file caching, replace their suggested STATICFILES_STORAGE with:

STATICFILES_STORAGE = 'whitenoise.storage.CompressedStaticFilesStorage'

Hope this helps someone. Thanks to everyone for such an awesome project🙏 💯

@Davelyb
Copy link

Davelyb commented Jun 23, 2022

We're having the same problem using GraphQL so I'd remove djangorestframework from the picture 🤔

Experiencing the same with GraphQL 😭

@benedictkioko I have the same problem using graphene_django + channels, i solved it use the command "daphne -b 0.0.0.0 -p 8000 --application-close-timeout 60 --proxy-headers core.asgi:application", hope this can help someone.

daphne help doc:
--application-close-timeout APPLICATION_CLOSE_TIMEOUT The number of seconds an ASGI application has to exit after client disconnect before it is killed

@razzaghi
Copy link

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

I changed from asgiref==3.2.7 to asgiref==3.2.10 worked for me on django 2.2

@jessamynsmith
Copy link

I am not sure if my issue is the same, but it results in a similar error. In my case, I am running my Django Channels ASGI app using Daphne, and I have a couple API calls that call an external API. I found that when that external API errored out and I did not have a timeout set on the requests call, it would hang the Daphne server. While I did have a flaw in my code, I do not think that such a flaw should render the entire server unresponsive and not able to be restarted through systemctl. Note that when I switched to uvicorn, I no longer had an issue even with the misbehaving code. In my real application, I have of course fixed my external API calls to have an explicit timeout. Here is a simplified repro case based on my real application:
https://github.com/jessamynsmith/daphne_hang/

Here is my exact error:
2023-02-14 05:01:42,735 WARNING Application instance <Task pending name='Task-6' coro=<ProtocolTypeRouter.call() running at /Users/jessamynsmith/Development/daphne_hang/venv/lib/python3.11/site-packages/channels/routing.py:62> wait_for=<Future pending cb=[_chain_future.._call_check_cancel() at /Users/jessamynsmith/.pyenv/versions/3.11.0/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>> for connection <WebRequest at 0x108603890 method=GET uri=/api/v1/hang/ clientproto=HTTP/1.1> took too long to shut down and was killed.

@jrosebr1
Copy link

@jessamynsmith Thank you so much for posting this!! I thought I was losing my mind 😆 I'm in a similar situation -- my API calls are taking too long, so if a user navigates away from the page where the websocket was instantiated, there's no chance for it to cleanly exit, leading to the "took too long to shut down and was killed" error message.

@Saran33
Copy link

Saran33 commented Mar 20, 2023

@jessamynsmith Thank you so much for posting this!! I thought I was losing my mind 😆 I'm in a similar situation -- my API calls are taking too long, so if a user navigates away from the page where the websocket was instantiated, there's no chance for it to cleanly exit, leading to the "took too long to shut down and was killed" error message.

Looking back on this thread, in the case of blocking I/O or CPU intensive operations during a disconnect method, it might be worth offloading those tasks to a worker, with something like celery or RQ.

@jrosebr1
Copy link

jrosebr1 commented Mar 20, 2023

Looking back on this thread, in the case of blocking I/O or CPU intensive operations during a disconnect method, it might be worth offloading those tasks to a worker, with something like celery or RQ.

That was my thought too. I was considering using Celery, but Celery isn't meant to be a real-time queue, and more importantly, if you offload the task to Celery you would have a hard(er) time pushing the result back to the channels consumer.

I guess you could submit the task and then continually poll it, waiting for it to be complete, but I didn't like that approach.

Basically, what I did was create a Queue and Thread:

class EditorConsumer(WebsocketConsumer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        ...
        self.requests_queue = Queue()
        self.requests_thread = None
        self.stop_event = Event()

    def connect(self):
        self.requests_thread = Thread(
            target=self.process_requests,
            daemon=True
        )
        self.requests_thread.start()

        ...

        self.accept() 

    def disconnect(self, code):
        self.stop_event.set()

    def receive(self, text_data=None, bytes_data=None):
        self.requests_queue.put(text_data)

    def process_requests(self):
        while not self.stop_event.is_set():
            text_data = self.requests_queue.get()

            ...

            # NOTE: It's worth checking if the `stop_event` is set here. If you try to call
            # `self.send` afterwards, you're likely going to error out
            if not self.stop_event.is_set():
                # send the response back to the client
                self.send(text_data=json.dumps(output_data))

Using this setup, disconnect is allowed to run and raise the StopConsumer event. The process_requests daemon is left to finish up whatever item it is currently on in the queue, then it exits, regardless if the queue is empty or not.

It's probably not the most "elegant" way to solve the problem, but it keeps the solution entirely inside the consumers.py file without having to involve Celery, Redis, etc., which has additional overhead (and again, Celery isn't meant to be real-time, which IMO, doesn't make it the right choice when working with websockets).

Edit: If my logic/thought process about not using Celery here is incorrect I would love to know.

@jrosebr1
Copy link

Update I refactored the process_requests method to ensure the daemon thread properly shuts down.

    def process_requests(self):
        # loop indefinitely
        while True:
            # attempt to grab the next task from the queue
            try:
                # grab the next task
                text_data = self.requests_queue.get(
                    timeout=self.QUEUE_TIMEOUT)

                ...

                # only send the response if the connection is open
                if not self.disconnected.is_set():
                    # send the response back to the client
                    self.send(text_data=json.dumps(output_data))

                # otherwise, the connection is closed, so break from the loop
                else:
                    break

            # the queue is empty, which can naturally happen if there are no
            # requests for us to process
            except queue.Empty:
                # check if the connection has been closed, and if so, break
                # from the loop, so we can exit the thread
                if self.disconnected.is_set():
                    break

        print("SHUTTING DOWN THREAD")

@Saran33
Copy link

Saran33 commented Mar 20, 2023

@jrosebr1 Nice! It depends on the use case I guess, but if you need to send a result to the client from outside the consumer class, you could send it via a channel layer. You could also use something like python-socketio instead of channels to emit a message from an external process.

@jrosebr1
Copy link

Thanks @Saran33, I'll take a look at those.

@yan0f
Copy link

yan0f commented May 24, 2024

@jessamynsmith saved my life

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests