New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No flow control on read side #105

Closed
mehaase opened this Issue Apr 7, 2016 · 5 comments

Comments

Projects
None yet
2 participants
@mehaase
Copy link
Contributor

mehaase commented Apr 7, 2016

I had an issue this week where a websockets client was crashing. The server was sending messages too fast for the client to keep up, and the client's message buffer just kept growing and growing until the Linux kernel killed it. I've reproduced the issue with a simple server/client pair:

server.py:

import asyncio, os, websockets

port = 6789

async def send_a_lot(conn, path):
    print('Client connected: going to send lots of data...')
    i = 1
    while True:
        rand = os.urandom(2**16)
        id_ = '{:08}'.format(i)
        await conn.send(id_.encode('ascii') + rand)
        print(id_)
        i += 1

start_server = websockets.serve(send_a_lot, 'localhost', port)
print('Waiting for connection on {}.'.format(port))
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

client.py:

import asyncio, websockets

port = 6789
delay = 1

async def get_a_lot():
    conn = await websockets.connect('ws://localhost:{}'.format(port))
    last_id = 0
    while True:
        data = await conn.recv()
        id_ = data[:8].decode('ascii')
        print('received {}'.format(id_))
        this_id = int(id_)
        if this_id != last_id + 1:
            raise ValueError('Missed a packet between {} and {}'
                             .format(last_id, this_id))
        else:
            last_id = this_id
        await asyncio.sleep(delay)

asyncio.get_event_loop().run_until_complete(get_a_lot())

The server sends a lot of data really fast, and the client is intentionally slow in order to highlight the issue. After running for about 10-20 seconds, the client is using about 1G of memory.

output of top

Profiling the client shows that the WebSocketClientProtocol.messages queue is growing without limit. I believe the queue size should be fixed (ideally in terms of bytes, but if not, then fixed in terms of messages), and the TCP receive window should be adjusted when the queue is approaching capacity so that the server will know to slow down.

It looks like flow control was implemented on the writer side some time back, but there is nothing equivalent for the reader side.

I'm not an expert in TCP flow control nor asyncio, so it's entirely possible that I'm completely wrong or misguided by filing a bug report here. I'm hoping one of the project maintainers can set me straight?

@mehaase mehaase changed the title Websocket client doesn't do any TCP flow control No flow control on read side Apr 7, 2016

@aaugustin

This comment has been minimized.

Copy link
Owner

aaugustin commented Apr 7, 2016

Yes, that's an issue.

It looks like flow control should be implemented with transport.pause_reading() and transport.resume_reading().

https://docs.python.org/3/library/asyncio-protocol.html#readtransport

The idea would be:

  • to add a max_queue kwarg similar to max_size, defaulting to a reasonable value (perhaps 32) to bound the size of WebSocketCommonProtocol.messages
  • to call pause_reading() when that limit is reached and resume_reading() when it is no longer reached.

For efficiency, we may need high/low water marks to avoid calling resume/pause_reading() repeatedly and have the queue oscillate between max_queue - 1 and max_queue messages.

@aaugustin

This comment has been minimized.

Copy link
Owner

aaugustin commented Apr 7, 2016

StreamReader just keeps its buffer between limit and 2 * limit, this seems to be the most straightforward way to implement high/low water marks.

@mehaase

This comment has been minimized.

Copy link
Contributor Author

mehaase commented Apr 7, 2016

I don't think the StreamReader exposes its underlying transport, making it difficult to call pause_reading() or resume_reading(). I was puzzling over this for a bit when the solution struck me, if the messages queue has a maxsize, then instead of using self.messages.put_nowait(msg), use yield from self.messages.put(msg), which blocks until the queue has free space.

It's a very small patch, and if max_queue defaults to 0, then it's a backwards compatible change. My initial testing indicates that this does improve flow control. I'll do some more testing tonight and send a patch if I think it's working well.

@aaugustin

This comment has been minimized.

Copy link
Owner

aaugustin commented Apr 7, 2016

Ah yes, that looks much better!

I'd rather make max_queue default to something reasonable, consider this a bug fix for a performance and possibly a security issue, and document the change.

Even though you can always take a server offline if you write faster than it can read, in this case, this manifests as an excessive memory consuption, which isn't the most graceful way to handle this scenario. So I think this problem could be viewed as a DoS vector. That's why I don't want to default to an unbounded queue. Security by default and all that :-)

I'm also wondering whether the queue length should be hardcoded to 1, so that websockets doesn't acknowledge a ping until the application has fetched the previous message. Currently websockets acknolwedges pings automatically even if some messages are queued and perhaps won't ever be seen by the application.

That said, one could argue that pings are a protocol level feature and that, as long as all earlier frames have been received and parsed, it's fine to acknowledge subsequent pings. Also, even if the application has fetched a message from the queue, an arbitrarily long async process may take place and the frame may not be fully processed until some later time.

I'll sleep on that question. It just changes whether the queue length is configurable or hardcoded to 1.

One last thing -- it's quite hard to write tests for websockets, partly because testing async stuff is hard, partly because the tests aren't as well structured and documented as they could be. Feel free to leave that part up to me. If you feel like updating the documentation (generated from docstrings) and adding a changelog entry, that's appreciated :-)

aaugustin added a commit that referenced this issue Apr 21, 2016

Add flow control for reading from a websocket (#105)
Add a configurable size limit on the reader's message queue.
When the queue is full, the reader will block. (A well behaved server
will slow down transmission to avoid packet loss.)
@aaugustin

This comment has been minimized.

Copy link
Owner

aaugustin commented Apr 21, 2016

Fixed, see #106.

@aaugustin aaugustin closed this Apr 21, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment