diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 21c4d81fe4b62..1a15837c6891c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -497,8 +497,10 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - // we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number - if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue; + // if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and + // any ops before the starting sequence number + final long seqNo = operation.seqNo(); + if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue; operations.add(operation); ops++; size += operation.estimateSize(); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e304ff46b5527..038cfa2e63768 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; @@ -27,6 +28,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -35,13 +37,23 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SegmentsStats; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; @@ -60,6 +72,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -136,6 +149,72 @@ public void close() throws IOException { IOUtils.close(reader, store, targetStore); } + public void testSendSnapshotSendsOps() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); + final long startingSeqNo = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); + final StartRecoveryRequest request = new StartRecoveryRequest( + shardId, + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + null, + randomBoolean(), + randomNonNegativeLong(), + randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + final IndexShard shard = mock(IndexShard.class); + when(shard.state()).thenReturn(IndexShardState.STARTED); + final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class); + final RecoverySourceHandler handler = + new RecoverySourceHandler(shard, recoveryTarget, request, () -> 0L, e -> () -> {}, fileChunkSizeInBytes, Settings.EMPTY); + final List operations = new ArrayList<>(); + final int initialNumberOfDocs = randomIntBetween(16, 64); + for (int i = 0; i < initialNumberOfDocs; i++) { + final Engine.Index index = getIndex(Integer.toString(i)); + operations.add(new Translog.Index(index, new Engine.IndexResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true))); + } + final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64); + for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) { + final Engine.Index index = getIndex(Integer.toString(i)); + operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); + } + operations.add(null); + int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { + private int counter = 0; + + @Override + public int totalOperations() { + return operations.size() - 1; + } + + @Override + public Translog.Operation next() throws IOException { + return operations.get(counter++); + } + }); + if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); + } else { + assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); + } + } + + private Engine.Index getIndex(final String id) { + final String type = "test"; + final ParseContext.Document document = new ParseContext.Document(); + document.add(new TextField("test", "test", Field.Store.YES)); + final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); + final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); + final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[] { 1 }); + final ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, null); + return new Engine.Index(new Term("_uid", doc.uid()), doc); + } + public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). put("indices.recovery.concurrent_small_file_streams", 1).build(); diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index b5bd18aba5f14..353c65484a69b 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -27,7 +27,9 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -35,6 +37,7 @@ import org.elasticsearch.test.rest.yaml.ObjectPath; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -93,7 +96,8 @@ protected int indexDocs(String index, final int idStart, final int numDocs) thro public void testSeqNoCheckpoints() throws Exception { Nodes nodes = buildNodeAndVersions(); logger.info("cluster discovered: {}", nodes.toString()); - final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); Settings.Builder settings = Settings.builder() .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) @@ -109,26 +113,60 @@ public void testSeqNoCheckpoints() throws Exception { createIndex(index, settings.build()); try (RestClient newNodeClient = buildClient(restClientSettings(), nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { - int numDocs = indexDocs(index, 0, randomInt(5)); + int numDocs = 0; + final int numberOfInitialDocs = 1 + randomInt(5); + logger.info("indexing [{}] docs initially", numberOfInitialDocs); + numDocs += indexDocs(index, 0, numberOfInitialDocs); assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); - logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); - logger.info("indexing some more docs"); - numDocs += indexDocs(index, numDocs, randomInt(5)); + assertOK(client().performRequest("POST", index + "/_refresh")); + for (final String bwcName : bwcNamesList) { + assertCount(index, "_only_nodes:" + bwcName, numDocs); + } + final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5); + logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes); + numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes); assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); - logger.info("moving primary to new node"); Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); - logger.info("indexing some more docs"); - int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5)); - numDocs += numDocsOnNewPrimary; + int numDocsOnNewPrimary = 0; + final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5); + logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); + numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary); + numDocs += numberOfDocsAfterMovingPrimary; + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + /* + * Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in + * the recovery code. + */ + logger.info("setting number of replicas to 0"); + updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0)); + final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5); + logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas); + numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas); + numDocs += numberOfDocsAfterDroppingReplicas; + logger.info("setting number of replicas to 1"); + updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + // the number of documents on the primary and on the recovered replica should match the number of indexed documents + assertCount(index, "_primary", numDocs); + assertCount(index, "_replica", numDocs); assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); } } + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { + final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); + assertOK(response); + final int actualCount = Integer.parseInt(objectPath(response).evaluate("count").toString()); + assertThat(actualCount, equalTo(expectedCount)); + } + private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { assertBusy(() -> { try {