Skip to content

Commit

Permalink
feat: Dask backend for NotNull stmt
Browse files Browse the repository at this point in the history
  • Loading branch information
bressanmarcos committed Jul 19, 2022
1 parent a320201 commit b3fb691
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions deirokay/statements/builtin/not_null.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Statement to check the number of not-null rows in a scope.
"""
import dask.dataframe # lazy module
import pandas # lazy module

from deirokay._typing import DeirokayStatement
Expand Down Expand Up @@ -95,7 +96,7 @@ class NotNull(BaseStatement):

name = 'not_null'
expected_parameters = ['at_least_%', 'at_most_%', 'multicolumn_logic']
supported_backends = [Backend.PANDAS]
supported_backends = [Backend.PANDAS, Backend.DASK]

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand All @@ -106,21 +107,29 @@ def __init__(self, *args, **kwargs) -> None:

assert self.multicolumn_logic in ('any', 'all')

@report(Backend.PANDAS)
def _report_pandas(self, df: 'pandas.DataFrame') -> dict:
def _report_common(self, df):
if self.multicolumn_logic == 'all':
# REMINDER: ~all == any
not_nulls = ~df.isnull().any(axis=1)
else:
not_nulls = ~df.isnull().all(axis=1)

report = {
'null_rows': int((~not_nulls).sum()),
'null_rows_%': float(100.0*(~not_nulls).sum()/len(not_nulls)),
'not_null_rows': int(not_nulls.sum()),
'not_null_rows_%': float(100.0*not_nulls.sum()/len(not_nulls)),
null_rows = int(sum(~not_nulls))
not_null_rows = int(sum(not_nulls))
return {
'null_rows': null_rows,
'null_rows_%': 100.0*null_rows/len(df),
'not_null_rows': not_null_rows,
'not_null_rows_%': 100.0*not_null_rows/len(df),
}
return report

@report(Backend.PANDAS)
def _report_pandas(self, df: 'pandas.DataFrame') -> dict:
return self._report_common(df)

@report(Backend.DASK)
def _report_dask(self, df: 'dask.dataframe.DataFrame') -> dict:
return self._report_common(df)

# docstr-coverage:inherited
def result(self, report: dict) -> bool:
Expand All @@ -130,16 +139,14 @@ def result(self, report: dict) -> bool:
return False
return True

@profile(Backend.PANDAS)
@staticmethod
def _profile_pandas(df: 'pandas.DataFrame') -> DeirokayStatement:
not_nulls = ~df.isnull().all(axis=1)

def _profile_common(df):
statement = {
'type': 'not_null'
} # type: DeirokayStatement

at_least_perc = float(100.0*not_nulls.sum()/len(not_nulls))
not_nulls = ~df.isnull().all(axis=1)
at_least_perc = float(100.0*sum(not_nulls)/len(not_nulls))

if at_least_perc == 0.0:
raise NotImplementedError(
Expand All @@ -150,3 +157,13 @@ def _profile_pandas(df: 'pandas.DataFrame') -> DeirokayStatement:
statement['at_least_%'] = at_least_perc

return statement

@profile(Backend.PANDAS)
@staticmethod
def _profile_pandas(df: 'pandas.DataFrame') -> DeirokayStatement:
return NotNull._profile_common(df)

@profile(Backend.DASK)
@staticmethod
def _profile_dask(df: 'dask.dataframe.DataFrame') -> DeirokayStatement:
return NotNull._profile_common(df)

0 comments on commit b3fb691

Please sign in to comment.