Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mode to dataframe and series #323

Merged
merged 6 commits into from Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
@@ -1,3 +1,4 @@
include LICENSE.txt
include README.md
include eland/py.typed
recursive-include eland
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions docs/sphinx/reference/dataframe.rst
Expand Up @@ -89,6 +89,7 @@ Computations / Descriptive Stats
DataFrame.var
DataFrame.sum
DataFrame.nunique
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
DataFrame.mode

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions docs/sphinx/reference/series.rst
Expand Up @@ -79,6 +79,7 @@ Computations / Descriptive Stats
Series.var
Series.nunique
Series.value_counts
Series.mode

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
62 changes: 62 additions & 0 deletions eland/dataframe.py
Expand Up @@ -1628,6 +1628,68 @@ def groupby(
by=by, query_compiler=self._query_compiler.copy(), dropna=dropna
)

def mode(
self,
numeric_only: bool = False,
dropna: bool = True,
es_size: int = 10,
) -> pd.DataFrame:
"""
Calculate mode of a DataFrame
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
numeric_only: {True, False} Default is False
Which datatype to be returned
- True: Returns all numeric or timestamp columns
- False: Returns all columns
dropna: {True, False} Default is True
- True: Don鈥檛 consider counts of NaN/NaT.
- False: Consider counts of NaN/NaT.
es_size: default 10
number of rows to be returned if mode has multiple values

See Also
--------
:pandas_api_docs:`pandas.DataFrame.mode`

Examples
--------
>>> ed_ecommerce = ed.DataFrame('localhost', 'ecommerce')
>>> ed_df = ed_ecommerce.filter(["total_quantity", "geoip.city_name", "customer_birth_date", "day_of_week", "taxful_total_price"])
>>> ed_df.mode(numeric_only=False)
total_quantity geoip.city_name customer_birth_date day_of_week taxful_total_price
0 2 New York NaT Thursday 53.98

>>> ed_df.mode(numeric_only=True)
total_quantity taxful_total_price
0 2 53.98

>>> ed_df = ed_ecommerce.filter(["products.tax_amount","order_date"])
>>> ed_df.mode()
products.tax_amount order_date
0 0.0 2016-12-02 20:36:58
1 NaN 2016-12-04 23:44:10
2 NaN 2016-12-08 06:21:36
3 NaN 2016-12-08 09:38:53
4 NaN 2016-12-12 11:38:24
5 NaN 2016-12-12 19:46:34
6 NaN 2016-12-14 18:00:00
7 NaN 2016-12-15 11:38:24
8 NaN 2016-12-22 19:39:22
9 NaN 2016-12-24 06:21:36

>>> ed_df.mode(es_size = 3)
products.tax_amount order_date
0 0.0 2016-12-02 20:36:58
1 NaN 2016-12-04 23:44:10
2 NaN 2016-12-08 06:21:36
"""
# TODO dropna=False
return self._query_compiler.mode(
numeric_only=numeric_only, dropna=True, is_dataframe=True, es_size=es_size
)

def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
Expand Down
8 changes: 6 additions & 2 deletions eland/field_mappings.py
Expand Up @@ -102,9 +102,13 @@ def is_es_agg_compatible(self, es_agg) -> bool:
# Except "median_absolute_deviation" which doesn't support bool
if es_agg == "median_absolute_deviation" and self.is_bool:
return False
# Cardinality and Count work for all types
# Cardinality, Count and mode work for all types
# Numerics and bools work for all aggs
if es_agg in ("cardinality", "value_count") or self.is_numeric or self.is_bool:
if (
es_agg in {"cardinality", "value_count", "mode"}
or self.is_numeric
or self.is_bool
):
return True
# Timestamps also work for 'min', 'max' and 'avg'
if es_agg in {"min", "max", "avg", "percentiles"} and self.is_timestamp:
Expand Down
3 changes: 3 additions & 0 deletions eland/groupby.py
Expand Up @@ -617,3 +617,6 @@ def count(self) -> "pd.DataFrame":
numeric_only=False,
is_dataframe_agg=False,
)

def mode(self) -> None:
raise NotImplementedError("Currently mode is not supported for groupby")
121 changes: 111 additions & 10 deletions eland/operations.py
Expand Up @@ -181,7 +181,7 @@ def _metric_agg_series(
dtype = "object"
return build_pd_series(results, index=results.keys(), dtype=dtype)

def value_counts(self, query_compiler, es_size):
def value_counts(self, query_compiler: "QueryCompiler", es_size: int) -> pd.Series:
return self._terms_aggs(query_compiler, "terms", es_size)

def hist(self, query_compiler, bins):
Expand All @@ -195,12 +195,54 @@ def aggs(self, query_compiler, pd_aggs, numeric_only=None) -> pd.DataFrame:
results, index=pd_aggs, dtype=(np.float64 if numeric_only else None)
)

def mode(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
is_dataframe: bool,
es_size: int,
numeric_only: bool = False,
dropna: bool = True,
) -> Union[pd.DataFrame, pd.Series]:

results = self._metric_aggs(
query_compiler,
pd_aggs=pd_aggs,
numeric_only=numeric_only,
dropna=dropna,
es_mode_size=es_size,
)

pd_dict: Dict[str, Any] = {}
row_diff: Optional[int] = None

if is_dataframe:
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
# If multiple values of mode is returned for a particular column
# find the maximum length and use that to fill dataframe with NaN/NaT
rows_len = max([len(value) for value in results.values()])
for key, values in results.items():
row_diff = rows_len - len(values)
# Convert np.ndarray to list
values = list(values)
if row_diff:
if isinstance(values[0], pd.Timestamp):
sethmlarson marked this conversation as resolved.
Show resolved Hide resolved
values.extend([pd.NaT] * row_diff)
else:
values.extend([np.NaN] * row_diff)
pd_dict[key] = values

return pd.DataFrame(pd_dict)
else:
return pd.DataFrame(results.values()).iloc[0].rename()

def _metric_aggs(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
numeric_only: Optional[bool] = None,
is_dataframe_agg: bool = False,
es_mode_size: Optional[int] = None,
dropna: bool = True,
) -> Dict[str, Any]:
"""
Used to calculate metric aggregations
Expand All @@ -216,6 +258,10 @@ def _metric_aggs(
return either all numeric values or NaN/NaT
is_dataframe_agg:
know if this method is called from single-agg or aggreagation method
es_mode_size:
number of rows to return when multiple mode values are present.
dropna:
drop NaN/NaT for a dataframe

Returns
-------
Expand Down Expand Up @@ -252,6 +298,15 @@ def _metric_aggs(
es_agg[0],
field.aggregatable_es_field_name,
)
elif es_agg == "mode":
# TODO for dropna=False, Check If field is timestamp or boolean or numeric,
# then use missing parameter for terms aggregation.
body.terms_aggs(
f"{es_agg}_{field.es_field_name}",
"terms",
field.aggregatable_es_field_name,
es_mode_size,
)
else:
body.metric_aggs(
f"{es_agg}_{field.es_field_name}",
Expand Down Expand Up @@ -280,7 +335,9 @@ def _metric_aggs(
is_dataframe_agg=is_dataframe_agg,
)

def _terms_aggs(self, query_compiler, func, es_size=None):
def _terms_aggs(
self, query_compiler: "QueryCompiler", func: str, es_size: int
) -> pd.Series:
"""
Parameters
----------
Expand Down Expand Up @@ -499,13 +556,43 @@ def _unpack_metric_aggs(
agg_value = np.sqrt(
(count / (count - 1.0)) * agg_value * agg_value
)
elif es_agg == "mode":
# For terms aggregation buckets are returned
# agg_value will be of type list
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["buckets"]
else:
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["value"]

if isinstance(agg_value, list):
# include top-terms in the result.
if not agg_value:
# If the all the documents for a field are empty
agg_value = [field.nan_value]
else:
max_doc_count = agg_value[0]["doc_count"]
# We need only keys which are equal to max_doc_count
# lesser values are ignored
agg_value = [
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
item["key"]
for item in agg_value
if item["doc_count"] == max_doc_count
]

# Maintain datatype by default because pandas does the same
# text are returned as-is
if field.is_bool or field.is_numeric:
agg_value = [
field.np_dtype.type(value) for value in agg_value
]

# Null usually means there were no results.
if agg_value is None or np.isnan(agg_value):
if not isinstance(agg_value, list) and (
agg_value is None or np.isnan(agg_value)
):
if is_dataframe_agg and not numeric_only:
agg_value = np.NaN
elif not is_dataframe_agg and numeric_only is False:
Expand All @@ -517,13 +604,22 @@ def _unpack_metric_aggs(

# If this is a non-null timestamp field convert to a pd.Timestamp()
elif field.is_timestamp:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
if isinstance(agg_value, list):
# convert to timestamp results for mode
agg_value = [
elasticsearch_date_to_pandas_date(
value, field.es_date_format
)
for value in agg_value
]
else:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum"}:
if pd_agg in {"max", "min", "median", "sum", "mode"}:
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
# 'sum' isn't representable with bool, use int64
if pd_agg == "sum" and field.is_bool:
agg_value = np.int64(agg_value)
Expand Down Expand Up @@ -791,10 +887,15 @@ def _map_pd_aggs_to_es_aggs(pd_aggs):
elif pd_agg == "median":
es_aggs.append(("percentiles", "50.0"))

# Not implemented
elif pd_agg == "mode":
# We could do this via top term
raise NotImplementedError(pd_agg, " not currently implemented")
if len(pd_aggs) != 1:
raise NotImplementedError(
"Currently mode is not supported in df.agg[...]. Try df.mode()"
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
)
else:
es_aggs.append("mode")

# Not implemented
elif pd_agg == "quantile":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
Expand Down
19 changes: 16 additions & 3 deletions eland/query.py
Expand Up @@ -101,20 +101,33 @@ def regexp(self, field: str, value: str) -> None:
else:
self._query = self._query & Rlike(field, value)

def terms_aggs(self, name: str, func: str, field: str, es_size: int) -> None:
def terms_aggs(
self,
name: str,
func: str,
field: str,
es_size: Optional[int] = None,
missing: Optional[Any] = None,
) -> None:
"""
Add terms agg e.g

"aggs": {
"name": {
"terms": {
"field": "Airline",
"size": 10
"size": 10,
"missing": "null"
}
}
}
"""
agg = {func: {"field": field, "size": es_size}}
agg = {func: {"field": field}}
if es_size:
agg[func]["size"] = str(es_size)

if missing:
agg[func]["missing"] = missing
self._aggs[name] = agg

def metric_aggs(self, name: str, func: str, field: str) -> None:
Expand Down
18 changes: 17 additions & 1 deletion eland/query_compiler.py
Expand Up @@ -621,6 +621,22 @@ def nunique(self):
self, ["nunique"], numeric_only=False
)

def mode(
self,
es_size: int,
numeric_only: bool = False,
dropna: bool = True,
is_dataframe: bool = True,
) -> Union[pd.DataFrame, pd.Series]:
return self._operations.mode(
self,
pd_aggs=["mode"],
numeric_only=numeric_only,
dropna=dropna,
is_dataframe=is_dataframe,
es_size=es_size,
)

def aggs_groupby(
self,
by: List[str],
Expand All @@ -638,7 +654,7 @@ def aggs_groupby(
numeric_only=numeric_only,
)

def value_counts(self, es_size):
def value_counts(self, es_size: int) -> pd.Series:
return self._operations.value_counts(self, es_size)

def es_info(self, buf):
Expand Down