Skip to content

Commit

Permalink
Recover closed indices after a full cluster restart (#39249)
Browse files Browse the repository at this point in the history
Closing an index is a process that can be broken down into several steps:
1. first, the state of the cluster is updated to add a write block on the index 
to be closed
2. then, a transport replication action is executed on all shards of the index. 
This action checks that the maximum sequence number and the global 
checkpoint have identical values, indicating that all in flight writing operations 
have been completed on the shard.
3. finally, and if the previous steps were successful, the cluster state is 
updated again to change the state of the index from `OPEN`to `CLOSE`.

During the last step, the master node retrieves the minimum node version 
among all the nodes that compose the cluster:
* If a node is in pre 8.0 version, the index is closed and the index routing 
table is removed from the cluster state. This is the "old" way of closing 
indices and closed indices with no routing table are not replicated.
* If all nodes are in version 8.0 or higher, the index is closed and its routing 
table is reinitialized in cluster state. This is the new way of closing indices 
and such closed indices will be replicated in the cluster.

But routing tables are not persisted in the cluster state, so after a full cluster 
restart there is no way to make the distinction between an index closed in 
7.x and an index closed and replicated on 8.0.

This commit introduces a new private index settings named 
`index.verified_before_close` that is added to closed indices that are replicated 
at closing time. This setting serves as a marker to indicate that the index has 
been closed using the new Close Index API on a cluster that supports
 replication of closed indices.

This way, after a full cluster restart, the Gateway service can automatically 
recovers those closed indices as if they were opened indices. Closed indices 
that don't have this setting (because they were closed on a pre-8.0 cluster, 
or a cluster in mixed version) won't be recovered and will need to be reopened 
and closed again on a 8.0 cluster.

Note that reopening the index removes the private setting. 

Relates to #33888
  • Loading branch information
tlrx committed Feb 26, 2019
1 parent d0cf376 commit 71f5c34
Show file tree
Hide file tree
Showing 13 changed files with 418 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
Expand All @@ -41,6 +42,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,8 +61,11 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* Tests to run before and after a full cluster restart. This is run twice,
Expand Down Expand Up @@ -951,6 +956,97 @@ public void testSoftDeletes() throws Exception {
}
}

/**
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
* it verifies that the index exists and is replicated if the old version supports replication.
*/
public void testClosedIndices() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen(index);

int numDocs = 0;
if (randomBoolean()) {
numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
assertOK(client().performRequest(request));
if (rarely()) {
refresh();
}
}
refresh();
}

assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
closeIndex(index);
}

if (getOldClusterVersion().onOrAfter(Version.V_8_0_0)) {
ensureGreenLongWait(index);
assertClosedIndex(index, true);
} else {
assertClosedIndex(index, false);
}

if (isRunningAgainstOldCluster() == false) {
openIndex(index);
ensureGreen(index);

final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
assertThat(numberOfShards, notNullValue());
final int nbShards = Integer.parseInt(numberOfShards);
assertThat(nbShards, greaterThanOrEqualTo(1));

for (int i = 0; i < nbShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(2));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}

private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Predicate;
Expand All @@ -43,7 +48,9 @@
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* In depth testing of the recovery mechanism during a rolling restart.
Expand Down Expand Up @@ -310,4 +317,144 @@ public void testRecoveryWithSoftDeletes() throws Exception {
}
ensureGreen(index);
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
* the replication of closed indices) during the rolling upgrade.
*/
public void testRecoveryClosedIndex() throws Exception {
final String indexName = "closed_index_created_on_old";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

final Version indexVersionCreated = indexVersionCreated(indexName);
if (indexVersionCreated.onOrAfter(Version.V_8_0_0)) {
// index was created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
* time the index was closed.
*/
public void testCloseIndexDuringRollingUpgrade() throws Exception {
final Version minimumNodeVersion = minimumNodeVersion();
final String indexName =
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);

if (indexExists(indexName) == false) {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build());
ensureGreen(indexName);
closeIndex(indexName);
}

if (minimumNodeVersion.onOrAfter(Version.V_8_0_0)) {
// index is created on a version that supports the replication of closed indices,
// so we expect the index to be closed and replicated
ensureGreen(indexName);
assertClosedIndex(indexName, true);
} else {
assertClosedIndex(indexName, false);
}
}

/**
* Returns the version in which the given index has been created
*/
private static Version indexVersionCreated(final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_settings");
final String versionCreatedSetting = indexName + ".settings.index.version.created";
request.addParameter("filter_path", versionCreatedSetting);

final Response response = client().performRequest(request);
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
*/
@SuppressWarnings("unchecked")
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));

final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
assertThat(metadata, notNullValue());
assertThat(metadata.get("state"), equalTo("close"));

final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
assertThat(blocks, notNullValue());
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));

final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
assertThat(settings, notNullValue());

final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));

final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
if (checkRoutingTable) {
assertThat(routingTable, notNullValue());
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));

for (int i = 0; i < numberOfShards; i++) {
final Collection<Map<String, ?>> shards =
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
assertThat(shards, notNullValue());
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
for (Map<String, ?> shard : shards) {
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
}
}
} else {
assertThat(routingTable, nullValue());
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -90,6 +92,8 @@ public class MetaDataIndexStateService {
public static final int INDEX_CLOSED_BLOCK_ID = 4;
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false,
false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
public static final Setting<Boolean> VERIFIED_BEFORE_CLOSE_SETTING =
Setting.boolSetting("index.verified_before_close", false, Setting.Property.IndexScope, Setting.Property.PrivateIndex);

private final ClusterService clusterService;
private final AllocationService allocationService;
Expand Down Expand Up @@ -402,15 +406,22 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
continue;
}

logger.debug("closing index {} succeeded", index);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
final IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE);
if (removeRoutingTable) {
metadata.put(updatedMetaData);
routingTable.remove(index.getName());
} else {
metadata.put(updatedMetaData
.settingsVersion(indexMetaData.getSettingsVersion() + 1)
.settings(Settings.builder()
.put(indexMetaData.getSettings())
.put(VERIFIED_BEFORE_CLOSE_SETTING.getKey(), true)));
routingTable.addAsFromOpenToClose(metadata.getSafe(index));
}

logger.debug("closing index {} succeeded", index);
closedIndices.add(index.getName());
} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
Expand Down Expand Up @@ -490,7 +501,15 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState)
for (IndexMetaData indexMetaData : indicesToOpen) {
final Index index = indexMetaData.getIndex();
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build();
final Settings.Builder updatedSettings = Settings.builder().put(indexMetaData.getSettings());
updatedSettings.remove(VERIFIED_BEFORE_CLOSE_SETTING.getKey());

IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData)
.state(IndexMetaData.State.OPEN)
.settingsVersion(indexMetaData.getSettingsVersion() + 1)
.settings(updatedSettings)
.build();

// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion);
Expand Down Expand Up @@ -554,4 +573,9 @@ public static ClusterBlock createIndexClosingBlock() {
EnumSet.of(ClusterBlockLevel.WRITE));
}

public static boolean isIndexVerifiedBeforeClosed(final IndexMetaData indexMetaData) {
return indexMetaData.getState() == IndexMetaData.State.CLOSE
&& VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetaData.getSettings())
&& VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetaData.getSettings());
}
}
Loading

0 comments on commit 71f5c34

Please sign in to comment.