Skip to content

Commit

Permalink
Add support for DataFrame.groupby() with aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
V1NAY8 committed Oct 15, 2020
1 parent adafeed commit abc5ca9
Show file tree
Hide file tree
Showing 9 changed files with 877 additions and 100 deletions.
1 change: 1 addition & 0 deletions eland/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window
DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations


with warnings.catch_warnings():
Expand Down
81 changes: 80 additions & 1 deletion eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import warnings
from io import StringIO
import re
from typing import Optional, Sequence, Union, Tuple, List
from typing import List, Optional, Sequence, Union, Tuple

import numpy as np
import pandas as pd
Expand All @@ -39,6 +39,7 @@
from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter
from eland.filter import BooleanFilter
from eland.utils import deprecated_api, is_valid_attr_name
from eland.groupby import GroupByDataFrame


class DataFrame(NDFrame):
Expand Down Expand Up @@ -1430,6 +1431,84 @@ def aggregate(

hist = gfx.ed_hist_frame

def groupby(
self, by: Optional[Union[str, List[str]]] = None, dropna: bool = True
) -> "GroupByDataFrame":
"""
Used to perform groupby operations
Parameters
----------
by:
column or list of columns used to groupby
Currently accepts column or list of columns
TODO Implement other combinations of by similar to pandas
dropna: default True
If True, and if group keys contain NA values, NA values together with row/column will be dropped.
TODO Implement False
TODO Implement remainder of pandas arguments
Returns
-------
GroupByDataFrame
See Also
--------
:pandas_api_docs:`pandas.DataFrame.groupby`
Examples
--------
>>> ed_flights = ed.DataFrame('localhost', 'flights', columns=["AvgTicketPrice", "Cancelled", "dayOfWeek", "timestamp", "DestCountry"])
>>> ed_flights.groupby(["DestCountry", "Cancelled"]).agg(["min", "max"], numeric_only=True) # doctest: +NORMALIZE_WHITESPACE
AvgTicketPrice dayOfWeek
min max min max
DestCountry Cancelled
AE False 110.799911 1126.148682 0.0 6.0
True 132.443756 817.931030 0.0 6.0
AR False 125.589394 1199.642822 0.0 6.0
True 251.389603 1172.382568 0.0 6.0
AT False 100.020531 1181.835815 0.0 6.0
... ... ... ... ...
TR True 307.915649 307.915649 0.0 0.0
US False 100.145966 1199.729004 0.0 6.0
True 102.153069 1192.429932 0.0 6.0
ZA False 102.002663 1196.186157 0.0 6.0
True 121.280296 1175.709961 0.0 6.0
<BLANKLINE>
[63 rows x 4 columns]
>>> ed_flights.groupby(["DestCountry", "Cancelled"]).mean(numeric_only=True) # doctest: +NORMALIZE_WHITESPACE
AvgTicketPrice dayOfWeek
DestCountry Cancelled
AE False 643.956793 2.717949
True 388.828809 2.571429
AR False 673.551677 2.746154
True 682.197241 2.733333
AT False 647.158290 2.819936
... ... ...
TR True 307.915649 0.000000
US False 598.063146 2.752014
True 579.799066 2.767068
ZA False 636.998605 2.738589
True 677.794078 2.928571
<BLANKLINE>
[63 rows x 2 columns]
"""
if by is None:
raise TypeError("by parameter should be specified to groupby")
if isinstance(by, str):
by = [by]
if isinstance(by, (list, tuple)):
remaining_columns = set(by) - set(self._query_compiler.columns)
if remaining_columns:
raise KeyError(
f"Requested columns {remaining_columns} not in the DataFrame."
)

return GroupByDataFrame(
by=by, query_compiler=self._query_compiler, dropna=dropna
)

def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
Expand Down
41 changes: 39 additions & 2 deletions eland/field_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Mapping,
Dict,
Any,
Tuple,
TYPE_CHECKING,
List,
Set,
Expand Down Expand Up @@ -697,14 +698,50 @@ def numeric_source_fields(self):
pd_dtypes, es_field_names, es_date_formats = self.metric_source_fields()
return es_field_names

def all_source_fields(self):
source_fields = []
def all_source_fields(self) -> List[Field]:
"""
This method is used to return all Field Mappings for fields
Returns
-------
A list of Field Mappings
"""
source_fields: List[Field] = []
for index, row in self._mappings_capabilities.iterrows():
row = row.to_dict()
row["index"] = index
source_fields.append(Field(**row))
return source_fields

def groupby_source_fields(self, by: List[str]) -> Tuple[List[Field], List[Field]]:
"""
This method returns all Field Mappings for groupby and non-groupby fields
Parameters
----------
by:
A list of groupby fields
Returns
-------
A Tuple consisting of a list of field mappings for groupby and non-groupby fields
"""
groupby_fields: Dict[str, Field] = {}
# groupby_fields: Union[List[Field], List[None]] = [None] * len(by)
aggregatable_fields: List[Field] = []
for index_name, row in self._mappings_capabilities.iterrows():
row = row.to_dict()
row["index"] = index_name
if index_name not in by:
aggregatable_fields.append(Field(**row))
else:
groupby_fields[index_name] = Field(**row)

# Maintain groupby order as given input
return [groupby_fields[column] for column in by], aggregatable_fields

def metric_source_fields(self, include_bool=False, include_timestamp=False):
"""
Returns
Expand Down
169 changes: 169 additions & 0 deletions eland/groupby.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import List, TYPE_CHECKING
from eland.query_compiler import QueryCompiler

if TYPE_CHECKING:
import pandas as pd # type: ignore


class GroupBy:
"""
This holds all the groupby base methods
Parameters
----------
by:
List of columns to groupby
query_compiler:
Query compiler object
dropna:
default is true, drop None/NaT/NaN values while grouping
"""

def __init__(
self,
by: List[str],
query_compiler: "QueryCompiler",
dropna: bool = True,
) -> None:
self._query_compiler: "QueryCompiler" = QueryCompiler(to_copy=query_compiler)
self._dropna: bool = dropna
self._by: List[str] = by

# numeric_only=True by default for all aggs because pandas does the same
def mean(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["mean"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def var(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["var"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def std(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["std"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def mad(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["mad"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def median(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["median"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def sum(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["sum"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def min(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["min"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def max(self, numeric_only: bool = True) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["max"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def nunique(self) -> "pd.DataFrame":
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["nunique"],
dropna=self._dropna,
numeric_only=False,
)


class GroupByDataFrame(GroupBy):
"""
This holds all the groupby methods for DataFrame
Parameters
----------
by:
List of columns to groupby
query_compiler:
Query compiler object
dropna:
default is true, drop None/NaT/NaN values while grouping
"""

def aggregate(self, func: List[str], numeric_only: bool = False) -> "pd.DataFrame":
"""
Used to groupby and aggregate
Parameters
----------
func:
Functions to use for aggregating the data.
Accepted combinations are:
- function
- list of functions
numeric_only: {True, False, None} Default is None
Which datatype to be returned
- True: returns all values with float64, NaN/NaT are ignored.
- False: returns all values with float64.
- None: returns all values with default datatype.
"""
if isinstance(func, str):
func = [func]
# numeric_only is by default False because pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=func,
dropna=self._dropna,
numeric_only=numeric_only,
is_agg=True,
)

agg = aggregate

0 comments on commit abc5ca9

Please sign in to comment.