-
Notifications
You must be signed in to change notification settings - Fork 215
/
SearchSourceBuilder.java
380 lines (346 loc) · 13.1 KB
/
SearchSourceBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.search;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.exceptions.InvalidRqlExpressionException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.rql.model.ParserException;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.parser.thingsearch.RqlOptionParser;
import org.eclipse.ditto.rql.query.expression.ThingsFieldExpressionFactory;
import org.eclipse.ditto.rql.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.thingsearch.model.Option;
import org.eclipse.ditto.thingsearch.model.SizeOption;
import org.eclipse.ditto.thingsearch.model.SortOption;
import org.eclipse.ditto.thingsearch.model.SortOptionEntry;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.InvalidOptionException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.StreamThings;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
/**
* Builder for {@link SearchSource}.
*/
public final class SearchSourceBuilder {
private static final String COMMA = ",";
private static final SortOption DEFAULT_SORT_OPTION =
SortOption.of(List.of(SortOptionEntry.asc(Thing.JsonFields.ID.getPointer())));
private ActorRef pubSubMediator;
private ActorSelection conciergeForwarder;
private JsonFieldSelector fields;
private JsonFieldSelector sortFields;
private String filter;
private JsonArray namespaces;
private String sort;
private JsonArray sortValues;
private DittoHeaders dittoHeaders;
private Duration thingsAskTimeout = Duration.ofSeconds(10L);
private Duration searchAskTimeout = Duration.ofSeconds(60L);
private String lastThingId = "";
/**
* Create a search-source from this builder.
*
* @return the search-source.
* @throws java.lang.NullPointerException if required fields are not set.
* @throws org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.InvalidOptionException if the sort option is
* invalid.
* @throws org.eclipse.ditto.base.model.exceptions.InvalidRqlExpressionException if the filter expression is
* invalid.
*/
public SearchSource build() {
final StreamThings streamThings = constructStreamThings();
return new SearchSource(
checkNotNull(pubSubMediator, "pubSubMediator"),
checkNotNull(conciergeForwarder, "conciergeForwarder"),
thingsAskTimeout,
searchAskTimeout,
fields,
sortFields,
streamThings,
lastThingId);
}
/**
* Set the pub-sub mediator for error reporting.
*
* @param pubSubMediator the pub-sub mediator.
* @return this builder.
*/
public SearchSourceBuilder pubSubMediator(final ActorRef pubSubMediator) {
this.pubSubMediator = pubSubMediator;
return this;
}
/**
* Set the concierge forwarder to send commands.
*
* @param conciergeForwarder the concierge forwarder.
* @return this builder.
*/
public SearchSourceBuilder conciergeForwarder(final ActorSelection conciergeForwarder) {
this.conciergeForwarder = conciergeForwarder;
return this;
}
/**
* Set the selected fields of search results.
*
* @param fields the selected fields.
* @return this builder.
*/
public SearchSourceBuilder fields(@Nullable final JsonFieldSelector fields) {
this.fields = fields;
return this;
}
/**
* Set the selected fields of search results from a comma-separated string.
*
* @param fieldsString selected fields as comma-separated string.
* @return this builder.
*/
public SearchSourceBuilder fields(@Nullable final String fieldsString) {
if (fieldsString == null) {
fields = null;
} else {
List<String> pointers = Arrays.stream(fieldsString.split(COMMA))
.filter(pointer -> !pointer.isBlank())
.toList();
if (pointers.isEmpty()) {
fields = null;
} else {
fields = JsonFieldSelector.newInstance(pointers.get(0),
pointers.stream().skip(1L).toArray(String[]::new));
}
}
return this;
}
/**
* Set the filter string of the search command.
*
* @param filter the filter.
* @return this builder.
*/
public SearchSourceBuilder filter(@Nullable final String filter) {
this.filter = validateFilter(filter);
return this;
}
/**
* Set the namespaces of the search command.
*
* @param namespaces the namespaces.
* @return this builder.
*/
public SearchSourceBuilder namespaces(@Nullable final JsonArray namespaces) {
this.namespaces = namespaces;
return this;
}
/**
* Set the namespaces of the search command from a comma-separated string.
*
* @param namespacesString namespaces as a comma-separated string.
* @return this builder.
*/
public SearchSourceBuilder namespaces(@Nullable final String namespacesString) {
if (namespacesString == null || namespacesString.isEmpty()) {
namespaces = null;
} else {
namespaces = Arrays.stream(namespacesString.split(COMMA))
.map(JsonValue::of)
.collect(JsonCollectors.valuesToArray());
}
return this;
}
/**
* Set the sort option as a string with consideration to the default sort option.
*
* @param sort the sort string.
* @return this builder.
*/
public SearchSourceBuilder sort(@Nullable final String sort) {
return validateAndSetSortOption(getEffectiveSortOption(sort));
}
/**
* Set the sort option and sort fields from the value of the options parameter.
*
* @param options options as comma-separated string.
* @return this builder.
*/
public SearchSourceBuilder options(@Nullable final String options) {
return validateAndSetSortOption(getEffectiveSortOption(options));
}
private SearchSourceBuilder validateAndSetSortOption(final SortOption sortOption) {
sort = sortOptionAsString(validateSortOption(sortOption));
sortFields = getSortFields(sortOption);
return this;
}
/**
* Set the sort values for cursor computation.
*
* @param sortValues values of the sort fields of the last result of a previous search.
* @return this builder.
*/
public SearchSourceBuilder sortValues(@Nullable final JsonArray sortValues) {
this.sortValues = sortValues;
return this;
}
/**
* Set the Ditto headers to use including authorization context and correlation ID.
*
* @param dittoHeaders the Ditto headers to use.
* @return this builder.
*/
public SearchSourceBuilder dittoHeaders(final DittoHeaders dittoHeaders) {
this.dittoHeaders = dittoHeaders;
return this;
}
/**
* Set the timeout when asking the things-shard-region.
*
* @param thingsAskTimeout the timeout.
* @return this builder.
*/
public SearchSourceBuilder thingsAskTimeout(final Duration thingsAskTimeout) {
this.thingsAskTimeout = thingsAskTimeout;
return this;
}
/**
* Set the timeout when asking the search actor.
*
* @param searchAskTimeout the timeout.
* @return this builder.
*/
public SearchSourceBuilder searchAskTimeout(final Duration searchAskTimeout) {
this.searchAskTimeout = searchAskTimeout;
return this;
}
/**
* Set the last thing ID to resume from.
*
* @param lastThingId the last thing ID.
* @return this builder.
*/
public SearchSourceBuilder lastThingId(final String lastThingId) {
this.lastThingId = checkNotNull(lastThingId, "lastThingId");
return this;
}
private String sortOptionAsString(final SortOption sortOption) {
return sortOption.getEntries()
.stream()
.map(SortOptionEntry::toString)
.collect(Collectors.joining(COMMA, "sort(", ")"));
}
private JsonFieldSelector getSortFields(final SortOption sortOption) {
return JsonFieldSelector.newInstance(
sortOption.getEntries().get(0).getPropertyPath(),
sortOption.getEntries()
.stream()
.skip(1L)
.map(SortOptionEntry::getPropertyPath)
.toArray(CharSequence[]::new)
);
}
private SortOption getEffectiveSortOption(@Nullable final String optionString) {
if (optionString == null || optionString.isEmpty()) {
return DEFAULT_SORT_OPTION;
} else {
return appendIdField(findUniqueSortOption(optionString));
}
}
private SortOption appendIdField(final SortOption parsedSortOption) {
final boolean hasId = parsedSortOption.stream()
.anyMatch(entry -> entry.getPropertyPath().equals(Thing.JsonFields.ID.getPointer()));
if (hasId) {
return parsedSortOption;
} else {
return SortOption.of(
Stream.concat(parsedSortOption.stream(), DEFAULT_SORT_OPTION.stream())
.toList()
);
}
}
private SortOption findUniqueSortOption(final String optionString) {
final List<Option> parsedOptions;
try {
parsedOptions = RqlOptionParser.parseOptions(optionString);
} catch (final ParserException e) {
throw InvalidOptionException.newBuilder()
.message("Invalid options: " + optionString)
.build();
}
checkForUnsupportedOptions(parsedOptions);
final List<SortOption> sortOptions = parsedOptions.stream()
.flatMap(option -> option instanceof SortOption
? Stream.of((SortOption) option)
: Stream.empty()
)
.toList();
if (sortOptions.isEmpty()) {
return DEFAULT_SORT_OPTION;
} else if (sortOptions.size() == 1) {
return sortOptions.get(0);
} else {
throw InvalidOptionException.newBuilder()
.message("Too many sort options.")
.description("0 or 1 sort option is expected.")
.build();
}
}
private StreamThings constructStreamThings() {
return StreamThings.of(filter, namespaces, sort, sortValues, checkNotNull(dittoHeaders, "dittoHeaders"));
}
@Nullable
private String validateFilter(@Nullable final String filter) {
if (filter != null) {
try {
RqlPredicateParser.getInstance().parse(filter);
} catch (final ParserException e) {
throw InvalidRqlExpressionException.newBuilder()
.message("Invalid filter expression: " + filter)
.build();
}
}
return filter;
}
private SortOption validateSortOption(final SortOption sort) {
// check sort expressions
try {
final ThingsFieldExpressionFactory fieldExpressionFactory =
ModelBasedThingsFieldExpressionFactory.getInstance();
for (final SortOptionEntry entry : sort) {
fieldExpressionFactory.sortBy(entry.getPropertyPath().toString());
}
} catch (final IllegalArgumentException e) {
throw InvalidOptionException.newBuilder()
.message(e.getMessage())
.description("The sort option is invalid.")
.build();
}
return sort;
}
private static void checkForUnsupportedOptions(final List<Option> options) {
for (final Option option : options) {
if (!(option instanceof SortOption || option instanceof SizeOption)) {
throw InvalidOptionException.newBuilder()
.message("The option " + option + " is not supported at this endpoint.")
.build();
}
}
}
}