Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Support for Series.value_counts() #49

Merged
merged 13 commits into from
Nov 19, 2019
3 changes: 2 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@

extlinks = {
'pandas_api_docs': ('https://pandas.pydata.org/pandas-docs/version/0.25.1/reference/api/%s.html', ''),
'pandas_user_guide': ('https://pandas.pydata.org/pandas-docs/stable/user_guide/%s.html', 'Pandas User Guide/')
'pandas_user_guide': ('https://pandas.pydata.org/pandas-docs/stable/user_guide/%s.html', 'Pandas User Guide/'),
'es_api_docs': ('https://www.elastic.co/guide/en/elasticsearch/reference/current/%s.html', '')
}

numpydoc_attributes_as_param_list = False
Expand Down
3 changes: 2 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ In general, the data resides in elasticsearch and not in memory, which allows el
* :doc:`reference/io`
* :doc:`reference/general_utility_functions`
* :doc:`reference/dataframe`
* :doc:`reference/series`
* :doc:`reference/index`
* :doc:`reference/indexing`

* :doc:`implementation/index`

* :doc:`implementation/details`
* :doc:`implementation/dataframe_supported`

6 changes: 6 additions & 0 deletions docs/source/reference/api/eland.Series.value_counts.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.Series.value_counts
===========================

.. currentmodule:: eland

.. automethod:: Series.value_counts
1 change: 1 addition & 0 deletions docs/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ methods. All classes and functions exposed in ``eland.*`` namespace are public.
io
general_utility_functions
dataframe
series
indexing
13 changes: 13 additions & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.. _api.series:

=========
Series
=========
.. currentmodule:: eland

Computations / descriptive stats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/

Series.value_counts
2 changes: 1 addition & 1 deletion eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ def _getitem(self, key):

def _getitem_column(self, key):
if key not in self.columns:
raise KeyError("Requested column is not in the DataFrame {}".format(key))
raise KeyError("Requested column [{}] is not in the DataFrame.".format(key))
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
return s

Expand Down
69 changes: 55 additions & 14 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,27 @@ def min(self, query_compiler):
return self._metric_aggs(query_compiler, 'min')

def nunique(self, query_compiler):
return self._terms_aggs(query_compiler, 'cardinality')
return self._metric_aggs(query_compiler, 'cardinality', field_types='aggregatable')

def value_counts(self, query_compiler, es_size):
return self._terms_aggs(query_compiler, 'terms', es_size)

def hist(self, query_compiler, bins):
return self._hist_aggs(query_compiler, bins)

def _metric_aggs(self, query_compiler, func):
def _metric_aggs(self, query_compiler, func, field_types=None):
"""
Parameters
----------
field_types: str, default None
if `aggregatable` use only columns whose fields in elasticseach are aggregatable.
If `None`, use only numeric fields.

Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
"""
query_params, post_processing = self._resolve_tasks()

size = self._size(query_params, post_processing)
Expand All @@ -144,11 +159,17 @@ def _metric_aggs(self, query_compiler, func):

columns = self.get_columns()

numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)

body = Query(query_params['query'])

for field in numeric_source_fields:
# some metrics aggs (including cardinality) work on all aggregatable fields
# therefore we include an optional all parameter on operations
# that call _metric_aggs
if field_types=='aggregatable':
source_fields = query_compiler._mappings.aggregatable_columns(columns)
else:
source_fields = query_compiler._mappings.numeric_source_fields(columns)

for field in source_fields:
body.metric_aggs(field, func, field)

response = query_compiler._client.search(
Expand All @@ -164,18 +185,32 @@ def _metric_aggs(self, query_compiler, func):
# }
results = {}

for field in numeric_source_fields:
results[field] = response['aggregations'][field]['value']
if field_types=='aggregatable':
for key, value in source_fields.items():
results[value] = response['aggregations'][key]['value']
else:
for field in source_fields:
results[field] = response['aggregations'][field]['value']

# Return single value if this is a series
# if len(numeric_source_fields) == 1:
# return np.float64(results[numeric_source_fields[0]])

s = pd.Series(data=results, index=numeric_source_fields)
s = pd.Series(data=results, index=results.keys())

return s

def _terms_aggs(self, query_compiler, func):
def _terms_aggs(self, query_compiler, func, es_size=None):
"""
Parameters
----------
es_size: int, default None
Parameter used by Series.value_counts()

Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
"""
query_params, post_processing = self._resolve_tasks()

size = self._size(query_params, post_processing)
Expand All @@ -190,7 +225,7 @@ def _terms_aggs(self, query_compiler, func):
body = Query(query_params['query'])

for field in aggregatable_columns.keys():
body.metric_aggs(field, func, field)
body.terms_aggs(field, func, field, es_size=es_size)

response = query_compiler._client.search(
index=query_compiler._index_pattern,
Expand All @@ -200,9 +235,15 @@ def _terms_aggs(self, query_compiler, func):
results = {}

for key, value in aggregatable_columns.items():
results[value] = response['aggregations'][key]['value']
for bucket in response['aggregations'][columns[0]]['buckets']:
results[bucket['key']] = bucket['doc_count']

s = pd.Series(data=results, index=results.keys())
try:
name = columns[0]
except IndexError:
name = None

s = pd.Series(data=results, index=results.keys(), name=name)

return s

Expand Down Expand Up @@ -379,7 +420,7 @@ def aggs(self, query_compiler, pd_aggs):

"""
Results are like (for 'sum', 'min')

AvgTicketPrice DistanceKilometers DistanceMiles FlightDelayMin
sum 8.204365e+06 9.261629e+07 5.754909e+07 618150
min 1.000205e+02 0.000000e+00 0.000000e+00 0
Expand Down
21 changes: 21 additions & 0 deletions eland/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ def terms(self, field, items, must=True):
else:
self._query = self._query & ~(IsIn(field, items))

def terms_aggs(self, name, func, field, es_size):
"""
Add terms agg e.g

"aggs": {
"name": {
"terms": {
"field": "Airline",
"size": 10
}
}
}
"""
agg = {
func: {
"field": field,
"size": es_size
}
}
self._aggs[name] = agg

def metric_aggs(self, name, func, field):
"""
Add metric agg e.g
Expand Down
3 changes: 3 additions & 0 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ def max(self):
def nunique(self):
return self._operations.nunique(self)

def value_counts(self, es_size):
return self._operations.value_counts(self, es_size)

def info_es(self, buf):
buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern))

Expand Down
42 changes: 42 additions & 0 deletions eland/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,48 @@ def head(self, n=5):
def tail(self, n=5):
return Series(query_compiler=self._query_compiler.tail(n))

def value_counts(self, es_size=10):
"""
Return the value counts for the specified field.

**Note we can only do this for aggregatable Elasticsearch fields - (in general) numeric and keyword rather than text fields**

TODO - implement remainder of pandas arguments

Parameters
----------
es_size: int, default 10
Number of buckets to return counts for, automatically sorts by count descending.
This parameter is specific to `eland`, and determines how many term buckets
elasticsearch should return out of the overall terms list.

Returns
-------
pandas.Series
number of occurences of each value in the column

See Also
--------
:pandas_api_docs:`pandas.Series.value_counts`
:es_api_docs:`search-aggregations-bucket-terms-aggregation`

Examples
--------
>>> df = ed.DataFrame('localhost', 'flights')
>>> df['Carrier'].value_counts()
Logstash Airways 3331
JetBeats 3274
Kibana Airlines 3234
ES-Air 3220
Name: Carrier, dtype: int64
"""
if not isinstance(es_size, int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether we should do defensive programming here is probably something to discuss - this shouldn't block the PR though + we should use type hints to avoid this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought it would be best to "fail fast" in the top level eland client code instead of passing something to the lower level transport mechanisms and waiting for a cryptic exception to bubble up from there or ES itself.

raise TypeError("es_size must be a positive integer.")
if not es_size>0:
raise ValueError("es_size must be a positive integer.")

return self._query_compiler.value_counts(es_size)

# ----------------------------------------------------------------------
# Rendering Methods
def __repr__(self):
Expand Down
47 changes: 47 additions & 0 deletions eland/tests/series/test_value_counts_pytest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# File called _pytest for PyCharm compatability
import eland as ed
from eland.tests.common import TestData
from pandas.util.testing import assert_series_equal
import pytest


class TestSeriesValueCounts(TestData):

def test_value_counts(self):
pd_s = self.pd_flights()['Carrier']
ed_s = self.ed_flights()['Carrier']

pd_vc = pd_s.value_counts()
ed_vc = ed_s.value_counts()

assert_series_equal(pd_vc, ed_vc)

def test_value_counts_size(self):
pd_s = self.pd_flights()['Carrier']
ed_s = self.ed_flights()['Carrier']

pd_vc = pd_s.value_counts()[:1]
ed_vc = ed_s.value_counts(es_size=1)

assert_series_equal(pd_vc, ed_vc)

def test_value_counts_keyerror(self):
ed_f = self.ed_flights()
with pytest.raises(KeyError):
assert ed_f['not_a_column'].value_counts()

def test_value_counts_dataframe(self):
# value_counts() is a series method, should raise AttributeError if called on a DataFrame
ed_f = self.ed_flights()
with pytest.raises(AttributeError):
assert ed_f.value_counts()

def test_value_counts_non_int(self):
ed_s = self.ed_flights()['Carrier']
with pytest.raises(TypeError):
assert ed_s.value_counts(es_size='foo')

def test_value_counts_non_positive_int(self):
ed_s = self.ed_flights()['Carrier']
with pytest.raises(ValueError):
assert ed_s.value_counts(es_size=-9)