From d8eab5053c4cc8017f6eb7893bfec1201447bcec Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 29 Apr 2019 11:30:18 -0400 Subject: [PATCH] Verify consistency of version and source in disruption tests (#41614) With this change, we will verify the consistency of version and source (besides id, seq_no, and term) of live documents between shard copies at the end of disruption tests. --- .../discovery/ClusterDisruptionIT.java | 8 +++-- .../index/engine/InternalEngineTests.java | 6 ++-- .../engine/LuceneChangesSnapshotTests.java | 10 ++++++- .../index/engine/ReadOnlyEngineTests.java | 2 +- .../RecoveryDuringReplicationTests.java | 6 ++-- .../index/shard/IndexShardTests.java | 4 +-- ...oAndTerm.java => DocIdSeqNoAndSource.java} | 30 ++++++++++++++----- .../index/engine/EngineTestCase.java | 23 +++++++++----- .../ESIndexLevelReplicationTestCase.java | 4 +-- .../index/shard/IndexShardTestCase.java | 6 ++-- .../test/InternalTestCluster.java | 6 ++-- .../elasticsearch/xpack/CcrIntegTestCase.java | 24 +++++++-------- .../index/engine/FollowingEngineTests.java | 6 ++-- 13 files changed, 85 insertions(+), 50 deletions(-) rename test/framework/src/main/java/org/elasticsearch/index/engine/{DocIdSeqNoAndTerm.java => DocIdSeqNoAndSource.java} (58%) diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index f60e108c34ccd..5e86bd1bcbb1f 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; @@ -134,6 +135,7 @@ public void testAckedIndexing() throws Exception { final List exceptedExceptions = new CopyOnWriteArrayList<>(); final ConflictMode conflictMode = ConflictMode.randomMode(); + final List fieldNames = IntStream.rangeClosed(0, randomInt(10)).mapToObj(n -> "f" + n).collect(Collectors.toList()); logger.info("starting indexers using conflict mode " + conflictMode); try { @@ -156,7 +158,7 @@ public void testAckedIndexing() throws Exception { int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id) - .setSource("{}", XContentType.JSON) + .setSource(Collections.singletonMap(randomFrom(fieldNames), randomNonNegativeLong()), XContentType.JSON) .setTimeout(timeout); if (conflictMode == ConflictMode.external) { @@ -459,7 +461,9 @@ public void testRestartNodeWhileIndexing() throws Exception { while (stopped.get() == false && docID.get() < 5000) { String id = Integer.toString(docID.incrementAndGet()); try { - IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get(); + IndexResponse response = client().prepareIndex(index, "_doc", id) + .setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON) + .get(); assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo()); ackedDocs.add(response.getId()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index da157f5866870..42f84186ae4cc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4403,7 +4403,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { Randomness.shuffle(seqNos); final EngineConfig engineConfig; final SeqNoStats prevSeqNoStats; - final List prevDocs; + final List prevDocs; final int totalTranslogOps; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { engineConfig = engine.config(); @@ -5515,7 +5515,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { commits.add(new ArrayList<>()); try (Store store = createStore()) { EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); - final List docs; + final List docs; try (InternalEngine engine = createEngine(config)) { List flushedOperations = new ArrayList<>(); for (Engine.Operation op : operations) { @@ -5563,7 +5563,7 @@ public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception { final IndexSettings softDeletesEnabled = IndexSettingsModule.newIndexSettings( IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder(). put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)).build()); - final List docs; + final List docs; try (InternalEngine engine = createEngine( config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) { List ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index f179cd840c60e..d1840c4d97cff 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -274,7 +275,14 @@ public void run() { pullOperations(engine); } assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); - assertThat(getDocIds(engine, true), equalTo(getDocIds(leader, true))); + // have to verify without source since we are randomly testing without _source + List docsWithoutSourceOnFollower = getDocIds(engine, true).stream() + .map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion())) + .collect(Collectors.toList()); + List docsWithoutSourceOnLeader = getDocIds(leader, true).stream() + .map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion())) + .collect(Collectors.toList()); + assertThat(docsWithoutSourceOnFollower, equalTo(docsWithoutSourceOnLeader)); } catch (Exception ex) { throw new AssertionError(ex); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index e66094d7321a7..b689400601dc6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -44,7 +44,7 @@ public void testReadOnlyEngine() throws Exception { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); int numDocs = scaledRandomIntBetween(10, 1000); final SeqNoStats lastSeqNoStats; - final List lastDocIds; + final List lastDocIds; try (InternalEngine engine = createEngine(config)) { Engine.Get get = null; for (int i = 0; i < numDocs; i++) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index de6ab82892f51..d499cf6e83f90 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -40,7 +40,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; @@ -770,7 +770,7 @@ public void testRollbackOnPromotion() throws Exception { } } shards.refresh("test"); - List docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean()) + List docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean()) .stream().filter(doc -> doc.getSeqNo() <= newPrimary.getGlobalCheckpoint()).collect(Collectors.toList()); CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); @@ -780,7 +780,7 @@ public void testRollbackOnPromotion() throws Exception { latch.countDown(); while (done.get() == false) { try { - List exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean()); + List exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean()); assertThat(docsBelowGlobalCheckpoint, everyItem(isIn(exposedDocs))); assertThat(randomFrom(replicas).getLocalCheckpoint(), greaterThanOrEqualTo(initDocs - 1L)); } catch (AlreadyClosedException ignored) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6d08de8cf856b..ce1e4ab586037 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -79,7 +79,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.CommitStats; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.DeleteResult; import org.elasticsearch.index.engine.EngineException; @@ -3671,7 +3671,7 @@ public void testResetEngine() throws Exception { while (done.get() == false) { try { List exposedDocIds = EngineTestCase.getDocIds(getEngine(shard), rarely()) - .stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toList()); + .stream().map(DocIdSeqNoAndSource::getId).collect(Collectors.toList()); assertThat("every operations before the global checkpoint must be reserved", docBelowGlobalCheckpoint, everyItem(isIn(exposedDocIds))); } catch (AlreadyClosedException ignored) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java b/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndSource.java similarity index 58% rename from test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java rename to test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndSource.java index b24a010c1a0d6..a48e813c1a68e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndSource.java @@ -20,24 +20,34 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.util.BytesRef; + import java.util.Objects; -/** A tuple of document id, sequence number and primary term of a document */ -public final class DocIdSeqNoAndTerm { +/** A tuple of document id, sequence number, primary term, source and version of a document */ +public final class DocIdSeqNoAndSource { private final String id; + private final BytesRef source; private final long seqNo; private final long primaryTerm; + private final long version; - public DocIdSeqNoAndTerm(String id, long seqNo, long primaryTerm) { + public DocIdSeqNoAndSource(String id, BytesRef source, long seqNo, long primaryTerm, long version) { this.id = id; + this.source = source; this.seqNo = seqNo; this.primaryTerm = primaryTerm; + this.version = version; } public String getId() { return id; } + public BytesRef getSource() { + return source; + } + public long getSeqNo() { return seqNo; } @@ -46,21 +56,27 @@ public long getPrimaryTerm() { return primaryTerm; } + public long getVersion() { + return version; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - DocIdSeqNoAndTerm that = (DocIdSeqNoAndTerm) o; - return Objects.equals(id, that.id) && seqNo == that.seqNo && primaryTerm == that.primaryTerm; + DocIdSeqNoAndSource that = (DocIdSeqNoAndSource) o; + return Objects.equals(id, that.id) && Objects.equals(source, that.source) + && seqNo == that.seqNo && primaryTerm == that.primaryTerm && version == that.version; } @Override public int hashCode() { - return Objects.hash(id, seqNo, primaryTerm); + return Objects.hash(id, source, seqNo, primaryTerm, version); } @Override public String toString() { - return "DocIdSeqNoAndTerm{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + "}"; + return "doc{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + + " version=" + version + " source= " + (source != null ? source.utf8ToString() : null) + "}"; } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 7fb2d50302c11..4c613b81edf9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -63,6 +63,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -996,16 +997,17 @@ public static Engine.Result applyOperation(Engine engine, Engine.Operation opera /** * Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine. */ - public static List getDocIds(Engine engine, boolean refresh) throws IOException { + public static List getDocIds(Engine engine, boolean refresh) throws IOException { if (refresh) { engine.refresh("test_get_doc_ids"); } try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) { - List docs = new ArrayList<>(); + List docs = new ArrayList<>(); for (LeafReaderContext leafContext : searcher.reader().leaves()) { LeafReader reader = leafContext.reader(); NumericDocValues seqNoDocValues = reader.getNumericDocValues(SeqNoFieldMapper.NAME); NumericDocValues primaryTermDocValues = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + NumericDocValues versionDocValues = reader.getNumericDocValues(VersionFieldMapper.NAME); Bits liveDocs = reader.getLiveDocs(); for (int i = 0; i < reader.maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { @@ -1014,20 +1016,25 @@ public static List getDocIds(Engine engine, boolean refresh) continue; } final long primaryTerm = primaryTermDocValues.longValue(); - Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); - BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); + Document doc = reader.document(i, Sets.newHashSet(IdFieldMapper.NAME, SourceFieldMapper.NAME)); + BytesRef binaryID = doc.getBinaryValue(IdFieldMapper.NAME); String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)); + final BytesRef source = doc.getBinaryValue(SourceFieldMapper.NAME); if (seqNoDocValues.advanceExact(i) == false) { throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]"); } final long seqNo = seqNoDocValues.longValue(); - docs.add(new DocIdSeqNoAndTerm(id, seqNo, primaryTerm)); + if (versionDocValues.advanceExact(i) == false) { + throw new AssertionError("versionDocValues not found for doc[" + i + "] id[" + id + "]"); + } + final long version = versionDocValues.longValue(); + docs.add(new DocIdSeqNoAndSource(id, source, seqNo, primaryTerm, version)); } } } - docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo) - .thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm) - .thenComparing((DocIdSeqNoAndTerm::getId))); + docs.sort(Comparator.comparingLong(DocIdSeqNoAndSource::getSeqNo) + .thenComparingLong(DocIdSeqNoAndSource::getPrimaryTerm) + .thenComparing((DocIdSeqNoAndSource::getId))); return docs; } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e3fa935149763..d88cdf488fb70 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -65,7 +65,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -479,7 +479,7 @@ public synchronized void close() throws Exception { if (closed == false) { closed = true; try { - final List docsOnPrimary = getDocIdAndSeqNos(primary); + final List docsOnPrimary = getDocIdAndSeqNos(primary); for (IndexShard replica : replicas) { assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes())); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 5b5ff8de01d03..6a39896199cf2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -51,7 +51,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.query.DisabledQueryCache; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; @@ -703,10 +703,10 @@ private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) th } public static Set getShardDocUIDs(final IndexShard shard) throws IOException { - return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toSet()); + return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndSource::getId).collect(Collectors.toSet()); } - public static List getDocIdAndSeqNos(final IndexShard shard) throws IOException { + public static List getDocIdAndSeqNos(final IndexShard shard) throws IOException { return EngineTestCase.getDocIds(shard.getEngine(), true); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c41a0fdcbef3a..679280ee2bd64 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -85,7 +85,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; @@ -1455,7 +1455,7 @@ public void assertSameDocIdsOnShards() throws Exception { if (primaryShard == null) { continue; } - final List docsOnPrimary; + final List docsOnPrimary; try { docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard); } catch (AlreadyClosedException ex) { @@ -1466,7 +1466,7 @@ public void assertSameDocIdsOnShards() throws Exception { if (replicaShard == null) { continue; } - final List docsOnReplica; + final List docsOnReplica; try { docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard); } catch (AlreadyClosedException ex) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index fd84725e4bd6e..dea3da2a3ba1b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -49,7 +49,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -490,13 +490,13 @@ public static ResumeFollowAction.Request resumeFollow(String followerIndex) { protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String followerIndex) throws Exception { logger.info("--> asserting <> between {} and {}", leaderIndex, followerIndex); assertBusy(() -> { - Map> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex); - Map> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex); - Map> mismatchedDocs = new HashMap<>(); - for (Map.Entry> fe : docsOnFollower.entrySet()) { - Set d1 = Sets.difference( + Map> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex); + Map> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex); + Map> mismatchedDocs = new HashMap<>(); + for (Map.Entry> fe : docsOnFollower.entrySet()) { + Set d1 = Sets.difference( Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList()))); - Set d2 = Sets.difference( + Set d2 = Sets.difference( Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue())); if (d1.isEmpty() == false || d2.isEmpty() == false) { mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2)); @@ -525,11 +525,11 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f }, 120, TimeUnit.SECONDS); } - private Map> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException { + private Map> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException { final ClusterState state = cluster.client().admin().cluster().prepareState().get().getState(); List shardRoutings = state.routingTable().allShards(index); Randomness.shuffle(shardRoutings); - final Map> docs = new HashMap<>(); + final Map> docs = new HashMap<>(); for (ShardRouting shardRouting : shardRoutings) { if (shardRouting == null || shardRouting.assignedToNode() == false) { continue; @@ -537,14 +537,14 @@ private Map> getDocIdAndSeqNos(InternalTestClus IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName()) .indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); try { - final List docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard); + final List docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard); logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats()); docs.put(shardRouting.shardId().id(), docsOnShard.stream() // normalize primary term as the follower use its own term - .map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L)) + .map(d -> new DocIdSeqNoAndSource(d.getId(), d.getSource(), d.getSeqNo(), 1L, d.getVersion())) .collect(Collectors.toList())); } catch (AlreadyClosedException e) { - // Ignore this exception and try getting List from other IndexShard instance. + // Ignore this exception and try getting List from other IndexShard instance. } } return docs; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index dfac5ef2654b8..e3d997886334b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; @@ -623,7 +623,7 @@ public void testProcessOnceOnPrimary() throws Exception { assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); } } - for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { + for (DocIdSeqNoAndSource docId : getDocIds(followingEngine, true)) { assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } // Replica should accept duplicates @@ -635,7 +635,7 @@ public void testProcessOnceOnPrimary() throws Exception { Engine.Result result = applyOperation(followingEngine, op, newTerm, nonPrimary); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); } - for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { + for (DocIdSeqNoAndSource docId : getDocIds(followingEngine, true)) { assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } }