diff --git a/docs/changelog/137274.yaml b/docs/changelog/137274.yaml
new file mode 100644
index 0000000000000..c26c0940f4a51
--- /dev/null
+++ b/docs/changelog/137274.yaml
@@ -0,0 +1,5 @@
+pr: 137274
+summary: Use a new synthetic `_id` format for time-series datastreams
+area: TSDB
+type: enhancement
+issues: []
diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBSyntheticIdsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBSyntheticIdsIT.java
index b0d14d0d80221..654051b9e13f5 100644
--- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBSyntheticIdsIT.java
+++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBSyntheticIdsIT.java
@@ -22,28 +22,35 @@
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.IdFieldMapper;
+import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
-import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import java.io.IOException;
import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
import static org.elasticsearch.common.time.FormatNames.STRICT_DATE_OPTIONAL_TIME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertCheckedResponse;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@@ -94,54 +101,59 @@ public void testInvalidIndexMode() {
);
}
- @TestLogging(reason = "debug", value = "org.elasticsearch.index.engine.Engine:TRACE")
public void testSyntheticId() throws Exception {
assumeTrue("Test should only run with feature flag", IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG);
- final var indexName = randomIdentifier();
- putDataStreamTemplate(random(), indexName);
+ final var dataStreamName = randomIdentifier();
+ putDataStreamTemplate(dataStreamName, randomIntBetween(1, 5));
+ final var docs = new HashMap();
+ final var unit = randomFrom(ChronoUnit.SECONDS, ChronoUnit.MINUTES);
final var timestamp = Instant.now();
+ logger.info("timestamp is " + timestamp);
- // Index 5 docs in datastream
+ // Index 10 docs in datastream
+ //
+ // For convenience, the metric value maps the index in the bulk response items
var results = createDocuments(
- indexName,
- document(timestamp, "vm-dev01", "cpu-load", 0), // will be updated
- document(timestamp.plusSeconds(2), "vm-dev01", "cpu-load", 1), // will be deleted
- document(timestamp, "vm-dev02", "cpu-load", 2),
- document(timestamp.plusSeconds(2), "vm-dev03", "cpu-load", 3),
- document(timestamp.plusSeconds(3), "vm-dev03", "cpu-load", 4)
+ dataStreamName,
+ // t + 0s
+ document(timestamp, "vm-dev01", "cpu-load", 0),
+ document(timestamp, "vm-dev02", "cpu-load", 1),
+ // t + 1s
+ document(timestamp.plus(1, unit), "vm-dev01", "cpu-load", 2),
+ document(timestamp.plus(1, unit), "vm-dev02", "cpu-load", 3),
+ // t + 0s out-of-order doc
+ document(timestamp, "vm-dev03", "cpu-load", 4),
+ // t + 2s
+ document(timestamp.plus(2, unit), "vm-dev01", "cpu-load", 5),
+ document(timestamp.plus(2, unit), "vm-dev02", "cpu-load", 6),
+ // t - 1s out-of-order doc
+ document(timestamp.minus(1, unit), "vm-dev01", "cpu-load", 7),
+ // t + 3s
+ document(timestamp.plus(3, unit), "vm-dev01", "cpu-load", 8),
+ document(timestamp.plus(3, unit), "vm-dev02", "cpu-load", 9)
);
- // Verify documents
- assertThat(results[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
- assertThat(results[0].getVersion(), equalTo(1L));
-
- assertThat(results[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
- assertThat(results[1].getVersion(), equalTo(1L));
-
- assertThat(results[2].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
- assertThat(results[2].getVersion(), equalTo(1L));
-
- assertThat(results[3].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
- assertThat(results[3].getVersion(), equalTo(1L));
-
- assertThat(results[4].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
- assertThat(results[4].getVersion(), equalTo(1L));
-
- final var docIndex = results[1].getIndex();
- final var docId = results[1].getId();
+ // Verify that documents are created
+ for (var result : results) {
+ assertThat(result.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
+ assertThat(result.getVersion(), equalTo(1L));
+ docs.put(result.getId(), result.getIndex());
+ }
enum Operation {
FLUSH,
REFRESH,
NONE
}
+
+ // Random flush or refresh or nothing, so that the next GETs are executed on flushed segments or in memory segments.
switch (randomFrom(Operation.values())) {
case FLUSH:
- flush(indexName);
+ flush(dataStreamName);
break;
case REFRESH:
- refresh(indexName);
+ refresh(dataStreamName);
break;
case NONE:
default:
@@ -149,46 +161,225 @@ enum Operation {
}
// Get by synthetic _id
- // Note: before synthetic _id this would have required postings on disks
- var getResponse = client().prepareGet(docIndex, docId).setFetchSource(true).execute().actionGet();
- assertThat(getResponse.isExists(), equalTo(true));
- assertThat(getResponse.getVersion(), equalTo(1L));
- var source = asInstanceOf(Map.class, getResponse.getSourceAsMap().get("metric"));
- assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(1));
-
- // Update by synthetic _id
- // Note: it doesn't work, is that expected? Is is blocked by IndexRouting.ExtractFromSource.updateShard
- var exception = expectThrows(IllegalArgumentException.class, () -> {
- var doc = document(timestamp, "vm-dev01", "cpu-load", 10); // update
- client().prepareUpdate(docIndex, docId).setDoc(doc).get();
+ var randomDocs = randomSubsetOf(randomIntBetween(0, results.length), results);
+ for (var doc : randomDocs) {
+ boolean fetchSource = randomBoolean();
+ var getResponse = client().prepareGet(doc.getIndex(), doc.getId()).setFetchSource(fetchSource).get();
+ assertThat(getResponse.isExists(), equalTo(true));
+ assertThat(getResponse.getVersion(), equalTo(1L));
+
+ if (fetchSource) {
+ var source = asInstanceOf(Map.class, getResponse.getSourceAsMap().get("metric"));
+ assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(doc.getItemId()));
+ }
+ }
+
+ // Random flush or refresh or nothing, so that the next DELETEs are executed on flushed segments or in memory segments.
+ switch (randomFrom(Operation.values())) {
+ case FLUSH:
+ flush(dataStreamName);
+ break;
+ case REFRESH:
+ refresh(dataStreamName);
+ break;
+ case NONE:
+ default:
+ break;
+ }
+
+ // Delete by synthetic _id
+ var deletedDocs = randomSubsetOf(randomIntBetween(1, docs.size()), docs.keySet());
+ for (var docId : deletedDocs) {
+ var deletedDocIndex = docs.get(docId);
+ assertThat(deletedDocIndex, notNullValue());
+
+ // Delete
+ var deleteResponse = client().prepareDelete(deletedDocIndex, docId).get();
+ assertThat(deleteResponse.getId(), equalTo(docId));
+ assertThat(deleteResponse.getIndex(), equalTo(deletedDocIndex));
+ assertThat(deleteResponse.getResult(), equalTo(DocWriteResponse.Result.DELETED));
+ assertThat(deleteResponse.getVersion(), equalTo(2L));
+ }
+
+ // Index more random docs
+ if (randomBoolean()) {
+ int nbDocs = randomIntBetween(1, 100);
+ final var arrayOfDocs = new XContentBuilder[nbDocs];
+
+ var t = timestamp.plus(4, unit); // t + 4s, no overlap with previous docs
+ while (nbDocs > 0) {
+ var hosts = randomSubsetOf(List.of("vm-dev01", "vm-dev02", "vm-dev03"));
+ for (var host : hosts) {
+ if (--nbDocs < 0) {
+ break;
+ }
+ arrayOfDocs[nbDocs] = document(t, host, "cpu-load", randomInt(10));
+ }
+ // always use seconds, otherwise the doc might fell outside of the timestamps window of the datastream
+ t = t.plus(1, ChronoUnit.SECONDS);
+ }
+
+ results = createDocuments(dataStreamName, arrayOfDocs);
+
+ // Verify that documents are created
+ for (var result : results) {
+ assertThat(result.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
+ assertThat(result.getVersion(), equalTo(1L));
+ docs.put(result.getId(), result.getIndex());
+ }
+ }
+
+ refresh(dataStreamName);
+
+ assertCheckedResponse(client().prepareSearch(dataStreamName).setTrackTotalHits(true).setSize(100), searchResponse -> {
+ assertHitCount(searchResponse, docs.size() - deletedDocs.size());
+
+ // Verify that search response does not contain deleted docs
+ for (var searchHit : searchResponse.getHits()) {
+ assertThat(
+ "Document with id [" + searchHit.getId() + "] is deleted",
+ deletedDocs.contains(searchHit.getId()),
+ equalTo(false)
+ );
+ }
});
- assertThat(
- exception.getMessage(),
- containsString("update is not supported because the destination index [" + docIndex + "] is in time_series mode")
+
+ // Search by synthetic _id
+ var otherDocs = randomSubsetOf(Sets.difference(docs.keySet(), Sets.newHashSet(deletedDocs)));
+ for (var docId : otherDocs) {
+ assertCheckedResponse(
+ client().prepareSearch(docs.get(docId))
+ .setSource(new SearchSourceBuilder().query(new TermQueryBuilder(IdFieldMapper.NAME, docId))),
+ searchResponse -> {
+ assertHitCount(searchResponse, 1L);
+ assertThat(searchResponse.getHits().getHits(), arrayWithSize(1));
+ assertThat(searchResponse.getHits().getHits()[0].getId(), equalTo(docId));
+ }
+ );
+ }
+
+ flush(dataStreamName);
+
+ // Check that synthetic _id field have no postings on disk
+ var indices = new HashSet<>(docs.values());
+ for (var index : indices) {
+ var diskUsage = diskUsage(index);
+ var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
+ assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
+ }
+ }
+
+ public void testGetFromTranslogBySyntheticId() throws Exception {
+ assumeTrue("Test should only run with feature flag", IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG);
+ final var dataStreamName = randomIdentifier();
+ putDataStreamTemplate(dataStreamName, 1);
+
+ final var docs = new HashMap();
+ final var unit = randomFrom(ChronoUnit.SECONDS, ChronoUnit.MINUTES);
+ final var timestamp = Instant.now();
+
+ // Index 5 docs in datastream
+ //
+ // For convenience, the metric value maps the index in the bulk response items
+ var results = createDocuments(
+ dataStreamName,
+ // t + 0s
+ document(timestamp, "vm-dev01", "cpu-load", 0),
+ document(timestamp, "vm-dev02", "cpu-load", 1),
+ // t + 1s
+ document(timestamp.plus(1, unit), "vm-dev01", "cpu-load", 2),
+ document(timestamp.plus(1, unit), "vm-dev02", "cpu-load", 3),
+ // t + 0s out-of-order doc
+ document(timestamp, "vm-dev03", "cpu-load", 4)
);
- // Delete by synthetic _id
- var deleteResponse = client().prepareDelete(docIndex, docId).get();
- assertThat(deleteResponse.getId(), equalTo(docId));
- assertThat(deleteResponse.getResult(), equalTo(DocWriteResponse.Result.DELETED));
- assertThat(deleteResponse.getVersion(), equalTo(2L));
-
- // Index more docs
- // TODO Randomize this to have segments only composed of deleted docs
- createDocuments(
- indexName,
- document(timestamp.plusSeconds(4), "vm-dev03", "cpu-load", 5),
- document(timestamp.plusSeconds(5), "vm-dev03", "cpu-load", 6)
+ // Verify that documents are created
+ for (var result : results) {
+ assertThat(result.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
+ assertThat(result.getVersion(), equalTo(1L));
+ docs.put(result.getId(), result.getIndex());
+ }
+
+ // Get by synthetic _id
+ //
+ // The documents are in memory buffers: the first GET will trigger the refresh of the internal reader
+ // (see InternalEngine.REAL_TIME_GET_REFRESH_SOURCE) to have an up-to-date searcher to resolve documents ids and versions. It will
+ // also enable the tracking of the locations of documents in the translog (see InternalEngine.trackTranslogLocation) so that next
+ // GETs will be resolved using the translog.
+ var randomDocs = randomSubsetOf(randomIntBetween(1, results.length), results);
+ for (var doc : randomDocs) {
+ var getResponse = client().prepareGet(doc.getIndex(), doc.getId()).setRealtime(true).setFetchSource(true).execute().actionGet();
+ assertThat(getResponse.isExists(), equalTo(true));
+ assertThat(getResponse.getVersion(), equalTo(1L));
+
+ var source = asInstanceOf(Map.class, getResponse.getSourceAsMap().get("metric"));
+ assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(doc.getItemId()));
+ }
+
+ int metricOffset = results.length;
+
+ // Index 5 more docs
+ results = createDocuments(
+ dataStreamName,
+ // t + 2s
+ document(timestamp.plus(2, unit), "vm-dev01", "cpu-load", metricOffset),
+ document(timestamp.plus(2, unit), "vm-dev02", "cpu-load", metricOffset + 1),
+ // t - 1s out-of-order doc
+ document(timestamp.minus(1, unit), "vm-dev01", "cpu-load", metricOffset + 2),
+ // t + 3s
+ document(timestamp.plus(3, unit), "vm-dev01", "cpu-load", metricOffset + 3),
+ document(timestamp.plus(3, unit), "vm-dev02", "cpu-load", metricOffset + 4)
);
- flushAndRefresh(indexName);
+ // Verify that documents are created
+ for (var result : results) {
+ assertThat(result.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
+ assertThat(result.getVersion(), equalTo(1L));
+ docs.put(result.getId(), result.getIndex());
+ }
- // Check that synthetic _id field has no postings on disk
- var diskUsage = diskUsage(docIndex);
- var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
- assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
+ // Get by synthetic _id
+ //
+ // Documents ids and versions are resolved using the translog. Here we exercise the get-from-translog (that uses the
+ // TranslogDirectoryReader) and VersionsAndSeqNoResolver.loadDocIdAndVersionUncached paths.
+ randomDocs = randomSubsetOf(randomIntBetween(1, results.length), results);
+ for (var doc : randomDocs) {
+ var getResponse = client().prepareGet(doc.getIndex(), doc.getId()).setRealtime(true).setFetchSource(true).execute().actionGet();
+ assertThat(getResponse.isExists(), equalTo(true));
+ assertThat(getResponse.getVersion(), equalTo(1L));
+
+ var source = asInstanceOf(Map.class, getResponse.getSourceAsMap().get("metric"));
+ assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(metricOffset + doc.getItemId()));
+ }
+
+ flushAndRefresh(dataStreamName);
+
+ // Get by synthetic _id
+ //
+ // Here we exercise the get-from-searcher and VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion paths.
+ randomDocs = randomSubsetOf(randomIntBetween(1, results.length), results);
+ for (var doc : randomDocs) {
+ var getResponse = client().prepareGet(doc.getIndex(), doc.getId())
+ .setRealtime(randomBoolean())
+ .setFetchSource(true)
+ .execute()
+ .actionGet();
+ assertThat(getResponse.isExists(), equalTo(true));
+ assertThat(getResponse.getVersion(), equalTo(1L));
+
+ var source = asInstanceOf(Map.class, getResponse.getSourceAsMap().get("metric"));
+ assertThat(asInstanceOf(Integer.class, source.get("value")), equalTo(metricOffset + doc.getItemId()));
+ }
+
+ // Check that synthetic _id field have no postings on disk
+ var indices = new HashSet<>(docs.values());
+ for (var index : indices) {
+ var diskUsage = diskUsage(index);
+ var diskUsageIdField = AnalyzeIndexDiskUsageTestUtils.getPerFieldDiskUsage(diskUsage, IdFieldMapper.NAME);
+ assertThat("_id field should not have postings on disk", diskUsageIdField.getInvertedIndexBytes(), equalTo(0L));
+ }
- // TODO Search datastream and count hits
+ assertHitCount(client().prepareSearch(dataStreamName).setSize(0), 10L);
}
private static XContentBuilder document(Instant timestamp, String hostName, String metricField, Integer metricValue)
@@ -210,7 +401,7 @@ private static XContentBuilder document(Instant timestamp, String hostName, Stri
return source;
}
- private static BulkItemResponse[] createDocuments(String indexName, XContentBuilder... docs) throws IOException {
+ private static BulkItemResponse[] createDocuments(String indexName, XContentBuilder... docs) {
assertThat(docs, notNullValue());
final var client = client();
var bulkRequest = client.prepareBulk();
@@ -222,8 +413,8 @@ private static BulkItemResponse[] createDocuments(String indexName, XContentBuil
return bulkResponse.getItems();
}
- private static void putDataStreamTemplate(Random random, String indexPattern) throws IOException {
- final var settings = indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.getName())
+ private static void putDataStreamTemplate(String indexPattern, int shards) throws IOException {
+ final var settings = indexSettings(shards, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.getName())
.put(IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.getKey(), false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.put(IndexSettings.USE_SYNTHETIC_ID.getKey(), true);
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
index 38d0fada7d866..094626018d449 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
@@ -705,6 +705,8 @@ public Iterator> settings() {
@Nullable
private final IndexReshardingMetadata reshardingMetadata;
+ private final boolean useTimeSeriesSyntheticId;
+
private IndexMetadata(
final Index index,
final long version,
@@ -754,7 +756,8 @@ private IndexMetadata(
@Nullable final IndexMetadataStats stats,
@Nullable final Double writeLoadForecast,
@Nullable Long shardSizeInBytesForecast,
- @Nullable IndexReshardingMetadata reshardingMetadata
+ @Nullable IndexReshardingMetadata reshardingMetadata,
+ final boolean useTimeSeriesSyntheticId
) {
this.index = index;
this.version = version;
@@ -815,6 +818,7 @@ private IndexMetadata(
this.shardSizeInBytesForecast = shardSizeInBytesForecast;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
this.reshardingMetadata = reshardingMetadata;
+ this.useTimeSeriesSyntheticId = useTimeSeriesSyntheticId;
}
IndexMetadata withMappingMetadata(MappingMetadata mapping) {
@@ -870,7 +874,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast,
- this.reshardingMetadata
+ this.reshardingMetadata,
+ this.useTimeSeriesSyntheticId
);
}
@@ -933,7 +938,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set inSyncSet)
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast,
- this.reshardingMetadata
+ this.reshardingMetadata,
+ this.useTimeSeriesSyntheticId
);
}
@@ -1004,7 +1010,8 @@ public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) {
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast,
- this.reshardingMetadata
+ this.reshardingMetadata,
+ this.useTimeSeriesSyntheticId
);
}
@@ -1066,7 +1073,8 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast,
- this.reshardingMetadata
+ this.reshardingMetadata,
+ this.useTimeSeriesSyntheticId
);
}
@@ -1123,7 +1131,8 @@ public IndexMetadata withIncrementedVersion() {
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast,
- this.reshardingMetadata
+ this.reshardingMetadata,
+ this.useTimeSeriesSyntheticId
);
}
@@ -1314,6 +1323,13 @@ public Instant getTimeSeriesEnd() {
return timeSeriesEnd;
}
+ /**
+ * @return whether the index is a time-series index that uses synthetic ids or not.
+ */
+ public boolean useTimeSeriesSyntheticId() {
+ return useTimeSeriesSyntheticId;
+ }
+
/**
* Return the concrete mapping for this index or {@code null} if this index has no mappings at all.
*/
@@ -2497,6 +2513,16 @@ IndexMetadata build(boolean repair) {
String indexModeString = settings.get(IndexSettings.MODE.getKey());
final IndexMode indexMode = indexModeString != null ? IndexMode.fromString(indexModeString.toLowerCase(Locale.ROOT)) : null;
final boolean isTsdb = indexMode == IndexMode.TIME_SERIES;
+ boolean useTimeSeriesSyntheticId = false;
+ if (isTsdb
+ && IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG
+ && indexCreatedVersion.onOrAfter(IndexVersions.TIME_SERIES_USE_SYNTHETIC_ID)) {
+ var setting = settings.get(IndexSettings.USE_SYNTHETIC_ID.getKey());
+ if (setting != null && setting.equalsIgnoreCase(Boolean.TRUE.toString())) {
+ assert IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG;
+ useTimeSeriesSyntheticId = true;
+ }
+ }
return new IndexMetadata(
new Index(index, uuid),
version,
@@ -2546,7 +2572,8 @@ IndexMetadata build(boolean repair) {
stats,
indexWriteLoadForecast,
shardSizeInBytesForecast,
- reshardingMetadata
+ reshardingMetadata,
+ useTimeSeriesSyntheticId
);
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java
index 12d45898bfba5..0cc8167727978 100644
--- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java
+++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java
@@ -29,6 +29,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
+import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
@@ -321,6 +322,7 @@ public abstract static class ExtractFromSource extends IndexRouting {
protected final XContentParserConfiguration parserConfig;
private final IndexMode indexMode;
private final boolean trackTimeSeriesRoutingHash;
+ private final boolean useTimeSeriesSyntheticId;
private final boolean addIdWithRoutingHash;
private int hash = Integer.MAX_VALUE;
@@ -333,6 +335,7 @@ public abstract static class ExtractFromSource extends IndexRouting {
assert indexMode != null : "Index mode must be set for ExtractFromSource routing";
this.trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES
&& metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
+ this.useTimeSeriesSyntheticId = metadata.useTimeSeriesSyntheticId();
addIdWithRoutingHash = indexMode == IndexMode.LOGSDB;
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(includePaths), null, true);
}
@@ -417,10 +420,19 @@ private int idToHash(String id) {
if (idBytes.length < 4) {
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
}
- // For TSDB, the hash is stored as the id prefix.
- // For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
- // see IndexRequest#autoGenerateTimeBasedId.
- return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0));
+ int hash;
+ if (addIdWithRoutingHash) {
+ // For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
+ // see IndexRequest#autoGenerateTimeBasedId.
+ hash = ByteUtils.readIntLE(idBytes, idBytes.length - 9);
+ } else if (useTimeSeriesSyntheticId) {
+ // For TSDB with synthetic ids, the hash is stored as the id suffix.
+ hash = TsidExtractingIdFieldMapper.extractRoutingHashFromSyntheticId(new BytesRef(idBytes));
+ } else {
+ // For TSDB, the hash is stored as the id prefix.
+ hash = ByteUtils.readIntLE(idBytes, 0);
+ }
+ return hashToShardId(hash);
}
@Override
@@ -510,7 +522,6 @@ public static class ForIndexDimensions extends ExtractFromSource {
@Override
protected int hashSource(IndexRequest indexRequest) {
- // System.out.println("hashSource for tsid");
BytesRef tsid = indexRequest.tsid();
if (tsid == null) {
tsid = buildTsid(indexRequest.getContentType(), indexRequest.indexSource().bytes());
diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java
index 08a8e28457159..5307d5d933421 100644
--- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java
+++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java
@@ -14,9 +14,9 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
-import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Assertions;
+import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
import java.io.IOException;
import java.util.Base64;
@@ -153,22 +153,30 @@ public static DocIdAndVersion timeSeriesLoadDocIdAndVersion(IndexReader reader,
* This allows this method to know whether there is no document with the specified id without loading the docid for
* the specified id.
*
- * @param reader The reader load docid, version and seqno from.
- * @param uid The term that describes the uid of the document to load docid, version and seqno for.
- * @param id The id that contains the encoded timestamp. The timestamp is used to skip checking the id for entire segments.
- * @param loadSeqNo Whether to load sequence number from _seq_no doc values field.
+ * @param reader The reader load docid, version and seqno from.
+ * @param uid The term that describes the uid of the document to load docid, version and seqno for.
+ * @param id The id that contains the encoded timestamp. The timestamp is used to skip checking the id for entire segments.
+ * @param loadSeqNo Whether to load sequence number from _seq_no doc values field.
+ * @param useSyntheticId Whether the id is a synthetic (true) or standard (false ) document id.
* @return the internal doc ID and version for the specified term from the specified reader or
* returning null if no document was found for the specified id
* @throws IOException In case of an i/o related failure
*/
- public static DocIdAndVersion timeSeriesLoadDocIdAndVersion(IndexReader reader, BytesRef uid, String id, boolean loadSeqNo)
- throws IOException {
- byte[] idAsBytes = Base64.getUrlDecoder().decode(id);
- assert idAsBytes.length == 20;
- // id format: [4 bytes (basic hash routing fields), 8 bytes prefix of 128 murmurhash dimension fields, 8 bytes
- // @timestamp)
- long timestamp = ByteUtils.readLongBE(idAsBytes, 12);
-
+ public static DocIdAndVersion timeSeriesLoadDocIdAndVersion(
+ IndexReader reader,
+ BytesRef uid,
+ String id,
+ boolean loadSeqNo,
+ boolean useSyntheticId
+ ) throws IOException {
+ final long timestamp;
+ if (useSyntheticId) {
+ assert uid.equals(new BytesRef(Base64.getUrlDecoder().decode(id)));
+ timestamp = TsidExtractingIdFieldMapper.extractTimestampFromSyntheticId(uid);
+ } else {
+ byte[] idAsBytes = Base64.getUrlDecoder().decode(id);
+ timestamp = TsidExtractingIdFieldMapper.extractTimestampFromId(idAsBytes);
+ }
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, true);
List leaves = reader.leaves();
// iterate in default order, the segments should be sorted by DataStream#TIMESERIES_LEAF_READERS_SORTER
diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java
index 26f747d2d2315..81b4bbab69756 100644
--- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java
+++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java
@@ -690,7 +690,19 @@ public boolean isES87TSDBCodecEnabled() {
false,
new Setting.Validator<>() {
@Override
- public void validate(Boolean value) {}
+ public void validate(Boolean enabled) {
+ if (enabled) {
+ if (TSDB_SYNTHETIC_ID_FEATURE_FLAG == false) {
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "The setting [%s] is only permitted when the feature flag is enabled.",
+ USE_SYNTHETIC_ID.getKey()
+ )
+ );
+ }
+ }
+ }
@Override
public void validate(Boolean enabled, Map, Object> settings) {
@@ -983,7 +995,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private final boolean recoverySourceEnabled;
private final boolean recoverySourceSyntheticEnabled;
private final boolean useDocValuesSkipper;
- private final boolean tsdbSyntheticId;
+ private final boolean useTimeSeriesSyntheticId;
/**
* The maximum number of refresh listeners allows on this shard.
@@ -1170,8 +1182,28 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
&& scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);
useDocValuesSkipper = DOC_VALUES_SKIPPER && scopedSettings.get(USE_DOC_VALUES_SKIPPER);
seqNoIndexOptions = scopedSettings.get(SEQ_NO_INDEX_OPTIONS_SETTING);
- tsdbSyntheticId = TSDB_SYNTHETIC_ID_FEATURE_FLAG && scopedSettings.get(USE_SYNTHETIC_ID);
- assert tsdbSyntheticId == false || mode == IndexMode.TIME_SERIES : mode;
+ final var useSyntheticId = IndexSettings.TSDB_SYNTHETIC_ID_FEATURE_FLAG && scopedSettings.get(USE_SYNTHETIC_ID);
+ if (indexMetadata.useTimeSeriesSyntheticId() != useSyntheticId) {
+ assert false;
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "The setting [%s] is set to [%s] but index metadata has a different value [%s].",
+ USE_SYNTHETIC_ID.getKey(),
+ useSyntheticId,
+ indexMetadata.useTimeSeriesSyntheticId()
+ )
+ );
+ }
+ if (useSyntheticId) {
+ assert TSDB_SYNTHETIC_ID_FEATURE_FLAG;
+ assert indexMetadata.useTimeSeriesSyntheticId();
+ assert indexMetadata.getIndexMode() == IndexMode.TIME_SERIES : indexMetadata.getIndexMode();
+ assert indexMetadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_USE_SYNTHETIC_ID);
+ useTimeSeriesSyntheticId = true;
+ } else {
+ useTimeSeriesSyntheticId = false;
+ }
if (recoverySourceSyntheticEnabled) {
if (DiscoveryNode.isStateless(settings)) {
throw new IllegalArgumentException("synthetic recovery source is only allowed in stateful");
@@ -1907,8 +1939,8 @@ public boolean useDocValuesSkipper() {
/**
* @return Whether the index is a time-series index that use synthetic ids.
*/
- public boolean useTsdbSyntheticId() {
- return tsdbSyntheticId;
+ public boolean useTimeSeriesSyntheticId() {
+ return useTimeSeriesSyntheticId;
}
/**
diff --git a/server/src/main/java/org/elasticsearch/index/IndexVersions.java b/server/src/main/java/org/elasticsearch/index/IndexVersions.java
index e63b655e2ce8d..172bdc67e7872 100644
--- a/server/src/main/java/org/elasticsearch/index/IndexVersions.java
+++ b/server/src/main/java/org/elasticsearch/index/IndexVersions.java
@@ -192,6 +192,7 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion REENABLED_TIMESTAMP_DOC_VALUES_SPARSE_INDEX = def(9_042_0_00, Version.LUCENE_10_3_1);
public static final IndexVersion SKIPPERS_ENABLED_BY_DEFAULT = def(9_043_0_00, Version.LUCENE_10_3_1);
+ public static final IndexVersion TIME_SERIES_USE_SYNTHETIC_ID = def(9_044_0_00, Version.LUCENE_10_3_1);
/*
* STOP! READ THIS FIRST! No, really,
diff --git a/server/src/main/java/org/elasticsearch/index/codec/CodecService.java b/server/src/main/java/org/elasticsearch/index/codec/CodecService.java
index 1e2fed61578a5..5d6e377d57db9 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/CodecService.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/CodecService.java
@@ -67,7 +67,7 @@ public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays)
for (String codec : Codec.availableCodecs()) {
codecs.put(codec, Codec.forName(codec));
}
- final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTsdbSyntheticId();
+ final boolean useTsdbSyntheticId = mapperService != null && mapperService.getIndexSettings().useTimeSeriesSyntheticId();
assert useTsdbSyntheticId == false || mapperService.getIndexSettings().getMode() == IndexMode.TIME_SERIES;
this.codecs = codecs.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdCodec.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdCodec.java
index 970664844631a..aa6936cb65df9 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdCodec.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdCodec.java
@@ -11,12 +11,19 @@
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.FieldInfosFormat;
+import org.apache.lucene.codecs.FieldsConsumer;
+import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.FilterCodec;
+import org.apache.lucene.codecs.NormsProducer;
+import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.index.mapper.SyntheticIdField;
@@ -27,6 +34,7 @@
import static org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat.SYNTHETIC_ID;
import static org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat.TIMESTAMP;
import static org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat.TS_ID;
+import static org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat.TS_ROUTING_HASH;
/**
* Special codec for time-series datastreams that use synthetic ids.
@@ -35,21 +43,25 @@
* of terms and postings on the field (now called a "synthetic _id" field) as if it was backed by an in inverted index.
*
*
- * In order to do this, it enforces synthetic _id fields to be indexed with the {@link IndexOptions#NONE} option, hence preventing the
- * building of a term dictionary with postings lists. The codec also changes this {@link IndexOptions#NONE} option back to
- * {@link IndexOptions#DOCS} when reading the {@link FieldInfos} during the opening of a new segment core reader. This allows to use a
- * Lucene term dictionary on top of a synthetic _id field that does not have corresponding postings files on disk. Finally, the codec
- * injects additional {@link FieldInfos} attributes so that Lucene's {@link PerFieldPostingsFormat} correctly instantiates a
- * {@link TSDBSyntheticIdPostingsFormat} to access the term and postings of the synthetic _id field.
+ * In order to do this, it wraps the default postings format with an implementation that throws an {@link IllegalArgumentException} if
+ * a Lucene field with the name {@code _id} produces terms (ie, has postings) during indexing. It also overwrites the {@link FieldInfos}
+ * to ensure that the {@code _id} field information has the {@link IndexOptions#NONE} option when written to disk. It also changes this
+ * {@link IndexOptions#NONE} option back to {@link IndexOptions#DOCS} when reading the {@link FieldInfos} during the opening of a new
+ * segment core reader. This allows to use a Lucene term dictionary on top of a synthetic _id field that does not have corresponding
+ * postings files on disk. Finally, the codec injects additional {@link FieldInfos} attributes so that Lucene's
+ * {@link PerFieldPostingsFormat} correctly instantiates a {@link TSDBSyntheticIdPostingsFormat} to access the term and postings of the
+ * synthetic _id field.
*
*/
public class TSDBSyntheticIdCodec extends FilterCodec {
- private final TSDBSyntheticIdFieldInfosFormat fieldInfosFormat;
+ private final RewriteFieldInfosFormat fieldInfosFormat;
+ private final EnsureNoPostingsFormat postingsFormat;
public TSDBSyntheticIdCodec(String name, Codec delegate) {
super(name, delegate);
- this.fieldInfosFormat = new TSDBSyntheticIdFieldInfosFormat(delegate.fieldInfosFormat());
+ this.fieldInfosFormat = new RewriteFieldInfosFormat(delegate.fieldInfosFormat());
+ this.postingsFormat = new EnsureNoPostingsFormat(delegate.postingsFormat());
}
@Override
@@ -57,14 +69,19 @@ public final FieldInfosFormat fieldInfosFormat() {
return fieldInfosFormat;
}
+ @Override
+ public PostingsFormat postingsFormat() {
+ return postingsFormat;
+ }
+
/**
- * {@link FieldInfosFormat} that ensures the _id field is synthetic
+ * {@link FieldInfosFormat} that overwrites the {@link FieldInfos}.
*/
- private static class TSDBSyntheticIdFieldInfosFormat extends FieldInfosFormat {
+ private static class RewriteFieldInfosFormat extends FieldInfosFormat {
private final FieldInfosFormat delegate;
- private TSDBSyntheticIdFieldInfosFormat(FieldInfosFormat delegate) {
+ private RewriteFieldInfosFormat(FieldInfosFormat delegate) {
this.delegate = delegate;
}
@@ -83,6 +100,13 @@ private void ensureSyntheticIdFields(FieldInfos fieldInfos) {
assert false : message;
throw new IllegalArgumentException(message);
}
+ // Ensure _ts_routing_hash exists
+ fi = fieldInfos.fieldInfo(TS_ROUTING_HASH);
+ if (fi == null) {
+ var message = "Field [" + TS_ROUTING_HASH + "] does not exist";
+ assert false : message;
+ throw new IllegalArgumentException(message);
+ }
// Ensure _id exists and not indexed
fi = fieldInfos.fieldInfo(SYNTHETIC_ID);
if (fi == null) {
@@ -102,6 +126,49 @@ private void ensureSyntheticIdFields(FieldInfos fieldInfos) {
@Override
public void write(Directory directory, SegmentInfo segmentInfo, String segmentSuffix, FieldInfos fieldInfos, IOContext context)
throws IOException {
+
+ // Change the _id field index options from IndexOptions.DOCS to IndexOptions.NONE
+ final var infos = new FieldInfo[fieldInfos.size()];
+ int i = 0;
+ for (FieldInfo fi : fieldInfos) {
+ if (SYNTHETIC_ID.equals(fi.getName())) {
+ final var attributes = new HashMap<>(fi.attributes());
+
+ // Assert that PerFieldPostingsFormat are not present or have the expected format and suffix
+ assert attributes.get(PerFieldPostingsFormat.PER_FIELD_FORMAT_KEY) == null
+ || TSDBSyntheticIdPostingsFormat.FORMAT_NAME.equals(attributes.get(PerFieldPostingsFormat.PER_FIELD_FORMAT_KEY));
+ assert attributes.get(PerFieldPostingsFormat.PER_FIELD_SUFFIX_KEY) == null
+ || TSDBSyntheticIdPostingsFormat.SUFFIX.equals(attributes.get(PerFieldPostingsFormat.PER_FIELD_SUFFIX_KEY));
+
+ // Remove attributes if present
+ attributes.remove(PerFieldPostingsFormat.PER_FIELD_FORMAT_KEY);
+ attributes.remove(PerFieldPostingsFormat.PER_FIELD_SUFFIX_KEY);
+
+ fi = new FieldInfo(
+ fi.getName(),
+ fi.getFieldNumber(),
+ fi.hasTermVectors(),
+ true,
+ fi.hasPayloads(),
+ IndexOptions.NONE,
+ fi.getDocValuesType(),
+ fi.docValuesSkipIndexType(),
+ fi.getDocValuesGen(),
+ attributes,
+ fi.getPointDimensionCount(),
+ fi.getPointIndexDimensionCount(),
+ fi.getPointNumBytes(),
+ fi.getVectorDimension(),
+ fi.getVectorEncoding(),
+ fi.getVectorSimilarityFunction(),
+ fi.isSoftDeletesField(),
+ fi.isParentField()
+ );
+ }
+ infos[i++] = fi;
+ }
+
+ fieldInfos = new FieldInfos(infos);
ensureSyntheticIdFields(fieldInfos);
delegate.write(directory, segmentInfo, segmentSuffix, fieldInfos, context);
}
@@ -155,4 +222,46 @@ public FieldInfos read(Directory directory, SegmentInfo segmentInfo, String segm
return new FieldInfos(infos);
}
}
+
+ /**
+ * {@link PostingsFormat} that throws an {@link IllegalArgumentException} if a Lucene field with the name {@code _id} has postings
+ * produced during indexing.
+ */
+ private static class EnsureNoPostingsFormat extends PostingsFormat {
+
+ private final PostingsFormat delegate;
+
+ private EnsureNoPostingsFormat(PostingsFormat delegate) {
+ super(delegate.getName());
+ this.delegate = delegate;
+ }
+
+ @Override
+ public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
+ final var consumer = delegate.fieldsConsumer(state);
+ return new FieldsConsumer() {
+ @Override
+ public void write(Fields fields, NormsProducer norms) throws IOException {
+ for (var field : fields) {
+ if (SYNTHETIC_ID.equalsIgnoreCase(field)) {
+ var message = "Field [" + SYNTHETIC_ID + "] has terms produced during indexing";
+ assert false : message;
+ throw new IllegalArgumentException(message);
+ }
+ }
+ consumer.write(fields, norms);
+ }
+
+ @Override
+ public void close() throws IOException {
+ consumer.close();
+ }
+ };
+ }
+
+ @Override
+ public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
+ return delegate.fieldsProducer(state);
+ }
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdFieldsProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdFieldsProducer.java
index 2f624fd2d9cd0..1431c10331d69 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdFieldsProducer.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdFieldsProducer.java
@@ -12,6 +12,7 @@
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.index.BaseTermsEnum;
+import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.ImpactsEnum;
import org.apache.lucene.index.PostingsEnum;
@@ -28,7 +29,6 @@
import org.elasticsearch.index.mapper.Uid;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
@@ -37,6 +37,10 @@
import static org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat.TIMESTAMP;
import static org.elasticsearch.index.codec.tsdb.TSDBSyntheticIdPostingsFormat.TS_ID;
+/**
+ * Produces synthetic _id terms that are computed at runtime from the doc values of other fields like _tsid, @timestamp and
+ * _ts_routing_hash.
+ */
public class TSDBSyntheticIdFieldsProducer extends FieldsProducer {
private static final Set FIELDS_NAMES = Set.of(SYNTHETIC_ID);
@@ -85,7 +89,7 @@ public Terms terms(String field) throws IOException {
return new Terms() {
@Override
public TermsEnum iterator() {
- return new FakeTermsEnum();
+ return new SyntheticIdTermsEnum();
}
@Override
@@ -131,97 +135,371 @@ public boolean hasPayloads() {
}
/**
- * This is a fake TermsEnum that scans all documents for find docs matching a specific _id. This implementation is only here to show
- * that the synthetic _id terms is used when applying doc values updates during soft-updates. It is buggy and should not be used besides
- * some carefully crafted integration tests, because it relies on the current _id format for TSDB indices that has limitations:
- * - it is composed of a routing hash, a @timestamp and a tsid that cannot be un-hashed so all docs must be scanned to find matchings
- * - it is not sorted on _id in the Lucene segments so doc values updates stop too early when applying DV updates
- *
- * This fake terms enumeration will be changed to support a different _id format in a short future.
+ * Holds all the doc values used in the {@link TermsEnum} and {@link PostingsEnum} to lookup and to build synthetic _ids, along with
+ * some utility methods to access doc values.
+ *
+ * It holds the instance of {@link DocValuesProducer} used to create the sorted doc values for _tsid, @timestamp and
+ * _ts_routing_hash. Because doc values can only advance, they are re-created from the {@link DocValuesProducer} when we need to
+ * seek backward.
+ *
*/
- private class FakeTermsEnum extends BaseTermsEnum {
+ private static class DocValuesHolder {
+
+ private final FieldInfo tsIdFieldInfo;
+ private final FieldInfo timestampFieldInfo;
+ private final FieldInfo routingHashFieldInfo;
+ private final DocValuesProducer docValuesProducer;
+
+ private SortedNumericDocValues timestampDocValues; // sorted desc. order
+ private SortedDocValues routingHashDocValues; // sorted asc. order
+ private SortedDocValues tsIdDocValues; // sorted asc. order
+ // Keep around the latest tsId ordinal and value
+ private int cachedTsIdOrd = -1;
+ private BytesRef cachedTsId;
+
+ private DocValuesHolder(FieldInfos fieldInfos, DocValuesProducer docValuesProducer) {
+ this.tsIdFieldInfo = safeFieldInfo(fieldInfos, TSDBSyntheticIdPostingsFormat.TS_ID);
+ this.timestampFieldInfo = safeFieldInfo(fieldInfos, TSDBSyntheticIdPostingsFormat.TIMESTAMP);
+ this.routingHashFieldInfo = safeFieldInfo(fieldInfos, TSDBSyntheticIdPostingsFormat.TS_ROUTING_HASH);
+ this.docValuesProducer = docValuesProducer;
+ }
- private BytesRef term = null;
- private int docID = -1;
+ private FieldInfo safeFieldInfo(FieldInfos fieldInfos, String fieldName) {
+ var fi = fieldInfos.fieldInfo(fieldName);
+ if (fi == null) {
+ var message = "Field [" + fieldName + "] does not exist";
+ assert false : message;
+ throw new IllegalArgumentException(message);
+ }
+ return fi;
+ }
+
+ /**
+ * Returns the _tsid ordinal value for a given docID. The document ID must exist and must have a value for the field.
+ *
+ * @param docID the docID
+ * @return the _tsid ordinal value
+ * @throws IOException if any I/O exception occurs
+ */
+ private int docTsIdOrdinal(int docID) throws IOException {
+ if (tsIdDocValues == null || tsIdDocValues.docID() > docID) {
+ tsIdDocValues = docValuesProducer.getSorted(tsIdFieldInfo);
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ }
+ boolean found = tsIdDocValues.advanceExact(docID);
+ assert found : "No value found for field [" + tsIdFieldInfo.getName() + " and docID " + docID;
+ return tsIdDocValues.ordValue();
+ }
+
+ /**
+ * Returns the timestamp value for a given docID. The document ID must exist and must have a value for the field.
+ *
+ * @param docID the docID
+ * @return the timestamp value
+ * @throws IOException if any I/O exception occurs
+ */
+ private long docTimestamp(int docID) throws IOException {
+ if (timestampDocValues == null || timestampDocValues.docID() > docID) {
+ timestampDocValues = docValuesProducer.getSortedNumeric(timestampFieldInfo);
+ }
+ boolean found = timestampDocValues.advanceExact(docID);
+ assert found : "No value found for field [" + timestampFieldInfo.getName() + " and docID " + docID;
+ assert timestampDocValues.docValueCount() == 1;
+ return timestampDocValues.nextValue();
+ }
+
+ /**
+ * Returns the routing hash value for a given docID. The document ID must exist and must have a value for the field.
+ *
+ * @param docID the docID
+ * @return the routing hash value
+ * @throws IOException if any I/O exception occurs
+ */
+ private BytesRef docRoutingHash(int docID) throws IOException {
+ if (routingHashDocValues == null || routingHashDocValues.docID() > docID) {
+ routingHashDocValues = docValuesProducer.getSorted(routingHashFieldInfo);
+ }
+ boolean found = routingHashDocValues.advanceExact(docID);
+ assert found : "No value found for field [" + routingHashFieldInfo.getName() + " and docID " + docID;
+ return routingHashDocValues.lookupOrd(routingHashDocValues.ordValue());
+ }
+
+ /**
+ * Lookup if a given _tsid exists, returning a positive ordinal if it exists otherwise it returns -insertionPoint-1.
+ *
+ * @param tsId the _tsid to look up
+ * @return a positive ordinal if the _tsid exists, else returns -insertionPoint-1.
+ * @throws IOException if any I/O exception occurs
+ */
+ private int lookupTsIdTerm(BytesRef tsId) throws IOException {
+ int compare = Integer.MAX_VALUE;
+ if (cachedTsId != null) {
+ compare = cachedTsId.compareTo(tsId);
+ if (compare == 0) {
+ return cachedTsIdOrd;
+ }
+ }
+ if (tsIdDocValues == null || compare > 0) {
+ tsIdDocValues = docValuesProducer.getSorted(tsIdFieldInfo);
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ }
+ int ordinal = tsIdDocValues.lookupTerm(tsId);
+ if (0 <= ordinal) {
+ cachedTsIdOrd = ordinal;
+ cachedTsId = tsId;
+ }
+ return ordinal;
+ }
+
+ /**
+ * Lookup the _tsid value for the given ordinal.
+ *
+ * @param tsIdOrdinal the _tsid ordinal
+ * @return the _tsid value
+ * @throws IOException if any I/O exception occurs
+ */
+ private BytesRef lookupTsIdOrd(int tsIdOrdinal) throws IOException {
+ if (cachedTsIdOrd != -1 && cachedTsIdOrd == tsIdOrdinal) {
+ return cachedTsId;
+ }
+ if (tsIdDocValues == null || tsIdDocValues.ordValue() > tsIdOrdinal) {
+ tsIdDocValues = docValuesProducer.getSorted(tsIdFieldInfo);
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ }
+ assert 0 <= tsIdOrdinal : tsIdOrdinal;
+ assert tsIdOrdinal < tsIdDocValues.getValueCount() : tsIdOrdinal;
+ var tsId = tsIdDocValues.lookupOrd(tsIdOrdinal);
+ if (tsId != null) {
+ cachedTsIdOrd = tsIdOrdinal;
+ cachedTsId = tsId;
+ }
+ return tsId;
+ }
+
+ /**
+ * Scan all documents to find the first document that has a _tsid equal or greater than the provided _tsid ordinal, returning its
+ * document ID. If no document is found, the method returns {@link DocIdSetIterator#NO_MORE_DOCS}.
+ *
+ * Warning: This method is very slow because it potentially scans all documents in the segment.
+ */
+ private int slowScanToFirstDocWithTsIdOrdinalEqualOrGreaterThan(int tsIdOrd) throws IOException {
+ // recreate even if doc values are already on the same ordinal, to ensure the method returns the first doc
+ if (tsIdDocValues == null || (cachedTsIdOrd != -1 && cachedTsIdOrd >= tsIdOrd)) {
+ tsIdDocValues = docValuesProducer.getSorted(tsIdFieldInfo);
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ }
+ assert 0 <= tsIdOrd : tsIdOrd;
+ assert tsIdOrd < tsIdDocValues.getValueCount() : tsIdOrd;
+
+ for (int docID = 0; docID != DocIdSetIterator.NO_MORE_DOCS; docID = tsIdDocValues.nextDoc()) {
+ boolean found = tsIdDocValues.advanceExact(docID);
+ assert found : "No value found for field [" + tsIdFieldInfo.getName() + " and docID " + docID;
+ var ord = tsIdDocValues.ordValue();
+ if (ord == tsIdOrd || tsIdOrd < ord) {
+ if (ord != cachedTsIdOrd) {
+ cachedTsId = tsIdDocValues.lookupOrd(ord);
+ cachedTsIdOrd = ord;
+ }
+ return docID;
+ }
+ }
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ return DocIdSetIterator.NO_MORE_DOCS;
+ }
+
+ /**
+ * Scan all documents to find the first document that has a _tsid equal to the provided _tsid ordinal, returning its
+ * document ID. If no document is found, the method returns {@link DocIdSetIterator#NO_MORE_DOCS}.
+ *
+ * Warning: This method is very slow because it potentially scans all documents in the segment.
+ */
+ private int slowScanToFirstDocWithTsIdOrdinalEqualTo(int tsIdOrd) throws IOException {
+ // recreate even if doc values are already on the same ordinal, to ensure the method returns the first doc
+ if (tsIdDocValues == null || (cachedTsIdOrd != -1 && cachedTsIdOrd >= tsIdOrd)) {
+ tsIdDocValues = docValuesProducer.getSorted(tsIdFieldInfo);
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ }
+ assert 0 <= tsIdOrd : tsIdOrd;
+ assert tsIdOrd < tsIdDocValues.getValueCount() : tsIdOrd;
+
+ for (int docID = 0; docID != DocIdSetIterator.NO_MORE_DOCS; docID = tsIdDocValues.nextDoc()) {
+ boolean found = tsIdDocValues.advanceExact(docID);
+ assert found : "No value found for field [" + tsIdFieldInfo.getName() + " and docID " + docID;
+ var ord = tsIdDocValues.ordValue();
+ if (ord == tsIdOrd) {
+ if (ord != cachedTsIdOrd) {
+ cachedTsId = tsIdDocValues.lookupOrd(ord);
+ cachedTsIdOrd = ord;
+ }
+ return docID;
+ } else if (tsIdOrd < ord) {
+ break;
+ }
+ }
+ cachedTsIdOrd = -1;
+ cachedTsId = null;
+ assert false : "Method must be called with an existing _tsid ordinal: " + tsIdOrd;
+ return DocIdSetIterator.NO_MORE_DOCS;
+ }
+
+ private int getTsIdValueCount() throws IOException {
+ if (tsIdDocValues == null) {
+ tsIdDocValues = docValuesProducer.getSorted(tsIdFieldInfo);
+ }
+ return tsIdDocValues.getValueCount();
+ }
+ }
- private BytesRef latestTsId = null;
- private long latestTimestamp = -1L;
+ /**
+ * Represents the synthetic term the {@link TermsEnum} or {@link PostingsEnum} is positioned on. It points to a given docID and its
+ * corresponding _tsid, @timestamp and _ts_routing_hash values. The {@link #term()} method returns the synthetic _id of the document.
+ */
+ private record SyntheticTerm(int docID, int tsIdOrd, BytesRef tsId, long timestamp, BytesRef routingHash) {
+ private BytesRef term() {
+ assert docID >= 0 : docID;
+ assert tsIdOrd >= 0 : tsIdOrd;
+ return syntheticId(tsId, timestamp, routingHash);
+ }
+ }
- private FakeTermsEnum() {}
+ /**
+ * When returned by next(), seekCeil(), nextDoc() and docID() it means there are no more synthetic terms in the {@link TermsEnum}
+ * or {@link PostingsEnum}.
+ */
+ private static final SyntheticTerm NO_MORE_DOCS = new SyntheticTerm(DocIdSetIterator.NO_MORE_DOCS, -1, null, -1L, null);
+
+ /**
+ * {@link TermsEnum} to iterate over documents synthetic _ids.
+ */
+ private class SyntheticIdTermsEnum extends BaseTermsEnum {
+
+ /**
+ * Holds all doc values that composed the synthetic _id
+ */
+ private final DocValuesHolder docValues;
+
+ /**
+ * Current synthetic term the enum is positioned on. It points to 1 document.
+ */
+ private SyntheticTerm current;
+
+ private SyntheticIdTermsEnum() {
+ this.docValues = new DocValuesHolder(fieldInfos, docValuesProducer);
+ this.current = null;
+ }
+
+ private void ensurePositioned() {
+ if (current == null || current == NO_MORE_DOCS) {
+ assert false;
+ throw new IllegalStateException("Method should not be called when unpositioned");
+ }
+ }
@Override
public BytesRef next() throws IOException {
- if (docID == DocIdSetIterator.NO_MORE_DOCS) {
- assert term == null;
+ if (current == NO_MORE_DOCS) {
return null;
}
- docID += 1;
+
+ int docID = (current != null) ? current.docID + 1 : 0;
if (maxDocs <= docID) {
- docID = DocIdSetIterator.NO_MORE_DOCS;
- latestTimestamp = -1L;
- latestTsId = null;
- term = null;
+ current = NO_MORE_DOCS;
return null;
}
-
- // Retrieve _tsid
- SortedDocValues tsIdDocValues = docValuesProducer.getSorted(fieldInfos.fieldInfo(TS_ID));
- boolean found = tsIdDocValues.advanceExact(docID);
- assert found;
- int tsIdOrd = tsIdDocValues.ordValue();
- BytesRef tsId = tsIdDocValues.lookupOrd(tsIdOrd);
- assert tsId != null;
-
- // Retrieve timestamp
- SortedNumericDocValues timestampDocValues = docValuesProducer.getSortedNumeric(fieldInfos.fieldInfo(TIMESTAMP));
- found = timestampDocValues.advanceExact(docID);
- assert found;
- assert timestampDocValues.docValueCount() == 1;
- long timestamp = timestampDocValues.nextValue();
-
- // Retrieve routing hash
- var tsRoutingHash = fieldInfos.fieldInfo(TimeSeriesRoutingHashFieldMapper.NAME);
- assert tsRoutingHash != null;
- SortedDocValues routingHashDocValues = docValuesProducer.getSorted(tsRoutingHash);
- found = routingHashDocValues.advanceExact(docID);
- assert found;
- BytesRef routingHashBytes = routingHashDocValues.lookupOrd(routingHashDocValues.ordValue());
-
- int routingHash = TimeSeriesRoutingHashFieldMapper.decode(
- Uid.decodeId(routingHashBytes.bytes, routingHashBytes.offset, routingHashBytes.length)
+ int tsIdOrdinal = docValues.docTsIdOrdinal(docID);
+ current = new SyntheticTerm(
+ docID,
+ tsIdOrdinal,
+ docValues.lookupTsIdOrd(tsIdOrdinal),
+ docValues.docTimestamp(docID),
+ docValues.docRoutingHash(docID)
);
- term = Uid.encodeId(TsidExtractingIdFieldMapper.createId(routingHash, tsId, timestamp));
- latestTimestamp = timestamp;
- latestTsId = tsId;
- return term;
+ return current.term();
}
@Override
- public SeekStatus seekCeil(BytesRef id) {
+ public SeekStatus seekCeil(BytesRef id) throws IOException {
+
assert id != null;
- if (term != null && term.equals(id)) {
- return SeekStatus.FOUND;
+ assert Long.BYTES + Integer.BYTES < id.length : id.length;
+ if (id == null || id.length <= Long.BYTES + Integer.BYTES) {
+ return SeekStatus.NOT_FOUND;
}
- try {
- while (next() != null) {
- if (term.equals(id)) {
- return SeekStatus.FOUND;
+
+ // Extract the _tsid
+ final BytesRef tsId = TsidExtractingIdFieldMapper.extractTimeSeriesIdFromSyntheticId(id);
+ int tsIdOrd = docValues.lookupTsIdTerm(tsId);
+
+ // _tsid not found
+ if (tsIdOrd < 0) {
+ tsIdOrd = -tsIdOrd - 1;
+ // set the terms enum on the first non-matching document
+ if (tsIdOrd < docValues.getTsIdValueCount()) {
+ int docID = docValues.slowScanToFirstDocWithTsIdOrdinalEqualOrGreaterThan(tsIdOrd);
+ if (docID != DocIdSetIterator.NO_MORE_DOCS) {
+ current = new SyntheticTerm(
+ docID,
+ tsIdOrd,
+ docValues.lookupTsIdOrd(tsIdOrd),
+ docValues.docTimestamp(docID),
+ docValues.docRoutingHash(docID)
+ );
+ return SeekStatus.NOT_FOUND;
}
}
- } catch (IOException e) {
- throw new UncheckedIOException(e);
+ // no docs/terms to iterate on
+ current = NO_MORE_DOCS;
+ return SeekStatus.END;
+ }
+
+ // _tsid found, extract the timestamp
+ final long timestamp = TsidExtractingIdFieldMapper.extractTimestampFromSyntheticId(id);
+
+ // Slow scan to the first document matching the _tsid
+ final int startDocID = docValues.slowScanToFirstDocWithTsIdOrdinalEqualTo(tsIdOrd);
+ assert startDocID >= 0 : startDocID;
+
+ int docID = startDocID;
+ int docTsIdOrd = tsIdOrd;
+ long docTimestamp;
+
+ // Iterate over documents to find the first one matching the timestamp
+ for (; docID < maxDocs; docID++) {
+ docTimestamp = docValues.docTimestamp(docID);
+ if (startDocID < docID) {
+ // After the first doc, we need to check again if _tsid matches
+ docTsIdOrd = docValues.docTsIdOrdinal(docID);
+ }
+ if (docTsIdOrd == tsIdOrd && docTimestamp == timestamp) {
+ // It's a match!
+ current = new SyntheticTerm(docID, tsIdOrd, tsId, docTimestamp, docValues.docRoutingHash(docID));
+ return SeekStatus.FOUND;
+ }
+ // Remaining docs don't match, stop here
+ if (tsIdOrd < docTsIdOrd || docTimestamp < timestamp) {
+ break;
+ }
}
+ current = NO_MORE_DOCS;
return SeekStatus.END;
}
@Override
public BytesRef term() {
- return term;
+ ensurePositioned();
+ return current.term();
}
@Override
public PostingsEnum postings(PostingsEnum reuse, int flags) {
- return new FakePostingsEnum(docID, latestTsId, latestTimestamp, maxDocs);
+ ensurePositioned();
+ return new SyntheticIdPostingsEnum(current);
}
/**
@@ -258,23 +536,19 @@ public ImpactsEnum impacts(int flags) throws IOException {
}
}
- /**
- * Do not use in production. See {@link FakeTermsEnum}.
- */
- private class FakePostingsEnum extends PostingsEnum {
+ private class SyntheticIdPostingsEnum extends PostingsEnum {
- private final int startDocID;
- private final BytesRef latestTsId;
- private final long latestTimestamp;
- private final int maxDocs;
- private int docID;
+ private final DocValuesHolder docValues;
- private FakePostingsEnum(int docID, BytesRef latestTsId, long latestTimestamp, int maxDocs) {
- this.startDocID = docID;
- this.latestTsId = latestTsId;
- this.latestTimestamp = latestTimestamp;
- this.maxDocs = maxDocs;
- this.docID = -1;
+ /**
+ * Current synthetic term the postings is pinned on.
+ */
+ private final SyntheticTerm term;
+ private int docID = -1;
+
+ private SyntheticIdPostingsEnum(SyntheticTerm term) {
+ this.docValues = new DocValuesHolder(fieldInfos, docValuesProducer);
+ this.term = Objects.requireNonNull(term);
}
@Override
@@ -286,61 +560,27 @@ public int docID() {
public int nextDoc() throws IOException {
if (docID == DocIdSetIterator.NO_MORE_DOCS) {
return docID;
- } else if (docID == -1) {
- docID = startDocID;
- } else {
- docID = docID + 1;
- if (maxDocs <= docID) {
- docID = DocIdSetIterator.NO_MORE_DOCS;
- return docID;
- }
}
-
- // Retrieve _tsid
- SortedDocValues tsIdDocValues = docValuesProducer.getSorted(fieldInfos.fieldInfo(TS_ID));
- boolean found = tsIdDocValues.advanceExact(docID);
- assert found;
- int tsIdOrd = tsIdDocValues.ordValue();
- BytesRef tsId = tsIdDocValues.lookupOrd(tsIdOrd);
- assert tsId != null;
-
- if (latestTsId != null && latestTsId.equals(tsId) == false) {
- // Different _tsid, stop here
- docID = DocIdSetIterator.NO_MORE_DOCS;
- return docID;
- }
-
- // Retrieve timestamp
- SortedNumericDocValues timestampDocValues = docValuesProducer.getSortedNumeric(fieldInfos.fieldInfo(TIMESTAMP));
- found = timestampDocValues.advanceExact(docID);
- assert found;
- assert timestampDocValues.docValueCount() == 1;
- long timestamp = timestampDocValues.nextValue();
-
- if (latestTimestamp != -1L && latestTimestamp != timestamp) {
- // Different @timestamp, stop here
- docID = DocIdSetIterator.NO_MORE_DOCS;
- return docID;
+ int nextDocID = (docID == -1) ? term.docID() : docID + 1;
+ if (nextDocID < maxDocs) {
+ int tsIdOrd = docValues.docTsIdOrdinal(nextDocID);
+ if (tsIdOrd == term.tsIdOrd()) {
+ long timestamp = docValues.docTimestamp(nextDocID);
+ if (timestamp == term.timestamp()) {
+ assert Objects.equals(docValues.docRoutingHash(nextDocID), term.routingHash());
+ assert Objects.equals(docValues.lookupTsIdOrd(tsIdOrd), term.tsId());
+ docID = nextDocID;
+ return docID;
+ }
+ }
}
-
- // Retrieve routing hash
- var tsRoutingHash = fieldInfos.fieldInfo(TimeSeriesRoutingHashFieldMapper.NAME);
- assert tsRoutingHash != null;
- SortedDocValues routingHashDocValues = docValuesProducer.getSorted(tsRoutingHash);
- found = routingHashDocValues.advanceExact(docID);
- assert found;
- BytesRef routingHashBytes = routingHashDocValues.lookupOrd(routingHashDocValues.ordValue());
- assert routingHashBytes != null;
+ docID = DocIdSetIterator.NO_MORE_DOCS;
return docID;
}
@Override
public int advance(int target) throws IOException {
- int doc;
- while ((doc = nextDoc()) < target) {
- // Continue
- }
- return doc;
+ return slowAdvance(target);
}
@Override
@@ -374,6 +614,15 @@ public BytesRef getPayload() throws IOException {
}
}
+ private static BytesRef syntheticId(BytesRef tsId, long timestamp, BytesRef routingHashBytes) {
+ assert tsId != null;
+ assert timestamp > 0L;
+ assert routingHashBytes != null;
+ String routingHashString = Uid.decodeId(routingHashBytes.bytes, routingHashBytes.offset, routingHashBytes.length);
+ int routingHash = TimeSeriesRoutingHashFieldMapper.decode(routingHashString);
+ return TsidExtractingIdFieldMapper.createSyntheticIdBytesRef(tsId, timestamp, routingHash);
+ }
+
private static boolean assertFieldInfosExist(FieldInfos fieldInfos, String... fieldNames) {
assert fieldNames != null && fieldNames.length > 0 : "fieldNames should be > 0";
for (var fieldName : fieldNames) {
diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdPostingsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdPostingsFormat.java
index cfe9975f33a1b..66a6aa7151c6b 100644
--- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdPostingsFormat.java
+++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/TSDBSyntheticIdPostingsFormat.java
@@ -19,6 +19,7 @@
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.SyntheticIdField;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
+import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
import java.io.IOException;
@@ -27,6 +28,7 @@ public class TSDBSyntheticIdPostingsFormat extends PostingsFormat {
public static final String SYNTHETIC_ID = SyntheticIdField.NAME;
public static final String TIMESTAMP = DataStreamTimestampFieldMapper.DEFAULT_PATH;
public static final String TS_ID = TimeSeriesIdFieldMapper.NAME;
+ public static final String TS_ROUTING_HASH = TimeSeriesRoutingHashFieldMapper.NAME;
static final String FORMAT_NAME = "TSDBSyntheticId";
static final String SUFFIX = "0";
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 47e9ab7803a84..d9bf37717edc5 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -245,7 +245,7 @@ public InternalEngine(EngineConfig engineConfig) {
InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction localCheckpointTrackerSupplier) {
super(engineConfig);
this.maxDocs = maxDocs;
- if (engineConfig.getIndexSettings().useTsdbSyntheticId()) {
+ if (engineConfig.getIndexSettings().useTimeSeriesSyntheticId()) {
logger.info("using TSDB with synthetic id");
useTsdbSyntheticId = true;
} else {
@@ -1067,7 +1067,13 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th
directoryReader -> {
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
- return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), op.id(), loadSeqNo);
+ return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(
+ directoryReader,
+ op.uid(),
+ op.id(),
+ loadSeqNo,
+ useTsdbSyntheticId
+ );
} else {
return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), loadSeqNo);
}
@@ -1442,7 +1448,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try {
- logDocumentsDetails(index.docs());
+ logDocumentsDetails(index.docs(), index.id(), index.uid());
if (plan.addStaleOpToLucene) {
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) {
@@ -1478,10 +1484,10 @@ && treatDocumentFailureAsTragicError(index) == false) {
}
}
- private void logDocumentsDetails(List docs) {
+ private void logDocumentsDetails(List docs, String id, BytesRef uid) {
if (useTsdbSyntheticId && logger.isTraceEnabled()) {
for (var doc : docs) {
- logger.trace("indexing document fields [{}]", doc.getFields());
+ logger.trace("indexing document [id: {}, uid: {}]:\n{}\r\n", id, uid, doc.getFields());
}
}
}
@@ -1859,8 +1865,10 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws
try {
final ParsedDocument tombstone = ParsedDocument.deleteTombstone(
engineConfig.getIndexSettings().seqNoIndexOptions(),
+ engineConfig.getIndexSettings().useDocValuesSkipper(),
useTsdbSyntheticId,
- delete.id()
+ delete.id(),
+ delete.uid()
);
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
@@ -1869,6 +1877,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Delete tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeletesField);
+ logDocumentsDetails(List.of(doc), delete.id(), delete.uid());
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
indexWriter.addDocument(doc);
} else {
@@ -2806,7 +2815,7 @@ private IndexWriterConfig getIndexWriterConfig() {
new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD,
() -> softDeletesPolicy.getRetentionQuery(engineConfig.getIndexSettings().seqNoIndexOptions()),
- new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
+ useTsdbSyntheticId ? mergePolicy : new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
)
);
if (SHUFFLE_FORCE_MERGE) {
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IdLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/IdLoader.java
index 9ceae1c750733..30407f8b4645f 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/IdLoader.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/IdLoader.java
@@ -38,8 +38,12 @@ static IdLoader fromLeafStoredFieldLoader() {
/**
* @return returns an {@link IdLoader} instance that syn synthesizes _id from routing, _tsid and @timestamp fields.
*/
- static IdLoader createTsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List routingPaths) {
- return new TsIdLoader(indexRouting, routingPaths);
+ static IdLoader createTsIdLoader(
+ IndexRouting.ExtractFromSource.ForRoutingPath indexRouting,
+ List routingPaths,
+ boolean useSyntheticId
+ ) {
+ return new TsIdLoader(indexRouting, routingPaths, useSyntheticId);
}
Leaf leaf(LeafStoredFieldLoader loader, LeafReader reader, int[] docIdsInLeaf) throws IOException;
@@ -61,10 +65,12 @@ final class TsIdLoader implements IdLoader {
private final IndexRouting.ExtractFromSource.ForRoutingPath indexRouting;
private final List routingPaths;
+ private final boolean useSyntheticId;
- TsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List routingPaths) {
+ TsIdLoader(IndexRouting.ExtractFromSource.ForRoutingPath indexRouting, List routingPaths, boolean useSyntheticId) {
this.routingPaths = routingPaths;
this.indexRouting = indexRouting;
+ this.useSyntheticId = useSyntheticId;
}
public IdLoader.Leaf leaf(LeafStoredFieldLoader loader, LeafReader reader, int[] docIdsInLeaf) throws IOException {
@@ -119,7 +125,11 @@ public IdLoader.Leaf leaf(LeafStoredFieldLoader loader, LeafReader reader, int[]
int routingHash = TimeSeriesRoutingHashFieldMapper.decode(
Uid.decodeId(routingHashBytes.bytes, routingHashBytes.offset, routingHashBytes.length)
);
- ids[i] = TsidExtractingIdFieldMapper.createId(routingHash, tsid, timestamp);
+ if (useSyntheticId) {
+ ids[i] = TsidExtractingIdFieldMapper.createSyntheticId(tsid, timestamp, routingHash);
+ } else {
+ ids[i] = TsidExtractingIdFieldMapper.createId(routingHash, tsid, timestamp);
+ }
}
}
return new TsIdLeaf(docIdsInLeaf, ids);
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java
index 61b26ca33b1ef..ef56ad180e32f 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java
@@ -10,6 +10,9 @@
package org.elasticsearch.index.mapper;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.LongField;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
@@ -73,7 +76,7 @@ public static ParsedDocument noopTombstone(SeqNoFieldMapper.SeqNoIndexOptions se
* @param id the id of the deleted document
*/
public static ParsedDocument deleteTombstone(SeqNoFieldMapper.SeqNoIndexOptions seqNoIndexOptions, String id) {
- return deleteTombstone(seqNoIndexOptions, false, id);
+ return deleteTombstone(seqNoIndexOptions, false /* ignored */, false, id, null /* ignored */);
}
/**
@@ -82,7 +85,13 @@ public static ParsedDocument deleteTombstone(SeqNoFieldMapper.SeqNoIndexOptions
* @param useSyntheticId whether the id is synthetic or not
* @param id the id of the deleted document
*/
- public static ParsedDocument deleteTombstone(SeqNoFieldMapper.SeqNoIndexOptions seqNoIndexOptions, boolean useSyntheticId, String id) {
+ public static ParsedDocument deleteTombstone(
+ SeqNoFieldMapper.SeqNoIndexOptions seqNoIndexOptions,
+ boolean useDocValuesSkipper,
+ boolean useSyntheticId,
+ String id,
+ BytesRef uid
+ ) {
LuceneDocument document = new LuceneDocument();
SeqNoFieldMapper.SequenceIDFields seqIdFields = SeqNoFieldMapper.SequenceIDFields.tombstone(seqNoIndexOptions);
seqIdFields.addFields(document);
@@ -91,7 +100,21 @@ public static ParsedDocument deleteTombstone(SeqNoFieldMapper.SeqNoIndexOptions
if (useSyntheticId) {
// Use a synthetic _id field which is not indexed nor stored
document.add(IdFieldMapper.syntheticIdField(id));
- // TODO I think we also need to add the fields that compose the synthetic _id.
+
+ var timeSeriesId = TsidExtractingIdFieldMapper.extractTimeSeriesIdFromSyntheticId(uid);
+ var timestamp = TsidExtractingIdFieldMapper.extractTimestampFromSyntheticId(uid);
+ var routingHash = TsidExtractingIdFieldMapper.extractRoutingHashBytesFromSyntheticId(uid);
+
+ if (useDocValuesSkipper) {
+ document.add(SortedDocValuesField.indexedField(TimeSeriesIdFieldMapper.NAME, timeSeriesId));
+ document.add(SortedNumericDocValuesField.indexedField("@timestamp", timestamp));
+ } else {
+ document.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, timeSeriesId));
+ document.add(new LongField("@timestamp", timestamp, Field.Store.NO));
+ }
+ var field = new SortedDocValuesField(TimeSeriesRoutingHashFieldMapper.NAME, routingHash);
+ document.add(field);
+
} else {
// Use standard _id field (indexed and stored, some indices also trim the stored field at some point)
document.add(IdFieldMapper.standardIdField(id));
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java
index 1bb7001b29890..f5cbfb5cd71cb 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java
@@ -67,7 +67,11 @@ public static BytesRef createField(DocumentParserContext context, RoutingHashBui
|| id.equals(indexRouting.createId(context.sourceToParse().getXContentType(), context.sourceToParse().source(), suffix));
} else if (context.sourceToParse().routing() != null) {
int routingHash = TimeSeriesRoutingHashFieldMapper.decode(context.sourceToParse().routing());
- id = createId(routingHash, tsid, timestamp);
+ if (context.indexSettings().useTimeSeriesSyntheticId()) {
+ id = createSyntheticId(tsid, timestamp, routingHash);
+ } else {
+ id = createId(routingHash, tsid, timestamp);
+ }
} else {
if (context.sourceToParse().id() == null) {
throw new IllegalArgumentException(
@@ -94,7 +98,7 @@ public static BytesRef createField(DocumentParserContext context, RoutingHashBui
context.id(id);
final Field idField;
- if (context.indexSettings().useTsdbSyntheticId()) {
+ if (context.indexSettings().useTimeSeriesSyntheticId()) {
idField = syntheticIdField(context.id());
} else {
idField = standardIdField(context.id());
@@ -118,6 +122,13 @@ public static String createId(int routingHash, BytesRef tsid, long timestamp) {
return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(bytes);
}
+ public static long extractTimestampFromId(byte[] id) {
+ assert id.length == 20;
+ // id format: [4 bytes (basic hash routing fields), 8 bytes prefix of 128 murmurhash dimension fields, 8 bytes
+ // @timestamp)
+ return ByteUtils.readLongBE(id, 12);
+ }
+
public static String createId(
boolean dynamicMappersExists,
RoutingHashBuilder routingBuilder,
@@ -141,6 +152,57 @@ public static String createId(
return id;
}
+ public static BytesRef createSyntheticIdBytesRef(BytesRef tsid, long timestamp, int routingHash) {
+ // A synthetic _id has the format: [_tsid (non-fixed length) + (Long.MAX_VALUE - timestamp) (8 bytes) + routing hash (4 bytes)].
+ // We dont' use hashing here because we need to be able to extract the concatenated values from the _id in various places, like
+ // when applying doc values updates in Lucene, or when routing GET or DELETE requests to the corresponding shard, or when replaying
+ // translog operations. Since the synthetic _id is not indexed and not really stored on disk we consider it fine if it is longer
+ // that standard ids.
+ //
+ // Also, when applying doc values updates Lucene expects _id to be sorted: it stops applying updates for a term "_id:ABC" if it
+ // seeks to a term "BCD" as it knows there won't be more documents matching "_id:ABC" past the term "BCD". So it is important to
+ // generate an _id as a byte array whose lexicographical order reflects the order of the documents in the segment. For this reason,
+ // the timestamp is stored in the synthetic _id as (Long.MAX_VALUE - timestamp).
+ byte[] bytes = new byte[tsid.length + Long.BYTES + Integer.BYTES];
+ System.arraycopy(tsid.bytes, tsid.offset, bytes, 0, tsid.length);
+ ByteUtils.writeLongBE(Long.MAX_VALUE - timestamp, bytes, tsid.length); // Big Endian as we want to most significant byte first
+ ByteUtils.writeIntBE(routingHash, bytes, tsid.length + Long.BYTES);
+ return new BytesRef(bytes);
+ }
+
+ public static String createSyntheticId(BytesRef tsid, long timestamp, int routingHash) {
+ BytesRef id = createSyntheticIdBytesRef(tsid, timestamp, routingHash);
+ return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(id.bytes);
+ }
+
+ public static BytesRef extractTimeSeriesIdFromSyntheticId(BytesRef id) {
+ assert id.length > Long.BYTES + Integer.BYTES;
+ // See #createSyntheticId
+ byte[] tsId = new byte[Math.toIntExact(id.length - Long.BYTES - Integer.BYTES)];
+ System.arraycopy(id.bytes, id.offset, tsId, 0, tsId.length);
+ return new BytesRef(tsId);
+ }
+
+ public static long extractTimestampFromSyntheticId(BytesRef id) {
+ assert id.length > Long.BYTES + Integer.BYTES;
+ // See #createSyntheticId
+ long delta = ByteUtils.readLongBE(id.bytes, id.offset + id.length - Long.BYTES - Integer.BYTES);
+ long timestamp = Long.MAX_VALUE - delta;
+ assert timestamp >= 0 : delta;
+ return timestamp;
+ }
+
+ public static int extractRoutingHashFromSyntheticId(BytesRef id) {
+ assert id.length > Long.BYTES + Integer.BYTES;
+ // See #createSyntheticId
+ return ByteUtils.readIntBE(id.bytes, id.offset + id.length - Integer.BYTES);
+ }
+
+ public static BytesRef extractRoutingHashBytesFromSyntheticId(BytesRef id) {
+ int hash = extractRoutingHashFromSyntheticId(id);
+ return Uid.encodeId(TimeSeriesRoutingHashFieldMapper.encode(hash));
+ }
+
@Override
public String documentDescription(DocumentParserContext context) {
/*
diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java
index 475d2d1887563..0b94b3be3650f 100644
--- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java
+++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java
@@ -971,7 +971,7 @@ public IdLoader newIdLoader() {
}
}
}
- return IdLoader.createTsIdLoader(indexRouting, routingPaths);
+ return IdLoader.createTsIdLoader(indexRouting, routingPaths, indexService.getIndexSettings().useTimeSeriesSyntheticId());
} else {
return IdLoader.fromLeafStoredFieldLoader();
}
diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java
index 2e69987f29180..59c8a2028d98a 100644
--- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java
+++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java
@@ -159,7 +159,7 @@ public void testLoadTimestampRangeWithDeleteTombstone() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setMergePolicy(NoMergePolicy.INSTANCE));
var randomSeqNoIndexOptions = randomFrom(SeqNoFieldMapper.SeqNoIndexOptions.values());
- writer.addDocument(ParsedDocument.deleteTombstone(randomSeqNoIndexOptions, false, "_id").docs().get(0));
+ writer.addDocument(ParsedDocument.deleteTombstone(randomSeqNoIndexOptions, "_id").docs().get(0));
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), true);
diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java
index 59c82195b7fce..141b865a8395b 100644
--- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java
+++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java
@@ -200,7 +200,7 @@ public void testTimeSeriesLoadDocIdAndVersion() throws Exception {
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1));
String id = createTSDBId(1000L);
assertThat(
- VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("1"), id, randomBoolean()),
+ VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("1"), id, randomBoolean(), false),
nullValue()
);
@@ -222,11 +222,23 @@ public void testTimeSeriesLoadDocIdAndVersion() throws Exception {
directoryReader = reopen(directoryReader);
id = createTSDBId(randomLongBetween(1000, 10000));
- assertThat(VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("1"), id, true), notNullValue());
- assertThat(VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("2"), id, true), notNullValue());
+ assertThat(
+ VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("1"), id, true, false),
+ notNullValue()
+ );
+ assertThat(
+ VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("2"), id, true, false),
+ notNullValue()
+ );
id = createTSDBId(randomBoolean() ? randomLongBetween(0, 999) : randomLongBetween(10001, Long.MAX_VALUE));
- assertThat(VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("1"), id, true), nullValue());
- assertThat(VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("2"), id, true), nullValue());
+ assertThat(
+ VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("1"), id, true, false),
+ nullValue()
+ );
+ assertThat(
+ VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, new BytesRef("2"), id, true, false),
+ nullValue()
+ );
directoryReader.close();
writer.close();
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java b/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java
index 083efccceec16..bf595392108c3 100644
--- a/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java
+++ b/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java
@@ -46,7 +46,8 @@ public class IdLoaderTests extends ESTestCase {
private final int routingHash = randomInt();
public void testSynthesizeIdSimple() throws Exception {
- var idLoader = IdLoader.createTsIdLoader(null, null);
+ final boolean useSyntheticIds = randomBoolean();
+ var idLoader = IdLoader.createTsIdLoader(null, null, useSyntheticIds);
long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
List docs = List.of(
@@ -60,15 +61,16 @@ public void testSynthesizeIdSimple() throws Exception {
assertThat(leafReader.numDocs(), equalTo(3));
var leaf = idLoader.leaf(null, leafReader, new int[] { 0, 1, 2 });
// NOTE: time series data is ordered by (tsid, timestamp)
- assertThat(leaf.getId(0), equalTo(expectedId(docs.get(2), routingHash)));
- assertThat(leaf.getId(1), equalTo(expectedId(docs.get(0), routingHash)));
- assertThat(leaf.getId(2), equalTo(expectedId(docs.get(1), routingHash)));
+ assertThat(leaf.getId(0), equalTo(expectedId(docs.get(2), routingHash, useSyntheticIds)));
+ assertThat(leaf.getId(1), equalTo(expectedId(docs.get(0), routingHash, useSyntheticIds)));
+ assertThat(leaf.getId(2), equalTo(expectedId(docs.get(1), routingHash, useSyntheticIds)));
};
prepareIndexReader(indexAndForceMerge(docs, routingHash), verify, false);
}
public void testSynthesizeIdMultipleSegments() throws Exception {
- var idLoader = IdLoader.createTsIdLoader(null, null);
+ final boolean useSyntheticIds = randomBoolean();
+ var idLoader = IdLoader.createTsIdLoader(null, null, useSyntheticIds);
long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
List docs1 = List.of(
@@ -110,22 +112,22 @@ public void testSynthesizeIdMultipleSegments() throws Exception {
assertThat(leafReader.numDocs(), equalTo(docs1.size()));
var leaf = idLoader.leaf(null, leafReader, IntStream.range(0, docs1.size()).toArray());
for (int i = 0; i < docs1.size(); i++) {
- assertThat(leaf.getId(i), equalTo(expectedId(docs1.get(i), routingHash)));
+ assertThat(leaf.getId(i), equalTo(expectedId(docs1.get(i), routingHash, useSyntheticIds)));
}
}
{
LeafReader leafReader = indexReader.leaves().get(1).reader();
assertThat(leafReader.numDocs(), equalTo(docs2.size()));
var leaf = idLoader.leaf(null, leafReader, new int[] { 0, 3 });
- assertThat(leaf.getId(0), equalTo(expectedId(docs2.get(0), routingHash)));
- assertThat(leaf.getId(3), equalTo(expectedId(docs2.get(3), routingHash)));
+ assertThat(leaf.getId(0), equalTo(expectedId(docs2.get(0), routingHash, useSyntheticIds)));
+ assertThat(leaf.getId(3), equalTo(expectedId(docs2.get(3), routingHash, useSyntheticIds)));
}
{
LeafReader leafReader = indexReader.leaves().get(2).reader();
assertThat(leafReader.numDocs(), equalTo(docs3.size()));
var leaf = idLoader.leaf(null, leafReader, new int[] { 1, 2 });
- assertThat(leaf.getId(1), equalTo(expectedId(docs3.get(1), routingHash)));
- assertThat(leaf.getId(2), equalTo(expectedId(docs3.get(2), routingHash)));
+ assertThat(leaf.getId(1), equalTo(expectedId(docs3.get(1), routingHash, useSyntheticIds)));
+ assertThat(leaf.getId(2), equalTo(expectedId(docs3.get(2), routingHash, useSyntheticIds)));
}
{
LeafReader leafReader = indexReader.leaves().get(2).reader();
@@ -138,7 +140,8 @@ public void testSynthesizeIdMultipleSegments() throws Exception {
}
public void testSynthesizeIdRandom() throws Exception {
- var idLoader = IdLoader.createTsIdLoader(null, null);
+ final boolean useSyntheticIds = randomBoolean();
+ var idLoader = IdLoader.createTsIdLoader(null, null, useSyntheticIds);
long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
Set expectedIDs = new HashSet<>();
@@ -161,7 +164,7 @@ public void testSynthesizeIdRandom() throws Exception {
for (int j = 0; j < numberOfSamples; j++) {
Doc doc = new Doc(startTime++, dimensions);
randomDocs.add(doc);
- expectedIDs.add(expectedId(doc, routingHash));
+ expectedIDs.add(expectedId(doc, routingHash, useSyntheticIds));
}
}
CheckedConsumer verify = indexReader -> {
@@ -240,7 +243,7 @@ private static void indexDoc(IndexWriter iw, Doc doc, int routingHash) throws IO
iw.addDocument(fields);
}
- private static String expectedId(Doc doc, int routingHash) throws IOException {
+ private static String expectedId(Doc doc, int routingHash, boolean useSyntheticIds) {
var routingFields = new RoutingPathFields(null);
for (Dimension dimension : doc.dimensions) {
if (dimension.value instanceof Number n) {
@@ -249,7 +252,11 @@ private static String expectedId(Doc doc, int routingHash) throws IOException {
routingFields.addString(dimension.field, dimension.value.toString());
}
}
- return TsidExtractingIdFieldMapper.createId(routingHash, routingFields.buildHash().toBytesRef(), doc.timestamp);
+ if (useSyntheticIds) {
+ return TsidExtractingIdFieldMapper.createSyntheticId(routingFields.buildHash().toBytesRef(), doc.timestamp, routingHash);
+ } else {
+ return TsidExtractingIdFieldMapper.createId(routingHash, routingFields.buildHash().toBytesRef(), doc.timestamp);
+ }
}
record Doc(long timestamp, List dimensions) {}
diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 8b902ee44ed0b..25ee257c3eeb5 100644
--- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -4571,7 +4571,7 @@ public void testOnCloseStats() throws IOException {
public void testSupplyTombstoneDoc() throws Exception {
IndexShard shard = newStartedShard();
String id = randomRealisticUnicodeOfLengthBetween(1, 10);
- ParsedDocument deleteTombstone = ParsedDocument.deleteTombstone(shard.indexSettings.seqNoIndexOptions(), randomBoolean(), id);
+ ParsedDocument deleteTombstone = ParsedDocument.deleteTombstone(shard.indexSettings.seqNoIndexOptions(), id);
assertThat(deleteTombstone.docs(), hasSize(1));
LuceneDocument deleteDoc = deleteTombstone.docs().get(0);
assertThat(