-
Notifications
You must be signed in to change notification settings - Fork 24.2k
/
TransportRollupSearchAction.java
489 lines (435 loc) · 25.4 KB
/
TransportRollupSearchAction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.rollup.action;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.BoostingQueryBuilder;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.DisMaxQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.rollup.Rollup;
import org.elasticsearch.xpack.rollup.RollupJobIdentifierUtils;
import org.elasticsearch.xpack.rollup.RollupRequestTranslator;
import org.elasticsearch.xpack.rollup.RollupResponseTranslator;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class TransportRollupSearchAction extends TransportAction<SearchRequest, SearchResponse> {
private final Client client;
private final NamedWriteableRegistry registry;
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ClusterService clusterService;
private static final Logger logger = Loggers.getLogger(RollupSearchAction.class);
@Inject
public TransportRollupSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, NamedWriteableRegistry registry, BigArrays bigArrays,
ScriptService scriptService, ClusterService clusterService) {
super(settings, RollupSearchAction.NAME, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.client = client;
this.registry = registry;
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.clusterService = clusterService;
// Note: in master this uses Writeable interface to get search request
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, true, in -> {
SearchRequest request = new SearchRequest();
request.readFrom(in);
return request;
},
new TransportRollupSearchAction.TransportHandler());
}
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(clusterService.getSettings());
String[] indices = resolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), request.indices());
RollupSearchContext rollupSearchContext = separateIndices(indices, clusterService.state().getMetaData().indices());
MultiSearchRequest msearch = createMSearchRequest(request, registry, rollupSearchContext);
client.multiSearch(msearch, ActionListener.wrap(msearchResponse -> {
InternalAggregation.ReduceContext context
= new InternalAggregation.ReduceContext(bigArrays, scriptService, false);
listener.onResponse(processResponses(rollupSearchContext, msearchResponse, context));
}, listener::onFailure));
}
static SearchResponse processResponses(RollupSearchContext rollupContext, MultiSearchResponse msearchResponse,
InternalAggregation.ReduceContext reduceContext) {
if (rollupContext.hasLiveIndices() && rollupContext.hasRollupIndices()) {
// Both
return RollupResponseTranslator.combineResponses(msearchResponse.getResponses(), reduceContext);
} else if (rollupContext.hasLiveIndices()) {
// Only live
assert msearchResponse.getResponses().length == 1;
return RollupResponseTranslator.verifyResponse(msearchResponse.getResponses()[0]);
} else if (rollupContext.hasRollupIndices()) {
// Only rollup
return RollupResponseTranslator.translateResponse(msearchResponse.getResponses(), reduceContext);
}
throw new RuntimeException("MSearch response was empty, cannot unroll RollupSearch results");
}
static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWriteableRegistry registry, RollupSearchContext context) {
if (context.hasLiveIndices() == false && context.hasRollupIndices() == false) {
// Don't support _all on everything right now, for code simplicity
throw new IllegalArgumentException("Must specify at least one rollup index in _rollup_search API");
} else if (context.hasLiveIndices() && context.hasRollupIndices() == false) {
// not best practice, but if the user accidentally only sends "normal" indices we can support that
logger.debug("Creating msearch with only normal request");
final SearchRequest originalRequest = new SearchRequest(context.getLiveIndices(), request.source());
return new MultiSearchRequest().add(originalRequest);
}
// Rollup only supports a limited subset of the search API, validate and make sure
// nothing is set that we can't support
validateSearchRequest(request);
// The original request is added as-is (if normal indices exist), minus the rollup indices
final SearchRequest originalRequest = new SearchRequest(context.getLiveIndices(), request.source());
MultiSearchRequest msearch = new MultiSearchRequest();
if (context.hasLiveIndices()) {
msearch.add(originalRequest);
}
SearchSourceBuilder rolledSearchSource = new SearchSourceBuilder();
rolledSearchSource.size(0);
AggregatorFactories.Builder sourceAgg = request.source().aggregations();
// If there are no aggs in the request, our translation won't create any msearch.
// So just add an dummy request to the msearch and return. This is a bit silly
// but maintains how the regular search API behaves
if (sourceAgg == null || sourceAgg.count() == 0) {
// Note: we can't apply any query rewriting or filtering on the query because there
// are no validated caps, so we have no idea what job is intended here. The only thing
// this affects is doc count, since hits and aggs will both be empty it doesn't really matter.
msearch.add(new SearchRequest(context.getRollupIndices(), request.source()).types(request.types()));
return msearch;
}
// Find our list of "best" job caps
Set<RollupJobCaps> validatedCaps = new HashSet<>();
sourceAgg.getAggregatorFactories()
.forEach(agg -> validatedCaps.addAll(RollupJobIdentifierUtils.findBestJobs(agg, context.getJobCaps())));
List<String> jobIds = validatedCaps.stream().map(RollupJobCaps::getJobID).collect(Collectors.toList());
for (AggregationBuilder agg : sourceAgg.getAggregatorFactories()) {
List<QueryBuilder> filterConditions = new ArrayList<>(5);
// Translate the agg tree, and collect any potential filtering clauses
List<AggregationBuilder> translatedAgg = RollupRequestTranslator.translateAggregation(agg, filterConditions, registry);
BoolQueryBuilder boolQuery = new BoolQueryBuilder();
filterConditions.forEach(boolQuery::must);
FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(RollupField.FILTER + "_" + agg.getName(),
boolQuery);
translatedAgg.forEach(filterAgg::subAggregation);
rolledSearchSource.aggregation(filterAgg);
}
// Rewrite the user's query to our internal conventions, checking against the validated job caps
QueryBuilder rewritten = rewriteQuery(request.source().query(), validatedCaps);
for (String id : jobIds) {
SearchSourceBuilder copiedSource;
try {
copiedSource = copyWriteable(rolledSearchSource, registry, SearchSourceBuilder::new);
} catch (IOException e) {
throw new RuntimeException("Encountered IO exception while trying to build rollup request.", e);
}
// filter the rewritten query by JobID
copiedSource.query(new BoolQueryBuilder()
.must(rewritten)
.filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), id))
// Both versions are acceptable right now since they are compatible at search time
.filter(new TermsQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD),
new long[]{Rollup.ROLLUP_VERSION_V1, Rollup.ROLLUP_VERSION_V2})));
// And add a new msearch per JobID
msearch.add(new SearchRequest(context.getRollupIndices(), copiedSource).types(request.types()));
}
return msearch;
}
/**
* Lifted from ESTestCase :s Don't reuse this anywhere!
*
* Create a copy of an original {@link SearchSourceBuilder} object by running it through a {@link BytesStreamOutput} and
* reading it in again using a {@link Writeable.Reader}. The stream that is wrapped around the {@link StreamInput}
* potentially need to use a {@link NamedWriteableRegistry}, so this needs to be provided too
*/
private static SearchSourceBuilder copyWriteable(SearchSourceBuilder original, NamedWriteableRegistry namedWriteableRegistry,
Writeable.Reader<SearchSourceBuilder> reader) throws IOException {
Writeable.Writer<SearchSourceBuilder> writer = (out, value) -> value.writeTo(out);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.CURRENT);
writer.write(output, original);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setVersion(Version.CURRENT);
return reader.read(in);
}
}
}
static void validateSearchRequest(SearchRequest request) {
// Rollup does not support hits at the moment
if (request.source().size() != 0) {
throw new IllegalArgumentException("Rollup does not support returning search hits, please try again " +
"with [size: 0].");
}
if (request.source().postFilter() != null) {
throw new IllegalArgumentException("Rollup search does not support post filtering.");
}
if (request.source().suggest() != null) {
throw new IllegalArgumentException("Rollup search does not support suggestors.");
}
if (request.source().highlighter() != null) {
throw new IllegalArgumentException("Rollup search does not support highlighting.");
}
if (request.source().profile()) {
throw new IllegalArgumentException("Rollup search does not support profiling at the moment.");
}
if (request.source().explain() != null && request.source().explain()) {
throw new IllegalArgumentException("Rollup search does not support explaining.");
}
}
static QueryBuilder rewriteQuery(QueryBuilder builder, Set<RollupJobCaps> jobCaps) {
if (builder == null) {
return new MatchAllQueryBuilder();
}
if (builder.getWriteableName().equals(BoolQueryBuilder.NAME)) {
BoolQueryBuilder rewrittenBool = new BoolQueryBuilder();
((BoolQueryBuilder)builder).must().forEach(query -> rewrittenBool.must(rewriteQuery(query, jobCaps)));
((BoolQueryBuilder)builder).mustNot().forEach(query -> rewrittenBool.mustNot(rewriteQuery(query, jobCaps)));
((BoolQueryBuilder)builder).should().forEach(query -> rewrittenBool.should(rewriteQuery(query, jobCaps)));
((BoolQueryBuilder)builder).filter().forEach(query -> rewrittenBool.filter(rewriteQuery(query, jobCaps)));
return rewrittenBool;
} else if (builder.getWriteableName().equals(ConstantScoreQueryBuilder.NAME)) {
return new ConstantScoreQueryBuilder(rewriteQuery(((ConstantScoreQueryBuilder)builder).innerQuery(), jobCaps));
} else if (builder.getWriteableName().equals(BoostingQueryBuilder.NAME)) {
return new BoostingQueryBuilder(rewriteQuery(((BoostingQueryBuilder)builder).negativeQuery(), jobCaps),
rewriteQuery(((BoostingQueryBuilder)builder).positiveQuery(), jobCaps));
} else if (builder.getWriteableName().equals(DisMaxQueryBuilder.NAME)) {
DisMaxQueryBuilder rewritten = new DisMaxQueryBuilder();
((DisMaxQueryBuilder) builder).innerQueries().forEach(query -> rewritten.add(rewriteQuery(query, jobCaps)));
return rewritten;
} else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME)) {
RangeQueryBuilder range = (RangeQueryBuilder) builder;
String fieldName = range.fieldName();
// Many range queries don't include the timezone because the default is UTC, but the query
// builder will return null so we need to set it here
String timeZone = range.timeZone() == null ? DateTimeZone.UTC.toString() : range.timeZone();
String rewrittenFieldName = rewriteFieldName(jobCaps, RangeQueryBuilder.NAME, fieldName, timeZone);
RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName)
.from(range.from())
.to(range.to())
.includeLower(range.includeLower())
.includeUpper(range.includeUpper());
if (range.timeZone() != null) {
rewritten.timeZone(range.timeZone());
}
if (range.format() != null) {
rewritten.format(range.format());
}
return rewritten;
} else if (builder.getWriteableName().equals(TermQueryBuilder.NAME)) {
TermQueryBuilder term = (TermQueryBuilder) builder;
String fieldName = term.fieldName();
String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
return new TermQueryBuilder(rewrittenFieldName, term.value());
} else if (builder.getWriteableName().equals(TermsQueryBuilder.NAME)) {
TermsQueryBuilder terms = (TermsQueryBuilder) builder;
String fieldName = terms.fieldName();
String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
return new TermsQueryBuilder(rewrittenFieldName, terms.values());
} else if (builder.getWriteableName().equals(MatchAllQueryBuilder.NAME)) {
// no-op
return builder;
} else {
throw new IllegalArgumentException("Unsupported Query in search request: [" + builder.getWriteableName() + "]");
}
}
private static String rewriteFieldName(Set<RollupJobCaps> jobCaps,
String builderName,
String fieldName,
String timeZone) {
List<String> incompatibleTimeZones = timeZone == null ? Collections.emptyList() : new ArrayList<>();
List<String> rewrittenFieldNames = jobCaps.stream()
// We only care about job caps that have the query's target field
.filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
.map(caps -> {
RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
return fieldCaps.getAggs().stream()
// For now, we only allow filtering on grouping fields
.filter(agg -> {
String type = (String)agg.get(RollupField.AGG);
// If the cap is for a date_histo, and the query is a range, the timezones need to match
if (type.equals(DateHistogramAggregationBuilder.NAME) && timeZone != null) {
boolean matchingTZ = ((String)agg.get(DateHistogramGroupConfig.TIME_ZONE))
.equalsIgnoreCase(timeZone);
if (matchingTZ == false) {
incompatibleTimeZones.add((String)agg.get(DateHistogramGroupConfig.TIME_ZONE));
}
return matchingTZ;
}
// Otherwise just make sure it's one of the three groups
return type.equals(TermsAggregationBuilder.NAME)
|| type.equals(DateHistogramAggregationBuilder.NAME)
|| type.equals(HistogramAggregationBuilder.NAME);
})
// Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
.map(agg -> {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
} else {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
}
})
.collect(Collectors.toList());
})
.distinct()
.collect(ArrayList::new, List::addAll, List::addAll);
if (rewrittenFieldNames.isEmpty()) {
if (incompatibleTimeZones.isEmpty()) {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ "] query is not available in selected rollup indices, cannot query.");
} else {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
+ incompatibleTimeZones);
}
} else if (rewrittenFieldNames.size() > 1) {
throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldNames, ",") + "].");
} else {
return rewrittenFieldNames.get(0);
}
}
static RollupSearchContext separateIndices(String[] indices, ImmutableOpenMap<String, IndexMetaData> indexMetaData) {
if (indices.length == 0) {
throw new IllegalArgumentException("Must specify at least one concrete index.");
}
List<String> rollup = new ArrayList<>();
List<String> normal = new ArrayList<>();
Set<RollupJobCaps> jobCaps = new HashSet<>();
Arrays.stream(indices).forEach(i -> {
if (i.equals(MetaData.ALL)) {
throw new IllegalArgumentException("Searching _all via RollupSearch endpoint is not supported at this time.");
}
Optional<RollupIndexCaps> caps = TransportGetRollupCapsAction.findRollupIndexCaps(i, indexMetaData.get(i));
if (caps.isPresent()) {
rollup.add(i);
jobCaps.addAll(caps.get().getJobCaps());
} else {
normal.add(i);
}
});
assert normal.size() + rollup.size() > 0;
if (rollup.size() > 1) {
throw new IllegalArgumentException("RollupSearch currently only supports searching one rollup index at a time. " +
"Found the following rollup indices: " + rollup);
}
return new RollupSearchContext(normal.toArray(new String[0]), rollup.toArray(new String[0]), jobCaps);
}
class TransportHandler implements TransportRequestHandler<SearchRequest> {
@Override
public final void messageReceived(SearchRequest request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
@Override
public final void messageReceived(final SearchRequest request, final TransportChannel channel, Task task) throws Exception {
// We already got the task created on the network layer - no need to create it again on the transport layer
execute(task, request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn(
(org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]",
actionName,
request),
e1);
}
}
});
}
}
static class RollupSearchContext {
private final String[] liveIndices;
private final String[] rollupIndices;
private final Set<RollupJobCaps> jobCaps;
RollupSearchContext(String[] liveIndices, String[] rollupIndices, Set<RollupJobCaps> jobCaps) {
this.liveIndices = Objects.requireNonNull(liveIndices);
this.rollupIndices = Objects.requireNonNull(rollupIndices);
this.jobCaps = Objects.requireNonNull(jobCaps);
}
boolean hasLiveIndices() {
return liveIndices.length != 0;
}
boolean hasRollupIndices() {
return rollupIndices.length != 0;
}
String[] getLiveIndices() {
return liveIndices;
}
String[] getRollupIndices() {
return rollupIndices;
}
Set<RollupJobCaps> getJobCaps() {
return jobCaps;
}
}
}