Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add value_counts. #63

Merged
merged 4 commits into from Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 35 additions & 3 deletions databricks/koala/structures.py
Expand Up @@ -23,7 +23,7 @@
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, Column
from pyspark.sql.types import StructType, to_arrow_type
from pyspark.sql.types import FloatType, DoubleType, StructType, to_arrow_type
from pyspark.sql.utils import AnalysisException

from . import namespace
Expand Down Expand Up @@ -260,6 +260,17 @@ def to_dataframe(self):
def toPandas(self):
return _col(self.to_dataframe().toPandas())

def isna(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

if isinstance(self.schema[self.name].dataType, (FloatType, DoubleType)):
return self.isNull() | F.isnan(self)
else:
return self.isNull()

isnull = isna

def notna(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a doc here when working on #64.

return ~self.isna()

@derived_from(pd.Series)
def dropna(self, axis=0, inplace=False, **kwargs):
col = _col(self.to_dataframe().dropna(axis=axis, inplace=False))
Expand All @@ -278,6 +289,28 @@ def unique(self):
# Pandas wants a series/array-like object
return _col(self.to_dataframe().unique())

@derived_from(pd.Series)
def value_counts(self, normalize=False, sort=True, ascending=False, bins=None, dropna=True):
if bins is not None:
raise NotImplementedError("value_counts currently does not support bins")

if dropna:
df_dropna = self.to_dataframe()._spark_filter(self.notna())
else:
df_dropna = self.to_dataframe()
df = df_dropna._spark_groupby(self).count()
if sort:
if ascending:
df = df._spark_orderBy(F._spark_col('count'))
else:
df = df._spark_orderBy(F._spark_col('count')._spark_desc())

if normalize:
sum = df_dropna._spark_count()
df = df._spark_withColumn('count', F._spark_col('count') / F._spark_lit(sum))

return _col(df.set_index([self.name]))

@property
def _pandas_anchor(self) -> DataFrame:
"""
Expand Down Expand Up @@ -535,8 +568,7 @@ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False):
columns = list(self.columns)

cnt = reduce(lambda x, y: x + y,
[F._spark_when(F._spark_col(column)._spark_isNotNull(), 1)
._spark_otherwise(0)
[F._spark_when(self[column].notna(), 1)._spark_otherwise(0)
for column in columns],
F._spark_lit(0))
if thresh is not None:
Expand Down
18 changes: 18 additions & 0 deletions databricks/koala/tests/test_dataframe.py
Expand Up @@ -247,6 +247,24 @@ def test_dropna(self):
with self.assertRaisesRegex(NotImplementedError, msg):
ddf.dropna(axis='foo')

def test_value_counts(self):
df = pd.DataFrame({'x': [1, 2, 1, 3, 3, np.nan, 1, 4]})
ddf = self.spark.from_pandas(df)

self.assertPandasAlmostEqual(ddf.x.value_counts().toPandas(), df.x.value_counts())
self.assertPandasAlmostEqual(ddf.x.value_counts(normalize=True).toPandas(),
df.x.value_counts(normalize=True))
self.assertPandasAlmostEqual(ddf.x.value_counts(ascending=True).toPandas(),
df.x.value_counts(ascending=True))
self.assertPandasAlmostEqual(ddf.x.value_counts(normalize=True, dropna=False).toPandas(),
df.x.value_counts(normalize=True, dropna=False))
self.assertPandasAlmostEqual(ddf.x.value_counts(ascending=True, dropna=False).toPandas(),
df.x.value_counts(ascending=True, dropna=False))

with self.assertRaisesRegex(NotImplementedError,
"value_counts currently does not support bins"):
ddf.x.value_counts(bins=3)

def test_to_datetime(self):
df = pd.DataFrame({'year': [2015, 2016],
'month': [2, 3],
Expand Down