Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idx to DataFrame #353

Merged
merged 2 commits into from Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.idxmax.rst
@@ -0,0 +1,6 @@
eland.DataFrame.idxmax
========================

.. currentmodule:: eland

.. automethod:: DataFrame.idxmax
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.idxmin.rst
@@ -0,0 +1,6 @@
eland.DataFrame.idxmin
========================

.. currentmodule:: eland

.. automethod:: DataFrame.idxmin
2 changes: 2 additions & 0 deletions docs/sphinx/reference/dataframe.rst
Expand Up @@ -101,6 +101,8 @@ Computations / Descriptive Stats
DataFrame.nunique
DataFrame.mode
DataFrame.quantile
DataFrame.idxmax
DataFrame.idxmin

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
64 changes: 64 additions & 0 deletions eland/dataframe.py
Expand Up @@ -1738,6 +1738,70 @@ def quantile(
"""
return self._query_compiler.quantile(quantiles=q, numeric_only=numeric_only)

def idxmax(self, axis: int = 0) -> pd.Series:
"""
Return index of first occurrence of maximum over requested axis.

NA/null values are excluded.

Parameters
----------
axis : {0, 1}, default 0
The axis to filter on, expressed as index (int).

Returns
-------
pandas.Series

See Also
--------
:pandas_api_docs:`pandas.DataFrame.idxmax`

Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights')
>>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> ed_flights.idxmax()
AvgTicketPrice 1843
FlightDelayMin 109
dayOfWeek 1988
dtype: object

"""
return self._query_compiler.idx(axis=axis, sort_order="desc")

def idxmin(self, axis: int = 0) -> pd.Series:
"""
Return index of first occurrence of minimum over requested axis.

NA/null values are excluded.

Parameters
----------
axis : {0, 1}, default 0
The axis to filter on, expressed as index (int).

Returns
-------
pandas.Series

See Also
--------
:pandas_api_docs:`pandas.DataFrame.idxmin`

Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights')
>>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> ed_flights.idxmin()
AvgTicketPrice 5454
FlightDelayMin 0
dayOfWeek 0
dtype: object

"""
return self._query_compiler.idx(axis=axis, sort_order="asc")

def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
Expand Down
50 changes: 50 additions & 0 deletions eland/operations.py
Expand Up @@ -187,6 +187,56 @@ def value_counts(self, query_compiler: "QueryCompiler", es_size: int) -> pd.Seri
def hist(self, query_compiler, bins):
return self._hist_aggs(query_compiler, bins)

def idx(
self, query_compiler: "QueryCompiler", axis: int, sort_order: str
) -> pd.Series:

if axis == 1:
# Fetch idx on Columns
raise NotImplementedError(
"This feature is not implemented yet for 'axis = 1'"
)

# Fetch idx on Index
query_params, post_processing = self._resolve_tasks(query_compiler)

fields = query_compiler._mappings.all_source_fields()

# Consider only Numeric fields
fields = [field for field in fields if (field.is_numeric)]

body = Query(query_params.query)

for field in fields:
body.top_hits_agg(
name=f"top_hits_{field.es_field_name}",
source_columns=[field.es_field_name],
sort_order=sort_order,
size=1,
)

# Fetch Response
response = query_compiler._client.search(
index=query_compiler._index_pattern, size=0, body=body.to_search_body()
)
response = response["aggregations"]

results = {}
for field in fields:
res = response[f"top_hits_{field.es_field_name}"]["hits"]

if not res["total"]["value"] > 0:
raise ValueError("Empty Index with no rows")

if not res["hits"][0]["_source"]:
# This means there are NaN Values, we skip them
# Implement this when skipna is implemented
continue
else:
results[field.es_field_name] = res["hits"][0]["_id"]

return pd.Series(results)

def aggs(self, query_compiler, pd_aggs, numeric_only=None) -> pd.DataFrame:
results = self._metric_aggs(
query_compiler, pd_aggs, numeric_only=numeric_only, is_dataframe_agg=True
Expand Down
16 changes: 16 additions & 0 deletions eland/query.py
Expand Up @@ -163,6 +163,22 @@ def percentile_agg(self, name: str, field: str, percents: List[float]) -> None:
agg = {"percentiles": {"field": field, "percents": percents}}
self._aggs[name] = agg

def top_hits_agg(
self,
name: str,
source_columns: List[str],
sort_order: str,
size: int = 1,
) -> None:

top_hits: Any = {}
if sort_order:
top_hits["sort"] = [{i: {"order": sort_order}} for i in source_columns]
if source_columns:
top_hits["_source"] = {"includes": source_columns}
top_hits["size"] = size
self._aggs[name] = {"top_hits": top_hits}

def composite_agg_bucket_terms(self, name: str, field: str) -> None:
"""
Add terms agg for composite aggregation
Expand Down
3 changes: 3 additions & 0 deletions eland/query_compiler.py
Expand Up @@ -685,6 +685,9 @@ def aggs_groupby(
numeric_only=numeric_only,
)

def idx(self, axis: int, sort_order: str) -> pd.Series:
return self._operations.idx(self, axis=axis, sort_order=sort_order)

def value_counts(self, es_size: int) -> pd.Series:
return self._operations.value_counts(self, es_size)

Expand Down
11 changes: 6 additions & 5 deletions noxfile.py
Expand Up @@ -72,16 +72,17 @@ def lint(session):
for typed_file in TYPED_FILES:
if not os.path.isfile(typed_file):
session.error(f"The file {typed_file!r} couldn't be found")
popen = subprocess.Popen(
f"mypy --strict {typed_file}",
process = subprocess.run(
["mypy", "--strict", typed_file],
env=session.env,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
popen.wait()
# Ensure that mypy itself ran successfully
assert process.returncode in (0, 1)

errors = []
for line in popen.stdout.read().decode().split("\n"):
for line in process.stdout.decode().split("\n"):
filepath = line.partition(":")[0]
if filepath in TYPED_FILES:
errors.append(line)
Expand Down
24 changes: 24 additions & 0 deletions tests/dataframe/test_metrics_pytest.py
Expand Up @@ -498,3 +498,27 @@ def test_flights_agg_quantile(self, numeric_only):
assert_frame_equal(
pd_quantile, ed_quantile, check_exact=False, rtol=4, check_dtype=False
)

def test_flights_idx_on_index(self):
pd_flights = self.pd_flights().filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
)
ed_flights = self.ed_flights().filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
)

pd_idxmax = pd_flights.idxmax()
ed_idxmax = ed_flights.idxmax()
assert_series_equal(pd_idxmax, ed_idxmax)

pd_idxmin = pd_flights.idxmin()
ed_idxmin = ed_flights.idxmin()
assert_series_equal(pd_idxmin, ed_idxmin)

def test_flights_idx_on_columns(self):
match = "This feature is not implemented yet for 'axis = 1'"
with pytest.raises(NotImplementedError, match=match):
ed_flights = self.ed_flights().filter(
["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"]
)
ed_flights.idxmax(axis=1)