/
AggregationPath.java
361 lines (319 loc) · 16.9 KB
/
AggregationPath.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.common.Strings;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.HasAggregations;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import java.util.ArrayList;
import java.util.List;
/**
* A path that can be used to sort/order buckets (in some multi-bucket aggregations, e.g. terms & histogram) based on
* sub-aggregations. The path may point to either a single-bucket aggregation or a metrics aggregation. If the path
* points to a single-bucket aggregation, the sort will be applied based on the {@code doc_count} of the bucket. If this
* path points to a metrics aggregation, if it's a single-value metrics (eg. avg, max, min, etc..) the sort will be
* applied on that single value. If it points to a multi-value metrics, the path should point out what metric should be
* the sort-by value.
* <p>
* The path has the following form:
* <center>{@code <aggregation_name>['>'<aggregation_name>*]['.'<metric_name>]}</center>
* <p>
* Examples:
*
* <ul>
* <li>
* {@code agg1>agg2>agg3} - where agg1, agg2 and agg3 are all single-bucket aggs (eg filter, nested, missing, etc..). In
* this case, the order will be based on the number of documents under {@code agg3}.
* </li>
* <li>
* {@code agg1>agg2>agg3} - where agg1 and agg2 are both single-bucket aggs and agg3 is a single-value metrics agg (eg avg, max, min, etc..).
* In this case, the order will be based on the value of {@code agg3}.
* </li>
* <li>
* {@code agg1>agg2>agg3.avg} - where agg1 and agg2 are both single-bucket aggs and agg3 is a multi-value metrics agg (eg stats, extended_stats, etc...).
* In this case, the order will be based on the avg value of {@code agg3}.
* </li>
* </ul>
*
*/
public class AggregationPath {
private static final String AGG_DELIM = ">";
public static AggregationPath parse(String path) {
String[] elements = Strings.tokenizeToStringArray(path, AGG_DELIM);
List<PathElement> tokens = new ArrayList<>(elements.length);
String[] tuple = new String[2];
for (int i = 0; i < elements.length; i++) {
String element = elements[i];
if (i == elements.length - 1) {
int index = element.lastIndexOf('[');
if (index >= 0) {
if (index == 0 || index > element.length() - 3) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
if (element.charAt(element.length() - 1) != ']') {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tokens.add(new PathElement(element, element.substring(0, index), element.substring(index + 1, element.length() - 1)));
continue;
}
index = element.lastIndexOf('.');
if (index < 0) {
tokens.add(new PathElement(element, element, null));
continue;
}
if (index == 0 || index > element.length() - 2) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tuple = split(element, index, tuple);
tokens.add(new PathElement(element, tuple[0], tuple[1]));
} else {
int index = element.lastIndexOf('[');
if (index >= 0) {
if (index == 0 || index > element.length() - 3) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
if (element.charAt(element.length() - 1) != ']') {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tokens.add(new PathElement(element, element.substring(0, index), element.substring(index + 1, element.length() - 1)));
continue;
}
tokens.add(new PathElement(element, element, null));
}
}
return new AggregationPath(tokens);
}
public static class PathElement {
private final String fullName;
public final String name;
public final String key;
public PathElement(String fullName, String name, String key) {
this.fullName = fullName;
this.name = name;
this.key = key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PathElement token = (PathElement) o;
if (key != null ? !key.equals(token.key) : token.key != null) return false;
if (!name.equals(token.name)) return false;
return true;
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + (key != null ? key.hashCode() : 0);
return result;
}
@Override
public String toString() {
return fullName;
}
}
private final List<PathElement> pathElements;
public AggregationPath(List<PathElement> tokens) {
this.pathElements = tokens;
if (tokens == null || tokens.size() == 0) {
throw new IllegalArgumentException("Invalid path [" + this + "]");
}
}
@Override
public String toString() {
return Strings.arrayToDelimitedString(pathElements.toArray(), AGG_DELIM);
}
public PathElement lastPathElement() {
return pathElements.get(pathElements.size() - 1);
}
public List<PathElement> getPathElements() {
return this.pathElements;
}
public List<String> getPathElementsAsStringList() {
List<String> stringPathElements = new ArrayList<>();
for (PathElement pathElement : this.pathElements) {
stringPathElements.add(pathElement.name);
if (pathElement.key != null) {
stringPathElements.add(pathElement.key);
}
}
return stringPathElements;
}
public AggregationPath subPath(int offset, int length) {
List<PathElement> subTokens = new ArrayList<>(pathElements.subList(offset, offset + length));
return new AggregationPath(subTokens);
}
/**
* Resolves the value pointed by this path given an aggregations root
*
* @param root The root that serves as a point of reference for this path
* @return The resolved value
*/
public double resolveValue(HasAggregations root) {
HasAggregations parent = root;
double value = Double.NaN;
for (int i = 0; i < pathElements.size(); i++) {
AggregationPath.PathElement token = pathElements.get(i);
Aggregation agg = parent.getAggregations().get(token.name);
if (agg == null) {
throw new IllegalArgumentException("Invalid order path [" + this +
"]. Cannot find aggregation named [" + token.name + "]");
}
if (agg instanceof SingleBucketAggregation) {
if (token.key != null && !token.key.equals("doc_count")) {
throw new IllegalArgumentException("Invalid order path [" + this +
"]. Unknown value key [" + token.key + "] for single-bucket aggregation [" + token.name +
"]. Either use [doc_count] as key or drop the key all together");
}
parent = (SingleBucketAggregation) agg;
value = ((SingleBucketAggregation) agg).getDocCount();
continue;
}
// the agg can only be a metrics agg, and a metrics agg must be at the end of the path
if (i != pathElements.size() - 1) {
throw new IllegalArgumentException("Invalid order path [" + this +
"]. Metrics aggregations cannot have sub-aggregations (at [" + token + ">" + pathElements.get(i + 1) + "]");
}
if (agg instanceof InternalNumericMetricsAggregation.SingleValue) {
if (token.key != null && !token.key.equals("value")) {
throw new IllegalArgumentException("Invalid order path [" + this +
"]. Unknown value key [" + token.key + "] for single-value metric aggregation [" + token.name +
"]. Either use [value] as key or drop the key all together");
}
parent = null;
value = ((InternalNumericMetricsAggregation.SingleValue) agg).value();
continue;
}
// we're left with a multi-value metric agg
if (token.key == null) {
throw new IllegalArgumentException("Invalid order path [" + this +
"]. Missing value key in [" + token + "] which refers to a multi-value metric aggregation");
}
parent = null;
value = ((InternalNumericMetricsAggregation.MultiValue) agg).value(token.key);
}
return value;
}
/**
* Resolves the aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The aggregator pointed by this path starting from the given aggregator as a point of reference
*/
public Aggregator resolveAggregator(Aggregator root) {
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
AggregationPath.PathElement token = pathElements.get(i);
aggregator = aggregator.subAggregator(token.name);
assert (aggregator instanceof SingleBucketAggregator && i <= pathElements.size() - 1)
|| (aggregator instanceof NumericMetricsAggregator && i == pathElements.size() - 1) :
"this should be picked up before aggregation execution - on validate";
}
return aggregator;
}
/**
* Resolves the topmost aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The first child aggregator of the root pointed by this path
*/
public Aggregator resolveTopmostAggregator(Aggregator root) {
AggregationPath.PathElement token = pathElements.get(0);
Aggregator aggregator = root.subAggregator(token.name);
assert (aggregator instanceof SingleBucketAggregator )
|| (aggregator instanceof NumericMetricsAggregator) : "this should be picked up before aggregation execution - on validate";
return aggregator;
}
/**
* Validates this path over the given aggregator as a point of reference.
*
* @param root The point of reference of this path
* @throws AggregationExecutionException on validation error
*/
public void validate(Aggregator root) throws AggregationExecutionException {
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
aggregator = aggregator.subAggregator(pathElements.get(i).name);
if (aggregator == null) {
throw new AggregationExecutionException("Invalid aggregator order path [" + this + "]. Unknown aggregation ["
+ pathElements.get(i).name + "]");
}
if (i < pathElements.size() - 1) {
// we're in the middle of the path, so the aggregator can only be a single-bucket aggregator
if (!(aggregator instanceof SingleBucketAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}
if (pathElements.get(i).key != null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a " +
"final single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}
}
}
boolean singleBucket = aggregator instanceof SingleBucketAggregator;
if (!singleBucket && !(aggregator instanceof NumericMetricsAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}
AggregationPath.PathElement lastToken = lastPathElement();
if (singleBucket) {
if (lastToken.key != null && !"doc_count".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"doc_count\" (a la \"" + lastToken.name + ".doc_count\")");
}
return; // perfectly valid to sort on single-bucket aggregation (will be sored on its doc_count)
}
if (aggregator instanceof NumericMetricsAggregator.SingleValue) {
if (lastToken.key != null && !"value".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"value\" (a la \"" + lastToken.name + ".value\")");
}
return; // perfectly valid to sort on single metric aggregation (will be sorted on its associated value)
}
// the aggregator must be of a multi-value metrics type
if (lastToken.key == null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. When ordering on a multi-value metrics aggregation a metric name must be specified");
}
if (!((NumericMetricsAggregator.MultiValue) aggregator).hasMetric(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Unknown metric name [" + lastToken.key + "] on multi-value metrics aggregation [" + lastToken.name + "]");
}
}
private static String[] split(String toSplit, int index, String[] result) {
result[0] = toSplit.substring(0, index);
result[1] = toSplit.substring(index + 1);
return result;
}
}