Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: Implement downsampling on time-series indices #85708

Merged
merged 78 commits into from
May 25, 2022

Conversation

csoulios
Copy link
Contributor

@csoulios csoulios commented Apr 5, 2022

This PR implements downsampling operation on time series indices.

The PR creates a _rollup endpoint that allows users to downsample an index and can be
accessed by the following call:

POST /<source_index>/_rollup/<rollup_index>
{
    "fixed_interval": "1d"
}

Requirements

An index can be downsampled if all of the following requirements are met:

  • Must be a time series index (have the index.mode: time_series index setting)
  • Must not be writeable (have the index.blocks.write: true index setting)
  • Must have dimension fields marked with mapping parameter time_series_dimension: true
  • Must have metric fields marked with mapping parameter time_series_metric

Steps

The steps followed by the downsampling action are the following:

  1. Read the index mapping from the source index and extract the dimension and metric fields.
  2. Create a temporary rollup index with the following settings/mapping:
    1. The following index settings will be copied from the original index
      • index.mode
      • index.routing_path
      • index.time_series.start_time
      • index.time_series.end_time
    2. Set the following index settings to the values of the original index:
      • index.rollup.source.uuid
      • index.rollup.source.name
    3. All fields mapped as time_series_dimension will be created in the rollup index with exactly the same mapping.
    4. All fields mapped as time_series_metric will be created and their field type will change to aggregate_metric_double.
  3. Use the TimeSeriesIndexSearcher implementation to traverse over the (sorted by _tsid, @timestamp) source index and create the rollup index.
  4. Rename the temporary rollup index to its defined name.
  5. If the source index is a member of a data stream, we must replace the original index with the rollup index. So the rollup index is added and the source index is deleted. The addition and deletion of the indices should be an atomic action.

TODO

  • Implement logic for storing fields that are neither dimensions nor metrics (aka labels)
  • Make rollup task cancellable
  • More testing

Relates to #74660

Fixes #65769
Fixes #69799
Finally, this PR is based on the code written for #64900

Not needed for timeseries indices
Removed most of the configuration, which is extracted from
the index mapping.

Modified TransportRollupAction to extract the rollup config from
the field caps
Removed most of the configuration, which is extracted from
the index mapping.

Modified TransportRollupAction to extract the rollup config from
the field caps.
@csoulios csoulios added >feature :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data :StorageEngine/Metrics You know, for Metrics v8.3.0 labels Apr 5, 2022
@elasticmachine elasticmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Apr 5, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@csoulios csoulios marked this pull request as draft April 5, 2022 20:31
@elasticsearchmachine
Copy link
Collaborator

Hi @csoulios, I've created a changelog YAML for you.

@csoulios csoulios requested a review from nik9000 May 20, 2022 13:33
Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at it for a while and left some comments. I think I want to have another look on Monday too.

numSent.addAndGet(-items);
if (failure != null) {
long items = request.numberOfActions();
numSent.addAndGet(-items);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one might still be worth looking at.

} else if (e.getValue().values().iterator().next().getMetricType() != null) {
metricFieldCaps.put(field, fieldCaps);
} else {
// TODO: Field is not a dimension or a metric. Treat it as a tag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we just throw tags away, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in a following PR we will add the tags. I left this there as a placeholder

if (resizeResponse.isAcknowledged()) {
// 6.
publishMetadata(originalIndexName, tmpIndexName, rollupIndexName, listener);
// 5. Add rollup index to data stream and publish rollup metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wait for all of the replicas to recover? I think a health check for green on the index is enough for that. I'm not sure that's a thing we always want, but it seems like a reasonable default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can wait for a follow up if you'd like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we should wait for the replicas to recover. I was thinking that this can happen in parallel. Any concerns we may lose the primary shard while deleting the source index?

I am afraid that waiting for replicas to recover may take lots of time.

@weizijun
Copy link
Contributor

I added a PR(#82944) in January, and discuss with @csoulios about the downsampling design several times. Now, I'm waiting for this PR to be merged. Then I can contribute my code together to the downsampling requirement.

@csoulios csoulios requested a review from nik9000 May 23, 2022 13:18
Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get this in. There's certainly more to do but I feel like this is a good start for rollups.

// 5. Refresh rollup index
refreshIndex(rollupIndexName, parentTask, ActionListener.wrap(refreshIndexResponse -> {
if (refreshIndexResponse.getFailedShards() == 0) {
// 6. Add rollup index to data stream and publish rollup metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe label it 6-8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 393d7c2

);
}
},
e -> deleteRollupIndex(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a good way to reduce the duplicate calls here. Oh well, in a follow up.

new ElasticsearchException(
"Failed to force-merge index [" + rollupIndexName + "]",
e
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this fail the request but not cause the destination index to be deleted? What will ILM think of that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it. If we reach at this point, rollup will be successful even if force-merge fails. Also, at this point we have delete the source index, so there's no way to rollback and restart the operation.

On the other hand, if force-merge fails we end up with a perfectly healthy rollup index with multiple segments. So, do you think at this point we can log and swallow the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 393d7c2, I changed it so that it does not fail the rollup action.

* operation.
*/
logger.error(
"Failed to force-merge rollup index [" + rollupIndexName + "]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

It's probably worth a test that fails here and makes sure we don't fall over. I think this will do the job but it's probably worth tracing this path. Both as a "for now" thing and to make sure that changes in the future don't cause us to lose this property.

@csoulios csoulios merged commit d539957 into elastic:master May 25, 2022
@csoulios csoulios deleted the tsdb-rollup branch May 25, 2022 14:16
salvatore-campagna pushed a commit to salvatore-campagna/elasticsearch that referenced this pull request May 26, 2022
This PR implements downsampling operation on time series indices.

The PR creates a _rollup endpoint that allows users to downsample an index and can be
accessed by the following call:

POST /<source_index>/_rollup/<rollup_index>
{
    "fixed_interval": "1d"
}

Requirements

An index can be downsampled if all of the following requirements are met:

    Must be a time series index (have the index.mode: time_series index setting)
    Must not be writeable (have the index.blocks.write: true index setting)
    Must have dimension fields marked with mapping parameter time_series_dimension: true
    Must have metric fields marked with mapping parameter time_series_metric

Relates to elastic#74660

Fixes elastic#65769
Fixes elastic#69799
Finally, this PR is based on the code written for elastic#64900
csoulios added a commit that referenced this pull request Jul 25, 2022
…88565)

This PR modified the rollup action so that all index settings are copied from
the source index to the rollup index.

Relates to #85708
csoulios added a commit that referenced this pull request Jul 26, 2022
This PR adds support for an ILM action that downsamples a time-series index
by invoking the _rollup endpoint (#85708)

A policy that includes the rollup action will look like the following

PUT _ilm/policy/my_policy
{
  "policy": {
    "phases": {
      "warm": {
        "actions": {
          "rollup": {
  	    "fixed_interval": "24h"
  	  }
  	}
      }
    }
  }
}

Relates to #74660
Fixes #68609
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>feature :StorageEngine/Metrics You know, for Metrics :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.3.0
Projects
None yet
9 participants