Skip to content

Commit

Permalink
Reduce memory required for search responses when many shards are unav…
Browse files Browse the repository at this point in the history
…ailable (#91365) (#92907)

When there are many shards unavailable, we repeatably store the exact same stack trace and exception. The only difference is the exception message. 

This commit fixes this by slightly modifying the created exception to not provide a stacktrace or print its stacktrace as a "reason" when a shard is unavailable.


closes #90622
  • Loading branch information
benwtrent committed Jan 13, 2023
1 parent 404d575 commit c60a418
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 24 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/91365.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91365
summary: Reduce memory required for search responses when many shards are unavailable
area: Search
type: bug
issues:
- 90622
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -67,6 +68,9 @@ public void testClusterAllowPartialsWithRedState() throws Exception {
assertThat("Expected total shards", searchResponse.getTotalShards(), equalTo(numShards));
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
assertThat(failure.getCause(), instanceOf(NoShardAvailableActionException.class));
assertThat(failure.getCause().getStackTrace(), emptyArray());
// We don't write out the entire, repetitive stacktrace in the reason
assertThat(failure.reason(), equalTo("org.elasticsearch.action.NoShardAvailableActionException\n"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,36 @@
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.io.PrintWriter;

public class NoShardAvailableActionException extends ElasticsearchException {

private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0];

// This is set so that no StackTrace is serialized in the scenario when we wrap other shard failures.
// It isn't necessary to serialize this field over the wire as the empty stack trace is serialized instead.
private final boolean onShardFailureWrapper;

public static NoShardAvailableActionException forOnShardFailureWrapper(String msg) {
return new NoShardAvailableActionException(null, msg, null, true);
}

public NoShardAvailableActionException(ShardId shardId) {
this(shardId, null);
this(shardId, null, null, false);
}

public NoShardAvailableActionException(ShardId shardId, String msg) {
this(shardId, msg, null);
this(shardId, msg, null, false);
}

public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) {
this(shardId, msg, cause, false);
}

private NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause, boolean onShardFailureWrapper) {
super(msg, cause);
setShard(shardId);
this.onShardFailureWrapper = onShardFailureWrapper;
}

@Override
Expand All @@ -37,5 +53,22 @@ public RestStatus status() {

public NoShardAvailableActionException(StreamInput in) throws IOException {
super(in);
onShardFailureWrapper = false;
}

@Override
public StackTraceElement[] getStackTrace() {
return onShardFailureWrapper ? EMPTY_STACK_TRACE : super.getStackTrace();
}

@Override
public void printStackTrace(PrintWriter s) {
if (onShardFailureWrapper == false) {
super.printStackTrace(s);
} else {
// Override to simply print the first line of the trace, which is the current exception.
// Since we aren't serializing the repetitive stacktrace onShardFailureWrapper, we shouldn't print it out either
s.println(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public final void onShardFailure(final int shardIndex, SearchShardTarget shardTa
if (TransportActions.isShardNotAvailableException(e)) {
// Groups shard not available exceptions under a generic exception that returns a SERVICE_UNAVAILABLE(503)
// temporary error.
e = new NoShardAvailableActionException(shardTarget.getShardId(), e.getMessage());
e = NoShardAvailableActionException.forOnShardFailureWrapper(e.getMessage());
}
// we don't aggregate shard on failures due to the internal cancellation,
// but do keep the header counts right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public ShardSearchFailure(Exception e, @Nullable SearchShardTarget shardTarget)

/**
* The search shard target the failure occurred on.
* @return The shardTarget, may be null
*/
@Nullable
public SearchShardTarget shard() {
Expand All @@ -95,7 +96,6 @@ public String toString() {

public static ShardSearchFailure readShardSearchFailure(StreamInput in) throws IOException {
return new ShardSearchFailure(in);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -28,17 +29,17 @@

public class ShardSearchFailureTests extends ESTestCase {

private static SearchShardTarget randomShardTarget(String indexUuid) {
String nodeId = randomAlphaOfLengthBetween(5, 10);
String indexName = randomAlphaOfLengthBetween(5, 10);
String clusterAlias = randomBoolean() ? randomAlphaOfLengthBetween(5, 10) : null;
return new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), randomInt()), clusterAlias);
}

public static ShardSearchFailure createTestItem(String indexUuid) {
String randomMessage = randomAlphaOfLengthBetween(3, 20);
Exception ex = new ParsingException(0, 0, randomMessage, new IllegalArgumentException("some bad argument"));
SearchShardTarget searchShardTarget = null;
if (randomBoolean()) {
String nodeId = randomAlphaOfLengthBetween(5, 10);
String indexName = randomAlphaOfLengthBetween(5, 10);
String clusterAlias = randomBoolean() ? randomAlphaOfLengthBetween(5, 10) : null;
searchShardTarget = new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), randomInt()), clusterAlias);
}
return new ShardSearchFailure(ex, searchShardTarget);
return new ShardSearchFailure(ex, randomBoolean() ? randomShardTarget(indexUuid) : null);
}

public void testFromXContent() throws IOException {
Expand Down Expand Up @@ -110,6 +111,22 @@ public void testToXContent() throws IOException {
}"""), xContent.utf8ToString());
}

public void testToXContentForNoShardAvailable() throws IOException {
ShardId shardId = new ShardId(new Index("indexName", "indexUuid"), 123);
ShardSearchFailure failure = new ShardSearchFailure(
NoShardAvailableActionException.forOnShardFailureWrapper("shard unassigned"),
new SearchShardTarget("nodeId", shardId, null)
);
BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean());
assertEquals(XContentHelper.stripWhitespace("""
{
"shard": 123,
"index": "indexName",
"node": "nodeId",
"reason":{"type":"no_shard_available_action_exception","reason":"shard unassigned"}
}"""), xContent.utf8ToString());
}

public void testToXContentWithClusterAlias() throws IOException {
ShardSearchFailure failure = new ShardSearchFailure(
new ParsingException(0, 0, "some message", null),
Expand All @@ -131,17 +148,25 @@ public void testToXContentWithClusterAlias() throws IOException {
}

public void testSerialization() throws IOException {
ShardSearchFailure testItem = createTestItem(randomAlphaOfLength(12));
ShardSearchFailure deserializedInstance = copyWriteable(
testItem,
writableRegistry(),
ShardSearchFailure::new,
VersionUtils.randomVersion(random())
);
assertEquals(testItem.index(), deserializedInstance.index());
assertEquals(testItem.shard(), deserializedInstance.shard());
assertEquals(testItem.shardId(), deserializedInstance.shardId());
assertEquals(testItem.reason(), deserializedInstance.reason());
assertEquals(testItem.status(), deserializedInstance.status());
for (int runs = 0; runs < 25; runs++) {
final ShardSearchFailure testItem;
if (randomBoolean()) {
testItem = createTestItem(randomAlphaOfLength(12));
} else {
SearchShardTarget target = randomShardTarget(randomAlphaOfLength(12));
testItem = new ShardSearchFailure(NoShardAvailableActionException.forOnShardFailureWrapper("unavailable"), target);
}
ShardSearchFailure deserializedInstance = copyWriteable(
testItem,
writableRegistry(),
ShardSearchFailure::new,
VersionUtils.randomVersion(random())
);
assertEquals(testItem.index(), deserializedInstance.index());
assertEquals(testItem.shard(), deserializedInstance.shard());
assertEquals(testItem.shardId(), deserializedInstance.shardId());
assertEquals(testItem.reason(), deserializedInstance.reason());
assertEquals(testItem.status(), deserializedInstance.status());
}
}
}

0 comments on commit c60a418

Please sign in to comment.