Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add factories for time series aggregation #107803

Merged
merged 4 commits into from
May 9, 2024
Merged

Conversation

dnhatn
Copy link
Member

@dnhatn dnhatn commented Apr 24, 2024

This change introduces operator factories for time-series aggregations. A time-series aggregation executes in three stages, deviating from the typical two-stage aggregation.

For example: sum(rate(write_requests)), avg(cpu) BY cluster, time-bucket

1. Initial Stage:
In this stage, a standard hash aggregation is executed, grouped by tsid and time-bucket. The values aggregations are added to collect values of the grouping keys excluding the time-bucket, which are then used for final result grouping.

rate[INITIAL](write_requests), avg[INITIAL](cpu), values[SINGLE](cluster) BY tsid, time-bucket

2. Intermediate Stage:
Equivalent to the final mode of a standard hash aggregation. This stage merges and reduces the result of the rate aggregations, but merges without reducing the results of non-rate aggregations. Certain aggregations, such as count_distinct, cannot have their final results combined.

rate[FINAL](write_requests), avg[INTERMEDIATE](cpu), values[SINGLE](cluster) BY tsid, time-bucket

3. Final Stage:
This extra stage performs outer aggregations over the rate results and combines the intermediate results of non-rate aggregations using the specified user-defined grouping keys.

sum[SINGLE](rate_result), avg[FINAL](cpu) BY cluster, bucket

@dnhatn dnhatn force-pushed the metrics-operator branch 3 times, most recently from b94607c to ca154b7 Compare April 24, 2024 03:53
@dnhatn
Copy link
Member Author

dnhatn commented Apr 24, 2024

Before this change, I took a different approach, creating a MetricsAggregationOperator that extends the HashAggregationOperator and overrides the output of the final mode. However, I believe this current approach is cleaner, despite the alternative potentially being more efficient.

* Equivalent to the final mode of a standard hash aggregation.
* This stage merges and reduces the result of the rate aggregations,
* but merges (without reducing) the results of non-rate aggregations.
* Certain aggregations, such as count_distinct, cannot have their final results combined.
Copy link
Contributor

@kkrik-es kkrik-es Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the case? Don't we assume that the final grouping fields are included in the tsid? Or, we don't have the mapping between tsid values and the final groups?

In this particular case, avg(cpu) is tracked as sum and count, which can be reduced by [tsid, time-bucket] for each [tsid, cluster] combo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because the distinct counts per tsid and time bucket aren't a subset of the dimension groups?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think my wording is confusing. Just to clarify, this isn't a limitation; it's the reason why we still need to keep the intermediate states of non-rate aggregation after the second stage. I will reword this.

Copy link
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nhat, the direction here looks good to me.

List<GroupingAggregator.Factory> aggregators = new ArrayList<>(outerRates.size() + nonRates.size());
for (AggregatorFunctionSupplier f : outerRates) {
aggregators.add(f.groupingAggregatorFactory(AggregatorMode.SINGLE));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little weird that you'd run these "outer" aggs on rates here and not as part of "something else" - like, they are just a regular stats at that point. But you have to "split" the stream, right? The "outer" aggs get the incoming pages and the non-rate aggs get the incoming pages too. Those can't be separate Operators because they both want to consume the page.

I guess we could make an Operator that works makes a shallowCopy of the page and passes it into a pipeline breaker and then passes the result onwards. That'd split the stream. But the way I'd model the planning for this is pretty similar to what you've written here anyway. So maybe it doesn't make a difference.

Am I understanding the problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the second phase, the output page contains [tsid, time-buckets, final-rate, intermediate result of non-rates, grouping keys]. This output page can be split into two separate pages: the first containing [grouping keys, time-bucket, final-rate], and the second containing [grouping keys, time-bucket, intermediate result of non-rates]. Two independent hash aggregations can then be run over these pages, although they will have to hash the grouping keys twice. This should not significantly impact performance. My primary concern is the order of the grouping keys emitted by BlockHashes. The two hash operators must generate the ordered grouping keys. Currently, our BlockHash implementations guarantee this, although we never document this guarantee. I will apply your suggestion to implement a SplitOperator, which will execute a series of operators sequentially (for now) and then merge the output pages into a single page. Thanks Nik!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that SplitOperator is the right thing. Maybe it is! But I was mostly thinking of it as building block for my understanding. But you think it'll help, go for it!

@dnhatn
Copy link
Member Author

dnhatn commented May 8, 2024

test this please

@dnhatn dnhatn added :StorageEngine/TSDB You know, for Metrics >non-issue labels May 9, 2024
@dnhatn dnhatn marked this pull request as ready for review May 9, 2024 01:45
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-storage-engine (Team:StorageEngine)

@dnhatn
Copy link
Member Author

dnhatn commented May 9, 2024

@kkrik-es @martijnvg @nik9000 Thank you for your review and feedback. I am going to merge this PR as is. I think we will need some adjustments when integrating with the plans.

@dnhatn dnhatn merged commit 155e7c5 into elastic:main May 9, 2024
15 checks passed
@dnhatn dnhatn deleted the metrics-operator branch May 9, 2024 04:34
markjhoy pushed a commit to markjhoy/elasticsearch that referenced this pull request May 9, 2024
This change introduces operator factories for time-series aggregations. 
A time-series aggregation executes in three stages, deviating from the
typical two-stage aggregation.

For example: `sum(rate(write_requests)), avg(cpu) BY cluster, time-bucket`

**1. Initial Stage:** In this stage, a standard hash aggregation is
executed, grouped by tsid and time-bucket. The `values` aggregations are
added to collect values of the grouping keys excluding the time-bucket,
which are then used for final result grouping.

```
rate[INITIAL](write_requests), avg[INITIAL](cpu), values[SINGLE](cluster) BY tsid, time-bucket
```

**2. Intermediate Stage:** Equivalent to the final mode of a standard
hash aggregation. This stage merges and reduces the result of the rate
aggregations, but merges without reducing the results of non-rate
aggregations. Certain aggregations, such as count_distinct, cannot have
their final results combined.

```
rate[FINAL](write_requests), avg[INTERMEDIATE](cpu), values[SINGLE](cluster) BY tsid, time-bucket
```

**3. Final Stage:** This extra stage performs outer aggregations over
the rate results and combines the intermediate results of non-rate 
aggregations using the specified user-defined grouping keys.

```
sum[SINGLE](rate_result), avg[FINAL](cpu) BY cluster, bucket
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants