Skip to content

Commit

Permalink
Noop peer recoveries on closed index (#41400)
Browse files Browse the repository at this point in the history
If users close an index to change some non-dynamic index settings, then the current implementation forces replicas of that closed index to copy over segment files from the primary. With this change, we make peer recoveries of closed index skip both phases.

Relates #33888

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
  • Loading branch information
dnhatn and ywelsch committed May 3, 2019
1 parent 2864e98 commit c7df2b8
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ public int estimateNumberOfHistoryOperations(String source, MapperService mapper

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return false;
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,31 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptySet;
Expand All @@ -50,9 +56,11 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

public class CloseIndexIT extends ESIntegTestCase {
Expand Down Expand Up @@ -338,6 +346,81 @@ public void testCloseIndexWaitForActiveShards() throws Exception {
assertIndexIsClosed(indexName);
}

public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception {
final String indexName = "noop-peer-recovery-test";
int numberOfReplicas = between(1, 2);
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put("index.routing.rebalance.enable", "none")
.build());
int iterations = between(1, 3);
for (int iter = 0; iter < iterations; iter++) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);

// Closing an index should execute noop peer recovery
assertAcked(client().admin().indices().prepareClose(indexName).get());
assertIndexIsClosed(indexName);
ensureGreen(indexName);
assertNoFileBasedRecovery(indexName);
internalCluster().assertSameDocIdsOnShards();

// Open a closed index should execute noop recovery
assertAcked(client().admin().indices().prepareOpen(indexName).get());
assertIndexIsOpened(indexName);
ensureGreen(indexName);
assertNoFileBasedRecovery(indexName);
internalCluster().assertSameDocIdsOnShards();
}
}

/**
* Ensures that if a replica of a closed index does not have the same content as the primary, then a file-based recovery will occur.
*/
public void testRecoverExistingReplica() throws Exception {
final String indexName = "test-recover-existing-replica";
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> dataNodes = randomSubsetOf(2, Sets.newHashSet(
clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet()));
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
} else {
client().admin().indices().prepareSyncedFlush(indexName).get();
}
// index more documents while one shard copy is offline
internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
Client client = client(dataNodes.get(0));
int moreDocs = randomIntBetween(1, 50);
for (int i = 0; i < moreDocs; i++) {
client.prepareIndex(indexName, "_doc").setSource("num", i).get();
}
assertAcked(client.admin().indices().prepareClose(indexName));
return super.onNodeStopped(nodeName);
}
});
assertIndexIsClosed(indexName);
ensureGreen(indexName);
internalCluster().assertSameDocIdsOnShards();
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
assertThat(recovery.getIndex().fileDetails(), not(empty()));
}
}
}

static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
Expand Down Expand Up @@ -383,4 +466,12 @@ static void assertException(final Throwable throwable, final String indexName) {
fail("Unexpected exception: " + t);
}
}

void assertNoFileBasedRecovery(String indexName) {
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
assertThat(recovery.getIndex().fileDetails(), empty());
}
}
}
}

0 comments on commit c7df2b8

Please sign in to comment.