Skip to content

Commit

Permalink
Relax error handling in transport disk usage API (#84774)
Browse files Browse the repository at this point in the history
Relax the failure handling to cover other exceptions such as
BroadcastShardOperationFailedException.
  • Loading branch information
dnhatn committed Mar 26, 2022
1 parent fbe7cf7 commit 53606ba
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84774.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84774
summary: Increase store ref before analyzing disk usage
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,65 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.junit.Before;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class IndexDiskUsageAnalyzerIT extends ESIntegTestCase {

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), EngineTestPlugin.class);
}

private static final Set<ShardId> failOnFlushShards = Sets.newConcurrentHashSet();

public static class EngineTestPlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
final ShardId shardId = config.getShardId();
if (failOnFlushShards.contains(shardId)) {
throw new EngineException(shardId, "simulated IO");
}
super.flush(force, waitIfOngoing);
}
});
}
}

@Before
public void resetFailOnFlush() throws Exception {
failOnFlushShards.clear();
}

public void testSimple() throws Exception {
final XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject();
Expand Down Expand Up @@ -152,6 +202,43 @@ public void testGeoShape() throws Exception {
assertMetadataFields(stats);
}

public void testFailOnFlush() throws Exception {
final String indexName = "test-index";
int numberOfShards = between(1, 5);
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1))
)
.get();
ensureYellow(indexName);
int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
int value = randomIntBetween(1, 10);
final XContentBuilder doc = XContentFactory.jsonBuilder()
.startObject()
.field("english_text", English.intToEnglish(value))
.field("value", value)
.endObject();
client().prepareIndex(indexName).setId("id-" + i).setSource(doc).get();
}
Index index = clusterService().state().metadata().index(indexName).getIndex();
List<ShardId> failedShards = randomSubsetOf(
between(1, numberOfShards),
IntStream.range(0, numberOfShards).mapToObj(n -> new ShardId(index, n)).toList()
);
failOnFlushShards.addAll(failedShards);
AnalyzeIndexDiskUsageResponse resp = client().execute(
AnalyzeIndexDiskUsageAction.INSTANCE,
new AnalyzeIndexDiskUsageRequest(new String[] { indexName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true)
).actionGet();
assertThat(resp.getTotalShards(), equalTo(numberOfShards));
assertThat(resp.getFailedShards(), equalTo(failedShards.size()));
}

void assertMetadataFields(IndexDiskUsageStats stats) {
final IndexDiskUsageStats.PerFieldDiskUsage sourceField = stats.getFields().get("_source");
assertThat(sourceField.getInvertedIndexBytes(), equalTo(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.indices.diskusage;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -109,8 +110,10 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
if (r instanceof AnalyzeDiskUsageShardResponse resp) {
++successfulShards;
combined.compute(resp.getIndex(), (k, v) -> v == null ? resp.stats : v.add(resp.stats));
} else if (r instanceof DefaultShardOperationFailedException) {
shardFailures.add((DefaultShardOperationFailedException) r);
} else if (r instanceof DefaultShardOperationFailedException e) {
shardFailures.add(e);
} else if (r instanceof Exception e) {
shardFailures.add(new DefaultShardOperationFailedException(ExceptionsHelper.convertToElastic(e)));
} else {
assert false : "unknown response [" + r + "]";
throw new IllegalStateException("unknown response [" + r + "]");
Expand Down

0 comments on commit 53606ba

Please sign in to comment.