diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 1f2c9a0f578cc..b3c6d12ab96e3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -27,6 +27,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -38,6 +39,7 @@ import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; @@ -298,11 +300,19 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener l // Persist the global checkpoint. indexShard.sync(); indexShard.persistRetentionLeases(); + if (hasUncommittedOperations()) { + indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + } indexShard.finalizeRecovery(); return null; }); } + private boolean hasUncommittedOperations() throws IOException { + long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0; + } + @Override public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { indexShard.activateWithPrimaryContext(primaryContext); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 4196472334ca9..3130cebad7097 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -52,9 +53,12 @@ import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.recovery.RecoveryStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.analysis.AnalysisModule; +import org.elasticsearch.indices.flush.SyncedFlushUtil; import org.elasticsearch.indices.recovery.RecoveryState.Stage; import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.AnalysisPlugin; @@ -84,14 +88,19 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -910,6 +919,39 @@ public void testDoNotInfinitelyWaitForMapping() { assertHitCount(client().prepareSearch().get(), numDocs); } + public void testRecoveryFlushReplica() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + String indexName = "test-index"; + createIndex(indexName, Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", 1).build()); + int numDocs = randomIntBetween(0, 10); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 1))); + ensureGreen(indexName); + ShardId shardId = null; + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) { + shardId = shardStats.getShardRouting().shardId(); + if (shardStats.getShardRouting().primary() == false) { + assertThat(shardStats.getCommitStats().getNumDocs(), equalTo(numDocs)); + SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( + shardStats.getCommitStats().getUserData().entrySet()); + assertThat(commitInfo.localCheckpoint, equalTo(shardStats.getSeqNoStats().getLocalCheckpoint())); + assertThat(commitInfo.maxSeqNo, equalTo(shardStats.getSeqNoStats().getMaxSeqNo())); + } + } + SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); + assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(indexName).get().failedShards(), equalTo(0))); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 2))); + ensureGreen(indexName); + // Recovery should keep syncId if no indexing activity on the primary after synced-flush. + Set syncIds = Stream.of(client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) + .map(shardStats -> shardStats.getCommitStats().syncId()) + .collect(Collectors.toSet()); + assertThat(syncIds, hasSize(1)); + } + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { final AtomicBoolean throwParsingError = new AtomicBoolean(); @Override