Skip to content

Commit

Permalink
Handle partial failure retrieving segments in SegmentCountStep (#46556)
Browse files Browse the repository at this point in the history
Since the `IndicesSegmentsRequest` scatters to all shards for the index,
it's possible that some of the shards may fail. This adds failure
handling and logging (since this is a best-effort step in the first
place) for this case.
  • Loading branch information
dakrone committed Sep 11, 2019
1 parent 0963e78 commit 09a9cef
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -50,19 +51,32 @@ public int getMaxNumSegments() {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
ActionListener.wrap(response -> {
IndexSegments segments = response.getIndices().get(indexMetaData.getIndex().getName());
List<ShardSegments> unmergedShards = segments.getShards().values().stream()
.flatMap(iss -> Arrays.stream(iss.getShards()))
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
.collect(Collectors.toList());
if (unmergedShards.size() > 0) {
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
indexMetaData.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
IndexSegments idxSegments = response.getIndices().get(indexMetaData.getIndex().getName());
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
"there were {} shard failures. " +
"failures: {}",
indexMetaData.getIndex().getName(),
response.getFailedShards(),
failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
.map(Strings::toString)
.collect(Collectors.toList()), ","));
listener.onResponse(true, new Info(-1));
} else {
List<ShardSegments> unmergedShards = idxSegments.getShards().values().stream()
.flatMap(iss -> Arrays.stream(iss.getShards()))
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
.collect(Collectors.toList());
if (unmergedShards.size() > 0) {
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
indexMetaData.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
}
// Force merging is best effort, so always return true that the condition has been met.
listener.onResponse(true, new Info(unmergedShards.size()));
}
// Force merging is best effort, so always return true that the condition has been met.
listener.onResponse(true, new Info(unmergedShards.size()));
}, listener::onFailure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
Expand Down Expand Up @@ -130,6 +131,7 @@ public void onResponse(boolean conditionMet, ToXContentObject info) {

@Override
public void onFailure(Exception e) {
logger.warn("unexpected onFailure call", e);
throw new AssertionError("unexpected method call");
}
});
Expand Down Expand Up @@ -187,6 +189,7 @@ public void onResponse(boolean conditionMet, ToXContentObject info) {

@Override
public void onFailure(Exception e) {
logger.warn("unexpected onFailure call", e);
throw new AssertionError("unexpected method call");
}
});
Expand All @@ -195,6 +198,67 @@ public void onFailure(Exception e) {
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
}

public void testFailedToRetrieveSomeSegments() {
int maxNumSegments = randomIntBetween(3, 10);
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
List<Segment> segments = new ArrayList<>();
for (int i = 0; i < maxNumSegments + randomIntBetween(1, 3); i++) {
segments.add(null);
}
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), null));
Mockito.when(indicesSegmentResponse.getShardFailures())
.thenReturn(new DefaultShardOperationFailedException[]{new DefaultShardOperationFailedException(index.getName(),
0, new IllegalArgumentException("fake"))});
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);

Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);

Step.StepKey stepKey = randomStepKey();
StepKey nextStepKey = randomStepKey();

Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(indicesSegmentResponse);
return null;
}).when(indicesClient).segments(any(), any());

SetOnce<Boolean> conditionMetResult = new SetOnce<>();
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
conditionMetResult.set(conditionMet);
conditionInfo.set(info);
}

@Override
public void onFailure(Exception e) {
logger.warn("unexpected onFailure call", e);
throw new AssertionError("unexpected method call: " + e);
}
});

assertTrue(conditionMetResult.get());
assertEquals(new SegmentCountStep.Info(-1L), conditionInfo.get());
}

public void testThrowsException() {
Exception exception = new RuntimeException("error");
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Expand Down

0 comments on commit 09a9cef

Please sign in to comment.