Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinate node memory checking during accumulating shard result response #47806

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

howardhuanghua
Copy link
Contributor

About this PR

Currently we don't have circuit breaker checking on coordinate node. There are two main memory consumption parts on coordinate node:

  1. Accumulating shard results from other data nodes.
    This PR is trying to add memory checking for this part.
  2. Aggregation reduction phase.
    ES has search.max_buckets to limit memory, and we introduced a more convenient setting in Bucket aggregation circuit breaker optimization. #46751 to enhance this part.

Let's focus on the first part about this PR.
Coordinate node would receive search result responses from other data nodes, this would be handled in InboundHandler.java#handleResponse. We could get accurate message bytes length before deserialization, and we could add this length to request breaker directly. The main purpose here is to check parent circuit breaker to avoid continuous incoming responses explode coordinate node memory. We only check large size responses (>1kb) to avoid massive breaker checking. The added request breaker bytes would be released in the subsequent finally block, as it's hard to trace all the memory reclamation occasions.

More Considerations

Besides the PR provided implementation, we also have some other considerations:

1. Introduce memory expansion ratio to estimate response object size based on received message length before deserialization.

After message deserialization, real memory size of response object would be much larger than the original received message bytes length. Take aggregation buckets as example, we have done some tests about response object memory size expansion ratio, and here are the results:

Aggregation Type Expansion Ratio (After deserialization/Before deserialization)
10-chars string terms agg 19.5
20-chars string terms agg 11.2
30-chars string terms agg 8.2
50-chars string terms agg 5.5
2-digits long terms agg 14.7
5-digits long terms agg 15
10-digits long terms agg 15.2

We could see the memory expansion ratio of string terms aggregation is about 10-20, and long terms aggregation would be stable at around 15.
So we could use a default expansion ratio like 10 to estimate response object size before deserialization, and make this expansion ratio configurable for different data analysis scenario.
Without considering the memory expansion, it's easy to explode coordinate node memory.

2. Release transport response object size from request breaker after object GC.

Current PR release the added bytes in the subsequent finally block, however the object hasn’t been GC collected at this stage. For aggregation case, I have tried to release it in SearchPhaseController.java#reducedQueryPhase() right after aggregation results consumed in result.consumeAggs(). But we still could not make sure the added bytes have always been released especially in some exception case. So we simple to release it in finally block and the main purpose is to make sure we have enough free memory through parent breaker.

3. Deserialize response messages until all the shard results have been received.

We could hold all the shard results before deserialization, if request breaker has already been triggered during accumulating, we could break the request directly without deserialize any of them.
If we deserialize each response immediately, all the deserialized object will be kept in memory until all the results have been received. This would be wastey for memory usage.

We have tried some implementations about above considerations, current PR is the simplest one, I hope we could discuss more details. Please help to evaluate these ideas, thanks a lot.

@howardhuanghua howardhuanghua changed the title coordinate node memory checking during accumulating shard result response Coordinate node memory checking during accumulating shard result response Oct 11, 2019
@alpar-t alpar-t added the :Core/Infra/Circuit Breakers Track estimates of memory consumption to prevent overload label Oct 18, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra (:Core/Infra/Circuit Breakers)

@howardhuanghua
Copy link
Contributor Author

Hi Adrien @jpountz , would you please help to review this PR? Thanks a lot.

final T response;
long bytesNeedToRelease = 0;
CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the relevant circuit breaker is IN_FLIGHT_REQUESTS

@jpountz
Copy link
Contributor

jpountz commented Oct 25, 2019

I'm not deep into how transport handles circuit breakers, and transport tends to be a bit tricky as there are some requests that should never fail to keep the transport stable. @tbrooks8 Could you help review this one?

Copy link
Contributor

@Tim-Brooks Tim-Brooks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is IMO not the correct approach for a cohesive solution on circuit breaking on responses.

The InboundHandler which is a networking layer component should be agnostic to how specific transport messages operate. This is lost when we start instanceOf and casting to verifying what message we are receiving.

We have discussed circuit breaking on responses and I think it is something we want to do (#42318). However it is obviously more tricky because we do not have the concept of a "response context" (some indicator telling us when we are done). This solution only works for the response in question because the Executor is SAME. If we dispatched this would not really do much.

If we want to circuit break on this response, I would suggest one of two approaches:

  1. Put the circuit breaker in SearchTransportService and have this response do whatever it wants in regards to circuit breaking.

  2. Make this a team-discuss topic for the infra team and add a default method canTripCircuitBreaker to TransportResponseHandler. By default it will return false and this specific implementation can override it and return true. The network layer can then circuit break based on this method agnostic to response type.

Number 2 would obviously be the basis of the future circuit breaking on response work (although it obviously only would work with synchronous response handling for now).

@howardhuanghua
Copy link
Contributor Author

Hi @tbrooks8 , thanks for the comment. I have updated the PR and use canTripCircuitBreaker to check whether we need to trigger circuit breaker checking. And also changed REQUEST breaker to IN_FLIGHT_REQUESTS based on Adrien's suggestion. Would you please help to review again?

By the way, how do you think about memory expansion after shard level result deserialization? Shall we need to use the estimated response object size as I described in above comment?

1. Introduce memory expansion ratio to estimate response object size based on received message length before deserialization.

After message deserialization, real memory size of response object would be much larger than the original received message bytes length. Take aggregation buckets as example, we have done some tests about response object memory size expansion ratio, and here are the results:

Aggregation Type Expansion Ratio (After deserialization/Before deserialization)
10-chars string terms agg 19.5
20-chars string terms agg 11.2
30-chars string terms agg 8.2
50-chars string terms agg 5.5
2-digits long terms agg 14.7
5-digits long terms agg 15
10-digits long terms agg 15.2
We could see the memory expansion ratio of string terms aggregation is about 10-20, and long terms aggregation would be stable at around 15.
So we could use a default expansion ratio like 10 to estimate response object size before deserialization, and make this expansion ratio configurable for different data analysis scenario.
Without considering the memory expansion, it's easy to explode coordinate node memory.

@Tim-Brooks
Copy link
Contributor

@howardhuanghua I was out last week. I will take a look at this soon. I will also engage some other people about this to ensure we are all in agreement about the default method approach.

@howardhuanghua
Copy link
Contributor Author

Thank you @tbrooks8 , if you have free time, please help to check this approach again. :)

@Tim-Brooks
Copy link
Contributor

I brought this up in the distributed sync today to discuss. To give some context, I mainly focus on the networking area of Elasticsearch which is why @jpountz pinged me on this topic.

So the first thing is that as I mentioned we have discussed adding response circuit breaking before and it is something we are interested in. We definitely accumulate data at the network level and it would be nice to avoid buffering a ton of unaccounted data. But getting the maximum value out of that work requires additional work to the transport protocol (which is in progress but seems unlikely to be ready in the short-term).

This particular PR seems to be addressing a specific concern - accumulating a significant amount of memory on the coordinator node. The reason that the networking oriented solution (IN_FLIGHT_REQUESTS) might be inadequate for the desired results it that it depends on the responses from other nodes all occurring at the same time. A lot of the response handlers here (my understanding) will simply set the results in some array and then the execution will be complete (immediately releasing the circuit breaker).

I guess it was not clear that this would provide much value for the accumulation concern? Is the expectation here that we would always be depending on the JMX memory stats circuit breaker to save us? And the hope is that we could get some type of essentially "deserialization estimate" prior to performing the deserialization?

On the topic of deserialization being larger than the network bytes, once again probably the best approach there is to have some type of TransportResponseHandler method which provides the actual deserialization size estimate. But that is also a level of circuit breaking logic that we have never done before. I want to ping @elastic/es-core-infra / @danielmitterdorfer here as I know that Daniel might have some thoughts about this.

I am personally okay with the additional response size circuit breaking. I just don't know how much value it provides in relation to what exists at the network level currently (plus the fact that releasing is clearly broken at the moment since it does not support async, it completely depends on the JMX real memory stuff). And maybe this should just be some internal logic inside the accumulation handler.

if (handler.canTripCircuitBreaker() && messageLengthBytes > 1024) {
// the main purpose is to check memory before deserialization for large size of response
bytesNeedToRelease = messageLengthBytes;
breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_response>");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this particular approach (the network level approach), you will need to catch the circuit breaking exception and handle it. With this code path it is going to appear as a transport serialization exception.

I also this that we would want to approach to literally surround the execution. I understand you are primarily concern about the increase in size from deserialization, but as a generic approach we would want to wrap the call to the response handler.

@elasticcla
Copy link

Hi @howardhuanghua, we have found your signature in our records, but it seems like you have signed with a different e-mail than the one used in your Git commit. Can you please add both of these e-mails into your Github profile (they can be hidden), so we can match your e-mails to your Github profile?

@howardhuanghua
Copy link
Contributor Author

Hi @tbrooks8 , thank you for your patient explanations.

I guess it was not clear that this would provide much value for the accumulation concern? Is the expectation here that we would always be depending on the JMX memory stats circuit breaker to save us? And the hope is that we could get some type of essentially "deserialization estimate" prior to performing the deserialization?

Yes, you are right, this PR is not going to check accumulated response results memory, the main purpose is to prevent continuous shard level results deserialization, and discard these response results if the total JVM heap is going to be used up (based on checking parent circuit breaker). For more accurate prediction, we hope to add the estimated deserialized size to the breaker, as after deserialization, result size would be several times of network message bytes.

The network layer accumulated buffered memory should be a big part of memory usage on coordinate node, another big part is bucket reduce phase that I have already opened #46751 to optimize it. This PR could discard the continuous shard level responses to protect coordinate node heap.

Shard level search response results would be accumulated in memory in the follow logic:

void consumeResult(Result result) {
assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set";
results.set(result.getShardIndex(), result);
}

Before getting all the shard results, the memory would not be released, related checking logic:

I have tried to add the accumulated result memory into REQUEST breaker in ArraySearchPhaseResults.java#consumeResult before, the main issues are:

  1. Response result has already been deserialized, it's real memory used size, not predicted.
  2. It's hard to estimate response result object size (no sizeof method in java to calc object), above test result about expansion ratio is calculated by some external tools.
  3. It's hard to release the added size from breaker completely, would potentially got breaker memory size leak.

We definitely accumulate data at the network level and it would be nice to avoid buffering a ton of unaccounted data.

I got an idea to optimize it a little bit, could we only accumulate the network messages without deserializing them? After getting all the shard results for the single search request on coordinate node, then deserialize them together and continue to execute the next phase. We could stop accumulate the original messages if parent breaker has already been triggered. This would save much more memory compare to buffer deserialized result object. I have described this in above comment in the follow section:
3. Deserialize response messages until all the shard results have been received.

And I have updated the commit based on your suggestion, please let me know if something wrong.
I know coordinate node's circuit breaker is being continuously optimized, we also have problems on this part and try to improve it, we hope that we could contribute on this part with you togther, thank you. :)

@rjernst rjernst added the Team:Core/Infra Meta label for core/infra team label May 4, 2020
@elasticsearchmachine elasticsearchmachine changed the base branch from master to main July 22, 2022 23:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Core/Infra/Circuit Breakers Track estimates of memory consumption to prevent overload Team:Core/Infra Meta label for core/infra team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants