Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138515.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138515
summary: Integrate stored fields format bloom filter with synthetic `_id`
area: Codec
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/138678.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138678
summary: Ensure that synthetic `_id` is usable after restarts/relocations
area: Distributed
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,31 @@
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -53,7 +59,11 @@
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

/**
* Test suite for time series indices that use synthetic ids for documents.
Expand Down Expand Up @@ -140,6 +150,7 @@ public void testSyntheticId() throws Exception {
assertThat(result.getVersion(), equalTo(1L));
docs.put(result.getId(), result.getIndex());
}
final int initialNumberOfDocs = results.length;

enum Operation {
FLUSH,
Expand Down Expand Up @@ -260,12 +271,77 @@ enum Operation {

flush(dataStreamName);

// Check that synthetic _id field have no postings on disk
var indices = new HashSet<>(docs.values());
for (var index : indices) {
var diskUsage = diskUsage(index);
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
if (randomBoolean()) {
logger.info("--> restarting the cluster");
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
} else {
// Move all the shards to a new node to force relocations
var newNodeName = internalCluster().startDataOnlyNode();
logger.info("--> relocating all shards to {}", newNodeName);

var dataStream = client().admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.get()
.getState()
.getMetadata()
.getProject(ProjectId.DEFAULT)
.dataStreams()
.get(dataStreamName);
assertThat(dataStream, notNullValue());
for (Index index : dataStream.getIndices()) {
updateIndexSettings(Settings.builder().put("index.routing.allocation.require._name", newNodeName), index.getName());
ensureGreen(index.getName());
}
}

// After the restart/relocation we'll try to index the same set of initial metrics
// to ensure that the version lookup works as expected. Additionally, some of the
// docs might have been deleted, so those should go through without issues.
var bulkResponses = createDocumentsWithoutValidatingTheResponse(
dataStreamName,
// t + 0s
document(timestamp, "vm-dev01", "cpu-load", 0),
document(timestamp, "vm-dev02", "cpu-load", 1),
// t + 1s
document(timestamp.plus(1, unit), "vm-dev01", "cpu-load", 2),
document(timestamp.plus(1, unit), "vm-dev02", "cpu-load", 3),
// t + 0s out-of-order doc
document(timestamp, "vm-dev03", "cpu-load", 4),
// t + 2s
document(timestamp.plus(2, unit), "vm-dev01", "cpu-load", 5),
document(timestamp.plus(2, unit), "vm-dev02", "cpu-load", 6),
// t - 1s out-of-order doc
document(timestamp.minus(1, unit), "vm-dev01", "cpu-load", 7),
// t + 3s
document(timestamp.plus(3, unit), "vm-dev01", "cpu-load", 8),
document(timestamp.plus(3, unit), "vm-dev02", "cpu-load", 9)
);

var successfulRequests = Arrays.stream(bulkResponses).filter(response -> response.isFailed() == false).toList();
assertThat(successfulRequests, hasSize(deletedDocs.size()));

var failedRequests = Arrays.stream(bulkResponses).filter(BulkItemResponse::isFailed).toList();
assertThat(failedRequests, hasSize(initialNumberOfDocs - deletedDocs.size()));
for (BulkItemResponse failedRequest : failedRequests) {
assertThat(failedRequest.getFailure().getCause(), is(instanceOf(VersionConflictEngineException.class)));
}

// TODO: fix IndexDiskUsageStats to take into account synthetic _id terms
var checkDiskUsage = false;
if (checkDiskUsage) {
// Check that synthetic _id field have no postings on disk
var indices = new HashSet<>(docs.values());
for (var index : indices) {
var diskUsage = diskUsage(index);
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
// When _id's are only used to populate the bloom filter,
// IndexDiskUsageStats won't account for anything since
// the bloom filter it's not exposed through the Reader API and
// the analyzer expects to get documents with fields to do the
// disk usage accounting.
assertThat(diskUsageIdField, nullValue());
}
}
}

Expand Down Expand Up @@ -371,12 +447,21 @@ public void testGetFromTranslogBySyntheticId() throws Exception {
assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(metricOffset + doc.getItemId()));
}

// Check that synthetic _id field have no postings on disk
var indices = new HashSet<>(docs.values());
for (var index : indices) {
var diskUsage = diskUsage(index);
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
// TODO: fix IndexDiskUsageStats to take into account synthetic _id terms
var checkDiskUsage = false;
if (checkDiskUsage) {
// Check that synthetic _id field have no postings on disk
var indices = new HashSet<>(docs.values());
for (var index : indices) {
var diskUsage = diskUsage(index);
var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
// When _id's are only used to populate the bloom filter,
// IndexDiskUsageStats won't account for anything since
// the bloom filter it's not exposed through the Reader API and
// the analyzer expects to get documents with fields to do the
// disk usage accounting.
assertThat(diskUsageIdField, nullValue());
}
}

assertHitCount(client().prepareSearch(dataStreamName).setSize(0), 10L);
Expand All @@ -402,14 +487,24 @@ private static XContentBuilder document(Instant timestamp, String hostName, Stri
}

private static BulkItemResponse[] createDocuments(String indexName, XContentBuilder... docs) {
return createDocuments(indexName, true, docs);
}

private static BulkItemResponse[] createDocumentsWithoutValidatingTheResponse(String indexName, XContentBuilder... docs) {
return createDocuments(indexName, false, docs);
}

private static BulkItemResponse[] createDocuments(String indexName, boolean validateResponse, XContentBuilder... docs) {
assertThat(docs, notNullValue());
final var client = client();
var bulkRequest = client.prepareBulk();
for (var doc : docs) {
bulkRequest.add(client.prepareIndex(indexName).setOpType(DocWriteRequest.OpType.CREATE).setSource(doc));
}
var bulkResponse = bulkRequest.get();
assertNoFailures(bulkResponse);
if (validateResponse) {
assertNoFailures(bulkResponse);
}
return bulkResponse.getItems();
}

Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@
exports org.elasticsearch.index.codec;
exports org.elasticsearch.index.codec.tsdb;
exports org.elasticsearch.index.codec.bloomfilter;
exports org.elasticsearch.index.codec.storedfields;
exports org.elasticsearch.index.codec.zstd;
exports org.elasticsearch.index.engine;
exports org.elasticsearch.index.fielddata;
Expand Down Expand Up @@ -478,7 +479,10 @@
org.elasticsearch.index.codec.Elasticsearch816Codec,
org.elasticsearch.index.codec.Elasticsearch900Codec,
org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec,
org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec;
org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec,
org.elasticsearch.index.codec.tsdb.ES93TSDBDefaultCompressionLucene103Codec,
org.elasticsearch.index.codec.tsdb.ES93TSDBZSTDCompressionLucene103Codec,
org.elasticsearch.index.codec.tsdb.ES93TSDBLuceneDefaultCodec;

provides org.apache.logging.log4j.core.util.ContextDataProvider with org.elasticsearch.common.logging.DynamicContextDataProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion TIME_SERIES_ALL_FIELDS_USE_SKIPPERS = def(9_046_0_00, Version.LUCENE_10_3_1);
public static final IndexVersion UPGRADE_TO_LUCENE_10_3_2 = def(9_047_0_00, Version.LUCENE_10_3_2);
public static final IndexVersion SECURITY_MIGRATIONS_METADATA_FLATTENED_UPDATE = def(9_048_0_00, Version.LUCENE_10_3_2);
public static final IndexVersion TIME_SERIES_USE_STORED_FIELDS_BLOOM_FILTER_FOR_ID = def(9_049_0_00, Version.LUCENE_10_3_2);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdCodec;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.codec.tsdb.ES93TSDBDefaultCompressionLucene103Codec;
import org.elasticsearch.index.codec.tsdb.ES93TSDBLuceneDefaultCodec;
import org.elasticsearch.index.codec.tsdb.ES93TSDBZSTDCompressionLucene103Codec;
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;
import org.elasticsearch.index.mapper.MapperService;

Expand Down Expand Up @@ -48,27 +50,56 @@ public class CodecService implements CodecProvider {
public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays) {
final var codecs = new HashMap<String, Codec>();

Codec legacyBestSpeedCodec = new LegacyPerFieldMapperCodec(Lucene103Codec.Mode.BEST_SPEED, mapperService, bigArrays);
boolean useSyntheticId = mapperService != null
&& mapperService.getIndexSettings().useTimeSeriesSyntheticId()
&& mapperService.getIndexSettings()
.getIndexVersionCreated()
.onOrAfter(IndexVersions.TIME_SERIES_USE_STORED_FIELDS_BLOOM_FILTER_FOR_ID);

var legacyBestSpeedCodec = new LegacyPerFieldMapperCodec(Lucene103Codec.Mode.BEST_SPEED, mapperService, bigArrays);
if (ZSTD_STORED_FIELDS_FEATURE_FLAG) {
codecs.put(DEFAULT_CODEC, new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_SPEED, mapperService, bigArrays));
PerFieldMapperCodec defaultZstdCodec = new PerFieldMapperCodec(
Zstd814StoredFieldsFormat.Mode.BEST_SPEED,
mapperService,
bigArrays
);
codecs.put(
DEFAULT_CODEC,
useSyntheticId ? new ES93TSDBZSTDCompressionLucene103Codec(defaultZstdCodec, bigArrays) : defaultZstdCodec
);
} else {
codecs.put(DEFAULT_CODEC, legacyBestSpeedCodec);
codecs.put(
DEFAULT_CODEC,
useSyntheticId ? new ES93TSDBDefaultCompressionLucene103Codec(legacyBestSpeedCodec, bigArrays) : legacyBestSpeedCodec
);
}
codecs.put(LEGACY_DEFAULT_CODEC, legacyBestSpeedCodec);

codecs.put(
LEGACY_DEFAULT_CODEC,
useSyntheticId ? new ES93TSDBDefaultCompressionLucene103Codec(legacyBestSpeedCodec, bigArrays) : legacyBestSpeedCodec
);

var bestCompressionCodec = new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_COMPRESSION, mapperService, bigArrays);
codecs.put(
BEST_COMPRESSION_CODEC,
new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_COMPRESSION, mapperService, bigArrays)
useSyntheticId ? new ES93TSDBZSTDCompressionLucene103Codec(bestCompressionCodec, bigArrays) : bestCompressionCodec
);

var legacyBestCompressionCodec = new LegacyPerFieldMapperCodec(Lucene103Codec.Mode.BEST_COMPRESSION, mapperService, bigArrays);
codecs.put(
LEGACY_BEST_COMPRESSION_CODEC,
useSyntheticId
? new ES93TSDBDefaultCompressionLucene103Codec(legacyBestCompressionCodec, bigArrays)
: legacyBestCompressionCodec
);
Codec legacyBestCompressionCodec = new LegacyPerFieldMapperCodec(Lucene103Codec.Mode.BEST_COMPRESSION, mapperService, bigArrays);
codecs.put(LEGACY_BEST_COMPRESSION_CODEC, legacyBestCompressionCodec);

codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
codecs.put(
LUCENE_DEFAULT_CODEC,
useSyntheticId ? new ES93TSDBLuceneDefaultCodec(Codec.getDefault(), bigArrays) : Codec.getDefault()
);
for (String codec : Codec.availableCodecs()) {
codecs.put(codec, Codec.forName(codec));
}
final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTimeSeriesSyntheticId();
assert useTsdbSyntheticId == false || mapperService.getIndexSettings().getMode() == IndexMode.TIME_SERIES;

this.codecs = codecs.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
Codec codec;
Expand All @@ -77,9 +108,6 @@ public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays)
} else {
codec = new DeduplicateFieldInfosCodec(e.getValue().getName(), e.getValue());
}
if (useTsdbSyntheticId && codec instanceof TSDBSyntheticIdCodec == false) {
codec = new TSDBSyntheticIdCodec(codec.getName(), codec);
}
return codec;
}));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.bloomfilter;

import org.apache.lucene.util.BytesRef;

import java.io.Closeable;
import java.io.IOException;

public interface BloomFilter extends Closeable {
/**
* Tests whether the given term may exist in the specified field.
*
* @param field the field name to check
* @param term the term to test for membership
* @return true if term may be present, false if definitely absent
*/
boolean mayContainTerm(String field, BytesRef term) throws IOException;

boolean isFilterAvailable();
}
Loading