Skip to content

Commit

Permalink
Closed index replica allocation
Browse files Browse the repository at this point in the history
Added integration test validating that fast recovery is made for closed
indices when multiple shard copies can be chosen from.

Fixed InternalTestCluster to allow doing operations inside onStopped()
when using restartXXXNode().

Relates elastic#41400 and elastic#33888
  • Loading branch information
henningandersen committed May 3, 2019
1 parent 8e14aba commit d9376ee
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -338,6 +340,53 @@ public void testCloseIndexWaitForActiveShards() throws Exception {
assertIndexIsClosed(indexName);
}

/**
* Verify that if we have two shard copies around, we prefer one with identical sequence numbers and do
* a noop recovery.
*/
public void testClosedIndexRecoversFast() throws Exception {
final String indexName = "closed-index-fast-recovery";
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());

indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50))
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
ensureGreen(indexName);

internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50))
.mapToObj(i -> client().prepareIndex(indexName, "_doc", "Extra" + i).setSource("num", i)).collect(toList()));
ensureGreen();

internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ensureYellow();

assertAcked(client().admin().indices().prepareClose(indexName).get());

assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")).get());
return super.onNodeStopped(nodeName);
}
});
return super.onNodeStopped(nodeName);
}
});

assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get());
ensureGreen();
// needs merge of #41400 before we can check this.
// assertNoFileBasedRecovery(indexName);
}

static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
Expand Down Expand Up @@ -564,7 +564,8 @@ private NodeAndClient getRandomNodeAndClient() {

private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) {
ensureOpen();
List<NodeAndClient> values = nodes.values().stream().filter(predicate).collect(Collectors.toList());
List<NodeAndClient> values = nodes.values().stream().filter(nc -> nc.isClosed() == false).filter(predicate)
.collect(Collectors.toList());
if (values.isEmpty() == false) {
return randomFrom(random, values);
}
Expand Down Expand Up @@ -1003,6 +1004,10 @@ public void close() throws IOException {
}
}

public boolean isClosed() {
return closed.get();
}

private void markNodeDataDirsAsPendingForWipe(Node node) {
assert Thread.holdsLock(InternalTestCluster.this);
NodeEnvironment nodeEnv = node.getNodeEnvironment();
Expand Down Expand Up @@ -1178,10 +1183,11 @@ public synchronized void validateClusterFormed() {

/** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */
private synchronized void validateClusterFormed(String viaNode) {
Set<DiscoveryNode> expectedNodes = new HashSet<>();
for (NodeAndClient nodeAndClient : nodes.values()) {
expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode());
}
Set<DiscoveryNode> expectedNodes =
nodes.values().stream()
.filter(nc -> nc.isClosed() == false)
.map(nc -> getInstanceFromNode(ClusterService.class, nc.node()).localNode())
.collect(Collectors.toSet());
logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes);
final Client client = client(viaNode);
try {
Expand Down Expand Up @@ -1533,7 +1539,7 @@ private static <T> T getInstanceFromNode(Class<T> clazz, Node node) {

@Override
public int size() {
return nodes.size();
return Math.toIntExact(nodes.values().stream().filter(nc -> nc.isClosed() == false).count());
}

@Override
Expand Down Expand Up @@ -2085,7 +2091,10 @@ private static int getMinMasterNodes(int eligibleMasterNodes) {
}

private int getMasterNodesCount() {
return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
return (int) nodes.values().stream()
.filter(n -> n.isClosed() == false)
.filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings()))
.count();
}

public String startMasterOnlyNode() {
Expand Down

0 comments on commit d9376ee

Please sign in to comment.