forked from elastic/eland
/
operations.py
1313 lines (1110 loc) · 47.4 KB
/
operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import copy
import warnings
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import numpy as np
import pandas as pd
from elasticsearch.helpers import scan
from eland.actions import PostProcessingAction, SortFieldAction
from eland.common import (
DEFAULT_CSV_BATCH_OUTPUT_SIZE,
DEFAULT_ES_MAX_RESULT_WINDOW,
DEFAULT_PAGINATION_SIZE,
SortOrder,
build_pd_series,
elasticsearch_date_to_pandas_date,
)
from eland.index import Index
from eland.query import Query
from eland.tasks import (
RESOLVED_TASK_TYPE,
ArithmeticOpFieldsTask,
BooleanFilterTask,
HeadTask,
QueryIdsTask,
QueryTermsTask,
SampleTask,
SizeTask,
TailTask,
)
if TYPE_CHECKING:
from eland.field_mappings import Field
from eland.query_compiler import QueryCompiler
class QueryParams:
def __init__(self):
self.query = Query()
self.sort_field: Optional[str] = None
self.sort_order: Optional[SortOrder] = None
self.size: Optional[int] = None
self.fields: Optional[List[str]] = None
self.script_fields: Optional[Dict[str, Dict[str, Any]]] = None
class Operations:
"""
A collector of the queries and selectors we apply to queries to return the appropriate results.
For example,
- a list of the field_names in the DataFrame (a subset of field_names in the index)
- a size limit on the results (e.g. for head(n=5))
- a query to filter the results (e.g. df.A > 10)
This is maintained as a 'task graph' (inspired by dask)
(see https://docs.dask.org/en/latest/spec.html)
"""
def __init__(self, tasks=None, arithmetic_op_fields_task=None):
if tasks is None:
self._tasks = []
else:
self._tasks = tasks
self._arithmetic_op_fields_task = arithmetic_op_fields_task
def __constructor__(self, *args, **kwargs):
return type(self)(*args, **kwargs)
def copy(self):
return self.__constructor__(
tasks=copy.deepcopy(self._tasks),
arithmetic_op_fields_task=copy.deepcopy(self._arithmetic_op_fields_task),
)
def head(self, index, n):
# Add a task that is an ascending sort with size=n
task = HeadTask(index, n)
self._tasks.append(task)
def tail(self, index, n):
# Add a task that is descending sort with size=n
task = TailTask(index, n)
self._tasks.append(task)
def sample(self, index, n, random_state):
task = SampleTask(index, n, random_state)
self._tasks.append(task)
def arithmetic_op_fields(self, display_name, arithmetic_series):
if self._arithmetic_op_fields_task is None:
self._arithmetic_op_fields_task = ArithmeticOpFieldsTask(
display_name, arithmetic_series
)
else:
self._arithmetic_op_fields_task.update(display_name, arithmetic_series)
def get_arithmetic_op_fields(self) -> Optional[ArithmeticOpFieldsTask]:
# get an ArithmeticOpFieldsTask if it exists
return self._arithmetic_op_fields_task
def __repr__(self):
return repr(self._tasks)
def count(self, query_compiler):
query_params, post_processing = self._resolve_tasks(query_compiler)
# Elasticsearch _count is very efficient and so used to return results here. This means that
# data frames that have restricted size or sort params will not return valid results
# (_count doesn't support size).
# Longer term we may fall back to pandas, but this may result in loading all index into memory.
if self._size(query_params, post_processing) is not None:
raise NotImplementedError(
f"Requesting count with additional query and processing parameters "
f"not supported {query_params} {post_processing}"
)
# Only return requested field_names
fields = query_compiler.get_field_names(include_scripted_fields=False)
counts = {}
for field in fields:
body = Query(query_params.query)
body.exists(field, must=True)
field_exists_count = query_compiler._client.count(
index=query_compiler._index_pattern, body=body.to_count_body()
)["count"]
counts[field] = field_exists_count
return build_pd_series(data=counts, index=fields)
def _metric_agg_series(
self,
query_compiler: "QueryCompiler",
agg: List,
numeric_only: Optional[bool] = None,
) -> pd.Series:
results = self._metric_aggs(query_compiler, agg, numeric_only=numeric_only)
if numeric_only:
return build_pd_series(results, index=results.keys(), dtype=np.float64)
else:
# If all results are float convert into float64
if all(isinstance(i, float) for i in results.values()):
dtype = np.float64
# If all results are int convert into int64
elif all(isinstance(i, int) for i in results.values()):
dtype = np.int64
# If single result is present consider that datatype instead of object
elif len(results) <= 1:
dtype = None
else:
dtype = "object"
return build_pd_series(results, index=results.keys(), dtype=dtype)
def value_counts(self, query_compiler: "QueryCompiler", es_size: int) -> pd.Series:
return self._terms_aggs(query_compiler, "terms", es_size)
def hist(self, query_compiler, bins):
return self._hist_aggs(query_compiler, bins)
def aggs(self, query_compiler, pd_aggs, numeric_only=None) -> pd.DataFrame:
results = self._metric_aggs(
query_compiler, pd_aggs, numeric_only=numeric_only, is_dataframe_agg=True
)
return pd.DataFrame(
results, index=pd_aggs, dtype=(np.float64 if numeric_only else None)
)
def mode(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
is_dataframe: bool,
es_size: int,
numeric_only: bool = False,
dropna: bool = True,
) -> Union[pd.DataFrame, pd.Series]:
results = self._metric_aggs(
query_compiler,
pd_aggs=pd_aggs,
numeric_only=numeric_only,
dropna=dropna,
es_mode_size=es_size,
)
pd_dict: Dict[str, Any] = {}
row_diff: Optional[int] = None
if is_dataframe:
# If multiple values of mode is returned for a particular column
# find the maximum length and use that to fill dataframe with NaN/NaT
rows_len = max([len(value) for value in results.values()])
for key, values in results.items():
row_diff = rows_len - len(values)
# Convert np.ndarray to list
values = list(values)
if row_diff:
if isinstance(values[0], pd.Timestamp):
values.extend([pd.NaT] * row_diff)
else:
values.extend([np.NaN] * row_diff)
pd_dict[key] = values
return pd.DataFrame(pd_dict)
else:
return pd.DataFrame(results.values()).iloc[0].rename()
def _metric_aggs(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
numeric_only: Optional[bool] = None,
is_dataframe_agg: bool = False,
es_mode_size: Optional[int] = None,
dropna: bool = True,
) -> Dict[str, Any]:
"""
Used to calculate metric aggregations
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html
Parameters
----------
query_compiler:
Query Compiler object
pd_aggs:
aggregations that are to be performed on dataframe or series
numeric_only:
return either all numeric values or NaN/NaT
is_dataframe_agg:
know if this method is called from single-agg or aggreagation method
es_mode_size:
number of rows to return when multiple mode values are present.
dropna:
drop NaN/NaT for a dataframe
Returns
-------
A dictionary which contains all aggregations calculated.
"""
query_params, post_processing = self._resolve_tasks(query_compiler)
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError(
f"Can not count field matches if size is set {size}"
)
fields = query_compiler._mappings.all_source_fields()
if numeric_only:
# Consider if field is Int/Float/Bool
fields = [field for field in fields if (field.is_numeric or field.is_bool)]
body = Query(query_params.query)
# Convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
for field in fields:
for es_agg in es_aggs:
# NaN/NaT fields are ignored
if not field.is_es_agg_compatible(es_agg):
continue
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
body.metric_aggs(
f"{es_agg[0]}_{field.es_field_name}",
es_agg[0],
field.aggregatable_es_field_name,
)
elif es_agg == "mode":
# TODO for dropna=False, Check If field is timestamp or boolean or numeric,
# then use missing parameter for terms aggregation.
body.terms_aggs(
f"{es_agg}_{field.es_field_name}",
"terms",
field.aggregatable_es_field_name,
es_mode_size,
)
else:
body.metric_aggs(
f"{es_agg}_{field.es_field_name}",
es_agg,
field.aggregatable_es_field_name,
)
response = query_compiler._client.search(
index=query_compiler._index_pattern, size=0, body=body.to_search_body()
)
"""
Results are like (for 'sum', 'min')
AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin
sum 8.204365e+06 9.261629e+07 5.754909e+07 618150
min 1.000205e+02 0.000000e+00 0.000000e+00 0
"""
return self._unpack_metric_aggs(
fields=fields,
es_aggs=es_aggs,
pd_aggs=pd_aggs,
response=response,
numeric_only=numeric_only,
is_dataframe_agg=is_dataframe_agg,
)
def _terms_aggs(
self, query_compiler: "QueryCompiler", func: str, es_size: int
) -> pd.Series:
"""
Parameters
----------
es_size: int, default None
Parameter used by Series.value_counts()
Returns
-------
pandas.Series
Series containing results of `func` applied to the field_name(s)
"""
query_params, post_processing = self._resolve_tasks(query_compiler)
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError(
f"Can not count field matches if size is set {size}"
)
# Get just aggregatable field_names
aggregatable_field_names = query_compiler._mappings.aggregatable_field_names()
body = Query(query_params.query)
for field in aggregatable_field_names.keys():
body.terms_aggs(field, func, field, es_size=es_size)
response = query_compiler._client.search(
index=query_compiler._index_pattern, size=0, body=body.to_search_body()
)
results = {}
for key in aggregatable_field_names.keys():
# key is aggregatable field, value is label
# e.g. key=category.keyword, value=category
for bucket in response["aggregations"][key]["buckets"]:
results[bucket["key"]] = bucket["doc_count"]
try:
# get first value in dict (key is .keyword)
name = list(aggregatable_field_names.values())[0]
except IndexError:
name = None
return build_pd_series(results, name=name)
def _hist_aggs(self, query_compiler, num_bins):
# Get histogram bins and weights for numeric field_names
query_params, post_processing = self._resolve_tasks(query_compiler)
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError(
f"Can not count field matches if size is set {size}"
)
numeric_source_fields = query_compiler._mappings.numeric_source_fields()
body = Query(query_params.query)
results = self._metric_aggs(query_compiler, ["min", "max"], numeric_only=True)
min_aggs = {}
max_aggs = {}
for field, (min_agg, max_agg) in results.items():
min_aggs[field] = min_agg
max_aggs[field] = max_agg
for field in numeric_source_fields:
body.hist_aggs(field, field, min_aggs[field], max_aggs[field], num_bins)
response = query_compiler._client.search(
index=query_compiler._index_pattern, size=0, body=body.to_search_body()
)
# results are like
# "aggregations" : {
# "DistanceKilometers" : {
# "buckets" : [
# {
# "key" : 0.0,
# "doc_count" : 2956
# },
# {
# "key" : 1988.1482421875,
# "doc_count" : 768
# },
# ...
bins = {}
weights = {}
# There is one more bin that weights
# len(bins) = len(weights) + 1
# bins = [ 0. 36. 72. 108. 144. 180. 216. 252. 288. 324. 360.]
# len(bins) == 11
# weights = [10066., 263., 386., 264., 273., 390., 324., 438., 261., 394.]
# len(weights) == 10
# ES returns
# weights = [10066., 263., 386., 264., 273., 390., 324., 438., 261., 252., 142.]
# So sum last 2 buckets
for field in numeric_source_fields:
# in case of series let plotting.ed_hist_series thrown an exception
if not response.get("aggregations"):
continue
# in case of dataframe, throw warning that field is excluded
if not response["aggregations"].get(field):
warnings.warn(
f"{field} has no meaningful histogram interval and will be excluded. "
f"All values 0.",
UserWarning,
)
continue
buckets = response["aggregations"][field]["buckets"]
bins[field] = []
weights[field] = []
for bucket in buckets:
bins[field].append(bucket["key"])
if bucket == buckets[-1]:
weights[field][-1] += bucket["doc_count"]
else:
weights[field].append(bucket["doc_count"])
df_bins = pd.DataFrame(data=bins)
df_weights = pd.DataFrame(data=weights)
return df_bins, df_weights
def _unpack_metric_aggs(
self,
fields: List["Field"],
es_aggs: Union[List[str], List[Tuple[str, str]]],
pd_aggs: List[str],
response: Dict[str, Any],
numeric_only: Optional[bool],
is_dataframe_agg: bool = False,
):
"""
This method unpacks metric aggregations JSON response.
This can be called either directly on an aggs query
or on an individual bucket within a composite aggregation.
Parameters
----------
fields:
a list of Field Mappings
es_aggs:
Eland Equivalent of aggs
pd_aggs:
a list of aggs
response:
a dict containing response from Elastic Search
numeric_only:
return either numeric values or NaN/NaT
Returns
-------
a dictionary on which agg caluculations are done.
"""
results: Dict[str, Any] = {}
for field in fields:
values = []
for es_agg, pd_agg in zip(es_aggs, pd_aggs):
# is_dataframe_agg is used to differentiate agg() and an aggregation called through .mean()
# If the field and agg aren't compatible we add a NaN/NaT for agg
# If the field and agg aren't compatible we don't add NaN/NaT for an aggregation called through .mean()
if not field.is_es_agg_compatible(es_agg):
if is_dataframe_agg and not numeric_only:
values.append(field.nan_value)
elif not is_dataframe_agg and numeric_only is False:
values.append(field.nan_value)
# Explicit condition for mad to add NaN because it doesn't support bool
elif is_dataframe_agg and numeric_only:
if pd_agg == "mad":
values.append(field.nan_value)
continue
if isinstance(es_agg, tuple):
agg_value = response["aggregations"][
f"{es_agg[0]}_{field.es_field_name}"
]
# Pull multiple values from 'percentiles' result.
if es_agg[0] == "percentiles":
agg_value = agg_value["values"]
agg_value = agg_value[es_agg[1]]
# Need to convert 'Population' stddev and variance
# from Elasticsearch into 'Sample' stddev and variance
# which is what pandas uses.
if es_agg[1] in ("std_deviation", "variance"):
# Neither transformation works with count <=1
count = response["aggregations"][
f"{es_agg[0]}_{field.es_field_name}"
]["count"]
# All of the below calculations result in NaN if count<=1
if count <= 1:
agg_value = np.NaN
elif es_agg[1] == "std_deviation":
agg_value *= count / (count - 1.0)
else: # es_agg[1] == "variance"
# sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2}
# population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2}
# sample_std=\sqrt{\frac{N}{N-1}population_std}
agg_value = np.sqrt(
(count / (count - 1.0)) * agg_value * agg_value
)
elif es_agg == "mode":
# For terms aggregation buckets are returned
# agg_value will be of type list
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["buckets"]
else:
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["value"]
if isinstance(agg_value, list):
# include top-terms in the result.
if not agg_value:
# If the all the documents for a field are empty
agg_value = [field.nan_value]
else:
max_doc_count = agg_value[0]["doc_count"]
# We need only keys which are equal to max_doc_count
# lesser values are ignored
agg_value = [
item["key"]
for item in agg_value
if item["doc_count"] == max_doc_count
]
# Maintain datatype by default because pandas does the same
# text are returned as-is
if field.is_bool or field.is_numeric:
agg_value = [
field.np_dtype.type(value) for value in agg_value
]
# Null usually means there were no results.
if not isinstance(agg_value, list) and (
agg_value is None or np.isnan(agg_value)
):
if is_dataframe_agg and not numeric_only:
agg_value = np.NaN
elif not is_dataframe_agg and numeric_only is False:
agg_value = np.NaN
# Cardinality is always either NaN or integer.
elif pd_agg in ("nunique", "count"):
agg_value = int(agg_value)
# If this is a non-null timestamp field convert to a pd.Timestamp()
elif field.is_timestamp:
if isinstance(agg_value, list):
# convert to timestamp results for mode
agg_value = [
elasticsearch_date_to_pandas_date(
value, field.es_date_format
)
for value in agg_value
]
else:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum", "mode"}:
# 'sum' isn't representable with bool, use int64
if pd_agg == "sum" and field.is_bool:
agg_value = np.int64(agg_value)
else:
agg_value = field.np_dtype.type(agg_value)
values.append(agg_value)
# If numeric_only is True and We only have a NaN type field then we check for empty.
if values:
results[field.column] = values if len(values) > 1 else values[0]
return results
def aggs_groupby(
self,
query_compiler: "QueryCompiler",
by: List[str],
pd_aggs: List[str],
dropna: bool = True,
is_dataframe_agg: bool = False,
numeric_only: Optional[bool] = True,
) -> pd.DataFrame:
"""
This method is used to construct groupby aggregation dataframe
Parameters
----------
query_compiler:
A Query compiler
by:
a list of columns on which groupby operations have to be performed
pd_aggs:
a list of aggregations to be performed
dropna:
Drop None values if True.
TODO Not yet implemented
is_dataframe_agg:
Know if groupby with aggregation or single agg is called.
numeric_only:
return either numeric values or NaN/NaT
Returns
-------
A dataframe which consists groupby data
"""
query_params, post_processing = self._resolve_tasks(query_compiler)
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError(
f"Can not count field matches if size is set {size}"
)
by_fields, agg_fields = query_compiler._mappings.groupby_source_fields(by=by)
# Used defaultdict to avoid initialization of columns with lists
results: Dict[str, List[Any]] = defaultdict(list)
if numeric_only:
agg_fields = [
field for field in agg_fields if (field.is_numeric or field.is_bool)
]
body = Query(query_params.query)
# To return for creating multi-index on columns
headers = [agg_field.column for agg_field in agg_fields]
# Convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
# Construct Query
for by_field in by_fields:
if by_field.aggregatable_es_field_name is None:
raise ValueError(
f"Cannot use {by_field.column!r} with groupby() because "
f"it has no aggregatable fields in Elasticsearch"
)
# groupby fields will be term aggregations
body.composite_agg_bucket_terms(
name=f"groupby_{by_field.column}",
field=by_field.aggregatable_es_field_name,
)
for agg_field in agg_fields:
for es_agg in es_aggs:
# Skip if the field isn't compatible or if the agg is
# 'value_count' as this value is pulled from bucket.doc_count.
if not agg_field.is_es_agg_compatible(es_agg):
continue
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
body.metric_aggs(
f"{es_agg[0]}_{agg_field.es_field_name}",
es_agg[0],
agg_field.aggregatable_es_field_name,
)
else:
body.metric_aggs(
f"{es_agg}_{agg_field.es_field_name}",
es_agg,
agg_field.aggregatable_es_field_name,
)
# Composite aggregation
body.composite_agg_start(
size=DEFAULT_PAGINATION_SIZE, name="groupby_buckets", dropna=dropna
)
for buckets in self.bucket_generator(query_compiler, body):
# We recieve response row-wise
for bucket in buckets:
# groupby columns are added to result same way they are returned
for by_field in by_fields:
bucket_key = bucket["key"][f"groupby_{by_field.column}"]
# Datetimes always come back as integers, convert to pd.Timestamp()
if by_field.is_timestamp and isinstance(bucket_key, int):
bucket_key = pd.to_datetime(bucket_key, unit="ms")
results[by_field.column].append(bucket_key)
agg_calculation = self._unpack_metric_aggs(
fields=agg_fields,
es_aggs=es_aggs,
pd_aggs=pd_aggs,
response={"aggregations": bucket},
numeric_only=numeric_only,
# We set 'True' here because we want the value
# unpacking to always be in 'dataframe' mode.
is_dataframe_agg=True,
)
# Process the calculated agg values to response
for key, value in agg_calculation.items():
if not isinstance(value, list):
results[key].append(value)
continue
for pd_agg, val in zip(pd_aggs, value):
results[f"{key}_{pd_agg}"].append(val)
agg_df = pd.DataFrame(results).set_index(by)
if is_dataframe_agg:
# Convert header columns to MultiIndex
agg_df.columns = pd.MultiIndex.from_product([headers, pd_aggs])
else:
# Convert header columns to Index
agg_df.columns = pd.Index(headers)
return agg_df
@staticmethod
def bucket_generator(
query_compiler: "QueryCompiler", body: "Query"
) -> Generator[List[str], None, List[str]]:
"""
This can be used for all groupby operations.
e.g.
"aggregations": {
"groupby_buckets": {
"after_key": {"total_quantity": 8},
"buckets": [
{
"key": {"total_quantity": 1},
"doc_count": 87,
"taxful_total_price_avg": {"value": 48.035978536496216},
}
],
}
}
Returns
-------
A generator which initially yields the bucket
If after_key is found, use it to fetch the next set of buckets.
"""
while True:
res = query_compiler._client.search(
index=query_compiler._index_pattern,
size=0,
body=body.to_search_body(),
)
# Pagination Logic
composite_buckets = res["aggregations"]["groupby_buckets"]
if "after_key" in composite_buckets:
# yield the bucket which contains the result
yield composite_buckets["buckets"]
body.composite_agg_after_key(
name="groupby_buckets",
after_key=composite_buckets["after_key"],
)
else:
return composite_buckets["buckets"]
@staticmethod
def _map_pd_aggs_to_es_aggs(pd_aggs):
"""
Args:
pd_aggs - list of pandas aggs (e.g. ['mad', 'min', 'std'] etc.)
Returns:
ed_aggs - list of corresponding es_aggs (e.g. ['median_absolute_deviation', 'min', 'std'] etc.)
Pandas supports a lot of options here, and these options generally work on text and numerics in pandas.
Elasticsearch has metric aggs and terms aggs so will have different behaviour.
Pandas aggs that return field_names (as opposed to transformed rows):
all
any
count
mad
max
mean
median
min
mode
quantile
rank
sem
skew
sum
std
var
nunique
"""
# pd aggs that will be mapped to es aggs
# that can use 'extended_stats'.
extended_stats_pd_aggs = {"mean", "min", "max", "sum", "var", "std"}
extended_stats_es_aggs = {"avg", "min", "max", "sum"}
extended_stats_calls = 0
es_aggs = []
for pd_agg in pd_aggs:
if pd_agg in extended_stats_pd_aggs:
extended_stats_calls += 1
# Aggs that are 'extended_stats' compatible
if pd_agg == "count":
es_aggs.append("value_count")
elif pd_agg == "max":
es_aggs.append("max")
elif pd_agg == "min":
es_aggs.append("min")
elif pd_agg == "mean":
es_aggs.append("avg")
elif pd_agg == "sum":
es_aggs.append("sum")
elif pd_agg == "std":
es_aggs.append(("extended_stats", "std_deviation"))
elif pd_agg == "var":
es_aggs.append(("extended_stats", "variance"))
# Aggs that aren't 'extended_stats' compatible
elif pd_agg == "nunique":
es_aggs.append("cardinality")
elif pd_agg == "mad":
es_aggs.append("median_absolute_deviation")
elif pd_agg == "median":
es_aggs.append(("percentiles", "50.0"))
elif pd_agg == "mode":
if len(pd_aggs) != 1:
raise NotImplementedError(
"Currently mode is not supported in df.agg[...]. Try df.mode()"
)
else:
es_aggs.append("mode")
# Not implemented
elif pd_agg == "quantile":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == "rank":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == "sem":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
else:
raise NotImplementedError(pd_agg, " not currently implemented")
# If two aggs compatible with 'extended_stats' is called we can
# piggy-back on that single aggregation.
if extended_stats_calls >= 2:
es_aggs = [
("extended_stats", es_agg)
if es_agg in extended_stats_es_aggs
else es_agg
for es_agg in es_aggs
]
return es_aggs
def filter(
self,
query_compiler: "QueryCompiler",
items: Optional[Sequence[str]] = None,
like: Optional[str] = None,
regex: Optional[str] = None,
) -> None:
# This function is only called for axis='index',
# DataFrame.filter(..., axis="columns") calls .drop()
if items is not None:
self.filter_index_values(
query_compiler, field=query_compiler.index.es_index_field, items=items
)
return
elif like is not None:
arg_name = "like"
else:
assert regex is not None
arg_name = "regex"
raise NotImplementedError(
f".filter({arg_name}='...', axis='index') is currently not supported due "
f"to substring and regex operations not being available for Elasticsearch document IDs."
)
def describe(self, query_compiler):
query_params, post_processing = self._resolve_tasks(query_compiler)
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError(
f"Can not count field matches if size is set {size}"
)
numeric_source_fields = query_compiler._mappings.numeric_source_fields()
# for each field we compute:
# count, mean, std, min, 25%, 50%, 75%, max
body = Query(query_params.query)
for field in numeric_source_fields:
body.metric_aggs("extended_stats_" + field, "extended_stats", field)
body.metric_aggs("percentiles_" + field, "percentiles", field)
response = query_compiler._client.search(
index=query_compiler._index_pattern, size=0, body=body.to_search_body()
)
results = {}
for field in numeric_source_fields:
values = list()
values.append(response["aggregations"]["extended_stats_" + field]["count"])
values.append(response["aggregations"]["extended_stats_" + field]["avg"])
values.append(
response["aggregations"]["extended_stats_" + field]["std_deviation"]
)
values.append(response["aggregations"]["extended_stats_" + field]["min"])
values.append(
response["aggregations"]["percentiles_" + field]["values"]["25.0"]
)
values.append(
response["aggregations"]["percentiles_" + field]["values"]["50.0"]
)
values.append(
response["aggregations"]["percentiles_" + field]["values"]["75.0"]
)
values.append(response["aggregations"]["extended_stats_" + field]["max"])
# if not None
if values.count(None) < len(values):
results[field] = values
df = pd.DataFrame(
data=results,
index=["count", "mean", "std", "min", "25%", "50%", "75%", "max"],
)