Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Block writer thread if response output buffer is full #5386

Merged
merged 3 commits into from
May 19, 2020

Conversation

purplefox
Copy link
Contributor

Description

Previously there was no back pressure on writes from the legacy streaming API to the response. This could mean that writes buffered in memory if the client was slow which could lead to unbounded memory usage.

This PR blocks the writer thread if the response is full and unblocks it when it is drained to provide some synchronous back pressure to the caller.

Testing done

Manually tested.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@purplefox purplefox requested a review from a team as a code owner May 18, 2020 14:58
Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @purplefox !

cf.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
// Very slow consumers will result in a timeout, this will cause the push query to be closed
throw new KsqlException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a more informative error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapped exception should contain the important error messages. I don't usually like adding new messages on rethrows unless you're doing something extra.

response.drainHandler(v -> cf.complete(null));
try {
cf.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check explicitly for TimeoutException? Since we don't expect ExecutionException to be thrown here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I don't think it adds much as we want to rethrow on any exception we receive.

@@ -41,7 +45,7 @@ public void write(final int b) {
}

@Override
public void write(final @NotNull byte[] bytes, final int offset, final int length) {
public synchronized void write(final @NotNull byte[] bytes, final int offset, final int length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResponseOutputStream is only written to by a single thread anyway, right? (Trying to check my understanding.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's a sanity check.

@purplefox purplefox merged commit 0edda40 into confluentinc:master May 19, 2020
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @purplefox,

Is it possible to move the back pressure so that we're utilising the blocking queue in TransientRowQueue? i.e. that we only read from the row queue if there is space in the response buffer?

I'm not sure what you think, but I don't think disconnecting any client that can't keep up is good UX. That's certainly wasn't the previous functionality.

I've been playing around with KStreams settings and I've two approaches I'm angling towards for slow consumers:

  1. Ideally, see if using static group membership is sufficient to avoid consumer group instability when the row queue is full and the offer call blocks. This would mean clients can pull as slow as they want, and the streams topology will just slow down its processing of data.
  2. Or. change the code posting to the queue to drop messages when the queue is full, tracking the drooped count and sending that information in the next message accepted. This would allow slow clients to receive a 'sample' of the data and be informed they're running too slow.

Personally, I don't like the second approach as much, but I think it's better than just disconnecting the user.

To do these things, I need the blocking queue to, well, block! At the moment, it's instantly depleted and the blocking call is in the response.

Thanks,

Andy

@purplefox
Copy link
Contributor Author

purplefox commented May 19, 2020

Is it possible to move the back pressure so that we're utilising the blocking queue in TransientRowQueue? i.e. that we only read from the row queue if there is space in the response buffer?

That's how it should be working. None of the code around polling or use of the TransientRowQueue has changed during the porting.

A row is polled from the queue, and written to the OutputStream. In Jetty, that OutputStream would block if the response is full, this means the thread doesn't poll any more messages from the queue until it unblocks.

Prior to this fix, Vert.x implemented output stream would never block on write. This would result in messages being buffered in Vert.x as fast as they were received resulting in unbounded memory usage. With this fix, the OutputStream should behave like the Jetty one, and the overall behaviour should be more or less the same.

I'm not sure what you think, but I don't think disconnecting any client that can't keep up is good UX. That's certainly wasn't the previous functionality.

If we don't timeout the write then the thread would block forever, causing the streams thread to block forever, this would most likely cause the KS consumer to be booted.

I've been playing around with KStreams settings and I've two approaches I'm angling towards for slow consumers:

  1. Ideally, see if using static group membership is sufficient to avoid consumer group instability when the row queue is full and the offer call blocks. This would mean clients can pull as slow as they want, and the streams topology will just slow down its processing of data.
  2. Or. change the code posting to the queue to drop messages when the queue is full, tracking the drooped count and sending that information in the next message accepted. This would allow slow clients to receive a 'sample' of the data and be informed they're running too slow.

I think booting the slow consumer is a better option. Consumers are unlikely to be able to cope with missing messages (e.g. if it's a changelog stream, missing messages make the stream inconsistent), so client would have to disconnect anyway, and it just makes it more complex if they have to code that themselves.

To do these things, I need the blocking queue to, well, block! At the moment, it's instantly depleted and the blocking call is in the response.

I don't think it's currently working that way. Could you elaborate on why you think that, in case I have missed something?

@big-andy-coates
Copy link
Contributor

big-andy-coates commented May 19, 2020

You're right, the code is filling the row queue, (sorry, brain not firing on all cylinders yet!), but it's also disconnecting clients after 60 seconds if they're not consuming rows. I don't think the old Jetty functionality did that, so this is still a change in functionality.

I'm not sure I agree we have to timeout slow clients. We could choose to allow a client to consume results at what ever pace they want, though this comes at the cost of server resources, obviously.

It would be nice to be able to support use-cases where the consumer of the data is potentially performing some long running task, where its totally valid to consume rows slowly. Rather than excluding ksqlDB from such a use case. We may even have such use cases already, and in which case this will break them!

Ideally, we might have a default timeout and allow the client to override, or remove, it. Giving us the best of both worlds. But until we do, I'm not sure disconnecting clients is the right way to go, though you may be able to convince me. After all, 60 seconds is a long time to process one row.

My ideal solution is, as I stated in the previous reply, is to see if we can use static membership to avoid/reduce the group instability and allow clients to pull at whatever speed they like. To do this I need the back pressure to cause the row queue to fill up (as it now will) and for queries to not get terminated, (as they currently do).

I agree the tracking of missed messages isn't much use for an application tier. Though may be of use for the CLI. However, I'm also not sure its worth the effort.

Not terminating queries because of slow clients will put the functionality back as it was, (I think), and allow me to investigate improving things.

At the very least, we should make the timeout configurable, e.g. through the server config, so that users can choose to set it high if their use-case requires it, or indeed low if they want to aggressively prune slow clients.

@purplefox
Copy link
Contributor Author

purplefox commented May 19, 2020

You're right, the code is filling the row queue, (sorry, brain not firing on all cylinders yet!), but it's also disconnecting clients after 60 seconds if they're not consuming rows. I don't think the old Jetty functionality did that, so this is still a change in functionality.

I haven't checked, but I'd be surprised if Jetty writes didn't timeout too. If they didn't, it would be easy to DoS a Jetty based web server by sending lots of requests and simply not reading the responses.

I'm not sure I agree we have to timeout slow clients. We could choose to allow a client to consume results at what ever pace they want, though this comes at the cost of server resources, obviously.

If a consumer can't keep up with messages it's unlikely it can do something useful. E.g. updating a management console or report, or latest stock prices, or latest taxi driver positions. Most use cases will require an up to date stream. I struggle to think of a use case where it's ok to always consume messages slower than the query produces them.

Not guarding against slow consumers makes it easy to DoS a server. Pretty much all messaging systems will, by default, boot slow consumers. As it's rarely useful to have a slow consumer and that's what the app wants in most cases.

It would be nice to be able to support use-cases where the consumer of the data is potentially performing some long running task, where its totally valid to consume rows slowly. Rather than excluding ksqlDB from such a use case. We may even have such use cases already, and in which case this will break them!

I would love to know of such a use case.

Ideally, we might have a default timeout and allow the client to override, or remove, it. Giving us the best of both worlds. But until we do, I'm not sure disconnecting clients is the right way to go, though you may be able to convince me. After all, 60 seconds is a long time to process one row.

The timeout isn't the time to process one row.

Let's say the query is generating rows at the rate of 10 per second, but the client consumes them at a rate of 5 per second. What happens is the write buffer gets full pretty quickly, and the writer blocks. As the client consumes messages, the buffer eventually empties, and the writer unblocks. To empty the buffer takes 5 * write buffer size, which is typically much larger than the time to process one row. (It might be the time to process hundreds or thousands of rows).

I'd stick with a timeout, but perhaps make it configurable. Imho most apps won't want slow consumers so we should time them out. If there is a valid use case for continuously lagging consumers, then we can allow them to configure the timeout to be a large value, but that seems like a niche case to me and shouldn't be the default.

@purplefox
Copy link
Contributor Author

As an aside, a much better solution to this is to have proper reactive back pressure back to the consumers. This is what we'd get with the lightweight topologies proposal for push queries.

Also, KS is currently considering making streams more reactive, so this might end up in streams too.

@purplefox
Copy link
Contributor Author

The value of 60 seconds as a timeout was chosen empirically by looking at how long the write thread actually blocked for when running a streaming query in the client. I observed the write queue would get full, then the write thread (and consequently the streams thread) would block for up to 45 seconds. The client would eventually free up space and it would unblock.

@big-andy-coates
Copy link
Contributor

big-andy-coates commented May 19, 2020

If a consumer can't keep up with messages it's unlikely it can do something useful.

Sure, to be useful the system needs to keep up with messages, but that could be defined over a long time period, e.g. a system that needs to process all the events over the course of a day, but there can be spikes in the day that cause backlogs and that's totally fine as far as the user is concerned as they're using Kafka to buffer the requests.

I would love to know of such a use case.

So you can't think of any business process that may take more than 1 minute to complete?

The timeout isn't the time to process one row.

Oh, so you're saying that once the buffer is full the pending write will wait util the buffer is empty, rather than just until there is space for the write? Is there only one buffer, or many? If there's only one then the situation is even worse, as it means it even more likely to disconnect clients as clients must be able to process a full buffers worth of rows within the hard coded 60 second timeout, is that right?

Not guarding against slow consumers makes it easy to DoS a server. Pretty much all messaging systems will, by default, boot slow consumers. As it's rarely useful to have a slow consumer and that's what the app wants in most cases.

Absolutely, that's why I'm suggesting we make the timeout configurable so that users can tune it to their use case, rather than picking a value that matched a test you've done with some specific set of data, over a specific network, with a specific client setup.

As an aside, a much better solution to this is to have proper reactive back pressure back to the consumers. This is what we'd get with the lightweight topologies proposal for push queries.
Also, KS is currently considering making streams more reactive, so this might end up in streams too.

This is exactly what I'm wanting to play around with. I think we may be able to get somewhere close by using static group membership. I think your change here is sufficient for me to play around with this. I may or may not work. Need to have a play to find out.

In conclusion, all I'm really asking is that you make the timeout configurable rather than a hard coded 60 second. I'm struggling to see why you think this is a bad idea, given the recently added 60 second hard limit will likely exclude ksqlDB from some use-case, potentially preexisting customer use-cases.

@purplefox
Copy link
Contributor Author

In conclusion, all I'm really asking is that you make the timeout configurable rather than a hard coded 60 second. I'm struggling to see why you think this is a bad idea, given the recently added 60 second hard limit will likely exclude ksqlDB from some use-case, potentially preexisting customer use-cases.

What makes you think that I think this is a bad idea? In my earlier reply I suggested we should do that:

I'd stick with a timeout, but perhaps make it configurable. Imho most apps won't want slow consumers so we should time them out. If there is a valid use case for continuously lagging consumers, then we can allow them to configure the timeout to be a large value, but that seems like a niche case to me and shouldn't be the default.

So, if you think a configurable timeout has value (and note, Jetty didn't have this), then I wouldn't object if you followed up with a PR that did that :)

@big-andy-coates
Copy link
Contributor

big-andy-coates commented May 19, 2020

Must of missed that - awesome! Yes please, I'd like to order one of those, please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants