Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Order by scripted_metric sub aggregation #8486

Closed
eryabitskiy opened this issue Nov 14, 2014 · 100 comments
Closed

Order by scripted_metric sub aggregation #8486

eryabitskiy opened this issue Nov 14, 2014 · 100 comments

Comments

@eryabitskiy
Copy link

Since there is a new Scripted metric aggregation (scripted_metric) in 1.4, it is possible to do a lot of amazing stuff.
For example it is possible to implement Weighted Average aggregation, which we were missing before.

Now we are really missing a possibility to sort by scripted_metric results.

Live example:

We calculate weightedAvgVis with scripted_metric and want to get ids with TOP 5 values of weightedAvgVis. Since script returns double, it looks logically possible.

{  
   "from":0,
   "size":0,
   "query":{  
      "match_all":{   }
   },
   "aggregations":{  
      "idNodes":{  
         "terms":{  
            "field":"id",
            "size":5,
            "order":{  
               "weightedAvgVis":"asc"
            }
         },
         "aggregations":{  
            "weightedAvgVis":{  
               "scripted_metric":{  
                  "init_script":"_agg['weightedSum'] = 0d; _agg['countSum'] = 0L;",
                  "map_script":"_agg['weightedSum'] = _agg.weightedSum + _source['avgVis'] * _source['count']; _agg['countSum'] = _agg.countSum + _source['count'];",
                  "reduce_script":"weightedSum = 0d; countSum = 0L; for(a in _aggs) {weightedSum += a.weightedSum; countSum += a.countSum;};if(countSum == 0L) {return 0d;} else {return weightedSum / countSum}"
               }
            }
         }
      }
   }
}
@eryabitskiy eryabitskiy changed the title Bucket order by scripted_metric sub aggregation Order by scripted_metric sub aggregation Nov 14, 2014
@colings86
Copy link
Contributor

@eryabitskiy this is something that I have been thinking about but requires a few outstanding issues to be resolved first. Specifically these are #8421 and #8434. These would allow us to specify much more powerful order paths and the getProperty method on the scripted metric aggregation could be used to retrieve arbitrary properties of the scripts results.

@snikch
Copy link
Contributor

snikch commented Dec 4, 2014

Hi there, I'm not sure if this is appropriate but I thought you may want to gauge interest. We're very keen to see this as well. Currently we end up doing a lot of sorting on oversized resultsets in Go, whereas being able to sort on scripted metrics would save us this hassle.

@vb3
Copy link

vb3 commented May 8, 2015

+1 This would be great if solved!

@felipegs
Copy link

felipegs commented May 8, 2015

+1

1 similar comment
@concordiadiscors
Copy link

+1

@rmruano
Copy link

rmruano commented May 12, 2015

👍 sort on scripted metrics aggregations would be a killer feature. In the meantime we're also doing client-side sorting on oversized results.

@hendrik-schumacher
Copy link

+1

@aaneja
Copy link

aaneja commented May 18, 2015

+1

2 similar comments
@edigu
Copy link

edigu commented May 20, 2015

+1

@brianstebar2
Copy link

+1

@tekwiz
Copy link

tekwiz commented Jun 15, 2015

+1 Aside from the other use-cases mentioned here, this feature would give the Kibana product some major strength, particularly for rapid prototyping.

@genme
Copy link

genme commented Jun 26, 2015

+1

1 similar comment
@adimasuhid
Copy link

+1

@eryabitskiy
Copy link
Author

@colings86

Thank you for your answer!

I understand that there is always a trade of between performance and accuracy especially in FTS engine. But if you fetch all buckets from shard you can always reach 100% accuracy.

And I suspect, that actually most of folks would be ok with all buckets returned from every shard for the sake of 100% accuracy over performance and memory consuming (we can make a vote to check it). We do it already on Java server side anyway.

Also you can always set some protector max size property that triggers an error on too much memory consumption during such queries.

Can you consider such feature?

@bpolaszek
Copy link

+1.

For my use case, I actually need to fetch all buckets, then sort and slice on client-side.
This is not only destroying performance, this also destroys developer's experience.

@clintongormley
Copy link

And I suspect, that actually most of folks would be ok with all buckets returned from every shard for the sake of 100% accuracy over performance and memory consuming (we can make a vote to check it).

We only add features to Elasticsearch which are horizontally scalable. Whatever we add should work when you're running one node on your laptop with 50GB of data or 1000 nodes in your data server with 50 PB of data.

Fetching all terms from all shards does not scale horizontally, and so we will not add it.

We do it already on Java server side anyway.

Exactly. This is a problem that should be solved client side instead (where you know the limits of your data and how much you will need to scale).

@eryabitskiy
Copy link
Author

I see...

Than I can mention one horizontally scalable use case: if each bucket is fully allocated withing one shard (by using terms field also as a routing) you can simply fetch only top buckets from shards and still get accurate results. Unfortunately it is only works together with proper routing.

@anhzhi
Copy link

anhzhi commented Sep 19, 2017

@colings86

In order to be able to use an aggregation for sorting the terms aggregation it must currently be a numeric metric aggregation. Pipeline Aggregations are a different family of aggregations than the Metric Aggregations and cannot be used for sorting. You couldn't sort using a Pipeline aggregation because they are only executed in the reduce phase on the coordinating node and so you do not have the information on the shard in order to be able to sort the shards buckets to pick the top N to send to the coordinating node.

Is there any plan that will fulfill this feature?
Is there any plan that will remove the restriction about numeric-metric-aggregation ?
Is there any plan that will support pipeline-aggregation-metric on node or shard?

In my opinion, these aggregations, relevant to calculation like sum/min/max/avg/count etc, are all metrics event the result is not in numeric format. For example:
{ "first_student":{ "min": { "field": "first_name" } } }
It will help a lot that elasticsearch suport different types of metric widely. Thus it will be more powerful than traditional SQL-data-analysis tools.

@bpolaszek
Copy link

I'm currently moving back my analytics to an SQL platform because of this.
Simple and frequent use case: compute progressions between aggregated metrics (i.e CTR : +15%) and sort by progression. Impossible to do with ES without storing progression first.

@anhzhi
Copy link

anhzhi commented Sep 19, 2017

@colings86

Currently, I will try to translate different kinds of bucket-script-metric into the forms of map-reduce-script in a general method, then ordering-phase about these metrics can be supported.
I think it needs large-mount or repeated calculation, which will lead to performance destroying and relatively-low efficiency.
Another solution is that the client sides execute the ordering-phase. This solution requires the huge (otherwise the order result will be in poor accuracy) results derived from elaticsearch 's aggregation.
Considering the big-data transfering and lacking of distributed-calculation-support about the client side, this solution will be more ineffective.

@bpolaszek
Copy link

Another solution is that the client sides execute the ordering-phase. This solution requires the huge (otherwise the order result will be in poor accuracy) results derived from elaticsearch 's aggregation.

This requires the knowledge of how much values the aggregation will return. You can not ask ES to "return all buckets" on an aggregation and you have to specify an arbitrary limit number, which is completely nonsense and performance-killing.

@colings86
Copy link
Contributor

@anhzhi I'm having trouble finding the text that you quoted for context (sorry if I've missed it in the comments above), could you paste the link to the comment or documentation you saw this in? The statement is however very true.

Is there any plan that will fulfill this feature?

If you are talking about being able to sort the terms aggregation by a pipeline aggregation then I don't see how this would be possible to implement. The terms aggregation needs to sort the buckets collected on the shard so it can return only the shard_size number of them to the coordinating node. If it did not do this sorting on the shard it would have to return all the buckets it collected to the coordinating node which would not scale since the number of buckets can be very large for high cardinality fields.

So, given the fact that we have to sort on the shard the only other solution would be to 'push down' the calculation of pipeline aggregations onto the shards. But this would not work either since pipeline aggregations at their heart compare the final results of different aggregations (think derivative where you are comparing the final values in consecutive buckets, or dividing two sum aggregations). On the shard the final result is not known because that result depends on the results from all the shards not just one, so if you calculated the value just based on the information on that shard the result would almost certainly be wrong and worse it could be wildly affected by the results of the other shards so would not even be a good estimate of the final value.

So pushing down the calculation of pipeline aggregation onto the shards doesn't help with sorting the terms aggregation by pipeline aggregations either.

If you were referring instead to sorting the terms aggregation by the scripted_metric aggregations, it was discussed in #8486 (comment) and we decided that rather than we would rather not add more cases where the error in the terms aggregation is unbounded so we would instead like to try to solve the specific use cases for the scripted_metric aggregation.

Is there any plan that will remove the restriction about numeric-metric-aggregation ?

There are no plans at this time. We need a value to compare the buckets in order to sort them and a numeric value gives us a value that can always be compared without complicating the API. On the other hand arbitrary objects do not naturally compare so the API would need to be made more complex and harder to use to support this.

Is there any plan that will support pipeline-aggregation-metric on node or shard?

See my comments to your first question above

In my opinion, these aggregations, relevant to calculation like sum/min/max/avg/count etc, are all metrics event the result is not in numeric format. For example:
{ "first_student":{ "min": { "field": "first_name" } } }

I'm not sure I understand this since the output of the min aggregation is a singular double value. For the rest API we do wrap this up in an object so you can navigate to it in the response but for the purposes of comparing when sorting it is a singular double value, and importantly we know that it will always be a singular double value so we can validate it in the request before we execute the request.

Another solution is that the client sides execute the ordering-phase. This solution requires the huge (otherwise the order result will be in poor accuracy) results derived from elasticsearch 's aggregation.
Considering the big-data transferring and lacking of distributed-calculation-support about the client side, this solution will be more ineffective.

Unfortunately this is also true for Elasticsearch itself because of the reasons I outlined answering your first question; that we need the information from all shards to be able to calculate the results of the pipeline aggregations in order to be able to do the sorting.

I would love to find a solution to this but I don't see how we could do so at this stage in a reliable, scalable way within the constraints of sharding.

@anhzhi
Copy link

anhzhi commented Sep 20, 2017

@colings86 The quoted text is from
https://github.com/elastic/elasticsearch/issues/14101 or
https://discuss.elastic.co/t/ordering-terms-aggregation-based-on-pipeline-metric/31839/7
I am refering to both aspects you mentioned:

  • sorting the terms aggregation by a pipeline aggregation, such like bucket_script
  • sorting the terms aggregation by the scripted_metric aggregations
    example as follows:
{
   "query": {
      "bool": {
         "filter": [
            {
               "query_string": {
                  "query": "*",
                  "analyze_wildcard": true
               }
            }
         ],
         "must": [
            {
               "range": {
                  "@timestamp": {
                     "to": 1505804143152,
                     "from": 1505800543152
                  }
               }
            }
         ]
      }
   },
   "from": 0,
   "aggs": {
      "http.client_ip.topn": {
         "terms": {
            "field": "http.client_ip",
            "order":[{
               # only normal metric aggregation supported
               # while script metric aggregation or bucket-script pipeline aggregation not supported
                "sum_bytes_total": "desc"
            }],
            "size": 10
         },
         "aggs": {
            "sum_bytes_total": {
               "sum": {
                  "script": "doc['http.bytes_in'].value+doc['http.bytes_out'].value"
               }
            },
            "bucket_script_metric": {
               "bucket_script": {
                  "buckets_path": {
                     "sum_bytes_total": "sum_bytes_total"
                  },
                  "script": {
                     "lang": "groovy",
                     "inline": "sum_bytes_total"
                  }
               }
            },
            "mr_script_metric": {
               "scripted_metric": {
                  "init_script": "params._agg.bytes_total = []",
                  "map_script": "params._agg.bytes_total.add(doc['http.bytes_in'].value+doc['http.bytes_out'].value)",
                  "combine_script": "double bytes_total = 0; for (item in params._agg.bytes_total) { bytes_total += item } return bytes_total",
                  "reduce_script": "double bytes_total = 0; for (a in params._aggs) { bytes_total += a } return bytes_total"
               }
            }
         }
      }
   },
   "size": 0
}

Now i get what you say. I think it needs some more boost improvements for distributed-calculation algorithms and software frameworks of ElasticSearch.
I'm still eagerly expecting for your new version, new innovations and new solutions about big data analysis.

@tberne
Copy link

tberne commented Dec 21, 2017

@anhzhi , totally agreed. Distributed calculation is a big miss for any big data analytics. At this exactly moment, I need to do such thing, but I just got disappointed when I reached and read this thread.

@bpolaszek
Copy link

@tberne @anzhi I have moved away from ElasticSearch because of this: needed to sort on scripted metrics and create range facets. Since ES was unable to provide this, I used to retrieve all documents and do that logic on the PHP side, which was completely counterproductive and performance-killing.

Last week I have delivered a new version of this application on which I worked for several months, and the storage engine is now based on MariaDb Columnstore (the only open-source SQL engine that stores data in columns instead of rows). Performance is not as good as ElasticSearch (and INSERT/UPDATE/DELETE take ages), but at least, creating scripted metrics and sorting on them (SELECT SUM(bytes_in + bytes_out) AS sum_bytes_total [...] ORDER BY sum_bytes_total DESC) is a piece of cake and performance is acceptable (queries about ~150ms on tables with dozens of millions rows). Of course, you don't have facets (aggregations) out of the box and you have to create them by yourself, which is more complicated to do than ES, but at least it works, and all the logic is done within the storage engine; no more tricky things to complete a non-supported behavior.

I formerly thought ElasticSearch was the good choice as the main engine of an BI application, but I was wrong: ElasticSearch is perfect for full-text, not for analytics.

@tberne
Copy link

tberne commented Dec 22, 2017

I formerly thought ElasticSearch was the good choice as the main engine of an BI application, but I was wrong: ElasticSearch is perfect for full-text, not for analytics.

I couldn't agree more. ES is just perfect when the thing is Full Text Search. Even with some simple aggregation scenario, it is a killing tool. But when we start doing some serious analysis, it lacks some major things.

I don't know yet, but I think I will keep ES as a big data storage and, perhaps, I will introduce the use of some stream processor (like Apache Flink) to consolidate some data back to ES.

@Vineeth-Mohan
Copy link

Vineeth-Mohan commented Dec 24, 2017

@bpolaszek - Cant you get the conversion rate per referer per ad using the script provision in sum aggregation and sum it using the same. This can be used later to order the referers.
Something like below is what I have in mind -

{
   "aggs" : {
             "refereres" : {
                         "terms" : { "field" : "referer" },
                         "order" : "sumOfConvertion" 
                      },
                 "aggs" : {
                        "convertionSum" : {
                                    "sum" : { "script" : " doc['clicks'].value / doc['views'].value "}
                            }
                 }
}

@clintongormley
Copy link

There's a solution with ES that still requires client-side sorting, but should be more efficient for retrieving all the results you need in order to sort them: try the composite agg which allows you to retrieve all results (with pagination) and then you can do a merge sort client side.

@rathko
Copy link

rathko commented Jan 18, 2019

Very interesting discussion! I do wonder how Solr solved this efficiently?

Are there any new feature that would allow this, without retrieving all results and sorting client side? Our use case is similar: calculate CTR (clicks / impressions) on aggregated fields and sort results by the highest values.

@bpolaszek
Copy link

Very interesting discussion! I do wonder how Solr solved this efficiently?

Are there any new feature that would allow this, without retrieving all results and sorting client side? Our use case is similar: calculate CTR (clicks / impressions) on aggregated fields and sort results by the highest values.

AFAIK Solr did not solve this either.

@rathko
Copy link

rathko commented Jan 21, 2019

Thanks @bpolaszek

It appears in our case we can shard/route the data, which means that data for every search query will always live on the same server and there would be no need to shuffle it around the cluster.

We are considering to write a custom plugin to allow us sorting on calculated fields. Any suggestions on how this could be achieved, given the above routing limitation? I had a look at the existing plugins but couldn't find anything similar.

@pplant
Copy link

pplant commented Feb 15, 2019

+1

@xiaojiu01
Copy link

Issue +1. Help add support

@damianoporta
Copy link

Are there any workarounds for the moment?

@hendrikmuhs
Copy link
Contributor

Because there is still a some activity on this bug, I like to mention a solution:

We added transform in 7.2 which you can use to group data, aggregate it and write it back into secondary index. It provides a lot of freedom, e.g. scripted metric. At least the original ask of the thread starter can be solved using a transform. In the first step you can group the data and apply the custom aggregation (scripted metric). Afterwards the destination can be queried to get the top-n results.

This solution is scalable and works with large amount of data. Starting with 7.3 the transform can run in continuous mode and update the destination.

@bpolaszek
Copy link

Sounds cool! Do you have an example query? (I'm no longer in ES, but still curious 😀)

@AbbassFaytaroony
Copy link

@colings86 Although this became very old, but I ran into the same limitation,

but i was able to solve it (apparently) using bucket sort aggregation.
My Query:

{
  "aggs": {
    "by-event-name": {
      "terms": {
        "field": "event_name.keyword"
      },
      "aggs": {
        "percentages_calc": {
          "scripted_metric": {
            "init_script": "state.errors = 0; state.t = 0;",
            "map_script": "if (doc['event_outcome.keyword'].value == 'ERROR') { state.errors += 1;state.t += 1} state.t += 1",
            "combine_script": "return state",
            "reduce_script": "double failureT = 0;double total = 0; for (s in states) {failureT += s.errors; total += s.t} return (failureT/total) * 100;"
          }
        },
        "sort_by_percentage": {
          "bucket_sort": {
            "sort": [
              {
                "percentages_calc.value": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  },
  "size": 0
}

What i am missing ?
It seems weird that it works, since while reading this page shows it is not possible. W

@syntax42
Copy link

syntax42 commented Feb 8, 2021

@AbbassFaytaroony Can cofirm. The method of using "bucket_sort" solved the problem for me too.

@Stormtv
Copy link

Stormtv commented Mar 2, 2021

@syntax42 @AbbassFaytaroony
Ran into an issue with the bucket sort as it executes after the query so unless you set the size of the number of buckets to an extremely large value you will run into issues of it not being properly sorted for all entries.

@syntax42
Copy link

syntax42 commented Mar 2, 2021

@Stormtv Yes, i found that out too :-(

@igold94
Copy link

igold94 commented Jun 18, 2021

It appears that you can now sort by numeric scripted_metrics but cannot sort by strings despite the fact that they are simple types and a viable return value. Does anyone have any insight on that?
buckets_path must reference either a number value or a single value numeric metric aggregation, got: [String] at aggregation

@bogdanul2003
Copy link

@AbbassFaytaroony do you know if it is possible that the script returns a map and I want to specify on which key to sort.
For example let's say percentages_calc.value= {"total": 1111, "othertotal": 2222} and I want to sort by the value of key "total". The values are all numbers.

I tried something like
"sort_by_percentage": { "bucket_sort": { "sort": [ { "percentages_calc.value['total']": { "order": "desc" } } ] } }

but I'm getting request validation error Validation Failed: 1: No aggregation found for path percentages_calc.value['total']

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests