Skip to content

Commit

Permalink
Trim stored fields for _id field in tsdb (#97409)
Browse files Browse the repository at this point in the history
And in the fetch phase synthesize _id on the fly.

The _id is composed out of a hash of routing fields, tsid and timestamp. These are all properties that can be retrieved from doc values and used to generate the _id on the fly.
  • Loading branch information
martijnvg committed Aug 25, 2023
1 parent 4b48a80 commit 0ba4e75
Show file tree
Hide file tree
Showing 15 changed files with 815 additions and 25 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/97409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 97409
summary: Trim stored fields for `_id` field in tsdb
area: TSDB
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,36 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.diskusage.AnalyzeIndexDiskUsageAction;
import org.elasticsearch.action.admin.indices.diskusage.AnalyzeIndexDiskUsageRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.xcontent.XContentType;

Expand All @@ -36,8 +47,14 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class TSDBIndexingIT extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -76,7 +93,7 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class);
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class);
}

@Override
Expand Down Expand Up @@ -436,6 +453,136 @@ public void testSkippingShards() throws Exception {
}
}

public void testTrimId() throws Exception {
String dataStreamName = "k8s";
var putTemplateRequest = new PutComposableIndexTemplateAction.Request("id");
putTemplateRequest.indexTemplate(
new ComposableIndexTemplate(
List.of(dataStreamName + "*"),
new Template(
Settings.builder()
.put("index.mode", "time_series")
.put("index.number_of_replicas", 0)
// Reduce sync interval to speedup this integraton test,
// otherwise by default it will take 30 seconds before minimum retained seqno is updated:
.put("index.soft_deletes.retention_lease.sync_interval", "100ms")
.build(),
new CompressedXContent(MAPPING_TEMPLATE),
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, putTemplateRequest).actionGet();

// index some data
int numBulkRequests = 32;
int numDocsPerBulk = 256;
String indexName = null;
{
Instant time = Instant.now();
for (int i = 0; i < numBulkRequests; i++) {
BulkRequest bulkRequest = new BulkRequest(dataStreamName);
for (int j = 0; j < numDocsPerBulk; j++) {
var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
bulkRequest.add(indexRequest);
time = time.plusMillis(1);
}
var bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.hasFailures(), is(false));
indexName = bulkResponse.getItems()[0].getIndex();
}
client().admin().indices().refresh(new RefreshRequest(dataStreamName)).actionGet();
}

// Check whether there are multiple segments:
var getSegmentsResponse = client().admin().indices().segments(new IndicesSegmentsRequest(dataStreamName)).actionGet();
assertThat(
getSegmentsResponse.getIndices().get(indexName).getShards().get(0).shards()[0].getSegments(),
hasSize(greaterThanOrEqualTo(2))
);

// Pre check whether _id stored field uses diskspace:
var diskUsageResponse = client().execute(
AnalyzeIndexDiskUsageAction.INSTANCE,
new AnalyzeIndexDiskUsageRequest(new String[] { dataStreamName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true)
).actionGet();
var map = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(diskUsageResponse), false);
assertMap(
map,
matchesMap().extraOk()
.entry(
indexName,
matchesMap().extraOk()
.entry(
"fields",
matchesMap().extraOk()
.entry("_id", matchesMap().extraOk().entry("stored_fields_in_bytes", greaterThanOrEqualTo(1)))
)
)
);

// Check that the minimum retaining seqno has advanced, otherwise _id (and recovery source) doesn't get trimmed away.
var finalIndexName = indexName;
assertBusy(() -> {
var r = client().admin().indices().stats(new IndicesStatsRequest().indices(dataStreamName).all()).actionGet();
var retentionLeasesStats = r.getIndices().get(finalIndexName).getIndexShards().get(0).getShards()[0].getRetentionLeaseStats();
assertThat(retentionLeasesStats.retentionLeases().leases(), hasSize(1));
assertThat(
retentionLeasesStats.retentionLeases().leases().iterator().next().retainingSequenceNumber(),
equalTo((long) numBulkRequests * numDocsPerBulk)
);
});

// Force merge should trim the _id stored field away for all segments:
var forceMergeResponse = client().admin().indices().forceMerge(new ForceMergeRequest(dataStreamName).maxNumSegments(1)).actionGet();
assertThat(forceMergeResponse.getTotalShards(), equalTo(1));
assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(1));
assertThat(forceMergeResponse.getFailedShards(), equalTo(0));

// Check whether we really end up with 1 segment:
getSegmentsResponse = client().admin().indices().segments(new IndicesSegmentsRequest(dataStreamName)).actionGet();
assertThat(getSegmentsResponse.getIndices().get(indexName).getShards().get(0).shards()[0].getSegments(), hasSize(1));

// Check the _id stored field uses no disk space:
diskUsageResponse = client().execute(
AnalyzeIndexDiskUsageAction.INSTANCE,
new AnalyzeIndexDiskUsageRequest(new String[] { dataStreamName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true)
).actionGet();
map = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(diskUsageResponse), false);
assertMap(
map,
matchesMap().extraOk()
.entry(
indexName,
matchesMap().extraOk()
.entry(
"fields",
matchesMap().extraOk().entry("_id", matchesMap().extraOk().entry("stored_fields_in_bytes", equalTo(0)))
)
)
);

// Check the search api can synthesize _id
var searchRequest = new SearchRequest(dataStreamName);
searchRequest.source().trackTotalHits(true);
var searchResponse = client().search(searchRequest).actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numBulkRequests * numDocsPerBulk));
String id = searchResponse.getHits().getHits()[0].getId();
assertThat(id, notNullValue());

// Check that the _id is gettable:
var getResponse = client().get(new GetRequest(indexName).id(id)).actionGet();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getId(), equalTo(id));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2608,6 +2608,7 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(
SourceFieldMapper.RECOVERY_SOURCE_NAME,
engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.search.internal.FilterStoredFieldVisitor;

import java.io.IOException;
Expand All @@ -39,18 +40,27 @@
import java.util.function.Supplier;

final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
RecoverySourcePruneMergePolicy(String recoverySourceField, Supplier<Query> retainSourceQuerySupplier, MergePolicy in) {
RecoverySourcePruneMergePolicy(
String recoverySourceField,
boolean pruneIdField,
Supplier<Query> retainSourceQuerySupplier,
MergePolicy in
) {
super(in, toWrap -> new OneMerge(toWrap.segments) {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
CodecReader wrapped = toWrap.wrapForMerge(reader);
return wrapReader(recoverySourceField, wrapped, retainSourceQuerySupplier);
return wrapReader(recoverySourceField, pruneIdField, wrapped, retainSourceQuerySupplier);
}
});
}

private static CodecReader wrapReader(String recoverySourceField, CodecReader reader, Supplier<Query> retainSourceQuerySupplier)
throws IOException {
private static CodecReader wrapReader(
String recoverySourceField,
boolean pruneIdField,
CodecReader reader,
Supplier<Query> retainSourceQuerySupplier
) throws IOException {
NumericDocValues recoverySource = reader.getNumericDocValues(recoverySourceField);
if (recoverySource == null || recoverySource.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
return reader; // early terminate - nothing to do here since non of the docs has a recovery source anymore.
Expand All @@ -66,20 +76,22 @@ private static CodecReader wrapReader(String recoverySourceField, CodecReader re
if (recoverySourceToKeep.cardinality() == reader.maxDoc()) {
return reader; // keep all source
}
return new SourcePruningFilterCodecReader(recoverySourceField, reader, recoverySourceToKeep);
return new SourcePruningFilterCodecReader(recoverySourceField, pruneIdField, reader, recoverySourceToKeep);
} else {
return new SourcePruningFilterCodecReader(recoverySourceField, reader, null);
return new SourcePruningFilterCodecReader(recoverySourceField, pruneIdField, reader, null);
}
}

private static class SourcePruningFilterCodecReader extends FilterCodecReader {
private final BitSet recoverySourceToKeep;
private final String recoverySourceField;
private final boolean pruneIdField;

SourcePruningFilterCodecReader(String recoverySourceField, CodecReader reader, BitSet recoverySourceToKeep) {
SourcePruningFilterCodecReader(String recoverySourceField, boolean pruneIdField, CodecReader reader, BitSet recoverySourceToKeep) {
super(reader);
this.recoverySourceField = recoverySourceField;
this.recoverySourceToKeep = recoverySourceToKeep;
this.pruneIdField = pruneIdField;
}

@Override
Expand Down Expand Up @@ -125,7 +137,12 @@ public boolean advanceExact(int target) {

@Override
public StoredFieldsReader getFieldsReader() {
return new RecoverySourcePruningStoredFieldsReader(super.getFieldsReader(), recoverySourceToKeep, recoverySourceField);
return new RecoverySourcePruningStoredFieldsReader(
super.getFieldsReader(),
recoverySourceToKeep,
recoverySourceField,
pruneIdField
);
}

@Override
Expand Down Expand Up @@ -212,11 +229,18 @@ private static class RecoverySourcePruningStoredFieldsReader extends FilterStore

private final BitSet recoverySourceToKeep;
private final String recoverySourceField;

RecoverySourcePruningStoredFieldsReader(StoredFieldsReader in, BitSet recoverySourceToKeep, String recoverySourceField) {
private final boolean pruneIdField;

RecoverySourcePruningStoredFieldsReader(
StoredFieldsReader in,
BitSet recoverySourceToKeep,
String recoverySourceField,
boolean pruneIdField
) {
super(in);
this.recoverySourceToKeep = recoverySourceToKeep;
this.recoverySourceField = Objects.requireNonNull(recoverySourceField);
this.pruneIdField = pruneIdField;
}

@Override
Expand All @@ -230,6 +254,9 @@ public Status needsField(FieldInfo fieldInfo) throws IOException {
if (recoverySourceField.equals(fieldInfo.name)) {
return Status.NO;
}
if (pruneIdField && IdFieldMapper.NAME.equals(fieldInfo.name)) {
return Status.NO;
}
return super.needsField(fieldInfo);
}
});
Expand All @@ -238,12 +265,17 @@ public Status needsField(FieldInfo fieldInfo) throws IOException {

@Override
public StoredFieldsReader getMergeInstance() {
return new RecoverySourcePruningStoredFieldsReader(in.getMergeInstance(), recoverySourceToKeep, recoverySourceField);
return new RecoverySourcePruningStoredFieldsReader(
in.getMergeInstance(),
recoverySourceToKeep,
recoverySourceField,
pruneIdField
);
}

@Override
public StoredFieldsReader clone() {
return new RecoverySourcePruningStoredFieldsReader(in.clone(), recoverySourceToKeep, recoverySourceField);
return new RecoverySourcePruningStoredFieldsReader(in.clone(), recoverySourceToKeep, recoverySourceField, pruneIdField);
}

}
Expand Down

0 comments on commit 0ba4e75

Please sign in to comment.