Skip to content

Commit

Permalink
Stopped spinning in CurrentThreadExecutor.run_until_future()
Browse files Browse the repository at this point in the history
Rather that spinning on
- check for work, non-blocking
- check the future
- sleep briefly

We can register a done callback on the future, which will be called
with the future itself as an arugment. Registering our task queue's
`put()` method lets us use the future itself as a sentinal value.

Now we can use `queue.get()` in blocking fashion. If we find the job
is the future we're waiting on, we're done getting tasks and we
return, otherwise we run the job and go back to blocking on
`queue.get()` for more tasks.

This noticeably cuts down on CPU usage in run_until_future().
  • Loading branch information
asedeno committed Nov 19, 2020
1 parent ee967d2 commit 3a0bdfb
Showing 1 changed file with 8 additions and 13 deletions.
21 changes: 8 additions & 13 deletions asgiref/current_thread_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import queue
import threading
import time
from concurrent.futures import Executor, Future


Expand Down Expand Up @@ -51,21 +50,17 @@ def run_until_future(self, future):
raise RuntimeError(
"You cannot run CurrentThreadExecutor from a different thread"
)
# Keep getting work items and checking the future
future.add_done_callback(self._work_queue.put)
# Keep getting and running work items until we get the future we're waiting for
# back via the future's done callback.
try:
while True:
# Get a work item and run it
try:
work_item = self._work_queue.get(block=False)
except queue.Empty:
# See if the future is done (we only exit if the work queue is empty)
if future.done():
return
# Prevent hot-looping on nothing
time.sleep(0.001)
else:
work_item.run()
del work_item
work_item = self._work_queue.get()
if work_item is future:
return
work_item.run()
del work_item
finally:
self._broken = True

Expand Down

0 comments on commit 3a0bdfb

Please sign in to comment.