Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Constant query time for (some) metrics aggregations #91743

Open
ruflin opened this issue Nov 21, 2022 · 25 comments
Open

Constant query time for (some) metrics aggregations #91743

ruflin opened this issue Nov 21, 2022 · 25 comments

Comments

@ruflin
Copy link
Member

ruflin commented Nov 21, 2022

The number of metric data points that are collected keeps growing especially in the context of k8s and containers. Elasticsearch aggregations which are mostly used to visualise these metrics return very precise results. But in many scenarios, this precision is not needed.

To show an example, below is a screenshot of the System dashboard where the CPU usage is reported as 15.632% and I think Elasticsearch even returns more precision as this is already rounded. But knowing CPU load is 15% is more than enough.

Screenshot 2022-11-21 at 10 59 14

The longer the time range, the more documents Elasticsearch has to look at and the query time improves. What if we could trade query accuracy against performance and no matter the size of the data we are querying, the result would be returned always in the same time. Lets image I could set a maximum of 1s on the query to get an approximate result back.

For reference, some existing features in Elasticsearch that can help with the above but not necessarily solve it:

One of the key features I see behind the constant query time feature is that a user should not have to understand on what happens behind the scene, it just works. I would not expect this to be available for all aggregations, but a subset of it.

Links

Links to relevant / related discussions:

@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Nov 21, 2022
@thomasdullien
Copy link
Contributor

thomasdullien commented Nov 21, 2022

Hey all,

we had to solve a similar (but slightly thornier) issue for dealing with profiling data. Profiling data is very voluminous, and needs to be accessible to drill-down at pretty constant rate, irrespective if you are looking at profiling data from an individual machine or of 1000s of machines in aggregate. You also need the ability to slice / filter the data based on a variety of criteria, which makes pre-aggregating (https://github.com/elastic/apm-dev/issues/877) difficult as it is unclear what data needs to be pre-aggregated over.

We also need to retrieve tens of thousands of data points in each query.

The solution is (of course) some form of statistical sampling.

The random sampler aggregation does not work for us, because it tries to randomly skip around existing indices. In order to get tens of thousands of IID samples from a data stream, it will have to perform approximately that many file seeks and reads. Considering that a spinning disk only has ~hundreds of IOPS, this is going to be prohibitive. Even on a fast SSD (40k IOPS), one such query could eat up the entire IO budget.

Our solution is "sample on ingest": We create a hierarchy of indices, each subsampled to 20% of the previous hierarchy's samples:

  1. index_full -- all the data
  2. index_20p -- 20% of the data
  3. index_20_of_20p -- 20% of 20% of the data
  4. ...
  5. After 5 layers you are down to 3% of the data, after 6 layers to 0.6% of the data, after 7 layers to 0.1% of the data, meaning one TB of original data is now one GB.

When data is needed, one can start sampling from a "sparse" index and apply the relevant filters to get an estimate for how "dense" the filtered results are overall. This estimate is then used to identify which actual index to query into to retrieve an IID sample of approximately the required size.

The difference to the random sampler aggregation is mostly that we are trading disk space (a maximum of 25% more data) for latency/IOPS.

This was not a problem for us in the past, as our (highly optimized) columnar storage efficiency gave us almost an order of magnitude smaller storage than naive ElasticSearch -- obviously, if ES is less efficient at storing data, the 25% tax this method imposes is also 25% of the larger size.

Anyhow, this method seems to generalize well, and could be internalized by ES as a feature for an index.

@DJRickyB DJRickyB added >enhancement :Search/Search Search-related issues that do not fall into other categories and removed needs:triage Requires assignment of a team area label labels Nov 21, 2022
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search (Team:Search)

@elasticsearchmachine elasticsearchmachine added the Team:Search Meta label for search team label Nov 21, 2022
@javanna javanna added :Analytics/Aggregations Aggregations and removed :Search/Search Search-related issues that do not fall into other categories Team:Search Meta label for search team labels Nov 22, 2022
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Nov 22, 2022
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@felixbarny
Copy link
Member

For metrics use cases specifically, I think that continuously rolling up the raw data to reduce dimensions and to have different granularities that are appropriate for different time ranges is the way to go.

Random sampling is useful to some degree but it'll never be as efficient as pre-aggregated data. Let's say the raw metrics have a granularity of 1m and you're trying to populate a graph that shows 60 data points. For a time range of 1h, you'll need exactly one document per date_histogram bucket. If want to display 60 days of data and you have pre-aggregated metrics in 1d granularity, you also just need a single doc per date_histogram bucket. If you're using the random sampler aggregation on the 1m raw metrics, you'll need substantially more documents per bucket in order to get a result with a reasonable accuracy. You can't just select a single random metric doc for each day.

As alluded to before, another way to reduce the number of documents to aggregate over is to drop/collapse dimensions that aren't needed for overview pages. For example, we could create a derivative of the CPU utilization metric that don't have a host.hostname dimension. So when visualizing a CPU utilization graph over all hosts, instead of having to aggregate the individual CPU utilization of all hosts, we'd only need to look at a single time series that has pre-aggregated the CPU utilization of the individual hosts. The more individual hosts, the higher the the query speedup (IINM, the query execution time scales linearly with the number of docs), and the lower the additional storage overhead we get from storing an additional metric. The downside is that we (or the user) need to choose specific dimensions that should be collapsed which needs knowledge of how the data is queried and which dimensions are likely to have the highest cardinality. Also, for this to work transparently, the queries need to target the right metrics, depending on the selected filter.

Combining both approaches will yield the best results.

I think the next step is to align on the general strategy for observability. So far I have seen three approaches proposed in this thread:

  1. Random sampling at query time
  2. Random sampling at ingest time
  3. Continuous downsampling at ingest time

Are there more that are worth exploring? How can we get to a decision on which path we want to take?

While having a solution for this in the platform would be wonderful, within APM, we don't want to hold our breath on that and we're planning to execute on option 3. See this list of (internal) issues for more details: https://github.com/elastic/apm-dev/labels/query%20performance.

@felixbarny
Copy link
Member

I'd also love to hear form @martijnvg and @giladgal if you have recommendations on the approach. I think you've already discussed adding continuous downsampling at ingest time. While you went when background downsampling as the approach that's implemented first, continuous downsampling is still an option, right? Is this just a matter of prioritization and capacity?

@martijnvg
Copy link
Member

I think that the continuous downsampling at ingest time approach (both on the level of merging dimensions and / or time interval) is a good approach to significantly reducing query time. Both when implemented in apm server or ES.

Currently the downsampling that exists in ES is done in the background after an index has been rolled over. This approach was chosen because saving disk space was also a goal. While both this type of downsampling and continuous downsampling reduce query time, the former doesn't improve query time for queries targeting recent data. I think continuous downsampling is something that can be implemented in ES. However currently this isn't planned or prioritised.

@giladgal
Copy link
Contributor

This is for brainstorming and not fully validated, but I think downsampling at ingest has a lot of advantages, especially regarding accuracy. I think the big disadvantage is that it is hard to develop in a general way, i.e. so that it is easy to operate and so that users can easily build their own queries and dashboards in Kibana that slice and dice the data as they see fit.

A solution based on random sampling may be easier to develop and more flexible. I suggest considering running the random sampler with various probabilities, using the most accurate one that returns within the defined query latency threshold, and then either allowing the user to request an update to a more accurate result, or providing the ability to poll for a more accurate result like with async search, and update the results when the more accurate query returns. This could be wrapped in an API so the user doesn't need to pick the probability, and presented as an option in Kibana, so you could for example request with a checkbox that the dashboard returns initial estimated results.

@dgieselaar
Copy link
Member

@giladgal That is close to what we have today:

image

However, at least in the current implementation of the random_sampler aggregation, it still counts all the documents in the query phase, which prevents the goal of this issue, which is a constant query time. Additionally, it's hard to come up with a good probability if the results are bucketed (e.g. we might miss some buckets in the long tail).

@ruflin
Copy link
Member Author

ruflin commented Nov 28, 2022

I could see random sampler as a short term win for us. I don't think in the current form it fills the requirement of "simple to use for any users" but we could make use of it in our dashboards, assuming Lens supports it.

The following is a brain dump on how I could see it working. The numbers I post below are example values and not the ones that should be used.

Let's assume Elasticsearch can aggregate very quickly over 10'000 docs. We always want to have at 1000 buckets with at least 10 documents inside. If we have now 10'000 documents, we query all docs, if we have 100'000 docs, we sample 0.1, for 10^6, 0.01 and so on. The challenge is as @dgieselaar pointed out, counting time will keep increasing. Of course, this ignores the potential long tail issue but I think here different dataset might have different requirements.

All our data streams are using the data stream naming scheme. This means, some of the queries run across all of the data stream data without a filter except timestamp range or the filter is a constant keyword. In this scenarios, I assume counting is fast as it is for full indices (is this correct?). The filtering that then is often applied is based on dimensions of the metrics like host.name. If it is a TSDS dimension, will this speed up counting in any way?

@felixbarny
Copy link
Member

Let's assume Elasticsearch can aggregate very quickly over 10'000 docs. We always want to have at 1000 buckets with at least 10 documents inside. If we have now 10'000 documents, we query all docs, if we have 100'000 docs, we sample 0.1, for 10^6, 0.01 and so on.

I'm not sure if we can scale down the sampling percentage linearly to the increase in doc counts. For an increasing doc count, it would mean we just randomly sample a constant x (10 in your example) out of 100, 1k, 10k, etc documents in each bucket. I suppose that the error will steadily increase with the number of documents. But I'm not sure. Maybe it's enough to define a sufficiently large number for x (min docs per bucket) to ensure a small enough error? Or do we have to scale x with the number of hits per bucket? If so, is it feasible to scale x sub-linearly? If not, it would just be the same as a constant probability and we're not keeping the query time constant.

@benwtrent could you chime in on that?

This is also related to

Additionally, it's hard to come up with a good probability if the results are bucketed (e.g. we might miss some buckets in the long tail).
Of course, this ignores the potential long tail issue

I didn't quite get what the long-tail issue is. Could you elaborate?

some of the queries run across all of the data stream data without a filter except timestamp range or the filter is a constant keyword.

I guess the count of hits is quite useless in these cases. It would just count the number of documents in the data stream rather than the number of documents that are actually aggregated as there will be a lot of matching documents that don't have a value for the field we're trying to aggregate. To make the count more useful, we'd need to add an exists filter for the field that's used in the aggregation. Maybe that's a best practice anyway(?) but I'm sure not all queries to that.

@ruflin
Copy link
Member Author

ruflin commented Nov 28, 2022

I guess the count of hits is quite useless in these cases. It would just count the number of documents in the data stream rather than the number of documents that are actually aggregated as there will be a lot of matching documents that don't have a value for the field we're trying to aggregate.

This should not be the case because of the data stream naming scheme. Ideally all the documents in a dataset look very similar and should have the required field. This works well for many integrations as we also control the collection. But I can see that in some scenarios, there might be gaps in the data.

@benwtrent
Copy link
Member

Overall, for time series focused metrics, downsampling (meaning increasing the metric summarization window, e.g. 1s -> 1m) is preferred. When or how this happens, I do not know. Keep in mind, this is fundamentally different to random sampling. I am not sure the two are analogous at all. Other than they both of “sampling” in their names.

Random sampling as it is now is not optimized for a time series metric over various facets. I think this is possible to do quickly and well at query time. But certain guarantees would have to be known and accounted for. One key advantage is that (I think) all time series metrics need to be index sorted by time. It is a huge disadvantage to not have time series sorted by time and maybe even by their facets.

TSDB feels like a WAY better solution here.

Or, a "downsampling" aggregation that is optimized for indices that are sorted by time and aggregates statistics at different timeseries rates. I think this would be possible, but it wouldn't be the "random sampler". Downsampling at query time would be an interesting problem :)


#91743 (comment)

Random sampling is useful to some degree but it'll never be as efficient as pre-aggregated data:

I agree here. However, the trade off is this: pre-aggregated data requires you to:

  • Know your facets ahead of time
  • Know your smallest granularity (in the time-metrics case)

I think there is probably some balance between the two if the facets (time-series terms) are known.

 I think the big disadvantage is that it is hard to develop in a general way, i.e. so that it is easy to operate and so that users can easily build their own queries and dashboards in Kibana that slice and dice the data as they see fit.


#91743 (comment)

I would argue that “generality” here is a loaded term. We want this to work for very specific metrics focused scenarios. Also, if TSDB has shown us anything, is that downsampling (‘aka rolling up’) works really well when the facets and smallest time window are known.


#91743 (comment)

Additionally, it's hard to come up with a good probability if the results are bucketed (e.g. we might miss some buckets in the long tail).

This is correct. If there are VERY small buckets, they could be skipped altogether. So, very small facets or time windows wouldn’t be displayed. Arguably, this is acceptable as you may only care about “heavy hitters”.


#91743 (comment)

For an increasing doc count, it would mean we just randomly sample a constant x (10 in your example) out of 100, 1k, 10k, etc documents in each bucket. I suppose that the error will steadily increase with the number of documents

I don’t understand this exposition. Random sampling is at the top level. So, buckets with larger number of docs will have a larger number of docs when sampled. The opposite is true for smaller buckets. This way for global statistics and honestly bucket level statistics, we don’t end up seeing bias for smaller buckets. If you standardize and require X docs for every bucket (where X is constant, no matter the bucket size), you end up with bias for the smaller buckets. You increase their impact on statistics even though they have fewer documents. Another way of saying this is you are changing the distribution of the statistics.

For “I suppose that the error will steadily increase with the number of documents”, I think you mean the opposite. More docs sampled, the lower the error.

 If so, is it feasible to scale x sub-linearly? 

I am not sure what you mean “sub-linearly”. It is possible to hit the correctly acceptable sampling rate with 2 API calls. One at a lower probability. If the doc counts don’t match up to what you want, adjust probability accordingly. There is a simple equation for this.

I didn't quite get what the long-tail issue is. Could you elaborate?

“Long-tail” here is when there are underrepresented facets or time buckets where there are very few buckets. Think of a terms aggregation where there are 1000s of terms, but 900 of them have < 10 docs and 100 of them have > 1000 docs.

@felixbarny
Copy link
Member

felixbarny commented Nov 28, 2022

Thanks a bunch for chiming in @benwtrent!

TSDB feels like a WAY better solution here.

While with TSDB, aggregations can leverage the sorting and on-disk structure and therefore make aggregations faster, the response times will still increase with the scale of the monitored infrastructure and with the selected time range.


For “I suppose that the error will steadily increase with the number of documents”, I think you mean the opposite. More docs sampled, the lower the error.

This was in response to this suggestion to ensure a min doc count rather than a top-level sampling percentage:

We always want to have at 1000 buckets with at least 10 documents inside.

If we only look at 10 docs in each bucket, the error would increase with the number of docs.

If you standardize and require X docs for every bucket (where X is constant, no matter the bucket size), you end up with bias for the smaller buckets.

Ack, good point.


It is possible to hit the correctly acceptable sampling rate with 2 API calls. One at a lower probability. If the doc counts don’t match up to what you want, adjust probability accordingly.

That approach sounds promising and is similar to what the profiling team is doing (#91743 (comment)), just with random sampling at query time rather than at ingest time.

There is a simple equation for this.

Could you expand on what the equation is?

I think this approach could be combined with the progressive loading approach we explored in APM:

  • Do the initial request with a low sampling probability. Like 1% or lower.
  • Optionally render the graph based on the results
  • Do another request if the doc count is under a threshold, and adjust the sampling probability accordingly
  • (Re-) render the graph

The benefit is that this can be applied generically. It could be an option in Lens, for example.
The tradeoff is that the query times aren't necessarily constant (#91743 (comment)). Also, even 1% may be too slow, depending on data volume and time range.


For APM, where we have control over which dimensions are used for aggregation and at which interval we're creating the metrics, pre-aggregation/downsampling still seems like a better option.

@ruflin
Copy link
Member Author

ruflin commented Dec 1, 2022

I filed elastic/kibana#146770 for random sampler support in Lens. It is not necessarily THE solution but I think it is part of the solution especially as the random sampler already exists.

@benwtrent
Copy link
Member

Also, even 1% may be too slow, depending on data volume and time range.

for large data 1% is really high. For billions of docs, sampling at 0.0001% works really good in some scenarios :).

I think @qn895 may have some input here as she implemented sampling in the ML visualizer.

Here is the equation:

new_p = desired_count/(sampled_count/old_p)

Basically, sampled_count/old_p gives us a decent approximation of the entire doc count (if this is not already known). Then the sampling probability would be the percentage of docs you want vs the total docs.

If the doc count is already known, then you can simply calculate this ahead of time. Keep in mind that calculating total_hits is way slower than simply running the random_sampler aggregation. And when running random_sampler you should NOT track total hits. The speed of the whole thing is predicated on the ability to randomly skip documents and not care :)

@felixbarny
Copy link
Member

How is the desired_count determined? Say a user is fine with a 5% error margin, how can we automatically determine the appropriate sampling percentage?

@benwtrent
Copy link
Member

Say a user is fine with a 5% error margin, how can we automatically determine the appropriate sampling percentage?

Having a % error margin is a more complicated problem. This is something that may be doable in the future but will require some significant work in the aggregation. It would require something similar to #83278 but per shard and slightly different calculations.

Effectively, if you have 1M+ docs sampled, your error rate is probably ok. The user can see very easily by the variance of the returned visualizations if their error rate is too high.

@felixbarny
Copy link
Member

1M+ docs sampled

If you're doing a date_histrogram aggregation with 1000 buckets, does the 1M rule of thumb apply to each bucket (so 1k x 1m docs in total), or to the total number of docs?

@felixbarny
Copy link
Member

There may be another challenge with random sampling on metrics data. Often metrics already represent a pre-aggregated version of event data.

For example, if service-a is doing 1k requests a minute, and we have a time series for the response time of that app, each metric data point actually represents 1k distinct events (they'll have the _doc_count set to 1000).

Say you have another service-b that only receives 1 request a minute. If we want to get the average latency across these two services and we randomly sample the metric documents, the metrics of the low-throughput service will be heavily over-represented as not sampling a single document from service-a equates to not considering 1000 events.

@benwtrent
Copy link
Member

@felixbarny, random sampler uses the _doc_count. So, your "sampled doc count" would take _doc_count into consideration.

If you're doing a date_histrogram aggregation with 1000 buckets, does the 1M rule of thumb apply to each bucket (so 1k x 1m docs in total), or to the total number of docs?

Depends on what you care about here. Usually in date_histograms you are caring about the distribution of the values, and sampling 1M can give you that.

@felixbarny
Copy link
Member

@felixbarny, random sampler uses the _doc_count. So, your "sampled doc count" would take _doc_count into consideration.

Does that mean that documents with a high _doc_count are less likely to be skipped than documents with a low count? Or do you mean that the total _doc_count gets extrapolated based on the sampled docs?

If it's just the latter, we might under-represent the values from docs with a high doc_count in the sample set. As a document with a high doc_count represents multiple events, it feels more consequential to drop it as opposed do dropping a document with a low doc_count.

@benwtrent
Copy link
Member

As a document with a high doc_count represents multiple events, it feels more consequential to drop it as opposed do dropping a document with a low doc_count.

This is very true. There are always outliers, and random_sampling does not pay attention to outliers.

@felixbarny
Copy link
Member

This is not about outliers or edge cases. Random sampling works wonderful for raw event data. But for APM, we already have pre-aggregated metrics. When randomly sampling a pre-aggregated time series, you don't end up with a independently and identically distributed sample.

@wchaparro wchaparro added :StorageEngine/Metrics You know, for Metrics and removed Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) labels Mar 20, 2024
@elasticsearchmachine elasticsearchmachine added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:StorageEngine labels Mar 20, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-storage-engine (Team:StorageEngine)

@wchaparro wchaparro removed :Analytics/Aggregations Aggregations Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) labels Jun 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests