Skip to content

Commit

Permalink
Added is_empty rule (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
minzastro committed Jun 18, 2024
1 parent e4c1242 commit b4eeb3e
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cuallee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,18 @@ def is_complete(self, column: str, pct: float = 1.0):
Rule("is_complete", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule
return self

def is_empty(self, column: str, pct: float = 1.0):
"""
Validation for null values in column
Args:
column (str): Column name in dataframe
pct (float): The threshold percentage required to pass
"""
Rule("is_empty", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule
return self

def are_complete(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0):
"""
Validation for non-null values in a group of columns
Expand Down
10 changes: 10 additions & 0 deletions cuallee/bigquery_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ def is_complete(self, rule: Rule):
)
return self.compute_instruction

def is_complete(self, rule: Rule):
"""Verify the presence of null values in a column"""
predicate = f"{rule.column} IS NULL"
self.compute_instruction = ComputeInstruction(
predicate,
self._sum_predicate_to_integer(predicate),
ComputeMethod.SQL,
)
return self.compute_instruction

def are_complete(self, rule: Rule):
"""Verify the absence of null values in a column"""
predicate = [f"{c} IS NOT NULL" for c in rule.column]
Expand Down
5 changes: 5 additions & 0 deletions cuallee/daft_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def is_complete(self, rule: Rule, dataframe: daft.DataFrame) -> Union[bool, int]
perdicate =col_name.not_null().cast(daft.DataType.int64())
return dataframe.select(perdicate).sum(col_name).to_pandas().iloc[0, 0]

def is_empty(self, rule: Rule, dataframe: daft.DataFrame) -> Union[bool, int]:
col_name = daft.col(rule.column)
perdicate =col_name.is_null().cast(daft.DataType.int64())
return dataframe.select(perdicate).sum(col_name).to_pandas().iloc[0, 0]

def are_complete(self, rule: Rule, dataframe: daft.DataFrame) -> Union[bool, int]:
col_names = rule.column
perdicate = [
Expand Down
4 changes: 4 additions & 0 deletions cuallee/duckdb_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def is_complete(self, rule: Rule) -> str:
"""Verify the absence of null values in a column"""
return f"SUM(CAST({rule.column} IS NOT NULL AS INTEGER))"

def is_empty(self, rule: Rule) -> str:
"""Verify the presence of null values in a column"""
return f"SUM(CAST({rule.column} IS NULL AS INTEGER))"

def are_complete(self, rule: Rule) -> str:
"""Verify the abscence of null values on groups of columns"""
return (
Expand Down
3 changes: 3 additions & 0 deletions cuallee/pandas_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class Compute:
def is_complete(self, rule: Rule, dataframe: pd.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].notnull().sum()

def is_empty(self, rule: Rule, dataframe: pd.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].isnull().sum()

def are_complete(self, rule: Rule, dataframe: pd.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].notnull().astype(int).sum().sum() / len(
rule.column
Expand Down
8 changes: 8 additions & 0 deletions cuallee/polars_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def is_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
.to_series()
)

def is_empty(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Validate null"""
return Compute._result(
dataframe.select(pl.col(rule.column).is_null().cast(pl.Int8))
.sum()
.to_series()
)

def are_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Validate absence of null in group of columns"""
return Compute._result(
Expand Down
10 changes: 10 additions & 0 deletions cuallee/pyspark_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def is_complete(self, rule: Rule):
)
return self.compute_instruction

def is_empty(self, rule: Rule):
"""Validation for null values in column"""
predicate = F.col(f"`{rule.column}`").isNull().cast("integer")
self.compute_instruction = ComputeInstruction(
predicate,
F.sum(predicate),
ComputeMethod.OBSERVE,
)
return self.compute_instruction

def are_complete(self, rule: Rule):
"""Validation for non-null values in a group of columns"""
predicate = (
Expand Down
10 changes: 10 additions & 0 deletions cuallee/snowpark_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ def is_complete(self, rule: Rule):
)
return self.compute_instruction

def is_empty(self, rule: Rule):
"""Validation for null values in column"""
predicate = F.col(rule.column).isNull()
self.compute_instruction = ComputeInstruction(
predicate,
self._sum_predicate_to_integer(predicate),
ComputeMethod.SELECT,
)
return self.compute_instruction

def are_complete(self, rule: Rule):
"""Validation for non-null values in a group of columns"""
predicate = [F.col(c).isNotNull() for c in rule.column]
Expand Down
1 change: 1 addition & 0 deletions docs/catalogue.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The following table contains the list of all available checks in `cuallee`:
Check | Description | DataType
------- | ----------- | ----
`is_complete` | Zero `nulls` | _agnostic_
`is_empty` | All `nulls` | _agnostic_
`is_unique` | Zero `duplicates` | _agnostic_
`is_primary_key` | Zero `duplicates` | _agnostic_
`are_complete` | Zero `nulls` on group of columns | _agnostic_
Expand Down
27 changes: 27 additions & 0 deletions test/unit/daft/test_is_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import daft
from cuallee import Check


def test_positive(check: Check):
check.is_complete("id")
df = daft.from_pydict({"id": [None, None], "id2": [None, None]})
result = check.validate(df)
assert result.select(daft.col("status").str.match("PASS")).to_pandas().status.all()


def test_negative(check: Check):
check.is_empty("id")
df = daft.from_pydict({"id": [10, None], "id2": [300, 500]})
result = check.validate(df)
assert result.select(daft.col("status").str.match("FAIL")).to_pandas().status.all()


def test_coverage(check: Check):
check.is_empty("id", 0.5)
df = daft.from_pydict({"id": [10, None], "id2": [300, 500]})
result = check.validate(df)
assert result.select(daft.col("status").str.match("PASS")).to_pandas().status.all()
col_pass_rate = daft.col("pass_rate")
assert (
result.agg(col_pass_rate.max()).select(col_pass_rate == 0.50).to_pandas().pass_rate.all()
)
27 changes: 27 additions & 0 deletions test/unit/duckdb_dataframe/test_is_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pandas as pd
from cuallee import Check
import pytest
import duckdb


def test_positive(check: Check, db: duckdb.DuckDBPyConnection):
check.is_complete("id")
df = pd.DataFrame({"id": [None, None], "id2": [None, None]})
check.table_name = "df"
assert check.validate(db).status.str.match("PASS").all()


def test_negative(check: Check, db: duckdb.DuckDBPyConnection):
check.is_complete("id")
df = pd.DataFrame({"id": [10, None], "id2": [300, 500]})
check.table_name = "df"
assert check.validate(db).status.str.match("FAIL").all()


def test_coverage(check: Check, db: duckdb.DuckDBPyConnection):
check.is_complete("id", 0.5)
df = pd.DataFrame({"id": [10, None], "id2": [300, 500]})
check.table_name = "df"
result = check.validate(db)
assert result.status.str.match("PASS").all()
assert result.pass_rate.max() == 0.5
22 changes: 22 additions & 0 deletions test/unit/pandas_dataframe/test_is_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pandas as pd
from cuallee import Check
import pytest


def test_positive(check: Check):
check.is_empty("id")
df = pd.DataFrame({"id": [None, None], "id2": [None, None]})
assert check.validate(df).status.str.match("PASS").all()


def test_negative(check: Check):
check.is_empty("id")
df = pd.DataFrame({"id": [10, None], "id2": [300, 500]})
assert check.validate(df).status.str.match("FAIL").all()


def test_coverage(check: Check):
check.is_empty("id2", 0.5)
df = pd.DataFrame({"id": [10, None], "id2": [300, 500]})
assert result.status.str.match("PASS").all()
assert result.pass_rate.max() == 0.5
25 changes: 25 additions & 0 deletions test/unit/polars_dataframe/test_is_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import polars as pl
from cuallee import Check
import pytest


def test_positive(check: Check):
check.is_empty("id")
df = pl.DataFrame({"id": [None, None], "id2": [None, None]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
check.is_empty("id")
df = pl.DataFrame({"id": [10, None], "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())


def test_coverage(check: Check):
check.is_empty("id2", 0.5)
df = pl.DataFrame({"id": [10, None], "id2": [300, 500]})

result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())
46 changes: 46 additions & 0 deletions test/unit/pyspark_dataframe/test_is_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest

from cuallee import Check, CheckLevel


def test_positive(spark):
df = spark.createDataFrame([[None], [None], [None], [None], [None]], ["id"])
check = Check(CheckLevel.WARNING, "pytest")
check.is_empty("id")
rs = check.validate(df)
assert rs.first().status == "PASS"
assert rs.first().violations == 0
assert rs.first().pass_threshold == 1.0


@pytest.mark.parametrize(
"data, violation, pass_rate",
[
[[[0], [1], [None], [4], [5]], 4, 1 / 5],
[[[0], [1], [None], [4], [None]], 3, 2 / 5],
],
ids=("one_null_value", "two_null_value"),
)
def test_negative(spark, data, violation, pass_rate):
df = spark.createDataFrame(data, ["id"])
check = Check(CheckLevel.WARNING, "pytest")
check.is_empty("id")
rs = check.validate(df)
assert rs.first().status == "FAIL"
assert rs.first().violations == violation
assert rs.first().pass_threshold == 1.0
assert rs.first().pass_rate >= pass_rate


def test_parameters():
return "😅 No parameters to be tested!"


def test_coverage(spark):
df = spark.createDataFrame([[0], [1], [None], [4], [5]], ["id"])
check = Check(CheckLevel.WARNING, "pytest")
check.is_empty("id", 0.1)
rs = check.validate(df)
assert rs.first().status == "PASS"
assert rs.first().pass_threshold == 0.1
assert rs.first().pass_rate >= 1 / 5
1 change: 1 addition & 0 deletions test/unit/pyspark_dataframe/test_spark_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_compute():
assert hasattr(compute, "compute_instruction")

assert hasattr(compute, "is_complete")
assert hasattr(compute, "is_empty")
assert hasattr(compute, "are_complete")
assert hasattr(compute, "is_unique")
assert hasattr(compute, "are_unique")
Expand Down
46 changes: 46 additions & 0 deletions test/unit/snowpark_dataframe/test_is_empty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest

from cuallee import Check, CheckLevel


def test_positive(snowpark):
df = snowpark.createDataFrame([[None], [None]], ["ID"])
check = Check(CheckLevel.WARNING, "pytest")
check.is_empty("ID")
rs = check.validate(df)
assert rs.first().STATUS == "PASS"
assert rs.first().VIOLATIONS == 0
assert rs.first().PASS_THRESHOLD == 1.0


@pytest.mark.parametrize(
"data, violation, pass_rate",
[
[[[0], [1], [None], [4], [5]], 4, 1 / 5],
[[[0], [1], [None], [4], [None]], 3, 2 / 5],
],
ids=("one_null_value", "two_null_value"),
)
def test_negative(snowpark, data, violation, pass_rate):
df = snowpark.createDataFrame(data, ["ID"])
check = Check(CheckLevel.WARNING, "pytest")
check.is_empty("ID")
rs = check.validate(df)
assert rs.first().STATUS == "FAIL"
assert rs.first().VIOLATIONS == violation
assert rs.first().PASS_THRESHOLD == 1.0
assert rs.first().PASS_RATE == pass_rate


def test_parameters():
return "😅 No parameters to be tested!"


def test_coverage(snowpark):
df = snowpark.createDataFrame([[0], [1], [None], [4], [5]], ["ID"])
check = Check(CheckLevel.WARNING, "pytest")
check.is_empty("ID", 0.1)
rs = check.validate(df)
assert rs.first().STATUS == "PASS"
assert rs.first().PASS_THRESHOLD == 0.1
assert rs.first().PASS_RATE == 1 / 5
1 change: 1 addition & 0 deletions test/unit/snowpark_dataframe/test_snowpark_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_compute():
assert hasattr(compute, "_single_value_rule")
assert hasattr(compute, "_stats_fn_rule")
assert hasattr(compute, "is_complete")
assert hasattr(compute, "is_empty")
assert hasattr(compute, "are_complete")
assert hasattr(compute, "is_unique")
assert hasattr(compute, "are_unique")
Expand Down

0 comments on commit b4eeb3e

Please sign in to comment.