-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker backpressure. #6313
Broker backpressure. #6313
Conversation
Adds a new property "druid.broker.http.maxQueuedBytes" and a new context parameter "maxQueuedBytes". Both represent a maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Fixes apache#4933.
👍 |
synchronized (watermarkLock) { | ||
suspendWatermark = Math.max(suspendWatermark, currentChunkNum); | ||
if (suspendWatermark > resumeWatermark) { | ||
channel.setReadable(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, this is the magic way of telling netty to stop reading data off of socket without blocking any of the worker threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is. It makes netty stop reading from the socket and stop sending new data up the channel. It means that the data servers (historicals etc) will block while trying to write data, which I think is ok, since the blocking happens there in an http server thread dedicated to one query.
if (response.isFinished()) { | ||
retVal.set((Final) response.getObj()); | ||
} | ||
|
||
assert currentChunkNum == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you intentionally leave it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did, it's just a thing that I think should always be true, so I put in an assert to 'document' that.
* | ||
* This handler can exert backpressure by returning a response with "continueReading" set to false from handleResponse() | ||
* or handleChunk(). In this case, the HTTP client will stop reading soon thereafter. It may not happen immediately, so | ||
* be prepared for more handleChunk() calls to happen. To resume reads, call resume() on the TrafficCop provided by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this because netty channel.setReadable(false)
doesn't take effect immediately and netty might deliver one or few more chunks after changing channel readability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is. Any data that has already been read from the socket, but not yet delivered to our channel handler, will still be delivered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome.
It would be nice if we could add some metrics around number of times suspension happened or time period for which we suspended reading data. that way user can defensively set the queued bytes setting and tune its value based on the metrics.
@himanshug The metric sounds useful, are you ok adding it yourself in a later patch? 😄 |
@gianm that isn't setting the right precedence :) |
@himanshug, thanks for the review. I raised an issue in #6321 describing how the metric could work. |
@gianm |
@hellobabygogo yes , this backpressure means broker will pause to read response data from historicals while it is processing data it already read from them. so, backpressure would propagate all the way to historicals. |
Adds a new property "druid.broker.http.maxQueuedBytes" and a new context
parameter "maxQueuedBytes". Both represent a maximum number of bytes queued
per query before exerting backpressure on the channel to the data server.
I tested this by running on a modest cluster (6 historicals) and doing an unlimited,
unfiltered Scan query via SQL (
select * from tbl
). The query ran for about half anhour and managed to fetch almost 20GB of results without OOMing the broker.
Without this patch, the same query quickly OOMed the broker.
See HttpResponseHandler for a description of the API.
Fixes #4933.