Skip to content

Commit

Permalink
Use tsdb's id in Engine tests (#85055)
Browse files Browse the repository at this point in the history
This should be enough to detect if tsdb's `_id` field mapper changes
enough to cause trouble for `Engine`. I suspect that in the end we'll
need something more like the changes that #84996 did for
RecoverySourceHandlerTests but that's a much bigger change that I'd
prefer to hold back until we need it. If tsdb's `_id` field mapper
changes enough to cause trouble for `Engine`.
  • Loading branch information
nik9000 committed Mar 21, 2022
1 parent ee8ce9c commit 9fe31df
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* {@code _tsid} and {@code @timestamp}.
*/
public class TsidExtractingIdFieldMapper extends IdFieldMapper {
private static final FieldType FIELD_TYPE = new FieldType();
public static final FieldType FIELD_TYPE = new FieldType();
/**
* Maximum length of the {@code _tsid} in the {@link #documentDescription}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.MapperRegistry;
import org.elasticsearch.index.mapper.ParsedDocument;
Expand Down Expand Up @@ -306,7 +306,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
assertSame(listener, indexService.getIndexOperationListeners().get(1));

ParsedDocument doc = InternalEngineTests.createParsedDoc("1", null);
ParsedDocument doc = EngineTestCase.createParsedDoc("1", EngineTestCase.randomIdFieldType(), null);
Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc);
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingSlowLog.IndexingSlowLogMessage;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -74,7 +74,7 @@ public void testLevelPrecedence() {
IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
IndexingSlowLog log = new IndexingSlowLog(settings);

ParsedDocument doc = InternalEngineTests.createParsedDoc("1", null);
ParsedDocument doc = EngineTestCase.createParsedDoc("1", EngineTestCase.randomIdFieldType(), null);
Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId("doc_id")), randomNonNegativeLong(), doc);
Engine.IndexResult result = Mockito.mock(Engine.IndexResult.class);// (0, 0, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
Mockito.when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testTwoLoggersDifferentLevel() {
);
IndexingSlowLog log2 = new IndexingSlowLog(index2Settings);

ParsedDocument doc = InternalEngineTests.createParsedDoc("1", null);
ParsedDocument doc = EngineTestCase.createParsedDoc("1", EngineTestCase.randomIdFieldType(), null);
Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId("doc_id")), randomNonNegativeLong(), doc);
Engine.IndexResult result = Mockito.mock(Engine.IndexResult.class);
Mockito.when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.elasticsearch.index.mapper.ProvidedIdFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -985,7 +987,7 @@ public void testSimpleOperations() throws Exception {

public void testGetWithSearcherWrapper() throws Exception {
engine.refresh("warm_up");
engine.index(indexForDoc(createParsedDoc("1", null)));
engine.index(indexForDoc(createParsedDoc("1", idFieldType, null)));
assertThat(engine.lastRefreshedCheckpoint(), equalTo(NO_OPS_PERFORMED));
MapperService mapperService = createMapperService();
MappingLookup mappingLookup = mapperService.mappingLookup();
Expand All @@ -1003,7 +1005,7 @@ public void testGetWithSearcherWrapper() throws Exception {
// refresh triggered, as we did not track translog location until the first realtime get.
assertThat(engine.lastRefreshedCheckpoint(), equalTo(0L));

engine.index(indexForDoc(createParsedDoc("1", null)));
engine.index(indexForDoc(createParsedDoc("1", idFieldType, null)));
try (Engine.GetResult get = engine.get(new Engine.Get(true, true, "1"), mappingLookup, documentParser, searcher -> searcher)) {
assertTrue(get.exists());
assertEquals(++translogGetCountExpected, translogGetCount.getAsLong());
Expand All @@ -1012,7 +1014,7 @@ public void testGetWithSearcherWrapper() throws Exception {
assertThat(engine.lastRefreshedCheckpoint(), equalTo(0L)); // no refresh; just read from translog

if (randomBoolean()) {
engine.index(indexForDoc(createParsedDoc("1", null)));
engine.index(indexForDoc(createParsedDoc("1", idFieldType, null)));
}
try (
Engine.GetResult get = engine.get(
Expand Down Expand Up @@ -1069,7 +1071,7 @@ public void testGetWithSearcherWrapper() throws Exception {
}
assertThat("no refresh, just read from translog or in-memory segment", engine.lastRefreshedCheckpoint(), equalTo(0L));

engine.index(indexForDoc(createParsedDoc("1", null)));
engine.index(indexForDoc(createParsedDoc("1", idFieldType, null)));
try (
Engine.GetResult get = engine.get(
new Engine.Get(true, true, "1"),
Expand Down Expand Up @@ -1517,6 +1519,7 @@ public void testVersioningNewIndex() throws IOException {
* we are testing an edge case here where we have a fully deleted segment that is retained but has all it's IDs pruned away.
*/
public void testLookupVersionWithPrunedAwayIds() throws IOException {
FieldType idFieldType = randomBoolean() ? ProvidedIdFieldMapper.Defaults.FIELD_TYPE : TsidExtractingIdFieldMapper.FIELD_TYPE;
try (Directory dir = newDirectory()) {
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(Lucene.STANDARD_ANALYZER);
indexWriterConfig.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
Expand All @@ -1533,7 +1536,7 @@ public void testLookupVersionWithPrunedAwayIds() throws IOException {
)
) {
org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document();
doc.add(new Field(IdFieldMapper.NAME, "1", ProvidedIdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new Field(IdFieldMapper.NAME, "1", idFieldType));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, -1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, 1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, 1));
Expand Down Expand Up @@ -1668,7 +1671,15 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource);
ParsedDocument doc = testParsedDocument(
Integer.toString(i),
idFieldType,
null,
testDocument(),
B_1,
null,
useRecoverySource
);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
if (useRecoverySource == false) {
Expand All @@ -1677,7 +1688,15 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
}
for (int i = 0; i < numDocs; i++) {
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, useRecoverySource);
ParsedDocument doc = testParsedDocument(
Integer.toString(i),
idFieldType,
null,
testDocument(),
B_1,
null,
useRecoverySource
);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocs.remove(doc.id());
Expand Down Expand Up @@ -1737,7 +1756,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
}
if (numSegments == 1) {
boolean useRecoverySource = randomBoolean() || omitSourceAllTheTime;
ParsedDocument doc = testParsedDocument("dummy", null, testDocument(), B_1, null, useRecoverySource);
ParsedDocument doc = testParsedDocument("dummy", idFieldType, null, testDocument(), B_1, null, useRecoverySource);
engine.index(indexForDoc(doc));
if (useRecoverySource == false) {
liveDocsWithSource.add(doc.id());
Expand Down Expand Up @@ -4626,7 +4645,7 @@ public void testLookupSeqNoByIdInLucene() throws Exception {
boolean isIndexing = randomBoolean();
int copies = frequently() ? 1 : between(2, 4);
for (int c = 0; c < copies; c++) {
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, idFieldType, null);
if (isIndexing) {
operations.add(
new Engine.Index(
Expand Down Expand Up @@ -5067,7 +5086,7 @@ public void testRandomOperations() throws Exception {
int numOps = between(10, 100);
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(randomIntBetween(1, 10));
ParsedDocument doc = createParsedDoc(id, null);
ParsedDocument doc = createParsedDoc(id, idFieldType, null);
Engine.Operation.TYPE type = randomFrom(Engine.Operation.TYPE.values());
switch (type) {
case INDEX -> {
Expand Down Expand Up @@ -5467,7 +5486,11 @@ public void testSeqNoGenerator() throws IOException {
)
) {
final String id = "id";
final Field uidField = new Field("_id", id, ProvidedIdFieldMapper.Defaults.FIELD_TYPE);
final Field uidField = new Field(
"_id",
id,
randomBoolean() ? ProvidedIdFieldMapper.Defaults.FIELD_TYPE : TsidExtractingIdFieldMapper.FIELD_TYPE
);
final Field versionField = new NumericDocValuesField("_version", 0);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
final LuceneDocument document = new LuceneDocument();
Expand Down Expand Up @@ -6465,7 +6488,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null);
ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), idFieldType, null);
if (randomBoolean()) {
Engine.IndexResult result = engine.index(indexForDoc(doc));
if (liveDocIds.add(doc.id()) == false) {
Expand Down Expand Up @@ -6772,7 +6795,7 @@ public void testRefreshAndCloseEngineConcurrently() throws Exception {
while (stopped.get() == false) {
String id = Integer.toString(randomIntBetween(1, 100));
try {
engine.index(indexForDoc(createParsedDoc(id, null)));
engine.index(indexForDoc(createParsedDoc(id, idFieldType, null)));
indexedDocs.release();
} catch (IOException e) {
throw new AssertionError(e);
Expand Down Expand Up @@ -7021,7 +7044,7 @@ private void runTestDeleteFailure(final CheckedBiConsumer<InternalEngine, Engine
iw.set(new ThrowingIndexWriter(dir, iwc));
return iw.get();
}, null, null, config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
engine.index(new Engine.Index(newUid("0"), primaryTerm.get(), InternalEngineTests.createParsedDoc("0", null)));
engine.index(new Engine.Index(newUid("0"), primaryTerm.get(), InternalEngineTests.createParsedDoc("0", idFieldType, null)));
final Engine.Delete op = new Engine.Delete("0", newUid("0"), primaryTerm.get());
consumer.accept(engine, op);
iw.get().setThrowFailure(() -> new IllegalArgumentException("fatal"));
Expand All @@ -7035,8 +7058,8 @@ private void runTestDeleteFailure(final CheckedBiConsumer<InternalEngine, Engine
}

public void testIndexThrottling() throws Exception {
final Engine.Index indexWithThrottlingCheck = spy(indexForDoc(createParsedDoc("1", null)));
final Engine.Index indexWithoutThrottlingCheck = spy(indexForDoc(createParsedDoc("2", null)));
final Engine.Index indexWithThrottlingCheck = spy(indexForDoc(createParsedDoc("1", idFieldType, null)));
final Engine.Index indexWithoutThrottlingCheck = spy(indexForDoc(createParsedDoc("2", idFieldType, null)));
doAnswer(invocation -> {
try {
assertTrue(engine.throttleLockIsHeldByCurrentThread());
Expand Down Expand Up @@ -7094,7 +7117,7 @@ public void afterRefresh(boolean didRefresh) {
Set<String> ids = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
engine.index(indexForDoc(createParsedDoc(id, null)));
engine.index(indexForDoc(createParsedDoc(id, idFieldType, null)));
ids.add(id);
}
final int refreshCountBeforeGet = refreshCount.get();
Expand All @@ -7105,7 +7128,7 @@ public void afterRefresh(boolean didRefresh) {
phaser.arriveAndAwaitAdvance();
int iters = randomIntBetween(1, 10);
for (int i = 0; i < iters; i++) {
ParsedDocument doc = createParsedDoc(randomFrom(ids), null);
ParsedDocument doc = createParsedDoc(randomFrom(ids), idFieldType, null);
try (
Engine.GetResult getResult = engine.get(
newGet(true, doc),
Expand All @@ -7123,7 +7146,7 @@ public void afterRefresh(boolean didRefresh) {
}
phaser.arriveAndAwaitAdvance();
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc("more-" + i, null)));
engine.index(indexForDoc(createParsedDoc("more-" + i, idFieldType, null)));
}
for (Thread getter : getters) {
getter.join();
Expand Down Expand Up @@ -7167,7 +7190,7 @@ public void afterRefresh(boolean didRefresh) {
);
try (InternalEngine engine = createEngine(config)) {
if (randomBoolean()) {
engine.index(indexForDoc(createParsedDoc("id", null)));
engine.index(indexForDoc(createParsedDoc("id", idFieldType, null)));
}
threadPool.executor(ThreadPool.Names.REFRESH)
.execute(
Expand Down Expand Up @@ -7299,7 +7322,7 @@ public void testMaxDocsOnPrimary() throws Exception {
for (int i = 0; i < numDocs; i++) {
final String id = Integer.toString(randomInt(numDocs));
if (randomBoolean()) {
operations.add(indexForDoc(createParsedDoc(id, null)));
operations.add(indexForDoc(createParsedDoc(id, idFieldType, null)));
} else {
operations.add(new Engine.Delete(id, newUid(id), primaryTerm.get()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testBasics() throws Exception {
int refreshedSeqNo = -1;
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(randomIntBetween(i, i + 5));
ParsedDocument doc = createParsedDoc(id, null, randomBoolean());
ParsedDocument doc = createParsedDoc(id, idFieldType, null, randomBoolean());
if (randomBoolean()) {
engine.index(indexForDoc(doc));
} else {
Expand Down Expand Up @@ -257,7 +257,7 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
int numOps = frequently() ? scaledRandomIntBetween(1, 1500) : scaledRandomIntBetween(5000, 20_000);
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(randomIntBetween(0, randomBoolean() ? 10 : numOps * 2));
ParsedDocument doc = createParsedDoc(id, randomAlphaOfLengthBetween(1, 5), randomBoolean());
ParsedDocument doc = createParsedDoc(id, idFieldType, randomAlphaOfLengthBetween(1, 5), randomBoolean());
final Engine.Operation op;
if (onPrimary) {
if (randomBoolean()) {
Expand Down Expand Up @@ -291,14 +291,14 @@ public void testAccessStoredFieldsSequentially() throws Exception {
int smallBatch = between(5, 9);
long seqNo = 0;
for (int i = 0; i < smallBatch; i++) {
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true));
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), idFieldType, null), 1, seqNo, true));
seqNo++;
}
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(1000), null), 1, 1000, true));
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(1000), idFieldType, null), 1, 1000, true));
seqNo = 11;
int largeBatch = between(15, 100);
for (int i = 0; i < largeBatch; i++) {
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true));
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), idFieldType, null), 1, seqNo, true));
seqNo++;
}
// disable optimization for a small batch
Expand Down

0 comments on commit 9fe31df

Please sign in to comment.