Skip to content

Commit

Permalink
Fix missing historyUUID in peer recovery when rolling upgrade 5.x to …
Browse files Browse the repository at this point in the history
…6.3 (#31506)

Today we make sure that a 5.x index commit should have all required
commit tags in RecoveryTarget#cleanFiles method. The reason we do this
in RecoveryTarget#cleanFiles method because this is only needed in a
file-based recovery and we assume that #cleanFiles should be called in a
file-based recovery. However, this assumption is not valid if the index
is sealed (.i.e synced-flush). This incorrect assumption would prevent
users from rolling upgrade from 5.x to 6.3 if their index were sealed.

Closes #31482
  • Loading branch information
dnhatn committed Jun 22, 2018
1 parent 65ce504 commit 9100a70
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Booleans;
Expand Down Expand Up @@ -713,8 +714,21 @@ public void testRecovery() throws Exception {

// make sure all recoveries are done
ensureGreen(index);
// Explicitly flush so we're sure to have a bunch of documents in the Lucene index
client().performRequest("POST", "/_flush");
// Recovering a synced-flush index from 5.x to 6.x might be subtle as a 5.x index commit does not have all 6.x commit tags.
if (randomBoolean()) {
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
Response resp = client().performRequest(new Request("POST", index + "/_flush/synced"));
assertOK(resp);
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("successful"), equalTo(result.get("total")));
assertThat(result.get("failed"), equalTo(0));
});
} else {
// Explicitly flush so we're sure to have a bunch of documents in the Lucene index
assertOK(client().performRequest(new Request("POST", "/_flush")));
}
if (shouldHaveTranslog) {
// Update a few documents so we are sure to have a translog
indexRandomDocuments(count / 10, false /* Flushing here would invalidate the whole thing....*/, false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.http.entity.StringEntity;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -285,4 +286,29 @@ public void testSearchGeoPoints() throws Exception {
}
}

public void testRecoverSyncedFlushIndex() throws Exception {
final String index = "recover_synced_flush_index";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
createIndex(index, settings.build());
indexDocs(index, 0, randomInt(5));
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
Response resp = client().performRequest(new Request("POST", index + "/_flush/synced"));
assertOK(resp);
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("successful"), equalTo(2));
});
}
ensureGreen(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ private void ensureRefCount() {

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
if (fileBasedRecovery && indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0)) {
store.ensureIndexHas6xCommitTags();
}
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
}
Expand Down Expand Up @@ -438,9 +441,6 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
store.incRef();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
store.ensureIndexHas6xCommitTags();
}
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPrimaryTerm());
Expand Down

0 comments on commit 9100a70

Please sign in to comment.