-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
ElasticsearchJson.java
704 lines (585 loc) · 20.6 KB
/
ElasticsearchJson.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.elasticsearch;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import static java.util.Collections.unmodifiableMap;
/**
* Internal objects (and deserializers) used to parse Elasticsearch results
* (which are in JSON format).
*
* <p>Since we're using basic row-level rest client http response has to be
* processed manually using JSON (jackson) library.
*/
final class ElasticsearchJson {
private ElasticsearchJson() {}
/**
* Visits leaves of the aggregation where all values are stored.
*/
static void visitValueNodes(Aggregations aggregations, Consumer<Map<String, Object>> consumer) {
Objects.requireNonNull(aggregations, "aggregations");
Objects.requireNonNull(consumer, "consumer");
Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
BiConsumer<RowKey, MultiValue> cons = (r, v) ->
rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
aggregations.forEach(a -> visitValueNodes(a, new ArrayList<>(), cons));
rows.forEach((k, v) -> {
if (v.stream().anyMatch(val -> val instanceof GroupValue)) {
v.forEach(tuple -> {
Map<String, Object> groupRow = new LinkedHashMap<>(k.keys);
groupRow.put(tuple.getName(), tuple.value());
consumer.accept(groupRow);
});
} else {
Map<String, Object> row = new LinkedHashMap<>(k.keys);
v.forEach(val -> row.put(val.getName(), val.value()));
consumer.accept(row);
}
});
}
/**
* Visits Elasticsearch
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html">mapping
* properties</a> and calls consumer for each {@code field / type} pair.
* Nested fields are represented as {@code foo.bar.qux}.
*/
static void visitMappingProperties(ObjectNode mapping,
BiConsumer<String, String> consumer) {
Objects.requireNonNull(mapping, "mapping");
Objects.requireNonNull(consumer, "consumer");
visitMappingProperties(new ArrayDeque<>(), mapping, consumer);
}
private static void visitMappingProperties(Deque<String> path,
ObjectNode mapping, BiConsumer<String, String> consumer) {
Objects.requireNonNull(mapping, "mapping");
if (mapping.isMissingNode()) {
return;
}
if (mapping.has("properties")) {
// recurse
visitMappingProperties(path, (ObjectNode) mapping.get("properties"), consumer);
return;
}
if (mapping.has("type")) {
// this is leaf (register field / type mapping)
consumer.accept(String.join(".", path), mapping.get("type").asText());
return;
}
// otherwise continue visiting mapping(s)
Iterable<Map.Entry<String, JsonNode>> iter = mapping::fields;
for (Map.Entry<String, JsonNode> entry : iter) {
final String name = entry.getKey();
final ObjectNode node = (ObjectNode) entry.getValue();
path.add(name);
visitMappingProperties(path, node, consumer);
path.removeLast();
}
}
/**
* Identifies a calcite row (as in relational algebra)
*/
private static class RowKey {
private final Map<String, Object> keys;
private final int hashCode;
private RowKey(final Map<String, Object> keys) {
this.keys = Objects.requireNonNull(keys, "keys");
this.hashCode = Objects.hashCode(keys);
}
private RowKey(List<Bucket> buckets) {
this(toMap(buckets));
}
private static Map<String, Object> toMap(Iterable<Bucket> buckets) {
return StreamSupport.stream(buckets.spliterator(), false)
.collect(LinkedHashMap::new,
(m, v) -> m.put(v.getName(), v.key()),
LinkedHashMap::putAll);
}
@Override public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final RowKey rowKey = (RowKey) o;
return hashCode == rowKey.hashCode
&& Objects.equals(keys, rowKey.keys);
}
@Override public int hashCode() {
return this.hashCode;
}
}
private static void visitValueNodes(Aggregation aggregation, List<Bucket> parents,
BiConsumer<RowKey, MultiValue> consumer) {
if (aggregation instanceof MultiValue) {
// this is a leaf. publish value of the row.
RowKey key = new RowKey(parents);
consumer.accept(key, (MultiValue) aggregation);
return;
}
if (aggregation instanceof Bucket) {
Bucket bucket = (Bucket) aggregation;
if (bucket.hasNoAggregations()) {
// bucket with no aggregations is also considered a leaf node
visitValueNodes(GroupValue.of(bucket.getName(), bucket.key()), parents, consumer);
return;
}
parents.add(bucket);
bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
parents.remove(parents.size() - 1);
} else if (aggregation instanceof HasAggregations) {
HasAggregations children = (HasAggregations) aggregation;
children.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
} else if (aggregation instanceof MultiBucketsAggregation) {
MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation;
multi.buckets().forEach(b -> visitValueNodes(b, parents, consumer));
}
}
/**
* Response from Elastic
*/
@JsonIgnoreProperties(ignoreUnknown = true)
static class Result {
private final SearchHits hits;
private final Aggregations aggregations;
private final String scrollId;
private final long took;
/**
* Constructor for this instance.
* @param hits list of matched documents
* @param took time taken (in took) for this query to execute
*/
@JsonCreator
Result(@JsonProperty("hits") SearchHits hits,
@JsonProperty("aggregations") Aggregations aggregations,
@JsonProperty("_scroll_id") String scrollId,
@JsonProperty("took") long took) {
this.hits = Objects.requireNonNull(hits, "hits");
this.aggregations = aggregations;
this.scrollId = scrollId;
this.took = took;
}
SearchHits searchHits() {
return hits;
}
Aggregations aggregations() {
return aggregations;
}
Duration took() {
return Duration.ofMillis(took);
}
Optional<String> scrollId() {
return Optional.ofNullable(scrollId);
}
}
/**
* Similar to {@code SearchHits} in ES. Container for {@link SearchHit}
*/
@JsonIgnoreProperties(ignoreUnknown = true)
static class SearchHits {
private final long total;
private final List<SearchHit> hits;
@JsonCreator
SearchHits(@JsonProperty("total")final long total,
@JsonProperty("hits") final List<SearchHit> hits) {
this.total = total;
this.hits = Objects.requireNonNull(hits, "hits");
}
public List<SearchHit> hits() {
return this.hits;
}
public long total() {
return total;
}
}
/**
* Concrete result record which matched the query. Similar to {@code SearchHit} in ES.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
static class SearchHit {
/**
* ID of the document (not available in aggregations)
*/
private final String id;
private final Map<String, Object> source;
private final Map<String, Object> fields;
@JsonCreator
SearchHit(@JsonProperty(ElasticsearchConstants.ID) final String id,
@JsonProperty("_source") final Map<String, Object> source,
@JsonProperty("fields") final Map<String, Object> fields) {
this.id = Objects.requireNonNull(id, "id");
// both can't be null
if (source == null && fields == null) {
final String message = String.format(Locale.ROOT,
"Both '_source' and 'fields' are missing for %s", id);
throw new IllegalArgumentException(message);
}
// both can't be non-null
if (source != null && fields != null) {
final String message = String.format(Locale.ROOT,
"Both '_source' and 'fields' are populated (non-null) for %s", id);
throw new IllegalArgumentException(message);
}
this.source = source;
this.fields = fields;
}
/**
* Returns id of this hit (usually document id)
* @return unique id
*/
public String id() {
return id;
}
Object valueOrNull(String name) {
Objects.requireNonNull(name, "name");
if (fields != null && fields.containsKey(name)) {
Object field = fields.get(name);
if (field instanceof Iterable) {
// return first element (or null)
Iterator<?> iter = ((Iterable<?>) field).iterator();
return iter.hasNext() ? iter.next() : null;
}
return field;
}
return valueFromPath(source, name);
}
/**
* Returns property from nested maps given a path like {@code a.b.c}.
* @param map current map
* @param path field path(s), optionally with dots ({@code a.b.c}).
* @return value located at path {@code path} or {@code null} if not found.
*/
private static Object valueFromPath(Map<String, Object> map, String path) {
if (map == null) {
return null;
}
if (map.containsKey(path)) {
return map.get(path);
}
// maybe pattern of type a.b.c
final int index = path.indexOf('.');
if (index == -1) {
return null;
}
final String prefix = path.substring(0, index);
final String suffix = path.substring(index + 1);
Object maybeMap = map.get(prefix);
if (maybeMap instanceof Map) {
return valueFromPath((Map<String, Object>) maybeMap, suffix);
}
return null;
}
Map<String, Object> source() {
return source;
}
Map<String, Object> fields() {
return fields;
}
Map<String, Object> sourceOrFields() {
return source != null ? source : fields;
}
}
/**
* {@link Aggregation} container.
*/
@JsonDeserialize(using = AggregationsDeserializer.class)
static class Aggregations implements Iterable<Aggregation> {
private final List<? extends Aggregation> aggregations;
private Map<String, Aggregation> aggregationsAsMap;
Aggregations(List<? extends Aggregation> aggregations) {
this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
}
/**
* Iterates over the {@link Aggregation}s.
*/
@Override public final Iterator<Aggregation> iterator() {
return asList().iterator();
}
/**
* The list of {@link Aggregation}s.
*/
final List<Aggregation> asList() {
return Collections.unmodifiableList(aggregations);
}
/**
* Returns the {@link Aggregation}s keyed by aggregation name. Lazy init.
*/
final Map<String, Aggregation> asMap() {
if (aggregationsAsMap == null) {
Map<String, Aggregation> map = new LinkedHashMap<>(aggregations.size());
for (Aggregation aggregation : aggregations) {
map.put(aggregation.getName(), aggregation);
}
this.aggregationsAsMap = unmodifiableMap(map);
}
return aggregationsAsMap;
}
/**
* Returns the aggregation that is associated with the specified name.
*/
@SuppressWarnings("unchecked")
public final <A extends Aggregation> A get(String name) {
return (A) asMap().get(name);
}
@Override public final boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
return aggregations.equals(((Aggregations) obj).aggregations);
}
@Override public final int hashCode() {
return Objects.hash(getClass(), aggregations);
}
}
/**
* Identifies all aggregations
*/
interface Aggregation {
/**
* @return The name of this aggregation.
*/
String getName();
}
/**
* Allows traversing aggregations tree
*/
interface HasAggregations {
Aggregations getAggregations();
}
/**
* An aggregation that returns multiple buckets
*/
static class MultiBucketsAggregation implements Aggregation {
private final String name;
private final List<Bucket> buckets;
MultiBucketsAggregation(final String name,
final List<Bucket> buckets) {
this.name = name;
this.buckets = buckets;
}
/**
* @return The buckets of this aggregation.
*/
List<Bucket> buckets() {
return buckets;
}
@Override public String getName() {
return name;
}
}
/**
* A bucket represents a criteria to which all documents that fall in it adhere to.
* It is also uniquely identified
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
*/
static class Bucket implements HasAggregations, Aggregation {
private final Object key;
private final String name;
private final Aggregations aggregations;
Bucket(final Object key,
final String name,
final Aggregations aggregations) {
this.key = key; // key can be set after construction
this.name = Objects.requireNonNull(name, "name");
this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
}
/**
* @return The key associated with the bucket
*/
Object key() {
return key;
}
/**
* @return The key associated with the bucket as a string
*/
String keyAsString() {
return Objects.toString(key());
}
/**
* Means current bucket has no aggregations.
*/
boolean hasNoAggregations() {
return aggregations.asList().isEmpty();
}
/**
* @return The sub-aggregations of this bucket
*/
@Override public Aggregations getAggregations() {
return aggregations;
}
@Override public String getName() {
return name;
}
}
/**
* Multi value aggregatoin like
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html">Stats</a>
*/
static class MultiValue implements Aggregation {
private final String name;
private final Map<String, Object> values;
MultiValue(final String name, final Map<String, Object> values) {
this.name = Objects.requireNonNull(name, "name");
this.values = Objects.requireNonNull(values, "values");
}
@Override public String getName() {
return name;
}
Map<String, Object> values() {
return values;
}
/**
* For single value. Returns single value represented by this leaf aggregation.
* @return value corresponding to {@code value}
*/
Object value() {
if (!values().containsKey("value")) {
String message = String.format(Locale.ROOT, "'value' field not present in "
+ "%s aggregation", getName());
throw new IllegalStateException(message);
}
return values().get("value");
}
}
/**
* Distinguishes from {@link MultiValue}.
* In order that rows which have the same key can be put into result map.
*/
static class GroupValue extends MultiValue {
GroupValue(String name, Map<String, Object> values) {
super(name, values);
}
/**
* Constructs a {@link GroupValue} instance with a single value.
*/
static GroupValue of(String name, Object value) {
return new GroupValue(name, Collections.singletonMap("value", value));
}
}
/**
* Allows to de-serialize nested aggregation structures.
*/
static class AggregationsDeserializer extends StdDeserializer<Aggregations> {
private static final Set<String> IGNORE_TOKENS =
ImmutableSet.of("meta", "buckets", "value", "values", "value_as_string",
"doc_count", "key", "key_as_string");
AggregationsDeserializer() {
super(Aggregations.class);
}
@Override public Aggregations deserialize(final JsonParser parser,
final DeserializationContext ctxt)
throws IOException {
ObjectNode node = parser.getCodec().readTree(parser);
return parseAggregations(parser, node);
}
private static Aggregations parseAggregations(JsonParser parser, ObjectNode node)
throws JsonProcessingException {
List<Aggregation> aggregations = new ArrayList<>();
Iterable<Map.Entry<String, JsonNode>> iter = node::fields;
for (Map.Entry<String, JsonNode> entry : iter) {
final String name = entry.getKey();
final JsonNode value = entry.getValue();
Aggregation agg = null;
if (value.has("buckets")) {
agg = parseBuckets(parser, name, (ArrayNode) value.get("buckets"));
} else if (value.isObject() && !IGNORE_TOKENS.contains(name)) {
// leaf
agg = parseValue(parser, name, (ObjectNode) value);
}
if (agg != null) {
aggregations.add(agg);
}
}
return new Aggregations(aggregations);
}
private static MultiValue parseValue(JsonParser parser, String name, ObjectNode node)
throws JsonProcessingException {
return new MultiValue(name, parser.getCodec().treeToValue(node, Map.class));
}
private static Aggregation parseBuckets(JsonParser parser, String name, ArrayNode nodes)
throws JsonProcessingException {
List<Bucket> buckets = new ArrayList<>(nodes.size());
for (JsonNode b: nodes) {
buckets.add(parseBucket(parser, name, (ObjectNode) b));
}
return new MultiBucketsAggregation(name, buckets);
}
/**
* Determines if current key is a missing field key. Missing key is returned when document
* does not have pivoting attribute (example {@code GROUP BY _MAP['a.b.missing']}). It helps
* grouping documents which don't have a field. In relational algebra this
* would normally be {@code null}.
*
* <p>Please note that missing value is different for each type.
*
* @param key current {@code key} (usually string) as returned by ES
* @return {@code true} if this value
*/
private static boolean isMissingBucket(JsonNode key) {
return ElasticsearchMapping.Datatype.isMissingValue(key);
}
private static Bucket parseBucket(JsonParser parser, String name, ObjectNode node)
throws JsonProcessingException {
if (!node.has("key")) {
throw new IllegalArgumentException("No 'key' attribute for " + node);
}
final JsonNode keyNode = node.get("key");
final Object key;
if (isMissingBucket(keyNode) || keyNode.isNull()) {
key = null;
} else if (keyNode.isTextual()) {
key = keyNode.textValue();
} else if (keyNode.isNumber()) {
key = keyNode.numberValue();
} else if (keyNode.isBoolean()) {
key = keyNode.booleanValue();
} else {
// don't usually expect keys to be Objects
key = parser.getCodec().treeToValue(node, Map.class);
}
return new Bucket(key, name, parseAggregations(parser, node));
}
}
}
// End ElasticsearchJson.java