Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: Implement downsampling on time-series indices #85708

Merged
merged 78 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
94fb431
Remove feature flag es.rollup_v2_feature_flag_enabled
csoulios Mar 24, 2022
b8e9d2b
RollupShardIndexer
csoulios Mar 30, 2022
7b7d06b
Cleanup
csoulios Mar 31, 2022
75ce70c
Test should not merge segments
csoulios Apr 3, 2022
98627b1
Refactoring code
csoulios Apr 3, 2022
3215864
Fix wrong timestamp order
csoulios Apr 3, 2022
34f42bd
Cleanup
csoulios Apr 3, 2022
29bc089
Cleanup
csoulios Apr 3, 2022
939a1c0
Removed CompressingOfflineSorter implementation
csoulios Apr 3, 2022
23d39a8
class to record
csoulios Apr 3, 2022
0f40815
Added more tests
csoulios Apr 4, 2022
7a44a68
Refactored rollup action config
csoulios Apr 4, 2022
b94e82d
Refactored rollup action config
csoulios Apr 5, 2022
e514fbb
Update docs/changelog/85708.yaml
csoulios Apr 5, 2022
219da73
Version updates
csoulios Apr 5, 2022
1a385e7
CI Fixes
csoulios Apr 6, 2022
633abac
checkstyle
csoulios Apr 6, 2022
d4daeb8
checkstyle
csoulios Apr 6, 2022
496e00f
Skip some failing tests
csoulios Apr 6, 2022
5a0642c
Fix broken test with wrong index sort order
csoulios Apr 6, 2022
d0222f0
Fix ILM test
csoulios Apr 6, 2022
b8bdcf9
Merge branch 'master' into tsdb-rollup
csoulios Apr 12, 2022
d9c6f6e
Enable test
csoulios Apr 12, 2022
ff864f2
Update docs/changelog/85708.yaml
csoulios Apr 15, 2022
0464806
Merge branch 'master' into tsdb-rollup
csoulios Apr 18, 2022
643962f
Pull timestamp from AggregationExecutionContext
csoulios Apr 18, 2022
d905038
Remove time_zone param from rollup config
csoulios Apr 18, 2022
29b77bc
Fix build failure
csoulios Apr 18, 2022
6237029
Merge branch 'master' into tsdb-rollup
csoulios Apr 18, 2022
a3e3e42
Added validations for rollup request parameters
csoulios Apr 18, 2022
0abece8
Update docs/changelog/85708.yaml
csoulios Apr 18, 2022
5dc065c
Set the number of shards and replicas
csoulios Apr 18, 2022
6a8976a
Validate that index is read-only before rolling up
csoulios Apr 19, 2022
d6adb3c
Delete source index after it has been downsampled
csoulios Apr 19, 2022
4492090
Added test and fix for rolling up data stream idx
csoulios Apr 19, 2022
fc255f6
minor change
csoulios Apr 19, 2022
2bc4600
More changes
csoulios Apr 19, 2022
b10254d
Set max as default_metric for gauge metrics
csoulios Apr 19, 2022
405915b
Merge branch 'master' into tsdb-rollup
csoulios Apr 20, 2022
d5de1aa
Minor change to address reviewer comments
csoulios Apr 20, 2022
bef7341
Tidy up code for data streams
csoulios Apr 20, 2022
af006e4
Changed hashCode
csoulios Apr 20, 2022
a1988a9
Merge branch 'master' into tsdb-rollup
csoulios Apr 26, 2022
cd23fd7
Return statement after error:
csoulios Apr 26, 2022
7973ca2
Merge branch 'master' into tsdb-rollup
csoulios Apr 26, 2022
98fd352
Merge branch 'master' into tsdb-rollup
csoulios Apr 29, 2022
c46b126
Fix compilation error
csoulios May 3, 2022
fb064c6
Merge branch 'master' into tsdb-rollup
csoulios May 3, 2022
3167f92
Merge branch 'master' into tsdb-rollup
csoulios May 9, 2022
ed94943
Fix broken test
csoulios May 9, 2022
48796c2
Merge branch 'master' into tsdb-rollup
csoulios May 9, 2022
78d110b
Merge branch 'master' into tsdb-rollup
csoulios May 10, 2022
f41343b
Compute last_value for metric fields
csoulios May 10, 2022
286cdbd
Removed temporary rollup index
csoulios May 12, 2022
ab1a4d8
Refresh the rollup index in the end
csoulios May 12, 2022
c22b6ba
Use randomValueOtherThan
csoulios May 12, 2022
4327b17
Do not serialize constants in RollupActionConfig
csoulios May 12, 2022
86c3347
Add tsdb feature flag
csoulios May 12, 2022
ed51704
Merge branch 'master' into tsdb-rollup
csoulios May 16, 2022
22a7589
minor change
csoulios May 16, 2022
ee42ef5
Perform the following actions in one state update:
csoulios May 17, 2022
d7d9b72
Perform batched cluster state update
csoulios May 19, 2022
332bef6
Cleanup the mess
csoulios May 19, 2022
17eb72b
Merge branch 'master' into tsdb-rollup
csoulios May 19, 2022
550a368
More cleanup
csoulios May 19, 2022
270ea69
Added feature flag to security tests
csoulios May 19, 2022
7197a5f
Added feature flag to build files
csoulios May 19, 2022
8b96364
Reverted wrong version bump
csoulios May 19, 2022
839a29d
Add the number of indexed docs to the shard response
csoulios May 20, 2022
de6b766
Added javadoc
csoulios May 20, 2022
0f3f827
Merge branch 'master' into tsdb-rollup
csoulios May 23, 2022
0ff80dc
Changes to rollup index:
csoulios May 23, 2022
7c4c441
Remove org.elasticsearch.rollup from module-info
csoulios May 23, 2022
5856f1d
Merge branch 'master' into tsdb-rollup
csoulios May 23, 2022
1dd28fd
Added more validation for source index
csoulios May 23, 2022
393d7c2
Do not fail rollup if force-merge fails
csoulios May 24, 2022
bfd9ce9
Merge branch 'master' into tsdb-rollup
csoulios May 24, 2022
0a06f41
Merge branch 'master' into tsdb-rollup
csoulios May 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85708.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85708
summary: "TSDB: Implement downsampling on time-series indices"
area: "TSDB"
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,20 @@ public final class TimeSeriesParams {
private TimeSeriesParams() {}

public enum MetricType {
gauge,
counter,
histogram,
summary
gauge(new String[] { "value_count", "sum", "min", "max" }),
counter(new String[] { "max" }),
histogram(new String[] { "value_count" }), // TODO Add more aggs
csoulios marked this conversation as resolved.
Show resolved Hide resolved
summary(new String[] { "value_count", "sum", "min", "max" });

private final String[] supportedAggs;

MetricType(String[] supportedAggs) {
this.supportedAggs = supportedAggs;
}

public String[] supportedAggs() {
return supportedAggs;
}
}

public static FieldMapper.Parameter<MetricType> metricParam(Function<FieldMapper, MetricType> initializer, MetricType... values) {
Expand Down
20 changes: 0 additions & 20 deletions server/src/main/java/org/elasticsearch/rollup/RollupV2.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
PriorityQueue<LeafWalker> queue = new PriorityQueue<>(searcher.getIndexReader().leaves().size()) {
@Override
protected boolean lessThan(LeafWalker a, LeafWalker b) {
return a.timestamp < b.timestamp;
return a.timestamp > b.timestamp;
csoulios marked this conversation as resolved.
Show resolved Hide resolved
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,28 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase {
// Open a searcher over a set of leaves
// Collection should be in order

private static final int THREADS = 5;
private static final int DOCS_PER_THREAD = 500;

public void testCollectInOrderAcrossSegments() throws IOException, InterruptedException {

Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexSort(
new Sort(
new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING),
new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG)
new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG, true)
)
);
RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);

AtomicInteger clock = new AtomicInteger(0);

final int THREADS = 5;
ExecutorService indexer = Executors.newFixedThreadPool(THREADS);
for (int i = 0; i < THREADS; i++) {
indexer.submit(() -> {
Document doc = new Document();
for (int j = 0; j < 500; j++) {
for (int j = 0; j < DOCS_PER_THREAD; j++) {
String tsid = "tsid" + randomIntBetween(0, 30);
long time = clock.addAndGet(randomIntBetween(0, 10));
doc.clear();
Expand All @@ -90,8 +92,8 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx

BucketCollector collector = new BucketCollector() {

BytesRef currentTSID = null;
long currentTimestamp = 0;
BytesRef previousTSID = null;
long previousTimestamp = 0;
long total = 0;

@Override
Expand All @@ -107,16 +109,18 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx)
public void collect(int doc, long owningBucketOrd) throws IOException {
assertTrue(tsid.advanceExact(doc));
assertTrue(timestamp.advanceExact(doc));
BytesRef latestTSID = tsid.lookupOrd(tsid.ordValue());
long latestTimestamp = timestamp.longValue();
if (currentTSID != null) {
assertTrue(currentTSID + "->" + latestTSID.utf8ToString(), latestTSID.compareTo(currentTSID) >= 0);
if (latestTSID.equals(currentTSID)) {
assertTrue(currentTimestamp + "->" + latestTimestamp, latestTimestamp >= currentTimestamp);
BytesRef currentTSID = tsid.lookupOrd(tsid.ordValue());
assertEquals(aggCtx.getTsid(), currentTSID);
long currentTimestamp = timestamp.longValue();
logger.info("{} -> {} / {} -> {}", previousTSID, currentTSID, previousTimestamp, currentTimestamp);
if (previousTSID != null) {
assertTrue(previousTSID + "->" + currentTSID.utf8ToString(), currentTSID.compareTo(previousTSID) >= 0);
if (currentTSID.equals(previousTSID)) {
assertTrue(previousTimestamp + "->" + currentTimestamp, currentTimestamp <= previousTimestamp);
}
}
currentTimestamp = latestTimestamp;
currentTSID = BytesRef.deepCopyOf(latestTSID);
previousTimestamp = currentTimestamp;
previousTSID = BytesRef.deepCopyOf(currentTSID);
total++;
}
};
Expand All @@ -129,7 +133,7 @@ public void preCollection() throws IOException {

@Override
public void postCollection() throws IOException {
assertEquals(2500, total);
assertEquals(THREADS * DOCS_PER_THREAD, total);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -375,6 +374,9 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
DeleteRollupJobAction.INSTANCE,
GetRollupJobsAction.INSTANCE,
GetRollupCapsAction.INSTANCE,
// TSDB Downsampling / Rollup
RollupIndexerAction.INSTANCE,
RollupAction.INSTANCE,
// ILM
DeleteLifecycleAction.INSTANCE,
GetLifecycleAction.INSTANCE,
Expand Down Expand Up @@ -411,12 +413,6 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
)
);

// rollupV2
if (RollupV2.isEnabled()) {
csoulios marked this conversation as resolved.
Show resolved Hide resolved
actions.add(RollupIndexerAction.INSTANCE);
actions.add(RollupAction.INSTANCE);
}

return actions;
}

Expand Down Expand Up @@ -520,6 +516,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::readFrom),
new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new),
// Transforms
new NamedWriteableRegistry.Entry(Metadata.Custom.class, TransformMetadata.TYPE, TransformMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, TransformMetadata.TYPE, TransformMetadata.TransformMetadataDiff::new),
Expand Down Expand Up @@ -565,10 +562,6 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
)
);

if (RollupV2.isEnabled()) {
namedWriteables.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new));
}

return namedWriteables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rollup.RollupV2;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,7 +55,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
UnfollowAction.NAME,
RolloverAction.NAME,
ReadOnlyAction.NAME,
RollupV2.isEnabled() ? RollupILMAction.NAME : null,
RollupILMAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME,
SearchableSnapshotAction.NAME
Expand All @@ -68,7 +67,8 @@ public class TimeseriesLifecycleType implements LifecycleType {
AllocateAction.NAME,
MigrateAction.NAME,
ShrinkAction.NAME,
ForceMergeAction.NAME
ForceMergeAction.NAME,
RollupILMAction.NAME
);
public static final List<String> ORDERED_VALID_COLD_ACTIONS = Stream.of(
SetPriorityAction.NAME,
Expand All @@ -78,7 +78,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
AllocateAction.NAME,
MigrateAction.NAME,
FreezeAction.NAME,
RollupV2.isEnabled() ? RollupILMAction.NAME : null
RollupILMAction.NAME
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be behind the tsdb feature flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rollup action will be available but it will not operate on indices that are not time series indices.

So, in a similar way as we did for TimeSeriesIdFieldMapper I did not put this behind a feature flag. I can add it though, if you think it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 86c3347

).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_FROZEN_ACTIONS = List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME);
public static final List<String> ORDERED_VALID_DELETE_ACTIONS = List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME);
Expand Down
Loading