/
conversion.clj
1482 lines (1335 loc) · 53.8 KB
/
conversion.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
;; Copyright (c) 2011-2019 Michael S. Klishin, Alex Petrov, and the ClojureWerkz Team
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns clojurewerkz.elastisch.native.conversion
(:refer-clojure :exclude [get merge flush])
(:require [clojure.walk :as wlk]
[cheshire.core :as json]
[clojurewerkz.elastisch.native.conversion-stats :as cnv-stats])
(:import (org.elasticsearch.common.settings Settings Settings$Builder)
(org.elasticsearch.common.transport TransportAddress InetSocketTransportAddress LocalTransportAddress)
java.util.Map
clojure.lang.IPersistentMap
java.net.InetAddress
org.elasticsearch.client.Client
org.elasticsearch.common.xcontent.XContentType
org.elasticsearch.index.VersionType
(org.elasticsearch.search.highlight HighlightBuilder HighlightBuilder$Field HighlightField)
org.elasticsearch.common.text.Text
;; Actions
org.elasticsearch.action.ShardOperationFailedException
(org.elasticsearch.action.index IndexRequest IndexRequest$OpType IndexResponse)
(org.elasticsearch.index.get GetResult)
(org.elasticsearch.action.get GetRequest GetResponse MultiGetRequest MultiGetResponse MultiGetItemResponse)
(org.elasticsearch.action.delete DeleteRequest DeleteResponse)
(org.elasticsearch.action.update UpdateRequest UpdateResponse)
(org.elasticsearch.action.count CountRequest)
(org.elasticsearch.action.search SearchRequest SearchResponse SearchScrollRequest
MultiSearchAction MultiSearchRequest MultiSearchResponse MultiSearchResponse$Item)
(org.elasticsearch.action.suggest SuggestResponse)
(org.elasticsearch.search.suggest.completion CompletionSuggestionBuilder
CompletionSuggestionFuzzyBuilder)
(org.elasticsearch.common.unit Fuzziness) ;;for CompletionSuggestionFuzzyBuilder
;(org.elastisearch.search.suggest.phrase PhraseSuggestionBuilder)
(org.elasticsearch.search.suggest.term TermSuggestionBuilder)
(org.elasticsearch.search.builder SearchSourceBuilder)
(org.elasticsearch.search.sort SortBuilder SortOrder FieldSortBuilder)
(org.elasticsearch.search SearchHits SearchHit)
(org.elasticsearch.action.percolate PercolateResponse PercolateResponse$Match)
;; Aggregations
org.elasticsearch.search.aggregations.Aggregations
org.elasticsearch.search.aggregations.metrics.avg.Avg
org.elasticsearch.search.aggregations.metrics.max.Max
org.elasticsearch.search.aggregations.metrics.min.Min
org.elasticsearch.search.aggregations.metrics.sum.Sum
org.elasticsearch.search.aggregations.metrics.tophits.TopHits
(org.elasticsearch.search.aggregations.metrics.percentiles Percentiles Percentile)
org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality
org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount
org.elasticsearch.search.aggregations.metrics.stats.Stats
org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats
(org.elasticsearch.search.aggregations.bucket.histogram Histogram Histogram$Bucket)
(org.elasticsearch.search.aggregations.bucket.range Range Range$Bucket)
(org.elasticsearch.search.aggregations.bucket.terms Terms Terms$Bucket)
org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation
(org.elasticsearch.search.aggregations HasAggregations)
;; Administrative Actions
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
org.elasticsearch.action.admin.indices.create.CreateIndexRequest
org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
(org.elasticsearch.action.admin.indices.mapping.get GetMappingsRequest GetMappingsResponse)
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest
(org.elasticsearch.action.admin.indices.stats IndicesStatsRequest IndicesStatsResponse)
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest
org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse
org.elasticsearch.action.admin.indices.open.OpenIndexRequest
org.elasticsearch.action.admin.indices.close.CloseIndexRequest
(org.elasticsearch.action.admin.indices.forcemerge ForceMergeRequest)
org.elasticsearch.action.admin.indices.flush.FlushRequest
org.elasticsearch.action.admin.indices.refresh.RefreshRequest
org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest
(org.elasticsearch.action.admin.indices.segments IndicesSegmentsRequest IndicesSegmentResponse IndexSegments)
(org.elasticsearch.action.admin.indices.alias.get GetAliasesResponse GetAliasesRequest)
org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest
org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest
com.carrotsearch.hppc.cursors.ObjectObjectCursor
org.elasticsearch.common.collect.ImmutableOpenMap
org.elasticsearch.cluster.metadata.AliasMetaData
org.elasticsearch.cluster.metadata.MappingMetaData
org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest
org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest
org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest
org.elasticsearch.action.support.broadcast.BroadcastResponse
org.elasticsearch.action.support.master.AcknowledgedResponse
org.elasticsearch.search.fetch.source.FetchSourceContext
;; Bulk responses
(org.elasticsearch.action.bulk BulkResponse BulkItemResponse)))
;;
;; Implementation
;;
(defn ^{:tag "[Ljava.lang.String;"} ->string-array
"Coerces argument to an array of strings"
[index-name]
(if (coll? index-name)
(into-array String (map name index-name))
(into-array String [(name index-name)])))
(defprotocol DeepMapConversion
(deep-java-map->map [o]))
(extend-protocol DeepMapConversion
ImmutableOpenMap
(deep-java-map->map [o]
;; The iterator implementation for ImmutableOpenMap is, ironically, a mutable
;; cursor - each call to .next() will return the same cursor instance. We must
;; ensure that it is consumed eagerly.
(->> o
(into {} (map (fn [^ObjectObjectCursor cursor]
[(.key cursor) (.value cursor)])))
(deep-java-map->map)))
java.util.Map
(deep-java-map->map [o]
(reduce (fn [m [^String k v]]
(assoc m (keyword k) (deep-java-map->map v)))
{}
(.entrySet o)))
java.util.List
(deep-java-map->map [o]
(mapv deep-java-map->map o))
java.lang.Object
(deep-java-map->map [o] o)
nil
(deep-java-map->map [_] nil))
(def ^{:const true}
default-content-type XContentType/JSON)
(defprotocol XContentTypeConversion
(^{:tag XContentType, :doc/format :markdown} to-content-type [input] "Picks a content type for given input"))
(extend-protocol XContentTypeConversion
clojure.lang.Named
(to-content-type [input]
(to-content-type (name input)))
String
(to-content-type [^String s]
(case (.toLowerCase s)
"application/json" XContentType/JSON
"text/json" XContentType/JSON
"json" XContentType/JSON
"application/smile" XContentType/SMILE
"smile" XContentType/SMILE
XContentType/JSON))
XContentType
(to-content-type [input]
input))
(defprotocol VersionTypeConversion
(^{:tag VersionType} to-version-type [input] "Picks a content type for given input"))
(extend-protocol VersionTypeConversion
clojure.lang.Named
(to-version-type [input]
(to-version-type (name input)))
String
(to-version-type [^String s]
(case (.toLowerCase s)
"internal" VersionType/INTERNAL
"external" VersionType/EXTERNAL
VersionType/INTERNAL))
VersionType
(to-version-type [input]
input))
;;
;; API
;;
;;
;; Settings
;;
(defn ^Settings ->settings
"Converts a Clojure map into immutable Elasticsearch settings"
[m]
(if m
(let [^Settings$Builder sb (Settings/builder)]
(doseq [[k v] m]
(.put sb ^String (name k) v))
(.build sb))
Settings$Builder/EMPTY_SETTINGS))
;;
;; Transports
;;
(defn ^TransportAddress ->socket-transport-address
[^String host ^{:tag "long"} port]
(InetSocketTransportAddress. (InetAddress/getByName host) port))
(defn ^TransportAddress ->local-transport-address
[^String id]
(LocalTransportAddress. id))
;;
;; Indexing
;;
(defn ^IndexRequest ->index-request
"Builds an index action request"
([index mapping-type ^Map doc]
;; default content type used by IndexRequest is JSON. MK.
(-> (IndexRequest. (name index) (name mapping-type))
(.source ^String (json/encode doc))))
;; non-variadic because it is more convenient and efficient to
;; invoke this internal implementation fn this way. MK.
([index mapping-type ^Map doc {:keys [id
routing
parent
timestamp
ttl
op-type
refresh
version
version-type
content-type]}]
(let [ir (-> (IndexRequest. (name index) (name mapping-type))
(.source ^String (json/encode doc)))]
(when id
(.id ^IndexRequest ir ^String id))
(when content-type
(.contentType ^IndexRequest ir (to-content-type content-type)))
(when routing
(.routing ^IndexRequest ir ^String routing))
(when parent
(.parent ^IndexRequest ir ^String parent))
(when timestamp
(.timestamp ^IndexRequest ir timestamp))
(when ttl
(.ttl ir ttl))
(when op-type
(.opType ^IndexRequest ir (IndexRequest$OpType/fromString (.toLowerCase (name op-type)))))
(when refresh
(.refresh ^IndexRequest ir refresh))
(when version
(.version ^IndexRequest ir version))
(when version-type
(.versionType ^IndexRequest ir (to-version-type version-type)))
ir)))
(defn ^IPersistentMap index-response->map
"Converts an index action response to a Clojure map"
[^IndexResponse r]
;; underscored aliases are there to match REST API responses
{:index (.getIndex r)
:_index (.getIndex r)
:id (.getId r)
:_id (.getId r)
:type (.getType r)
:_type (.getType r)
:version (.getVersion r)
:_version (.getVersion r)})
;;
;; Actions
;;
(defn ^GetRequest ->get-request
"Builds a get action request"
([index mapping-type ^String id]
(GetRequest. (name index) (name mapping-type) id))
([index mapping-type ^String id {:keys [parent preference
routing fields _source]}]
(let [gr (GetRequest. (name index) (name mapping-type) id)]
(when routing
(.routing gr routing))
(when parent
(.parent gr parent))
(when preference
(.preference gr preference))
(when fields
(.fields gr (into-array String fields)))
(when _source
(let [^{:tag "[Ljava.lang.String;"} exclude (when (:exclude _source)
(into-array String (:exclude _source)))
^{:tag "[Ljava.lang.String;"} include (when (or (not exclude) ; either include = _source
(:include _source)) ; or it's given explicitly
(into-array String (:include _source _source)))]
(.fetchSourceContext gr (FetchSourceContext. include exclude))))
gr)))
(defn- convert-source-result
"Copied from clj-elasticsearch. More performant than doing `wlk/keywordize-keys`."
[src]
(cond
(instance? java.util.HashMap src) (into {}
(map (fn [^java.util.Map$Entry e]
[(keyword (.getKey e))
(convert-source-result (.getValue e))]) src))
(instance? java.util.ArrayList src) (into [] (map convert-source-result src))
:else src))
(defn- convert-fields-result
"Get fields from search result, i.e. when filtering returned fields."
[fields]
(cond
(instance? java.util.Map fields) (into {} (map (fn [^java.util.Map$Entry e]
[(keyword (.getKey e))
(.. e getValue getValues)]) fields))
:else fields))
(defn ^IPersistentMap get-response->map
[^GetResponse r]
(let [s (convert-source-result (.getSourceAsMap r))]
;; underscored aliases are there to match REST API responses
{:exists? (.isExists r)
:exists (.isExists r)
:index (.getIndex r)
:_index (.getIndex r)
:type (.getType r)
:_type (.getType r)
:id (.getId r)
:_id (.getId r)
:version (.getVersion r)
:_version (.getVersion r)
:empty? (.isSourceEmpty r)
:source s
:_source s
;; TODO: convert GetFields to maps
:fields (into {} (.getFields r))}))
(defn ^IPersistentMap get-result->map
[^GetResult r]
(let [s (convert-source-result (.sourceAsMap r))]
{:exists? (.isExists r)
:exists (.isExists r)
:index (.getIndex r)
:_index (.getIndex r)
:type (.getType r)
:_type (.getType r)
:id (.getId r)
:_id (.getId r)
:version (.getVersion r)
:_version (.getVersion r)
:empty? (.isSourceEmpty r)
:source s
:_source s
;; TODO: convert GetFields to maps
:fields (into {} (.getFields r))}))
(defn ^IPersistentMap multi-get-item-response->map
[^MultiGetItemResponse i]
(let [r (.getResponse i)
s (convert-source-result (.getSourceAsMap r))]
{:exists (.isExists r)
:_index (.getIndex r)
:_type (.getType r)
:_id (.getId r)
:_version (.getVersion r)
:_source s}))
(defn multi-get-response->seq
[^MultiGetResponse r]
(let [items (.getResponses r)]
(map multi-get-item-response->map items)))
(defn ^MultiGetRequest ->multi-get-request
"Builds a multi-get action request"
([queries]
(->multi-get-request queries {}))
([queries {:keys [preference refresh realtime]}]
(let [r (MultiGetRequest.)]
(doseq [q queries]
(.add r (:_index q) (:_type q) (:_id q)))
(when preference
(.preference r preference))
(when refresh
(.refresh r refresh))
(when realtime
(.realtime r realtime))
r)))
(defn ^CountRequest ->count-request
([index-name options]
(->count-request index-name [] options))
([index-name mapping-type {:keys [^Map source min-score routing]}]
(let [^CountRequest r (CountRequest. (->string-array index-name))]
(.types r (->string-array mapping-type))
(when source
(let [^Map m {"query" (wlk/stringify-keys source)}]
(.source r m)))
(when min-score
(.minScore r min-score))
(when routing
(.routing r (->string-array routing)))
r)))
(defn ^DeleteRequest ->delete-request
([index-name mapping-type id]
(DeleteRequest. (name index-name) (name mapping-type) id))
([index-name mapping-type id {:keys [routing refresh version version-type parent]}]
(let [r (DeleteRequest. (name index-name) (name mapping-type) id)]
(when routing
(.routing ^DeleteRequest r ^String routing))
(when refresh
(.refresh ^DeleteRequest r ^{:tag "boolean"} refresh))
(when version
(.version ^DeleteRequest r ^{:tag "long"} version))
(when version-type
(.versionType ^DeleteRequest r version-type))
(when parent
(.parent ^DeleteRequest r parent))
r)))
(defn ^IPersistentMap delete-response->map
[^DeleteResponse r]
;; matches REST API responses
{:found (.isFound r)
:found? (.isFound r)
:_index (.getIndex r)
:_type (.getType r)
:_version (.getVersion r)
:_id (.getId r)})
(defn ^UpdateRequest ->update-request
[index-name mapping-type ^String id ^Map doc {:keys [doc_as_upsert
fields
parent
refresh
retry_on_conflict
routing
script
script_params
scripted_upsert]}]
(let [r (UpdateRequest. (name index-name) (name mapping-type) id)
stringified-doc (wlk/stringify-keys doc)]
(when (and doc (not script))
(.doc ^UpdateRequest r ^Map stringified-doc))
(when doc_as_upsert
(.docAsUpsert ^UpdateRequest r ^Boolean doc_as_upsert))
(when fields
(.fields ^UpdateRequest r (->string-array fields)))
(when parent
(.parent ^UpdateRequest r parent))
(when script
(.script ^UpdateRequest r ^String script)
(when doc
(.upsert ^UpdateRequest r ^Map stringified-doc)))
(when scripted_upsert
(.upsert ^UpdateRequest r ^Map stringified-doc)
(.scriptedUpsert ^UpdateRequest r ^Boolean scripted_upsert))
(when script_params
(.scriptParams ^UpdateRequest r ^Map (wlk/stringify-keys script_params)))
(when retry_on_conflict
(.retryOnConflict ^UpdateRequest r ^Boolean retry_on_conflict))
(when refresh
(.refresh ^UpdateRequest r ^Boolean refresh))
(when routing
(.routing ^UpdateRequest r ^String routing))
r))
(defn ^UpdateRequest ->partial-update-request
[index-name mapping-type ^String id ^Map partial-doc {:keys [routing refresh retry-on-conflict fields parent]}]
(let [doc (wlk/stringify-keys partial-doc)
r (UpdateRequest. (name index-name) (name mapping-type) id)]
(.doc r ^Map doc)
(when refresh
(.refresh r refresh))
(when retry-on-conflict
(.retryOnConflict r retry-on-conflict))
(when routing
(.routing r ^String routing))
(when parent
(.parent r parent))
(when fields
(.fields r (->string-array fields)))
r))
(defn ^UpdateRequest ->upsert-request
([index-name mapping-type ^String id ^Map doc {:keys [routing
refresh
retry-on-conflict
fields
parent
upsert]}]
(let [doc (wlk/stringify-keys doc)
upsert (wlk/stringify-keys upsert)
r (UpdateRequest. (name index-name) (name mapping-type) id)]
(.doc r ^Map doc)
(.upsert r ^Map (or upsert doc))
(when refresh
(.refresh r refresh))
(when retry-on-conflict
(.retryOnConflict r retry-on-conflict))
(when routing
(.routing r ^String routing))
(when parent
(.parent r parent))
(when fields
(.fields r (->string-array fields)))
r)))
(defn ^IPersistentMap update-response->map
[^UpdateResponse r]
;; matches REST API responses
;; example: {:ok true, :_index people, :_type person, :_id 1, :_version 2}
{:_index (.getIndex r) :type (.getType r) :_id (.getId r)
:get-result (when-let [gr (.getGetResult r)]
(get-result->map gr))})
(defn ^SortOrder ^:private ->sort-order
[s]
(if (instance? SortOrder s)
s
(SortOrder/valueOf (.toUpperCase (name s)))))
(defn ^SortBuilder ^:private ->field-sort-builder
[key value]
(let [fsb (FieldSortBuilder. (name key))]
(cond (map? value)
(do
(when-let [iu (:ignoreUnmapped value)]
(.ignoreUnmapped fsb iu))
(when-let [order (:order value)]
(.order fsb (->sort-order order))))
(or (instance? clojure.lang.Named value)
(instance? String value))
(.order fsb (->sort-order (name value))))
fsb))
(defn ^SearchSourceBuilder ^:private set-sort
[^SearchSourceBuilder sb sort]
(cond
(instance? String sort) (.sort sb ^String sort)
;; Allow 'sort' to be a SortBuilder, such as a GeoDistanceSortBuilder.
(instance? SortBuilder sort) (.sort sb ^SortBuilder sort)
;; map
:else (doseq [[k v] sort]
(.sort sb (->field-sort-builder k v))))
sb)
;; TODO: remove this once Clojure 1.9 is the minimum required version
(defn ^Boolean boolean? [value] (instance? Boolean value))
(defn ^:private add-partial-fields-to-builder
[^SearchSourceBuilder sb _source]
(cond
(nil? _source) sb
(boolean? _source) (.fetchSource sb _source)
(map? _source) (let [m (wlk/stringify-keys _source)
in (->string-array (m "include" []))
ex (->string-array (m "exclude" []))]
(.fetchSource sb in ex))
(sequential? _source) (.fetchSource sb (->string-array _source)
(->string-array []))
:else sb))
(defn ^HighlightBuilder$Field make-field
[field-name {:keys [type pre_tags post_tags order
highlight_filter fragment_size number_of_fragments
encoder require_field_match boundary_max_scan
boundary_chars fragmenter highlight_query no_match_size
phrase_limit force_source] :as opts}]
(let [fd (HighlightBuilder$Field. (name field-name))]
(when type
(.highlighterType fd type))
(when pre_tags
(.preTags fd (->string-array pre_tags)))
(when post_tags
(.postTags fd (->string-array post_tags)))
(when order
(.order fd order))
(when highlight_filter
(.highlightFilter fd highlight_filter))
(when fragment_size
(.fragmentSize fd ^{:tag "int"} fragment_size))
(when number_of_fragments
(.numOfFragments fd ^{:tag "int"} number_of_fragments))
(when require_field_match
(.requireFieldMatch fd require_field_match))
(when boundary_max_scan
(.boundaryMaxScan fd boundary_max_scan))
;; TODO: boundary_chars
(when fragmenter
(.fragmenter fd fragmenter))
(when force_source
(.forceSource fd force_source))
;; TODO: highlight_query
;; TODO: no_match_size
;; TODO: phrase_limit
;; TODO: custom highlighter options
fd))
(defn ^:private add-highlight-to-builder
[^SearchSourceBuilder sb {:keys [fields type tags_schema pre_tags post_tags order
highlight_filter fragment_size number_of_fragments
encoder require_field_match boundary_max_scan
boundary_chars fragmenter highlight_query no_match_size
phrase_limit force_source] :as opts}]
(let [^HighlightBuilder hb (.highlighter sb)]
(when type
(.highlighterType hb type))
(when tags_schema
(.tagsSchema hb tags_schema))
(when pre_tags
(.preTags hb (->string-array pre_tags)))
(when post_tags
(.postTags hb (->string-array post_tags)))
(when order
(.order hb order))
(when highlight_filter
(.highlightFilter hb highlight_filter))
(when fragment_size
(.fragmentSize hb ^{:tag "int"} fragment_size))
(when number_of_fragments
(.numOfFragments hb ^{:tag "int"} number_of_fragments))
(when encoder
(.encoder hb encoder))
(when require_field_match
(.requireFieldMatch hb require_field_match))
(when boundary_max_scan
(.boundaryMaxScan hb boundary_max_scan))
;; TODO: boundary_chars
(when fragmenter
(.fragmenter hb fragmenter))
(when force_source
(.forceSource hb force_source))
;; TODO: highlight_query
;; TODO: no_match_size
;; TODO: phrase_limit
;; TODO: custom highlighter options
(doseq [[k v] fields]
(.field hb (make-field k v)))
))
(defn ^SearchRequest ->search-request
[index-name mapping-type {:keys [search-type search_type scroll routing
preference query aggregations from size timeout
template params
post-filter filter min-score version fields sort stats _source
highlight] :as options}]
(let [r (SearchRequest.)
^SearchSourceBuilder sb (SearchSourceBuilder.)]
;; source
(when query
(.query sb ^Map (wlk/stringify-keys query)))
(when aggregations
(.aggregations sb ^Map (wlk/stringify-keys aggregations)))
(when from
(.from sb from))
(when size
(.size sb size))
(when timeout
(.timeout sb ^String timeout))
(when filter
(.postFilter sb ^Map (wlk/stringify-keys filter)))
;; compatibility
(when post-filter
(.postFilter sb ^Map (wlk/stringify-keys post-filter)))
(when fields
(.fields sb ^java.util.List fields))
(when _source
(add-partial-fields-to-builder sb _source))
(when min-score
(.minScore sb min-score))
(when version
(.version sb version))
(when sort
(set-sort sb sort))
(when stats
(.stats sb (->string-array stats)))
(when highlight
(add-highlight-to-builder sb highlight))
(.source r sb)
;; non-source
(when index-name
(.indices r (->string-array index-name)))
(when mapping-type
(.types r (->string-array mapping-type)))
(when-let [s (or search-type search_type)]
(.searchType r ^String s))
(when routing
(.routing r ^String routing))
(when scroll
(.scroll r ^String scroll))
(when template
(.templateName r ^String (:id template))
(.templateType r ^org.elasticsearch.script.ScriptService$ScriptType org.elasticsearch.script.ScriptService$ScriptType/INDEXED))
(when params
(.templateParams r ^Map (wlk/stringify-keys params)))
r))
(defn ^MultiSearchRequest ->multi-search-request
([^Client conn queries opts]
(let [sb (.newRequestBuilder MultiSearchAction/INSTANCE conn)]
;; pairs of [{:index "index name" :type "mapping type"}, search-options]
(doseq [[{:keys [index type]} search-opts] (partition 2 queries)]
(.add sb (->search-request index type search-opts)))
(.request sb)))
([^Client conn ^String index queries opts]
(let [sb (.newRequestBuilder MultiSearchAction/INSTANCE conn)]
(doseq [[{:keys [type]} search-opts] (partition 2 queries)]
(.add sb (->search-request index type search-opts)))
(.request sb)))
([^Client conn ^String index ^String type queries opts]
(let [sb (.newRequestBuilder MultiSearchAction/INSTANCE conn)]
(doseq [[_ search-opts] (partition 2 queries)]
(.add sb (->search-request index type search-opts)))
(.request sb))))
(defn ^SearchScrollRequest ->search-scroll-request
[^String scroll-id {:keys [scroll]}]
(let [r (SearchScrollRequest. scroll-id)]
(when scroll
(.scroll r ^String scroll))
r))
(defn ->string
[text]
(if (keyword? text)
(name text)
(str text)))
(defn attach-suggestion-context
"attach context for suggestion query."
[^CompletionSuggestionBuilder query context]
(let [add-category! (fn [field-name context-value]
(.addCategory query
^String (->string field-name)
(->string-array context-value)))
add-location! (fn [field-name {:keys [lat lon precision]}]
(if (empty? precision)
(.addGeoLocation query
(->string field-name)
(double lat)
(double lon)
nil)
(.addGeoLocationWithPrecision query
(->string field-name)
(double lat) (double lon)
(->string-array precision))))]
(doseq [[field context-dt] context]
(cond
(string? context-dt) (add-category! field context-dt)
(vector? context-dt) (add-category! field context-dt)
(map? context-dt) (when (contains? context-dt :lat)
(add-location! field context-dt))))
query))
;; TODO: add builder for term suggestor
(defmulti ->suggest-query (fn [qtype _ _] qtype))
(defmethod ^CompletionSuggestionBuilder ->suggest-query :completion
[qtype term {:keys [field size analyzer context]
:or {field "suggest"}}]
"builds a suggestion query object for simple autocomplete"
(let [query (doto (CompletionSuggestionBuilder. "hits")
(.text ^String term)
(.field ^String field))]
(when size (.size query size))
(when analyzer (.analyzer query analyzer))
(when context (attach-suggestion-context query context))
query))
(defmethod ^CompletionSuggestionFuzzyBuilder ->suggest-query :fuzzy
[qtype term {:keys [field size analyzer fuzziness transpositions
min-length prefix-length unicode-aware context]
:or {field "suggest"
transpositions true
min-length 3
prefix-length 1
unicode-aware false}}]
"builds query for fuzzy completion which allows little typos in search term"
(let [fuzz-level (case fuzziness
0 Fuzziness/ZERO
1 Fuzziness/ONE
2 Fuzziness/TWO
Fuzziness/AUTO)
query (doto (CompletionSuggestionFuzzyBuilder. "hits")
(.text ^String term)
(.field ^String field)
(.setFuzziness ^Fuzziness fuzz-level)
(.setFuzzyTranspositions ^Boolean transpositions)
(.setFuzzyMinLength ^Integer min-length)
(.setFuzzyPrefixLength ^Integer prefix-length)
(.setUnicodeAware ^Boolean unicode-aware))]
(when size (.size query size))
(when analyzer (.analyzer query analyzer))
(when context (attach-suggestion-context query context))
query))
(defmethod ^TermSuggestionBuilder ->suggest-query :term
[qtype term {:keys [field size analyzer context max-edits max-term-freq
min-doc-freq min-word-length prefix-length accuracy sort
string-distance suggest-mode]
:or {field "suggest"}}]
(let [query (doto (TermSuggestionBuilder. "hits")
(.text ^String term)
(.field ^String field))]
(when accuracy (.setAccuracy query accuracy))
(when analyzer (.analyzer query analyzer))
(when context (attach-suggestion-context query context))
(when max-edits (.maxEdits query max-edits))
(when max-term-freq (.maxTermFreq query max-term-freq))
(when min-doc-freq (.minDocFreq query min-doc-freq))
(when min-word-length (.minWordLength query min-word-length))
(when prefix-length (.prefixLength query prefix-length))
(when size (.size query size))
(when sort (.sort query sort))
(when string-distance (.stringDistance query string-distance))
(when suggest-mode (.suggestMode query suggest-mode))
query))
(defn ^:private highlight-field-to-map
[^HighlightField hlf]
{})
(defn ^:private add-highlight-from
[^SearchHit sh m]
(let [hls (.highlightFields sh)
hlm (reduce (fn [acc [^String k ^HighlightField hlf]]
(assoc acc (keyword k) (mapv (fn [^Text t]
(.string t)) (.getFragments hlf))))
{}
hls)]
(assoc m :highlight hlm)))
(defn- ^IPersistentMap search-hit->map
[^SearchHit sh]
(let [source (.getSource sh)
fs (dissoc (convert-fields-result (.getFields sh)) :_source)
result (add-highlight-from sh {:_index (.getIndex sh)
:_type (.getType sh)
:_id (.getId sh)
:_score (.getScore sh)
:sort (vec (.getSortValues sh))
:_version (.getVersion sh)})
result-with-source (if source
(assoc result :_source (convert-source-result source))
result)]
(if-not (or (nil? fs)
(empty? fs))
(assoc result-with-source
:_fields fs
:fields fs)
result-with-source)))
(defn- search-hits->seq
[^SearchHits hits]
{:total (.getTotalHits hits)
:max_score (.getMaxScore hits)
:hits (map search-hit->map (.getHits hits))})
(defprotocol AggregatorPresenter
(aggregation-value [agg] "Presents an aggregation as immutable Clojure map"))
(defn assoc-aggregation-value
[acc [^String name agg]]
;; <String, Aggregation>
(assoc acc (keyword name) (aggregation-value agg)))
(defn aggregations-to-map
[^Aggregations aggs]
(reduce assoc-aggregation-value {} (.asMap aggs)))
(defn merge-sub-aggregations [m ^HasAggregations b]
(clojure.core/merge
m
(aggregations-to-map (.getAggregations b))))
(defn histogram-bucket->map
[^Histogram$Bucket b]
(merge-sub-aggregations
{:key (.getKey b)
:doc_count (.getDocCount b)}
b))
(defn range-bucket->map
[^Range$Bucket b]
(merge-sub-aggregations
{:doc_count (.getDocCount b)
:from_as_string (.getFromAsString b)
:from (.. b getFrom)
:to_as_string (.getToAsString b)
:to (.. b getTo)}
b))
(defn terms-bucket->map
[^Terms$Bucket b]
(merge-sub-aggregations
{:doc_count (.getDocCount b)
:key (.getKey b)}
b))
(extend-protocol AggregatorPresenter
Avg
(aggregation-value [^Avg agg]
{:value (.getValue agg)})
Max
(aggregation-value [^Max agg]
{:value (.getValue agg)})
Min
(aggregation-value [^Min agg]
{:value (.getValue agg)})
Sum
(aggregation-value [^Sum agg]
{:value (.getValue agg)})
Percentiles
(aggregation-value [^Percentiles agg]
{:values (reduce (fn [acc ^Percentile p]
(assoc acc (keyword (str (.getPercent p))) (.getValue p)))
{}
agg)})
Cardinality
(aggregation-value [^Cardinality agg]
{:value (.getValue agg)})
ValueCount
(aggregation-value [^ValueCount agg]
{:value (.getValue agg)})
;; Missing, Global, etc
SingleBucketAggregation
(aggregation-value [^SingleBucketAggregation agg]
(->> (.getAggregations agg)
(aggregations-to-map)
(clojure.core/merge {:doc_count (.getDocCount agg)})))
Stats
(aggregation-value [^Stats agg]
{:count (.getCount agg)
:min (.getMin agg)
:max (.getMax agg)
:avg (.getAvg agg)
:sum (.getSum agg)})
ExtendedStats
(aggregation-value [^ExtendedStats agg]
{:count (.getCount agg)
:min (.getMin agg)
:max (.getMax agg)
:avg (.getAvg agg)
:sum (.getSum agg)
:sum_of_squares (.getSumOfSquares agg)
:variance (.getVariance agg)
:std_deviation (.getStdDeviation agg)})
TopHits
(aggregation-value [^TopHits agg]
{:hits (search-hits->seq (.getHits agg))})
Histogram
(aggregation-value [^Histogram agg]
{:buckets (mapv histogram-bucket->map (.getBuckets agg))})
Range
(aggregation-value [^Range agg]
{:buckets (mapv range-bucket->map (.getBuckets agg))})
Terms
(aggregation-value [^Terms agg]
{:buckets (mapv terms-bucket->map (.getBuckets agg))}))
(defn search-response->seq
[^SearchResponse r]
;; Example REST API response:
;;
;; {:took 18,
;; :timed_out false,
;; :_shards {:total 5, :successful 5, :failed 0},
;; :hits {
;; :total 4,
;; :max_score 1.0,
;; :hits [{:_index "articles",