Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_q.get() can block the event loop under high load #82

Closed
hanikesn opened this issue Mar 16, 2018 · 3 comments
Closed

async_q.get() can block the event loop under high load #82

hanikesn opened this issue Mar 16, 2018 · 3 comments

Comments

@hanikesn
Copy link

hanikesn commented Mar 16, 2018

When adding a lot of things to the queue async get will block for some time. Adding a sleep(0.001) helps, but will decrease throughput substantially. Especially with multiple threads adding to the queue.

import asyncio
import time
import janus


async def get(q):
    while True:
        await q.get()


def add(q):
    while True:
        time.sleep(0.5)
        for i in range(0, 500):
            q.put(i)


loop = asyncio.get_event_loop()
queue = janus.Queue(maxsize=100, loop=loop)
loop.set_debug(True)
loop.run_in_executor(None, add, queue.sync_q)
loop.run_until_complete(get(queue.async_q))
Executing <Handle <TaskWakeupMethWrapper object at 0x7f9f377b3078>(<Future finis...events.py:275>) created at /usr/lib64/python3.6/asyncio/locks.py:383> took 0.250 seconds
Executing <Handle <TaskWakeupMethWrapper object at 0x7f9f343d0498>(<Future finis...events.py:275>) created at /usr/lib64/python3.6/asyncio/locks.py:383> took 0.364 seconds
Executing <Handle <TaskWakeupMethWrapper object at 0x7f9f3454ef78>(<Future finis...events.py:275>) created at /usr/lib64/python3.6/asyncio/locks.py:383> took 0.341 seconds
@asvetlov
Copy link
Member

asvetlov commented Mar 16, 2018

I don't think this is the problem.
janus doesn't switch the loop on await async_q.get() if queue is not empty.
Your example fetches many items one by one in while loop until gets queue-empty state and waits for new data.
Usually asyncio code does IO operations: performs HTTP requests, communicates with database etc.
Once IO is not ready (e.g. socket read buffer has no data to fetch, write buffer is overloaded etc. etc.) the task is suspended.
I expect you do have IO in your real while loop, q.get() is not the only possible yield point.

P.S.
await asyncio.sleep(0) should be enough.

@hanikesn
Copy link
Author

You're perfectly right. Calling await asyncio.sleep(0) fixes the issue. To be honest I expected any async to always yield to the scheduler to give it time for other tasks, but it seems priorities get messed up. In fact I don't call any I/O there right now as I use it as a synchronization point between other tasks.

@asvetlov
Copy link
Member

asvetlov commented Mar 16, 2018

In fact await doesn't force context switch exactly for performance reason.
It is a possible switch point, not mandatory. await asyncio.sleep(0) forces the switch.
Like old good file.read() doesn't assume OS thread switching but time.sleep(0) does.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants