Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify consistency of version and source in disruption tests #41661

Merged
merged 6 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
Expand Down Expand Up @@ -134,6 +135,7 @@ public void testAckedIndexing() throws Exception {
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();

final ConflictMode conflictMode = ConflictMode.randomMode();
final List<String> fieldNames = IntStream.rangeClosed(0, randomInt(10)).mapToObj(n -> "f" + n).collect(Collectors.toList());

logger.info("starting indexers using conflict mode " + conflictMode);
try {
Expand All @@ -156,7 +158,7 @@ public void testAckedIndexing() throws Exception {
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id)
.setSource("{}", XContentType.JSON)
.setSource(Collections.singletonMap(randomFrom(fieldNames), randomNonNegativeLong()), XContentType.JSON)
.setTimeout(timeout);

if (conflictMode == ConflictMode.external) {
Expand Down Expand Up @@ -459,7 +461,9 @@ public void testRestartNodeWhileIndexing() throws Exception {
while (stopped.get() == false && docID.get() < 5000) {
String id = Integer.toString(docID.incrementAndGet());
try {
IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get();
IndexResponse response = client().prepareIndex(index, "_doc", id)
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
.get();
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo());
ackedDocs.add(response.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4384,7 +4384,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException {
Randomness.shuffle(seqNos);
final EngineConfig engineConfig;
final SeqNoStats prevSeqNoStats;
final List<DocIdSeqNoAndTerm> prevDocs;
final List<DocIdSeqNoAndSource> prevDocs;
final int totalTranslogOps;
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
engineConfig = engine.config();
Expand Down Expand Up @@ -5491,7 +5491,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndTerm> docs;
final List<DocIdSeqNoAndSource> docs;
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
for (Engine.Operation op : operations) {
Expand Down Expand Up @@ -5538,7 +5538,7 @@ public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
final IndexSettings softDeletesEnabled = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder().
put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)).build());
final List<DocIdSeqNoAndTerm> docs;
final List<DocIdSeqNoAndSource> docs;
try (InternalEngine engine = createEngine(
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -274,7 +275,14 @@ public void run() {
pullOperations(engine);
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(getDocIds(engine, true), equalTo(getDocIds(leader, true)));
// have to verify without source since we are randomly testing without _source
List<DocIdSeqNoAndSource> docsWithoutSourceOnFollower = getDocIds(engine, true).stream()
.map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion()))
.collect(Collectors.toList());
List<DocIdSeqNoAndSource> docsWithoutSourceOnLeader = getDocIds(leader, true).stream()
.map(d -> new DocIdSeqNoAndSource(d.getId(), null, d.getSeqNo(), d.getPrimaryTerm(), d.getVersion()))
.collect(Collectors.toList());
assertThat(docsWithoutSourceOnFollower, equalTo(docsWithoutSourceOnLeader));
} catch (Exception ex) {
throw new AssertionError(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testReadOnlyEngine() throws Exception {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
int numDocs = scaledRandomIntBetween(10, 1000);
final SeqNoStats lastSeqNoStats;
final List<DocIdSeqNoAndTerm> lastDocIds;
final List<DocIdSeqNoAndSource> lastDocIds;
try (InternalEngine engine = createEngine(config)) {
Engine.Get get = null;
for (int i = 0; i < numDocs; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
Expand Down Expand Up @@ -770,7 +770,7 @@ public void testRollbackOnPromotion() throws Exception {
}
}
shards.refresh("test");
List<DocIdSeqNoAndTerm> docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean())
List<DocIdSeqNoAndSource> docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean())
.stream().filter(doc -> doc.getSeqNo() <= newPrimary.getGlobalCheckpoint()).collect(Collectors.toList());
CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean();
Expand All @@ -780,7 +780,7 @@ public void testRollbackOnPromotion() throws Exception {
latch.countDown();
while (done.get() == false) {
try {
List<DocIdSeqNoAndTerm> exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean());
List<DocIdSeqNoAndSource> exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean());
assertThat(docsBelowGlobalCheckpoint, everyItem(isIn(exposedDocs)));
assertThat(randomFrom(replicas).getLocalCheckpoint(), greaterThanOrEqualTo(initDocs - 1L));
} catch (AlreadyClosedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.DeleteResult;
import org.elasticsearch.index.engine.EngineException;
Expand Down Expand Up @@ -3664,7 +3664,7 @@ public void testResetEngine() throws Exception {
while (done.get() == false) {
try {
List<String> exposedDocIds = EngineTestCase.getDocIds(getEngine(shard), rarely())
.stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toList());
.stream().map(DocIdSeqNoAndSource::getId).collect(Collectors.toList());
assertThat("every operations before the global checkpoint must be reserved",
docBelowGlobalCheckpoint, everyItem(isIn(exposedDocIds)));
} catch (AlreadyClosedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,34 @@
package org.elasticsearch.index.engine;


import org.apache.lucene.util.BytesRef;

import java.util.Objects;

/** A tuple of document id, sequence number and primary term of a document */
public final class DocIdSeqNoAndTerm {
/** A tuple of document id, sequence number, primary term, source and version of a document */
public final class DocIdSeqNoAndSource {
private final String id;
private final BytesRef source;
private final long seqNo;
private final long primaryTerm;
private final long version;

public DocIdSeqNoAndTerm(String id, long seqNo, long primaryTerm) {
public DocIdSeqNoAndSource(String id, BytesRef source, long seqNo, long primaryTerm, long version) {
this.id = id;
this.source = source;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
}

public String getId() {
return id;
}

public BytesRef getSource() {
return source;
}

public long getSeqNo() {
return seqNo;
}
Expand All @@ -46,21 +56,27 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public long getVersion() {
return version;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DocIdSeqNoAndTerm that = (DocIdSeqNoAndTerm) o;
return Objects.equals(id, that.id) && seqNo == that.seqNo && primaryTerm == that.primaryTerm;
DocIdSeqNoAndSource that = (DocIdSeqNoAndSource) o;
return Objects.equals(id, that.id) && Objects.equals(source, that.source)
&& seqNo == that.seqNo && primaryTerm == that.primaryTerm && version == that.version;
}

@Override
public int hashCode() {
return Objects.hash(id, seqNo, primaryTerm);
return Objects.hash(id, source, seqNo, primaryTerm, version);
}

@Override
public String toString() {
return "DocIdSeqNoAndTerm{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + "}";
return "doc{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm
+ " version=" + version + " source= " + (source != null ? source.utf8ToString() : null) + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -995,16 +996,17 @@ public static Engine.Result applyOperation(Engine engine, Engine.Operation opera
/**
* Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
*/
public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh) throws IOException {
public static List<DocIdSeqNoAndSource> getDocIds(Engine engine, boolean refresh) throws IOException {
if (refresh) {
engine.refresh("test_get_doc_ids");
}
try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) {
List<DocIdSeqNoAndTerm> docs = new ArrayList<>();
List<DocIdSeqNoAndSource> docs = new ArrayList<>();
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
LeafReader reader = leafContext.reader();
NumericDocValues seqNoDocValues = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
NumericDocValues primaryTermDocValues = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
NumericDocValues versionDocValues = reader.getNumericDocValues(VersionFieldMapper.NAME);
Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < reader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
Expand All @@ -1013,20 +1015,25 @@ public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh)
continue;
}
final long primaryTerm = primaryTermDocValues.longValue();
Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME));
BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME);
Document doc = reader.document(i, Sets.newHashSet(IdFieldMapper.NAME, SourceFieldMapper.NAME));
BytesRef binaryID = doc.getBinaryValue(IdFieldMapper.NAME);
String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length));
final BytesRef source = doc.getBinaryValue(SourceFieldMapper.NAME);
if (seqNoDocValues.advanceExact(i) == false) {
throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]");
}
final long seqNo = seqNoDocValues.longValue();
docs.add(new DocIdSeqNoAndTerm(id, seqNo, primaryTerm));
if (versionDocValues.advanceExact(i) == false) {
throw new AssertionError("versionDocValues not found for doc[" + i + "] id[" + id + "]");
}
final long version = versionDocValues.longValue();
docs.add(new DocIdSeqNoAndSource(id, source, seqNo, primaryTerm, version));
}
}
}
docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo)
.thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)
.thenComparing((DocIdSeqNoAndTerm::getId)));
docs.sort(Comparator.comparingLong(DocIdSeqNoAndSource::getSeqNo)
.thenComparingLong(DocIdSeqNoAndSource::getPrimaryTerm)
.thenComparing((DocIdSeqNoAndSource::getId)));
return docs;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
Expand Down Expand Up @@ -479,7 +479,7 @@ public synchronized void close() throws Exception {
if (closed == false) {
closed = true;
try {
final List<DocIdSeqNoAndTerm> docsOnPrimary = getDocIdAndSeqNos(primary);
final List<DocIdSeqNoAndSource> docsOnPrimary = getDocIdAndSeqNos(primary);
for (IndexShard replica : replicas) {
assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp()));
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase;
Expand Down Expand Up @@ -703,10 +703,10 @@ private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) th
}

public static Set<String> getShardDocUIDs(final IndexShard shard) throws IOException {
return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toSet());
return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndSource::getId).collect(Collectors.toSet());
}

public static List<DocIdSeqNoAndTerm> getDocIdAndSeqNos(final IndexShard shard) throws IOException {
public static List<DocIdSeqNoAndSource> getDocIdAndSeqNos(final IndexShard shard) throws IOException {
return EngineTestCase.getDocIds(shard.getEngine(), true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
Expand Down Expand Up @@ -1455,7 +1455,7 @@ public void assertSameDocIdsOnShards() throws Exception {
if (primaryShard == null) {
continue;
}
final List<DocIdSeqNoAndTerm> docsOnPrimary;
final List<DocIdSeqNoAndSource> docsOnPrimary;
try {
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
} catch (AlreadyClosedException ex) {
Expand All @@ -1466,7 +1466,7 @@ public void assertSameDocIdsOnShards() throws Exception {
if (replicaShard == null) {
continue;
}
final List<DocIdSeqNoAndTerm> docsOnReplica;
final List<DocIdSeqNoAndSource> docsOnReplica;
try {
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
} catch (AlreadyClosedException ex) {
Expand Down