Skip to content

Commit

Permalink
Fix call_soon_threadsafe thread safety
Browse files Browse the repository at this point in the history
Don't start the idle handler in other threads or signal handlers,
leaving the job to `_on_wake()`.

Co-authored-by: hexin02 <hexin02@megvii.com>
  • Loading branch information
fantix and hexin02 committed Jul 13, 2021
1 parent 2e71c4c commit 6387a4e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
28 changes: 28 additions & 0 deletions tests/test_base.py
Expand Up @@ -2,6 +2,7 @@
import fcntl
import logging
import os
import random
import sys
import threading
import time
Expand Down Expand Up @@ -702,6 +703,33 @@ async def foo():
self.loop.run_until_complete(
self.loop.shutdown_default_executor())

def test_call_soon_threadsafe_safety(self):
ITERATIONS = 4096
counter = [0]

def cb():
counter[0] += 1
if counter[0] < ITERATIONS - 512:
h = self.loop.call_later(0.01, lambda: None)
self.loop.call_later(
0.0005 + random.random() * 0.0005, h.cancel
)

def scheduler():
loop = self.loop
for i in range(ITERATIONS):
if loop.is_running():
loop.call_soon_threadsafe(cb)
time.sleep(0.001)
loop.call_soon_threadsafe(loop.stop)

thread = threading.Thread(target=scheduler)

self.loop.call_soon(thread.start)
self.loop.run_forever()
thread.join()
self.assertEqual(counter[0], ITERATIONS)


class TestBaseUV(_TestBase, UVTestCase):

Expand Down
1 change: 1 addition & 0 deletions uvloop/loop.pxd
Expand Up @@ -145,6 +145,7 @@ cdef class Loop:
cdef _exec_queued_writes(self)

cdef inline _call_soon(self, object callback, object args, object context)
cdef inline _append_ready_handle(self, Handle handle)
cdef inline _call_soon_handle(self, Handle handle)

cdef _call_later(self, uint64_t delay, object callback, object args,
Expand Down
13 changes: 10 additions & 3 deletions uvloop/loop.pyx
Expand Up @@ -427,7 +427,7 @@ cdef class Loop:
if handle._cancelled:
self.remove_signal_handler(sig) # Remove it properly.
else:
self._call_soon_handle(handle)
self._append_ready_handle(handle)
self.handler_async.send()

cdef _on_wake(self):
Expand Down Expand Up @@ -667,10 +667,13 @@ cdef class Loop:
self._call_soon_handle(handle)
return handle

cdef inline _call_soon_handle(self, Handle handle):
cdef inline _append_ready_handle(self, Handle handle):
self._check_closed()
self._ready.append(handle)
self._ready_len += 1

cdef inline _call_soon_handle(self, Handle handle):
self._append_ready_handle(handle)
if not self.handler_idle.running:
self.handler_idle.start()

Expand Down Expand Up @@ -1281,7 +1284,11 @@ cdef class Loop:
"""Like call_soon(), but thread-safe."""
if not args:
args = None
handle = self._call_soon(callback, args, context)
cdef Handle handle = new_Handle(self, callback, args, context)
self._append_ready_handle(handle) # deque append is atomic
# libuv async handler is thread-safe while the idle handler is not -
# we only set the async handler here, which will start the idle handler
# in _on_wake() from the loop and eventually call the callback.
self.handler_async.send()
return handle

Expand Down

0 comments on commit 6387a4e

Please sign in to comment.