Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions django/core/handlers/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ async def handle(self, scope, receive, send):
return
# Request is complete and can be served.
set_script_prefix(self.get_script_prefix(scope))
await sync_to_async(signals.request_started.send, thread_sensitive=True)(
sender=self.__class__, scope=scope
)
await signals.request_started.asend(sender=self.__class__, scope=scope)
# Get the request and check for basic issues.
request, error_response = self.create_request(scope, body_file)
if request is None:
Expand Down
148 changes: 125 additions & 23 deletions django/dispatch/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import logging
import threading
import weakref

from asgiref.sync import async_to_sync, sync_to_async

from django.utils.inspect import func_accepts_kwargs

logger = logging.getLogger("django.dispatch")
Expand Down Expand Up @@ -94,6 +97,8 @@ def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
else:
lookup_key = (_make_id(receiver), _make_id(sender))

is_async = asyncio.iscoroutinefunction(receiver)

if weak:
ref = weakref.ref
receiver_object = receiver
Expand All @@ -106,8 +111,8 @@ def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):

with self.lock:
self._clear_dead_receivers()
if not any(r_key == lookup_key for r_key, _ in self.receivers):
self.receivers.append((lookup_key, receiver))
if not any(r_key == lookup_key for r_key, _, _ in self.receivers):
self.receivers.append((lookup_key, receiver, is_async))
self.sender_receivers_cache.clear()

def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
Expand Down Expand Up @@ -138,7 +143,7 @@ def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
with self.lock:
self._clear_dead_receivers()
for index in range(len(self.receivers)):
(r_key, _) = self.receivers[index]
(r_key, _, _) = self.receivers[index]
if r_key == lookup_key:
disconnected = True
del self.receivers[index]
Expand All @@ -147,7 +152,8 @@ def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
return disconnected

def has_listeners(self, sender=None):
return bool(self._live_receivers(sender))
sync_receivers, async_receivers = self._live_receivers(sender)
return bool(sync_receivers) or bool(async_receivers)

def send(self, sender, **named):
"""
Expand All @@ -172,11 +178,84 @@ def send(self, sender, **named):
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []
responses = []
sync_receivers, async_receivers = self._live_receivers(sender)
for receiver in sync_receivers:
response = receiver(signal=self, sender=sender, **named)
responses.append((receiver, response))
if async_receivers:

async def asend():
async_responses = await asyncio.gather(
*[
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
]
)
return zip(async_receivers, async_responses)

responses.extend(async_to_sync(asend)())
return responses

async def asend(self, sender, **named):
"""
Send signal from sender to all connected receivers in async mode.

All sync receivers will be wrapped by sync_to_async.
If any receiver raises an error, the error propagates back through send,
terminating the dispatch loop. So it's possible that all receivers
won't be called if an error is raised.

Arguments:

sender
The sender of the signal. Either a specific object or None.

named
Named arguments which will be passed to receivers.

Return a list of tuple pairs [(receiver, response), ... ].
"""
if (
not self.receivers
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []

sync_receivers, async_receivers = self._live_receivers(sender)

return [
(receiver, receiver(signal=self, sender=sender, **named))
for receiver in self._live_receivers(sender)
]
if sync_receivers:

@sync_to_async
def sync_send():
responses = []
for receiver in sync_receivers:
response = receiver(signal=self, sender=sender, **named)
responses.append((receiver, response))
return responses

else:
sync_send = list

responses, async_responses = await asyncio.gather(
sync_send(),
asyncio.gather(
*[
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
]
),
)
responses.extend(zip(async_receivers, async_responses))
return responses

def _log_robust_failure(self, receiver, err):
logger.error(
"Error calling %s in Signal.send_robust() (%s)",
receiver.__qualname__,
err,
exc_info=err,
)

def send_robust(self, sender, **named):
"""
Expand Down Expand Up @@ -206,19 +285,35 @@ def send_robust(self, sender, **named):
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in self._live_receivers(sender):
sync_receivers, async_receivers = self._live_receivers(sender)
for receiver in sync_receivers:
try:
response = receiver(signal=self, sender=sender, **named)
except Exception as err:
logger.error(
"Error calling %s in Signal.send_robust() (%s)",
receiver.__qualname__,
err,
exc_info=err,
)
self._log_robust_failure(receiver, err)
responses.append((receiver, err))
else:
responses.append((receiver, response))
if async_receivers:

async def asend_and_wrap_exception(receiver):
try:
response = await receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
return err
return response

async def asend():
async_responses = await asyncio.gather(
*[
asend_and_wrap_exception(receiver)
for receiver in async_receivers
]
)
return zip(async_receivers, async_responses)

responses.extend(async_to_sync(asend)())
return responses

def _clear_dead_receivers(self):
Expand All @@ -244,31 +339,38 @@ def _live_receivers(self, sender):
# We could end up here with NO_RECEIVERS even if we do check this case in
# .send() prior to calling _live_receivers() due to concurrent .send() call.
if receivers is NO_RECEIVERS:
return []
return ([], [])
if receivers is None:
with self.lock:
self._clear_dead_receivers()
senderkey = _make_id(sender)
receivers = []
for (receiverkey, r_senderkey), receiver in self.receivers:
for (_receiverkey, r_senderkey), receiver, is_async in self.receivers:
if r_senderkey == NONE_ID or r_senderkey == senderkey:
receivers.append(receiver)
receivers.append((receiver, is_async))
if self.use_caching:
if not receivers:
self.sender_receivers_cache[sender] = NO_RECEIVERS
else:
# Note, we must cache the weakref versions.
self.sender_receivers_cache[sender] = receivers
non_weak_receivers = []
for receiver in receivers:
non_weak_sync_receivers = []
non_weak_async_receivers = []
for receiver, is_async in receivers:
if isinstance(receiver, weakref.ReferenceType):
# Dereference the weak reference.
receiver = receiver()
if receiver is not None:
non_weak_receivers.append(receiver)
if is_async:
non_weak_async_receivers.append(receiver)
else:
non_weak_sync_receivers.append(receiver)
else:
non_weak_receivers.append(receiver)
return non_weak_receivers
if is_async:
non_weak_async_receivers.append(receiver)
else:
non_weak_sync_receivers.append(receiver)
return (non_weak_sync_receivers, non_weak_async_receivers)

def _remove_receiver(self, receiver=None):
# Mark that the self.receivers list has dead weakrefs. If so, we will
Expand Down
4 changes: 1 addition & 3 deletions django/test/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,7 @@ async def __call__(self, scope):
body_file = FakePayload("")

request_started.disconnect(close_old_connections)
await sync_to_async(request_started.send, thread_sensitive=False)(
sender=self.__class__, scope=scope
)
await request_started.asend(sender=self.__class__, scope=scope)
request_started.connect(close_old_connections)
# Wrap FakePayload body_file to allow large read() in test environment.
request = ASGIRequest(scope, LimitedStream(body_file, len(body_file)))
Expand Down
2 changes: 1 addition & 1 deletion django/test/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def complex_setting_changed(*, enter, setting, **kwargs):
# this stacklevel shows the line containing the override_settings call.
warnings.warn(
f"Overriding setting {setting} can lead to unexpected behavior.",
stacklevel=6,
stacklevel=5,
)


Expand Down
4 changes: 3 additions & 1 deletion docs/releases/5.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ Serialization
Signals
~~~~~~~

* ...
* The new :meth:`django.dispatch.Signal.asend` allows asynchronous signal
dispatch. Signal receivers may be synchronous or asynchronous, and will be
automatically adapted to the correct calling style.

Templates
~~~~~~~~~
Expand Down
3 changes: 3 additions & 0 deletions docs/topics/async.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ synchronous function and call it using :func:`sync_to_async`.

Asynchronous model and related manager interfaces were added.


.. _async_performance:

Performance
-----------

Expand Down
59 changes: 51 additions & 8 deletions docs/topics/signals.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ This would be wrong -- in fact, Django will throw an error if you do so. That's
because at any point arguments could get added to the signal and your receiver
must be able to handle those new arguments.

Receivers may also be asynchronous functions, with the same signature but
declared using ``async def``::

async def my_callback(sender, **kwargs):
await asyncio.sleep(5)
print("Request finished!")

Signals can be sent either synchronously or asynchronously, and receivers will
automatically be adapted to the correct call-style. See :ref:`sending signals
<sending-signals>` for more information.

.. versionchanged:: 5.0

Support for asynchronous receivers was added.

.. _connecting-receiver-functions:

Connecting receiver functions
Expand Down Expand Up @@ -243,18 +258,24 @@ For example::

This declares a ``pizza_done`` signal.

.. _sending-signals:

Sending signals
---------------

There are two ways to send signals in Django.
There are two ways to send signals synchronously in Django.

.. method:: Signal.send(sender, **kwargs)
.. method:: Signal.send_robust(sender, **kwargs)

To send a signal, call either :meth:`Signal.send` (all built-in signals use
this) or :meth:`Signal.send_robust`. You must provide the ``sender`` argument
(which is a class most of the time) and may provide as many other keyword
arguments as you like.
Signals may also be sent asynchronously.

.. method:: Signal.asend(sender, **kwargs)

To send a signal, call either :meth:`Signal.send`, :meth:`Signal.send_robust`,
or ``await`` :meth:`Signal.asend`. You must provide the ``sender``
argument (which is a class most of the time) and may provide as many other
keyword arguments as you like.

For example, here's how sending our ``pizza_done`` signal might look::

Expand All @@ -265,9 +286,8 @@ For example, here's how sending our ``pizza_done`` signal might look::
pizza_done.send(sender=self.__class__, toppings=toppings, size=size)
...

Both ``send()`` and ``send_robust()`` return a list of tuple pairs
``[(receiver, response), ... ]``, representing the list of called receiver
functions and their response values.
All three methods return a list of tuple pairs ``[(receiver, response), ... ]``,
representing the list of called receiver functions and their response values.

``send()`` differs from ``send_robust()`` in how exceptions raised by receiver
functions are handled. ``send()`` does *not* catch any exceptions raised by
Expand All @@ -281,6 +301,29 @@ error instance is returned in the tuple pair for the receiver that raised the er
The tracebacks are present on the ``__traceback__`` attribute of the errors
returned when calling ``send_robust()``.

``asend()`` is similar as ``send()``, but it is coroutine that must be
awaited::

async def asend_pizza(self, toppings, size):
await pizza_done.asend(
sender=self.__class__, toppings=toppings, size=size
)
...

Whether synchronous or asynchronous, receivers will be correctly adapted to
whether ``send()`` or ``asend()`` is used. Synchronous receivers will be
called using :func:`sync_to_async` when invoked via ``asend()``.
Asynchronous receivers will be called using :func:`async_to_sync` when invoked
via ``sync()``. Similar to the :ref:`case for middleware <async_performance>`,
there is a small performance cost to adapting receivers in this way.

All built-in signals, except those in the async request-response pathway, are
dispatched using :meth:`Signal.send`.

.. versionchanged:: 5.0

Support for asynchronous signals was added.

Disconnecting signals
=====================

Expand Down
Loading