From 9bf19bf7f911359f001e71ca25264c2a3c712c78 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Jun 2018 09:52:19 -0400 Subject: [PATCH 1/4] Fix missing historyUUID in peer recovery when rolling upgrade 5.x to 6.3 Today we make sure that a 5.x index commit should have all required commit tags in RecoveryTarget#cleanFiles method. The reason we do this in RecoveryTarget#cleanFiles method because this is only needed in a file-based recovery and we assume that #cleanFiles should be called in a file-based recovery. However, this assumption is not valid if the index is sealed (.i.e synced-flush). This incorrect assumption would prevent users from rolling upgrade from 5.x to 6.3 if their index were sealed. --- .../upgrades/FullClusterRestartIT.java | 41 +++++++++++++++++++ .../elasticsearch/upgrades/RecoveryIT.java | 24 +++++++++++ .../indices/recovery/RecoveryTarget.java | 6 +-- 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 54e2d88dde769..c82507d0bb301 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -24,6 +24,7 @@ import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; +import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Booleans; @@ -788,6 +789,45 @@ public void testRecovery() throws Exception { } } + /** + * Tests that a synced-flushed index is correctly recovered. + * This might be an edge-case from 5.x to 6.x since a 5.x index commit does not have all required 6.x commit tags. + */ + public void testRecoverySealedIndex() throws Exception { + int count; + if (runningAgainstOldCluster) { + count = randomInt(10); + } else { + count = countOfIndexedRandomDocuments(); + } + if (runningAgainstOldCluster) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexRandomDocuments(count, randomBoolean(), randomBoolean(), + n -> JsonXContent.contentBuilder().startObject().field("key", "value").endObject()); + assertBusy(() -> { + Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); + assertOK(resp); + Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); + assertThat(result.get("successful"), equalTo(2)); + }); + } + ensureGreen(index); + refresh(); + Request countRequest = new Request("GET", "/" + index + "/_search"); + countRequest.addParameter("size", "0"); + String countResponse = toStr(client().performRequest(countRequest)); + assertThat(countResponse, containsString("\"total\":" + count)); + } + /** * Tests snapshot/restore by creating a snapshot and restoring it. It takes * a snapshot on the old cluster and restores it on the old cluster as a @@ -892,6 +932,7 @@ public void testHistoryUUIDIsAdded() throws Exception { mappingsAndSettings.endObject(); client().performRequest("PUT", "/" + index, Collections.emptyMap(), new StringEntity(Strings.toString(mappingsAndSettings), ContentType.APPLICATION_JSON)); + } else { Response response = client().performRequest("GET", index + "/_stats", singletonMap("level", "shards")); List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index f7bbe4847b959..7c8dce1aeedff 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -22,6 +22,7 @@ import org.apache.http.entity.StringEntity; import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; @@ -285,4 +286,27 @@ public void testSearchGeoPoints() throws Exception { } } + public void testRecoverySealedIndex() throws Exception { + final String index = "recovery_a_sealed_index"; + if (CLUSTER_TYPE == ClusterType.OLD) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, randomInt(5)); + assertBusy(() -> { + Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); + assertOK(resp); + Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); + assertThat(result.get("successful"), equalTo(2)); + }); + } + ensureGreen(index); + } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e2004eda17fc1..7ef8534f93d02 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -362,6 +362,9 @@ private void ensureRefCount() { @Override public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + if (fileBasedRecovery && indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0)) { + store.ensureIndexHas6xCommitTags(); + } state().getTranslog().totalOperations(totalTranslogOps); indexShard().openEngineAndSkipTranslogRecovery(); } @@ -438,9 +441,6 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa store.incRef(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); - if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { - store.ensureIndexHas6xCommitTags(); - } // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPrimaryTerm()); From b93cc20062337a83899c484da4379ad357fd31d4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Jun 2018 11:31:22 -0400 Subject: [PATCH 2/4] Yannick feedback --- .../upgrades/FullClusterRestartIT.java | 15 +++++++-------- .../org/elasticsearch/upgrades/RecoveryIT.java | 6 ++++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index c82507d0bb301..d3a46d7ddbace 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -793,13 +793,8 @@ public void testRecovery() throws Exception { * Tests that a synced-flushed index is correctly recovered. * This might be an edge-case from 5.x to 6.x since a 5.x index commit does not have all required 6.x commit tags. */ - public void testRecoverySealedIndex() throws Exception { - int count; - if (runningAgainstOldCluster) { - count = randomInt(10); - } else { - count = countOfIndexedRandomDocuments(); - } + public void testRecoverSyncedFlushIndex() throws Exception { + final int count; if (runningAgainstOldCluster) { Settings.Builder settings = Settings.builder() .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -811,14 +806,19 @@ public void testRecoverySealedIndex() throws Exception { .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster createIndex(index, settings.build()); + count = randomInt(10); indexRandomDocuments(count, randomBoolean(), randomBoolean(), n -> JsonXContent.contentBuilder().startObject().field("key", "value").endObject()); + // We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation. + // A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit. assertBusy(() -> { Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); assertOK(resp); Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); assertThat(result.get("successful"), equalTo(2)); }); + } else { + count = countOfIndexedRandomDocuments(); } ensureGreen(index); refresh(); @@ -932,7 +932,6 @@ public void testHistoryUUIDIsAdded() throws Exception { mappingsAndSettings.endObject(); client().performRequest("PUT", "/" + index, Collections.emptyMap(), new StringEntity(Strings.toString(mappingsAndSettings), ContentType.APPLICATION_JSON)); - } else { Response response = client().performRequest("GET", index + "/_stats", singletonMap("level", "shards")); List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 7c8dce1aeedff..eea8b915fb40e 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -286,8 +286,8 @@ public void testSearchGeoPoints() throws Exception { } } - public void testRecoverySealedIndex() throws Exception { - final String index = "recovery_a_sealed_index"; + public void testRecoverSyncedFlushIndex() throws Exception { + final String index = "recover_synced_flush_index"; if (CLUSTER_TYPE == ClusterType.OLD) { Settings.Builder settings = Settings.builder() .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -300,6 +300,8 @@ public void testRecoverySealedIndex() throws Exception { .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster createIndex(index, settings.build()); indexDocs(index, 0, randomInt(5)); + // We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation. + // A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit. assertBusy(() -> { Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); assertOK(resp); From 814bcfaf72241c1709c676f4d1e2e0a1c261d001 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Jun 2018 16:47:38 -0400 Subject: [PATCH 3/4] fold to the recovery test --- .../upgrades/FullClusterRestartIT.java | 55 +++++-------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index d3a46d7ddbace..a079fd664208d 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -714,8 +714,20 @@ public void testRecovery() throws Exception { // make sure all recoveries are done ensureGreen(index); - // Explicitly flush so we're sure to have a bunch of documents in the Lucene index - client().performRequest("POST", "/_flush"); + // Recovering a synced-flush index from 5.x to 6.x might be subtle as a 5.x index commit does not have all 6.x commit tags. + if (randomBoolean()) { + // We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation. + // A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit. + assertBusy(() -> { + Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); + assertOK(resp); + Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); + assertThat(result.get("successful"), equalTo(2)); + }); + } else { + // Explicitly flush so we're sure to have a bunch of documents in the Lucene index + assertOK(client().performRequest(new Request("POST", "/_flush"))); + } if (shouldHaveTranslog) { // Update a few documents so we are sure to have a translog indexRandomDocuments(count / 10, false /* Flushing here would invalidate the whole thing....*/, false, @@ -789,45 +801,6 @@ public void testRecovery() throws Exception { } } - /** - * Tests that a synced-flushed index is correctly recovered. - * This might be an edge-case from 5.x to 6.x since a 5.x index commit does not have all required 6.x commit tags. - */ - public void testRecoverSyncedFlushIndex() throws Exception { - final int count; - if (runningAgainstOldCluster) { - Settings.Builder settings = Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) - // if the node with the replica is the first to be restarted, while a replica is still recovering - // then delayed allocation will kick in. When the node comes back, the master will search for a copy - // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN - // before timing out - .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") - .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster - createIndex(index, settings.build()); - count = randomInt(10); - indexRandomDocuments(count, randomBoolean(), randomBoolean(), - n -> JsonXContent.contentBuilder().startObject().field("key", "value").endObject()); - // We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation. - // A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit. - assertBusy(() -> { - Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); - assertOK(resp); - Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); - assertThat(result.get("successful"), equalTo(2)); - }); - } else { - count = countOfIndexedRandomDocuments(); - } - ensureGreen(index); - refresh(); - Request countRequest = new Request("GET", "/" + index + "/_search"); - countRequest.addParameter("size", "0"); - String countResponse = toStr(client().performRequest(countRequest)); - assertThat(countResponse, containsString("\"total\":" + count)); - } - /** * Tests snapshot/restore by creating a snapshot and restoring it. It takes * a snapshot on the old cluster and restores it on the old cluster as a From fd89c62ba12e801a254e4cb680228c6a7c22a2a8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Jun 2018 20:36:06 -0400 Subject: [PATCH 4/4] Correct number_of_shards --- .../java/org/elasticsearch/upgrades/FullClusterRestartIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index a079fd664208d..a205d21f33775 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -722,7 +722,8 @@ public void testRecovery() throws Exception { Response resp = client().performRequest(new Request("POST", index + "/_flush/synced")); assertOK(resp); Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); - assertThat(result.get("successful"), equalTo(2)); + assertThat(result.get("successful"), equalTo(result.get("total"))); + assertThat(result.get("failed"), equalTo(0)); }); } else { // Explicitly flush so we're sure to have a bunch of documents in the Lucene index