/
ThingsSearchCursor.java
898 lines (813 loc) · 38.4 KB
/
ThingsSearchCursor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.thingsearch.starter.actors;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.model.base.exceptions.InvalidRqlExpressionException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.query.Query;
import org.eclipse.ditto.model.query.SortDirection;
import org.eclipse.ditto.model.query.criteria.Criteria;
import org.eclipse.ditto.model.query.criteria.CriteriaFactory;
import org.eclipse.ditto.model.rql.ParserException;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.thingsearch.CursorOption;
import org.eclipse.ditto.model.thingsearch.LimitOption;
import org.eclipse.ditto.model.thingsearch.Option;
import org.eclipse.ditto.model.thingsearch.OptionVisitor;
import org.eclipse.ditto.model.thingsearch.SearchResult;
import org.eclipse.ditto.model.thingsearch.SearchResultBuilder;
import org.eclipse.ditto.model.thingsearch.SizeOption;
import org.eclipse.ditto.model.thingsearch.SortOption;
import org.eclipse.ditto.model.thingsearch.SortOptionEntry;
import org.eclipse.ditto.model.thingsearchparser.RqlOptionParser;
import org.eclipse.ditto.services.thingsearch.common.model.ResultList;
import org.eclipse.ditto.services.thingsearch.persistence.write.mapping.JsonToBson;
import org.eclipse.ditto.signals.commands.thingsearch.exceptions.InvalidOptionException;
import org.eclipse.ditto.signals.commands.thingsearch.query.QueryThings;
import akka.NotUsed;
import akka.http.javadsl.coding.Coder;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
/**
* Package-private evaluator and generator of opaque cursors.
* The meaning of a cursor should be invisible to all users.
* This class provides the following services to other classes of this package:
* <ul>
* <li>{@code extractCursor(QueryThings, ActorMaterializer)}:
* Read any cursor given by a {@code QueryThings} command.
* </li>
* <li>{@code adjust(Optional<ThingsSearchCursor>, QueryThings)}:
* Augment a {@code QueryThings} command by information in the cursor.
* </li>
* <li>{@code adjust(Optional<ThingsSearchCursor>, Query, CriteriaFactory)}:
* Adjust a {@code Query} so that its results start from the location marked by the cursor.
* </li>
* <li>{@code processSearchResult(QueryThings, ThingsSearchCursor, SearchResult, ResultList)}:
* Compute a cursor pointing at the end of the search result if there are more results.
* </li>
* </ul>
*/
final class ThingsSearchCursor {
static final SortOptionEntry DEFAULT_SORT_OPTION_ENTRY =
SortOptionEntry.asc(Thing.JsonFields.ID.getPointer());
/**
* Delimiter between correlation IDs. The real correlation ID of a QueryThings command with cursor is
* "[correlation-id-of-cursor]:[correlation-id-of-command-headers]".
*/
private static final String CORRELATION_ID_DELIMITER = ":";
private static final String LIMIT_OPTION_FORBIDDEN = "The options 'cursor' and 'limit' must not be used together.";
private static final Base64.Encoder BASE64_URL_ENCODER_WITHOUT_PADDING = Base64.getUrlEncoder().withoutPadding();
/*
* Secret fields in JSON representation of a cursor.
*/
private static final JsonFieldDefinition<String> FILTER = JsonFactory.newStringFieldDefinition("F");
private static final JsonFieldDefinition<String> JSON_FIELD_SELECTOR = JsonFactory.newStringFieldDefinition("J");
private static final JsonFieldDefinition<JsonArray> NAMESPACES = JsonFactory.newJsonArrayFieldDefinition("N");
private static final JsonFieldDefinition<String> CORRELATION_ID = JsonFactory.newStringFieldDefinition("C");
private static final JsonFieldDefinition<String> OPTIONS = JsonFactory.newStringFieldDefinition("O");
private static final JsonFieldDefinition<JsonArray> VALUES = JsonFactory.newJsonArrayFieldDefinition("V");
/*
* Data encoded in a cursor.
*/
@Nullable private final String filter;
@Nullable private final String jsonFieldSelector;
@Nullable private final Set<String> namespaces;
@Nullable final String correlationId;
private final List<Option> options;
private final JsonArray values;
private final SortOption sortOption;
ThingsSearchCursor(@Nullable final String jsonFieldSelector,
@Nullable final Set<String> namespaces, @Nullable final String correlationId,
final List<Option> options, @Nullable final String filter, final JsonArray values) {
this.namespaces = namespaces;
this.filter = filter;
this.jsonFieldSelector = jsonFieldSelector;
this.correlationId = correlationId;
this.options = options;
this.values = values;
this.sortOption = findUniqueSortOption(options);
if (sortOption.getSize() != values.getSize()) {
// Cursor corrupted. Offer no more information.
throw invalidCursorBuilder().build();
}
}
@Override
public String toString() {
return getClass().getSimpleName() + toJson();
}
@Override
public int hashCode() {
return Objects.hash(filter, jsonFieldSelector, namespaces, correlationId, options, values, sortOption);
}
@Override
public boolean equals(final Object that) {
if (that instanceof ThingsSearchCursor) {
final ThingsSearchCursor c = (ThingsSearchCursor) that;
return Arrays.asList(filter, jsonFieldSelector, namespaces, correlationId, options, values, sortOption)
.equals(Arrays.asList(c.filter, c.jsonFieldSelector, c.namespaces, c.correlationId, c.options,
c.values, c.sortOption));
} else {
return false;
}
}
/**
* Override certain values in this cursor by values from a {@code QueryThing} command.
* Accepted values are field selector, namespaces and a size option.
*
* @param queryThings the command.
* @param inputOptions parsed options of the command.
* @return a copy of this cursor with values from the command.
*/
private ThingsSearchCursor override(final QueryThings queryThings, final List<Option> inputOptions) {
checkCursorValidity(queryThings, inputOptions)
.ifPresent(error -> {
throw error;
});
final String newSelector =
queryThings.getFields().map(JsonFieldSelector::toString).orElse(this.jsonFieldSelector);
final List<Option> newOptions =
new OptionsBuilder().visitAll(options).visitAll(inputOptions).withSortOption(sortOption).build();
return new ThingsSearchCursor(newSelector, namespaces, correlationId, newOptions, filter, values);
}
/**
* Check whether this cursor is valid for a {@code QueryThings} command.
* A cursor is compatible with a command if
* <ul>
* <li>their filter strings are identical,</li>
* <li>their sort options are compatible, and</li>
* <li>the command has no limit option.</li>
* </ul>
*
* @param queryThings the command.
* @param commandOptions parsed options of the command.
* @return reason why this cursor is invalid for the command.
*/
private Optional<DittoRuntimeException> checkCursorValidity(final QueryThings queryThings,
final List<Option> commandOptions) {
// when a cursor is present, the command may only have an additional size option and a field selector.
final boolean commandHasDifferentFilter =
queryThings.getFilter().filter(f -> !Objects.equals(f, filter)).isPresent();
final String description;
if (commandHasDifferentFilter) {
description = "The parameter 'filter' must not differ from the original query of the cursor.";
} else if (commandOptions.stream().anyMatch(LimitOption.class::isInstance)) {
description = LIMIT_OPTION_FORBIDDEN;
} else if (hasIncompatibleSortOption(commandOptions)) {
description = "The option 'sort' must not differ from the original query of the cursor.";
} else {
description = null;
}
return Optional.ofNullable(description).map(d -> invalidCursor(d, queryThings));
}
/**
* Check if options from a {@code QueryThings} command contains an incompatible sort option.
* A sort option is compatible with the sort option of this cursor if
* <ul>
* <li>both are identical,</li>
* <li>cursor option is obtained from command option by appending the default option, or</li>
* <li>cursor option is obtained from command option by truncating after +thingId or -thingId.</li>
* </ul>
*
* @param commandOptions options from a command.
* @return whether the options contains an incompatible sort option.
*/
private boolean hasIncompatibleSortOption(final List<Option> commandOptions) {
return findAll(SortOption.class, commandOptions).stream()
.anyMatch(sortOption -> !areCompatible(this.sortOption.getEntries(), sortOption.getEntries(), 0));
}
/**
* During paging, augment a search result by a cursor pointing at its end.
*
* @param searchResult the search result.
* @param resultList items in the search result.
* @return search result augmented by a new cursor.
*/
private SearchResult searchResultWithExistingCursor(final SearchResult searchResult,
final ResultList<?> resultList) {
final Optional<JsonArray> newValues = resultList.lastResultSortValues();
if (newValues.isPresent()) {
final ThingsSearchCursor newCursor =
new ThingsSearchCursor(jsonFieldSelector, namespaces, correlationId, options, filter,
newValues.get());
return searchResult.toBuilder()
.cursor(newCursor.encode())
.nextPageOffset(null)
.build();
} else {
return searchResult.toBuilder().nextPageOffset(null).build();
}
}
/**
* @return Compute a {@code QueryThings} using content of this cursor.
*/
private QueryThings adjustQueryThings(final QueryThings queryThings) {
final List<String> optionStrings = options.stream().map(Option::toString).collect(Collectors.toList());
final JsonFieldSelector selector = Optional.ofNullable(jsonFieldSelector)
.map(JsonFieldSelector::newInstance)
.orElse(null);
final DittoHeaders headers = queryThings.getDittoHeaders().toBuilder()
.correlationId(combineCorrelationIds(correlationId, queryThings.getDittoHeaders()))
.build();
return QueryThings.of(filter, optionStrings, selector, namespaces, headers);
}
/**
* Adjust a {@code Query} object so that its results start at the location of this cursor.
*
* @param query the query object.
* @param cf a criteria factory.
* @return a new query object starting at the location of this cursor.
*/
private Query adjustQuery(final Query query, final CriteriaFactory cf) {
return query.withCritera(cf.and(Arrays.asList(query.getCriteria(),
getNextPageFilter(query.getSortOptions(), values, cf))));
}
/**
* @return Secret JSON representation of this cursor.
*/
private JsonObject toJson() {
final java.util.function.Predicate<JsonField> notNull = field -> !field.getValue().isNull();
return JsonFactory.newObjectBuilder()
.set(FILTER, filter, notNull)
.set(JSON_FIELD_SELECTOR, jsonFieldSelector, notNull)
.set(NAMESPACES, renderNamespaces(), notNull)
.set(CORRELATION_ID, correlationId, notNull)
.set(OPTIONS, RqlOptionParser.unparse(options))
.set(VALUES, values)
.build();
}
/**
* @return naemspaces of this cursor as JSON array.
*/
@Nullable
private JsonArray renderNamespaces() {
if (namespaces != null) {
return namespaces.stream().map(JsonValue::of).collect(JsonCollectors.valuesToArray());
} else {
return null;
}
}
/**
* @return Coded string representation of this cursor.
*/
String encode() {
final ByteString byteString = ByteString.fromString(toJson().toString(), StandardCharsets.UTF_8);
final ByteString compressed = Coder.Deflate.encode(byteString);
return BASE64_URL_ENCODER_WITHOUT_PADDING.encodeToString(compressed.toByteBuffer().array());
}
/**
* Decode the string representation of a cursor.
*
* @param cursorString the string representation.
* @param materializer materializer of actors that will decode the cursor.
* @return future of the decoded cursor.
*/
static CompletionStage<ThingsSearchCursor> decode(final String cursorString,
final ActorMaterializer materializer) {
final ByteString compressed = ByteString.fromArray(Base64.getUrlDecoder().decode(cursorString));
return Coder.Deflate.decode(compressed, materializer)
.thenApply(decompressed -> fromJson(JsonFactory.newObject(decompressed.utf8String())));
}
/**
* Adjust a {@code QueryThings} by the content of an optional cursor.
*
* @param cursor an optional cursor.
* @param queryThings the command to adjust.
* @return the adjusted command if the cursor exists; the unadjusted command if the cursor does not exist.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
static QueryThings adjust(final Optional<ThingsSearchCursor> cursor, final QueryThings queryThings) {
return cursor.map(c -> c.adjustQueryThings(queryThings)).orElse(queryThings);
}
/**
* Adjust a {@code Query} object so that its result starts with the location of an optional cursor.
*
* @param cursor an optional cursor.
* @param query the query to adjust.
* @param cf a criteria factory.
* @return the adjusted {@code Query} if the cursor exists; the unadjusted {@code Query} if the cursor does not
* exist.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
static Query adjust(final Optional<ThingsSearchCursor> cursor, final Query query, final CriteriaFactory cf) {
return cursor.map(c -> c.adjustQuery(query, cf)).orElse(query);
}
/**
* Extract a cursor from a {@code QueryThings} command if any exists.
*
* @param queryThings the command.
* @param materializer materializer of actors that will extract the cursor.
* @return source of an optional cursor if the command has no cursor or has a valid cursor; a failed source if the
* command has an invalid cursor.
*/
static Source<Optional<ThingsSearchCursor>, NotUsed> extractCursor(final QueryThings queryThings,
final ActorMaterializer materializer) {
try {
final List<Option> options = getOptions(queryThings);
final List<CursorOption> cursorOptions = findAll(CursorOption.class, options);
final List<LimitOption> limitOptions = findAll(LimitOption.class, options);
final Optional<InvalidOptionException> sizeOptionError = checkSizeOption(options, queryThings);
if (sizeOptionError.isPresent()) {
return Source.failed(sizeOptionError.get());
} else if (cursorOptions.isEmpty()) {
return Source.single(Optional.empty());
} else if (cursorOptions.size() > 1) {
// there may not be 2 or more cursor options in 1 command.
return Source.failed(invalidCursor("There may not be more than 1 'cursor' option.", queryThings));
} else if (!limitOptions.isEmpty()) {
return Source.failed(invalidCursor(LIMIT_OPTION_FORBIDDEN, queryThings));
} else {
return Source.fromCompletionStage(decode(cursorOptions.get(0).getCursor(), materializer))
.map(cursor -> Optional.of(cursor.override(queryThings, options)));
}
} catch (final ParserException | IllegalArgumentException e) {
return Source.failed(InvalidRqlExpressionException.newBuilder()
.message(e.getMessage())
.cause(e)
.dittoHeaders(queryThings.getDittoHeaders())
.build());
} catch (final Throwable error) {
return Source.failed(error);
}
}
/**
* Augment a search result by the next cursor as needed.
*
* @param queryThings the command that produced the results.
* @param cursor cursor given by the command, if any.
* @param searchResult the search result.
* @param resultList items in the search result.
* @return search result with cursor or next-page-offset or both as appropriate.
*/
static SearchResult processSearchResult(final QueryThings queryThings,
@Nullable final ThingsSearchCursor cursor,
final SearchResult searchResult,
final ResultList<String> resultList) {
if (!findAll(LimitOption.class, getOptions(queryThings)).isEmpty()) {
// do not deliver cursor if "limit" is specified
return searchResult;
} else if (cursor != null) {
// adjust next cursor by search result, do not deliver nextPageOffset
return cursor.searchResultWithExistingCursor(searchResult, resultList);
} else {
// compute new cursor, deliver both
return searchResultWithNewCursor(queryThings, searchResult, resultList);
}
}
/**
* Locate instances of a class within a collection.
*
* @param clazz the class.
* @param collection the collection.
* @param <T> type of the class.
* @return list of all instances of the class in the collection.
*/
private static <T> List<T> findAll(final Class<T> clazz, final Collection<?> collection) {
return collection.stream()
.filter(clazz::isInstance)
.map(clazz::cast)
.collect(Collectors.toList());
}
/**
* Find a unique sort option within a list of options of a cursor. If none exists, then the cursor is corrupted and
* an exception is thrown offering no insight into the cursor's composition.
*
* @param options list of options of a cursor.
* @return the unique sort option.
* @throws org.eclipse.ditto.signals.commands.thingsearch.exceptions.InvalidOptionException if the list contains 0
* or more than 1 sort option.
*/
private static SortOption findUniqueSortOption(final List<Option> options) {
final List<SortOption> sortOptions = findAll(SortOption.class, options);
if (sortOptions.size() == 1) {
return sortOptions.get(0);
} else {
// Cursor corrupted. Offer no more information.
throw invalidCursorBuilder().build();
}
}
/**
* Create a builder for errors due to invalid cursors.
*
* @return the exception builder.
*/
private static DittoRuntimeExceptionBuilder<InvalidOptionException> invalidCursorBuilder() {
return InvalidOptionException.newBuilder()
.message("The option 'cursor' is not valid for the search request.");
}
/**
* Create an exception due to an invalid cursor.
*
* @param description why the cursor is invalid.
* @param withDittoHeaders signal whose headers the exception should retain.
* @return the exception.
*/
private static InvalidOptionException invalidCursor(final String description,
final WithDittoHeaders withDittoHeaders) {
return invalidCursor(description, withDittoHeaders.getDittoHeaders());
}
/**
* Create an exception due to an invalid cursor.
*
* @param description why the cursor is invalid.
* @param dittoHeaders headers of the exception.
* @return the exception.
*/
private static InvalidOptionException invalidCursor(final String description,
final DittoHeaders dittoHeaders) {
return invalidCursorBuilder().description(description)
.dittoHeaders(dittoHeaders)
.build();
}
/**
* Create a cursor from its secret JSON representation.
*
* @param json the JSON representation.
* @return the cursor.
*/
private static ThingsSearchCursor fromJson(final JsonObject json) {
return new ThingsSearchCursor(
json.getValue(JSON_FIELD_SELECTOR).orElse(null),
json.getValue(NAMESPACES).map(ThingsSearchCursor::readNamespaces).orElse(null),
json.getValue(CORRELATION_ID).orElse(null),
RqlOptionParser.parseOptions(json.getValueOrThrow(OPTIONS)),
json.getValue(FILTER).orElse(null),
json.getValueOrThrow(VALUES));
}
/**
* Read a set of namespaces from a JSON array.
*
* @param array the array.
* @return set of namespaces.
*/
private static Set<String> readNamespaces(final JsonArray array) {
return array.stream().map(JsonValue::asString).collect(Collectors.toSet());
}
/**
* Parse options of a {@code QueryThings} command.
*
* @param queryThings the command.
* @return parsed options.
*/
private static List<Option> getOptions(final QueryThings queryThings) {
return queryThings.getOptions()
.map(options -> String.join(",", options))
.map(RqlOptionParser::parseOptions)
.orElse(Collections.emptyList());
}
/**
* Augment a fresh search result (i. e., not obtained via any cursor) by a new cursor if appropriate.
*
* @param queryThings the command that produced the search result.
* @param searchResult the search result.
* @param resultList items in the search result.
* @return the augmented search result.
*/
private static SearchResult searchResultWithNewCursor(final QueryThings queryThings,
final SearchResult searchResult, final ResultList<?> resultList) {
final List<Option> commandOptions = getOptions(queryThings);
final boolean hasLimitOption = !findAll(LimitOption.class, commandOptions).isEmpty();
final boolean hasSizeOption = !findAll(SizeOption.class, commandOptions).isEmpty();
if (hasNextPage(resultList)) {
// there are more results; append cursor and offset as appropriate
final SearchResultBuilder builder = searchResult.toBuilder();
if (hasLimitOption) {
// limit option is present. Do not compute cursor.
builder.cursor(null);
} else {
// limit option is absent. Compute cursor.
final ThingsSearchCursor newCursor = computeNewCursor(queryThings, resultList);
builder.cursor(newCursor.encode());
// size option is present. Remove next-page-offset.
if (hasSizeOption) {
// using size option; do not deliver nextPageOffset
builder.nextPageOffset(null);
}
}
return builder.build();
} else if (hasSizeOption) {
// This is the last page. Size option is present. Remove next-page-offset.
return searchResult.toBuilder().nextPageOffset(null).build();
} else {
// This is the last page. Size option is absent. Retain next-page-offset.
return searchResult;
}
}
/**
* Compute cursor for a {@code QueryThings} without cursor.
*
* @param queryThings the command.
* @param resultList search result produced by the command.
* @return cursor at the end of the search result.
*/
private static ThingsSearchCursor computeNewCursor(final QueryThings queryThings, final ResultList<?> resultList) {
return new ThingsSearchCursor(queryThings.getFields().map(JsonFieldSelector::toString).orElse(null),
queryThings.getNamespaces().orElse(null),
queryThings.getDittoHeaders().getCorrelationId().orElse(null),
computeOptionsForNewCursor(queryThings),
queryThings.getFilter().orElse(null),
resultList.lastResultSortValues().orElse(JsonArray.empty()));
}
/**
* Compute options for a new cursor with special attention to ensure that at least one sort dimension is non-null
* for all things.
*
* @param queryThings the command.
* @return the options for the new cursor.
*/
private static List<Option> computeOptionsForNewCursor(final QueryThings queryThings) {
return new OptionsBuilder().visitAll(getOptions(queryThings))
.withSortOption(sortOptionForNewCursor(queryThings))
.build();
}
/**
* Compute the sort option for a new cursor such that at least one dimension is non-null for all things.
*
* @param queryThings the command.
* @return the sort option for the new cursor.
*/
private static SortOption sortOptionForNewCursor(final QueryThings queryThings) {
final List<SortOption> sortOptions = findAll(SortOption.class, getOptions(queryThings));
final List<SortOptionEntry> entries =
sortOptions.isEmpty() ? Collections.emptyList() : sortOptions.get(0).getEntries();
return SortOption.of(ensureDefaultPropertyPath(entries));
}
/**
* Append default sort entry if a command's sort option does not include thing ID.
*
* @param entries sort option entries of a command.
* @return augmented sort option entries.
*/
private static List<SortOptionEntry> ensureDefaultPropertyPath(final List<SortOptionEntry> entries) {
final JsonPointer defaultPropertyPath = DEFAULT_SORT_OPTION_ENTRY.getPropertyPath();
final OptionalInt thingIdEntry = IntStream.range(0, entries.size())
.filter(i -> Objects.equals(defaultPropertyPath, entries.get(i).getPropertyPath()))
.findFirst();
if (thingIdEntry.isPresent()) {
return entries.subList(0, thingIdEntry.getAsInt() + 1);
} else {
final List<SortOptionEntry> augmentedEntries = new ArrayList<>(entries);
augmentedEntries.add(DEFAULT_SORT_OPTION_ENTRY);
return augmentedEntries;
}
}
/**
* Filter out results before a cursor's position.
*
* @param sortOptions sort options of the parsed query.
* @param previousValues values of the fields in the sort options of a cursor marking its position.
* @param cf a criteria factory.
* @return criteria to filter out results before a cursor's position.
*/
private static Criteria getNextPageFilter(final List<org.eclipse.ditto.model.query.SortOption> sortOptions,
final JsonArray previousValues,
final CriteriaFactory cf) {
if (sortOptions.size() != previousValues.getSize()) {
// this should not happen.
throw invalidCursorBuilder().build();
}
return getNextPageFilterImpl(sortOptions, previousValues, cf, 0);
}
/**
* Recursive implementation of {@code getNextPageFilter}.
*
* @param sortOptionEntries sort options of the parsed query.
* @param previousValues values of the fields in the sort options of a cursor marking its position.
* @param cf a criteria factory.
* @param i dimension to start generating criteria for.
* @return criteria starting from the ith dimension.
*/
private static Criteria getNextPageFilterImpl(
final List<org.eclipse.ditto.model.query.SortOption> sortOptionEntries,
final JsonArray previousValues,
final CriteriaFactory cf, final int i) {
final org.eclipse.ditto.model.query.SortOption sortOption = sortOptionEntries.get(i);
final JsonValue previousValue = previousValues.get(i).orElse(JsonFactory.nullLiteral());
final Criteria ithDimensionCriteria = getDimensionLtCriteria(sortOption, previousValue, cf);
if (i + 1 >= sortOptionEntries.size()) {
return ithDimensionCriteria;
} else {
final Criteria nextDimension = getNextPageFilterImpl(sortOptionEntries, previousValues, cf, i + 1);
return getNextDimensionCriteria(ithDimensionCriteria, nextDimension, sortOption, previousValue, cf);
}
}
/**
* Generate a criteria to filter for things whose value on a field prior to a cursor's position according to
* the ordering specified by a sort option.
*
* @param entry sort option specifying an ordering on a field.
* @param previousValue value of the field in the sort option marking the position of a cursor.
* @param cf a criteria factory.
* @return criteria to filter for things prior to a cursor's position on the specified field.
*/
private static Criteria getDimensionLtCriteria(final org.eclipse.ditto.model.query.SortOption entry,
final JsonValue previousValue, final CriteriaFactory cf) {
// special handling for null values needed due to comparison operators never matching null values
if (entry.getSortDirection() == SortDirection.ASC) {
if (previousValue.isNull()) {
// ASC null: any value is bigger than null
return cf.existsCriteria(entry.getSortExpression());
} else {
// ASC nonnull: null values cannot be bigger and can be ignored
return cf.fieldCriteria(entry.getSortExpression(), cf.gt(JsonToBson.convert(previousValue)));
}
} else {
if (previousValue.isNull()) {
// DESC null: smaller than null means false
return cf.nor(cf.any());
} else {
// DESC nonnull: null is smaller than any value
return cf.or(Arrays.asList(
cf.fieldCriteria(entry.getSortExpression(), cf.lt(JsonToBson.convert(previousValue))),
cf.nor(cf.existsCriteria(entry.getSortExpression()))
));
}
}
}
/**
* Generate a criteria to filter for things that precede the cursor's position due to this dimension or subsequent
* dimensions taking null values into account.
*
* @param thisDimensionLt criteria to filter for things prior to the cursor's position on this dimension.
* @param nextDimension criteria to filter for things prior to the cursor's position on subsequent dimensions.
* @param sortOption parsed sort option for this dimension.
* @param previousValue value on this dimension marking the position of the cursor.
* @param cf a criteria factory.
* @return criteria to filter for things that precede the cursor's position due to this dimension or subsequent
* dimensions.
*/
private static Criteria getNextDimensionCriteria(final Criteria thisDimensionLt, final Criteria nextDimension,
final org.eclipse.ditto.model.query.SortOption sortOption, final JsonValue previousValue,
final CriteriaFactory cf) {
final Criteria thisDimensionEq;
if (previousValue.isNull()) {
thisDimensionEq = cf.or(Arrays.asList(
cf.nor(cf.existsCriteria(sortOption.getSortExpression())),
cf.fieldCriteria(sortOption.getSortExpression(), cf.eq(null))
));
} else {
thisDimensionEq =
cf.fieldCriteria(sortOption.getSortExpression(), cf.eq(JsonToBson.convert(previousValue)));
}
return cf.or(Arrays.asList(thisDimensionLt, cf.and(Arrays.asList(thisDimensionEq, nextDimension))));
}
/**
* Test whether there are more results.
*
* @param resultList items of a search result.
* @return whether there are more results.
*/
private static boolean hasNextPage(final ResultList<?> resultList) {
return resultList.lastResultSortValues().isPresent();
}
/**
* Test if sort options of a cursor is compatible with sort options of a command starting from a dimension.
*
* @param cursorSortOptionEntries sort options of a cursor.
* @param commandSortOptionEntries sort options of a command.
* @param i the dimension to start the check from.
* @return whether the sort options are compatible.
*/
private static boolean areCompatible(final List<SortOptionEntry> cursorSortOptionEntries,
final List<SortOptionEntry> commandSortOptionEntries,
final int i) {
if (i >= cursorSortOptionEntries.size() && i >= commandSortOptionEntries.size()) {
// all dimensions checked; sort options are compatible.
return true;
} else if (i >= commandSortOptionEntries.size()) {
// command sort options have fewer dimensions; ensure cursor sort options are obtained by appending default.
return i + 1 == cursorSortOptionEntries.size() &&
DEFAULT_SORT_OPTION_ENTRY.equals(cursorSortOptionEntries.get(i));
} else if (i >= cursorSortOptionEntries.size()) {
// cursor sort options have fewer dimensions; incompatible.
return false;
} else {
// check this dimension has equal sort option entries and recurse onto subsequent dimensions
final SortOptionEntry cursorEntry = cursorSortOptionEntries.get(i);
final boolean isThisDimensionEq =
Objects.equals(cursorEntry, commandSortOptionEntries.get(i));
if (isThisDimensionEq) {
final boolean isThisDimensionBijective =
Objects.equals(DEFAULT_SORT_OPTION_ENTRY.getPropertyPath(), cursorEntry.getPropertyPath());
return isThisDimensionBijective ||
areCompatible(cursorSortOptionEntries, commandSortOptionEntries, i + 1);
} else {
return false;
}
}
}
/**
* Combine correlation IDs between the cursor and the command headers.
*
* @param cursorCorrelationId correlationId of a cursor.
* @param commandHeaders headers from a {@code QueryThings} command.
* @return a merged correlation ID.
*/
@Nullable
private static String combineCorrelationIds(@Nullable final String cursorCorrelationId,
final DittoHeaders commandHeaders) {
return Optional.ofNullable(cursorCorrelationId)
.map(encodedCID -> commandHeaders.getCorrelationId()
.map(newCID -> encodedCID + CORRELATION_ID_DELIMITER + newCID)
.orElse(encodedCID))
.orElse(null);
}
/**
* Check that at most 1 size option is given and its argument is positive.
*
* @param options options of a {@code QueryThings} command.
* @param withDittoHeaders headers of the command.
* @return empty optional if any size option is valid, or the reason why they are not.
*/
private static Optional<InvalidOptionException> checkSizeOption(final List<Option> options,
final WithDittoHeaders withDittoHeaders) {
final List<SizeOption> sizeOptions = findAll(SizeOption.class, options);
if (sizeOptions.size() > 1) {
return Optional.of(invalidCursorBuilder()
.message("There may not be more than 1 'size' option.")
.dittoHeaders(withDittoHeaders.getDittoHeaders())
.build());
} else if (!sizeOptions.isEmpty() && sizeOptions.get(0).getSize() <= 0) {
return Optional.of(invalidCursorBuilder()
.message("The option 'size' must be a positive integer.")
.dittoHeaders(withDittoHeaders.getDittoHeaders())
.build());
} else {
return Optional.empty();
}
}
/**
* Visitor to set size and sort options.
*/
private static final class OptionsBuilder implements OptionVisitor {
@Nullable private SortOption sortOption;
@Nullable private SizeOption sizeOption;
private OptionsBuilder visitAll(final List<Option> startingOptions) {
startingOptions.forEach(option -> option.accept(this));
return this;
}
private OptionsBuilder withSortOption(final SortOption sortOption) {
this.sortOption = sortOption;
return this;
}
@Override
public void visit(final LimitOption limitOption) {
// should not happen
throw invalidCursorBuilder().build();
}
@Override
public void visit(final SortOption sortOption) {
this.sortOption = sortOption;
}
@Override
public void visit(final CursorOption cursorOption) {
// do nothing; cursor options are ignored
}
@Override
public void visit(final SizeOption sizeOption) {
this.sizeOption = sizeOption;
}
private List<Option> build() {
return Stream.of(sortOption, sizeOption)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
}