Skip to content

Commit

Permalink
TSDB: Implement downsampling on time-series indices (#85708)
Browse files Browse the repository at this point in the history
This PR implements downsampling operation on time series indices.

The PR creates a _rollup endpoint that allows users to downsample an index and can be
accessed by the following call:

POST /<source_index>/_rollup/<rollup_index>
{
    "fixed_interval": "1d"
}

Requirements

An index can be downsampled if all of the following requirements are met:

    Must be a time series index (have the index.mode: time_series index setting)
    Must not be writeable (have the index.blocks.write: true index setting)
    Must have dimension fields marked with mapping parameter time_series_dimension: true
    Must have metric fields marked with mapping parameter time_series_metric

Relates to #74660

Fixes #65769
Fixes #69799
Finally, this PR is based on the code written for #64900
  • Loading branch information
csoulios committed May 25, 2022
1 parent beadcaf commit d539957
Show file tree
Hide file tree
Showing 40 changed files with 2,060 additions and 2,603 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/85708.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 85708
summary: "TSDB: Implement downsampling on time-series indices"
area: TSDB
type: feature
issues:
- 69799
- 65769
1 change: 0 additions & 1 deletion server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@
exports org.elasticsearch.rest.action.document;
exports org.elasticsearch.rest.action.ingest;
exports org.elasticsearch.rest.action.search;
exports org.elasticsearch.rollup;
exports org.elasticsearch.script;
exports org.elasticsearch.script.field;
exports org.elasticsearch.search;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,8 @@ public Index getResizeSourceIndex() {

public static final String INDEX_ROLLUP_SOURCE_UUID_KEY = "index.rollup.source.uuid";
public static final String INDEX_ROLLUP_SOURCE_NAME_KEY = "index.rollup.source.name";

public static final String INDEX_ROLLUP_STATUS_KEY = "index.rollup.status";
public static final Setting<String> INDEX_ROLLUP_SOURCE_UUID = Setting.simpleString(
INDEX_ROLLUP_SOURCE_UUID_KEY,
Property.IndexScope,
Expand All @@ -877,6 +879,24 @@ public Index getResizeSourceIndex() {
Property.PrivateIndex
);

public enum RollupTaskStatus {
STARTED,
SUCCESS;

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

public static final Setting<RollupTaskStatus> INDEX_ROLLUP_STATUS = Setting.enumSetting(
RollupTaskStatus.class,
INDEX_ROLLUP_STATUS_KEY,
RollupTaskStatus.SUCCESS,
Property.IndexScope,
Property.InternalIndex
);

// LIFECYCLE_NAME is here an as optimization, see LifecycleSettings.LIFECYCLE_NAME and
// LifecycleSettings.LIFECYCLE_NAME_SETTING for the 'real' version
public static final String LIFECYCLE_NAME = "index.lifecycle.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_ROLLUP_SOURCE_NAME,
IndexMetadata.INDEX_ROLLUP_SOURCE_UUID,
IndexMetadata.INDEX_ROLLUP_STATUS,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,20 @@ public final class TimeSeriesParams {
private TimeSeriesParams() {}

public enum MetricType {
gauge,
counter,
histogram,
summary
gauge(new String[] { "max", "min", "value_count", "sum" }),
counter(new String[] { "last_value" }),
histogram(new String[] { "value_count" }), // TODO Add more aggs
summary(new String[] { "value_count", "sum", "min", "max" });

private final String[] supportedAggs;

MetricType(String[] supportedAggs) {
this.supportedAggs = supportedAggs;
}

public String[] supportedAggs() {
return supportedAggs;
}
}

public static FieldMapper.Parameter<MetricType> metricParam(Function<FieldMapper, MetricType> initializer, MetricType... values) {
Expand Down
20 changes: 0 additions & 20 deletions server/src/main/java/org/elasticsearch/rollup/RollupV2.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.DeleteLicenseAction;
import org.elasticsearch.license.GetBasicStatusAction;
import org.elasticsearch.license.GetLicenseAction;
Expand All @@ -27,7 +28,6 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -412,8 +412,8 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
)
);

// rollupV2
if (RollupV2.isEnabled()) {
// TSDB Downsampling / Rollup
if (IndexSettings.isTimeSeriesModeEnabled()) {
actions.add(RollupIndexerAction.INSTANCE);
actions.add(RollupAction.INSTANCE);
}
Expand Down Expand Up @@ -566,7 +566,8 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
)
);

if (RollupV2.isEnabled()) {
// TSDB Downsampling / Rollup
if (IndexSettings.isTimeSeriesModeEnabled()) {
namedWriteables.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.index.IndexSettings;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
UnfollowAction.NAME,
RolloverAction.NAME,
ReadOnlyAction.NAME,
RollupV2.isEnabled() ? RollupILMAction.NAME : null,
IndexSettings.isTimeSeriesModeEnabled() ? RollupILMAction.NAME : null,
ShrinkAction.NAME,
ForceMergeAction.NAME,
SearchableSnapshotAction.NAME
Expand All @@ -78,7 +78,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
AllocateAction.NAME,
MigrateAction.NAME,
FreezeAction.NAME,
RollupV2.isEnabled() ? RollupILMAction.NAME : null
IndexSettings.isTimeSeriesModeEnabled() ? RollupILMAction.NAME : null
).filter(Objects::nonNull).toList();
public static final List<String> ORDERED_VALID_FROZEN_ACTIONS = List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME);
public static final List<String> ORDERED_VALID_DELETE_ACTIONS = List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME);
Expand Down

0 comments on commit d539957

Please sign in to comment.