Skip to content

Commit

Permalink
Backport support for replicating closed indices to 7.x (#39506)(#39499)
Browse files Browse the repository at this point in the history
Backport support for replicating closed indices (#39499)
    
    Before this change, closed indexes were simply not replicated. It was therefore
    possible to close an index and then decommission a data node without knowing
    that this data node contained shards of the closed index, potentially leading to
    data loss. Shards of closed indices were not completely taken into account when
    balancing the shards within the cluster, or automatically replicated through shard
    copies, and they were not easily movable from node A to node B using APIs like
    Cluster Reroute without being fully reopened and closed again.
    
    This commit changes the logic executed when closing an index, so that its shards
    are not just removed and forgotten but are instead reinitialized and reallocated on
    data nodes using an engine implementation which does not allow searching or
     indexing, which has a low memory overhead (compared with searchable/indexable
    opened shards) and which allows shards to be recovered from peer or promoted
    as primaries when needed.
    
    This new closing logic is built on top of the new Close Index API introduced in
    6.7.0 (#37359). Some pre-closing sanity checks are executed on the shards before
    closing them, and closing an index on a 8.0 cluster will reinitialize the index shards
    and therefore impact the cluster health.
    
    Some APIs have been adapted to make them work with closed indices:
    - Cluster Health API
    - Cluster Reroute API
    - Cluster Allocation Explain API
    - Recovery API
    - Cat Indices
    - Cat Shards
    - Cat Health
    - Cat Recovery
    
    This commit contains all the following changes (most recent first):
    * c6c42a1 Adapt NoOpEngineTests after #39006
    * 3f9993d Wait for shards to be active after closing indices (#38854)
    * 5e7a428 Adapt the Cluster Health API to closed indices (#39364)
    * 3e61939 Adapt CloseFollowerIndexIT for replicated closed indices (#38767)
    * 71f5c34 Recover closed indices after a full cluster restart (#39249)
    * 4db7fd9 Adapt the Recovery API for closed indices (#38421)
    * 4fd1bb2 Adapt more tests suites to closed indices (#39186)
    * 0519016 Add replica to primary promotion test for closed indices (#39110)
    * b756f6c Test the Cluster Shard Allocation Explain API with closed indices (#38631)
    * c484c66 Remove index routing table of closed indices in mixed versions clusters (#38955)
    * 00f1828 Mute CloseFollowerIndexIT.testCloseAndReopenFollowerIndex()
    * e845b0a Do not schedule Refresh/Translog/GlobalCheckpoint tasks for closed indices (#38329)
    * cf9a015 Adapt testIndexCanChangeCustomDataPath for replicated closed indices (#38327)
    * b9becdd Adapt testPendingTasks() for replicated closed indices (#38326)
    * 02cc730 Allow shards of closed indices to be replicated as regular shards (#38024)
    * e53a9be Fix compilation error in IndexShardIT after merge with master
    * cae4155 Relax NoOpEngine constraints (#37413)
    * 54d110b [RCI] Adapt NoOpEngine to latest FrozenEngine changes
    * c63fd69 [RCI] Add NoOpEngine for closed indices (#33903)
    
    Relates to #33888
  • Loading branch information
tlrx committed Mar 1, 2019
1 parent 06d0e0e commit e005eeb
Show file tree
Hide file tree
Showing 75 changed files with 2,800 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.WarningFailureException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
Expand All @@ -47,6 +48,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -65,8 +67,11 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

/**
Expand Down Expand Up @@ -1022,8 +1027,98 @@ public void testSoftDeletes() throws Exception {
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion)
throws IOException {
/**
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
* it verifies that the index exists and is replicated if the old version supports replication.
*/
public void testClosedIndices() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen(index);

int numDocs = 0;
if (randomBoolean()) {
numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
assertOK(client().performRequest(request));
if (rarely()) {
refresh();
}
}
refresh();
}

assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
closeIndex(index);
}

if (getOldClusterVersion().onOrAfter(Version.V_7_1_0)) {
ensureGreenLongWait(index);
assertClosedIndex(index, true);
} else {
assertClosedIndex(index, false);
}

if (isRunningAgainstOldCluster() == false) {
openIndex(index);
ensureGreen(index);

final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
assertThat(numberOfShards, notNullValue());
final int nbShards = Integer.parseInt(numberOfShards);
assertThat(nbShards, greaterThanOrEqualTo(1));

for (int i = 0; i < nbShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(2));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);
Map<String, Object> listSnapshotResponse = entityAsMap(client().performRequest(listSnapshotRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,21 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Predicate;
Expand All @@ -43,7 +49,9 @@
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* In depth testing of the recovery mechanism during a rolling restart.
Expand Down Expand Up @@ -310,4 +318,148 @@ public void testRecoveryWithSoftDeletes() throws Exception {
}
ensureGreen(index);
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
* the replication of closed indices) during the rolling upgrade.
*/
public void testRecoveryClosedIndex() throws Exception {
final String indexName = "closed_index_created_on_old";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

final Version indexVersionCreated = indexVersionCreated(indexName);
if (indexVersionCreated.onOrAfter(Version.V_7_1_0)) {
// index was created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
* time the index was closed.
*/
public void testCloseIndexDuringRollingUpgrade() throws Exception {
final Version minimumNodeVersion = minimumNodeVersion();
final String indexName =
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);

final Request indexExistsRequest = new Request("HEAD", "/" + indexName);
indexExistsRequest.setOptions(allowTypesRemovalWarnings());

final Response indexExistsResponse = client().performRequest(indexExistsRequest);
if (RestStatus.OK.getStatus() != indexExistsResponse.getStatusLine().getStatusCode()) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

if (minimumNodeVersion.onOrAfter(Version.V_7_1_0)) {
// index is created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* Returns the version in which the given index has been created
*/
private static Version indexVersionCreated(final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_settings");
final String versionCreatedSetting = indexName + ".settings.index.version.created";
request.addParameter("filter_path", versionCreatedSetting);

final Response response = client().performRequest(request);
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));

for (int i = 0; i < numberOfShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
}
},
"params": {
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "all",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"level": {
"type" : "enum",
"options" : ["cluster","indices","shards"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Sets the number of active shards to wait for before the operation returns."
}
}
},
Expand Down
Loading

0 comments on commit e005eeb

Please sign in to comment.