-
Notifications
You must be signed in to change notification settings - Fork 24.3k
/
CanMatchNodeRequest.java
282 lines (257 loc) · 11.2 KB
/
CanMatchNodeRequest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.builder.SubSearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* Node-level request used during can-match phase
*/
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest {
private final SearchSourceBuilder source;
private final List<Shard> shards;
private final SearchType searchType;
private final Boolean requestCache;
private final boolean allowPartialSearchResults;
private final Scroll scroll;
private final int numberOfShards;
private final long nowInMillis;
@Nullable
private final String clusterAlias;
private final String[] indices;
private final IndicesOptions indicesOptions;
private final TimeValue waitForCheckpointsTimeout;
public static class Shard implements Writeable {
private final String[] indices;
private final ShardId shardId;
private final int shardRequestIndex;
private final AliasFilter aliasFilter;
private final float indexBoost;
private final ShardSearchContextId readerId;
private final TimeValue keepAlive;
private final long waitForCheckpoint;
public Shard(
String[] indices,
ShardId shardId,
int shardRequestIndex,
AliasFilter aliasFilter,
float indexBoost,
ShardSearchContextId readerId,
TimeValue keepAlive,
long waitForCheckpoint
) {
this.indices = indices;
this.shardId = shardId;
this.shardRequestIndex = shardRequestIndex;
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
this.readerId = readerId;
this.keepAlive = keepAlive;
this.waitForCheckpoint = waitForCheckpoint;
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}
public Shard(StreamInput in) throws IOException {
indices = in.readStringArray();
shardId = new ShardId(in);
shardRequestIndex = in.readVInt();
aliasFilter = AliasFilter.readFrom(in);
indexBoost = in.readFloat();
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
keepAlive = in.readOptionalTimeValue();
waitForCheckpoint = in.readLong();
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
shardId.writeTo(out);
out.writeVInt(shardRequestIndex);
aliasFilter.writeTo(out);
out.writeFloat(indexBoost);
out.writeOptionalWriteable(readerId);
out.writeOptionalTimeValue(keepAlive);
out.writeLong(waitForCheckpoint);
}
public int getShardRequestIndex() {
return shardRequestIndex;
}
public String[] getOriginalIndices() {
return indices;
}
public ShardId shardId() {
return shardId;
}
}
public CanMatchNodeRequest(
SearchRequest searchRequest,
IndicesOptions indicesOptions,
List<Shard> shards,
int numberOfShards,
long nowInMillis,
@Nullable String clusterAlias
) {
this.source = getCanMatchSource(searchRequest);
this.indicesOptions = indicesOptions;
this.shards = new ArrayList<>(shards);
this.searchType = searchRequest.searchType();
this.requestCache = searchRequest.requestCache();
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
assert searchRequest.allowPartialSearchResults() != null;
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults();
this.scroll = searchRequest.scroll();
this.numberOfShards = numberOfShards;
this.nowInMillis = nowInMillis;
this.clusterAlias = clusterAlias;
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout();
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct().toArray(String[]::new);
}
private static void collectAggregationQueries(Collection<AggregationBuilder> aggregations, List<QueryBuilder> aggregationQueries) {
for (AggregationBuilder aggregation : aggregations) {
QueryBuilder aggregationQuery = aggregation.getQuery();
if (aggregationQuery != null) {
aggregationQueries.add(aggregationQuery);
}
collectAggregationQueries(aggregation.getSubAggregations(), aggregationQueries);
}
}
private SearchSourceBuilder getCanMatchSource(SearchRequest searchRequest) {
// Aggregations may use a different query than the top-level search query. An example is
// the significant terms aggregation, which also collects data over a background that
// typically much larger than the search query. To accommodate for this, we take the union
// of all queries to determine whether a request can match.
List<QueryBuilder> aggregationQueries = new ArrayList<>();
if (searchRequest.source() != null && searchRequest.source().aggregations() != null) {
collectAggregationQueries(searchRequest.source().aggregations().getAggregatorFactories(), aggregationQueries);
}
if (aggregationQueries.isEmpty()) {
return searchRequest.source();
} else {
List<SubSearchSourceBuilder> subSearches = new ArrayList<>(searchRequest.source().subSearches());
for (QueryBuilder aggregationQuery : aggregationQueries) {
subSearches.add(new SubSearchSourceBuilder(aggregationQuery));
}
return searchRequest.source().shallowCopy().subSearches(subSearches);
}
}
public CanMatchNodeRequest(StreamInput in) throws IOException {
super(in);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
indicesOptions = IndicesOptions.readIndicesOptions(in);
searchType = SearchType.fromId(in.readByte());
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
// types no longer relevant so ignore
String[] types = in.readStringArray();
if (types.length > 0) {
throw new IllegalStateException(
"types are no longer supported in search requests but found [" + Arrays.toString(types) + "]"
);
}
}
scroll = in.readOptionalWriteable(Scroll::new);
requestCache = in.readOptionalBoolean();
allowPartialSearchResults = in.readBoolean();
numberOfShards = in.readVInt();
nowInMillis = in.readVLong();
clusterAlias = in.readOptionalString();
waitForCheckpointsTimeout = in.readTimeValue();
shards = in.readCollectionAsList(Shard::new);
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct().toArray(String[]::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(source);
indicesOptions.writeIndicesOptions(out);
out.writeByte(searchType.id());
if (out.getTransportVersion().before(TransportVersions.V_8_0_0)) {
// types not supported so send an empty array to previous versions
out.writeStringArray(Strings.EMPTY_ARRAY);
}
out.writeOptionalWriteable(scroll);
out.writeOptionalBoolean(requestCache);
out.writeBoolean(allowPartialSearchResults);
out.writeVInt(numberOfShards);
out.writeVLong(nowInMillis);
out.writeOptionalString(clusterAlias);
out.writeTimeValue(waitForCheckpointsTimeout);
out.writeCollection(shards);
}
public List<Shard> getShardLevelRequests() {
return shards;
}
public List<ShardSearchRequest> createShardSearchRequests() {
return shards.stream().map(this::createShardSearchRequest).toList();
}
public ShardSearchRequest createShardSearchRequest(Shard r) {
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
new OriginalIndices(r.indices, indicesOptions),
r.shardId,
r.shardRequestIndex,
numberOfShards,
searchType,
source,
requestCache,
r.aliasFilter,
r.indexBoost,
allowPartialSearchResults,
scroll,
nowInMillis,
clusterAlias,
r.readerId,
r.keepAlive,
r.waitForCheckpoint,
waitForCheckpointsTimeout,
false
);
shardSearchRequest.setParentTask(getParentTask());
return shardSearchRequest;
}
@Override
public String[] indices() {
return indices;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
}
@Override
public String getDescription() {
// Shard id is enough here, the request itself can be found by looking at the parent task description
return "shardIds[" + shards.stream().map(slr -> slr.shardId).toList() + "]";
}
}