diff --git a/cuallee/__init__.py b/cuallee/__init__.py index ccfc1bd8..24fed90a 100644 --- a/cuallee/__init__.py +++ b/cuallee/__init__.py @@ -353,6 +353,18 @@ def is_complete(self, column: str, pct: float = 1.0): Rule("is_complete", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self + def is_empty(self, column: str, pct: float = 1.0): + """ + Validation for null values in column + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + + """ + Rule("is_empty", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule + return self + def are_complete(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0): """ Validation for non-null values in a group of columns diff --git a/cuallee/bigquery_validation.py b/cuallee/bigquery_validation.py index 8e7cf383..c2cef63d 100644 --- a/cuallee/bigquery_validation.py +++ b/cuallee/bigquery_validation.py @@ -44,6 +44,16 @@ def is_complete(self, rule: Rule): ) return self.compute_instruction + def is_complete(self, rule: Rule): + """Verify the presence of null values in a column""" + predicate = f"{rule.column} IS NULL" + self.compute_instruction = ComputeInstruction( + predicate, + self._sum_predicate_to_integer(predicate), + ComputeMethod.SQL, + ) + return self.compute_instruction + def are_complete(self, rule: Rule): """Verify the absence of null values in a column""" predicate = [f"{c} IS NOT NULL" for c in rule.column] diff --git a/cuallee/daft_validation.py b/cuallee/daft_validation.py index abf2713d..57f26583 100644 --- a/cuallee/daft_validation.py +++ b/cuallee/daft_validation.py @@ -23,6 +23,11 @@ def is_complete(self, rule: Rule, dataframe: daft.DataFrame) -> Union[bool, int] perdicate =col_name.not_null().cast(daft.DataType.int64()) return dataframe.select(perdicate).sum(col_name).to_pandas().iloc[0, 0] + def is_empty(self, rule: Rule, dataframe: daft.DataFrame) -> Union[bool, int]: + col_name = daft.col(rule.column) + perdicate =col_name.is_null().cast(daft.DataType.int64()) + return dataframe.select(perdicate).sum(col_name).to_pandas().iloc[0, 0] + def are_complete(self, rule: Rule, dataframe: daft.DataFrame) -> Union[bool, int]: col_names = rule.column perdicate = [ diff --git a/cuallee/duckdb_validation.py b/cuallee/duckdb_validation.py index 61632f49..c9369a2f 100644 --- a/cuallee/duckdb_validation.py +++ b/cuallee/duckdb_validation.py @@ -19,6 +19,10 @@ def is_complete(self, rule: Rule) -> str: """Verify the absence of null values in a column""" return f"SUM(CAST({rule.column} IS NOT NULL AS INTEGER))" + def is_empty(self, rule: Rule) -> str: + """Verify the presence of null values in a column""" + return f"SUM(CAST({rule.column} IS NULL AS INTEGER))" + def are_complete(self, rule: Rule) -> str: """Verify the abscence of null values on groups of columns""" return ( diff --git a/cuallee/pandas_validation.py b/cuallee/pandas_validation.py index 4ad8beef..8a6be21a 100644 --- a/cuallee/pandas_validation.py +++ b/cuallee/pandas_validation.py @@ -14,6 +14,9 @@ class Compute: def is_complete(self, rule: Rule, dataframe: pd.DataFrame) -> Union[bool, int]: return dataframe.loc[:, rule.column].notnull().sum() + def is_empty(self, rule: Rule, dataframe: pd.DataFrame) -> Union[bool, int]: + return dataframe.loc[:, rule.column].isnull().sum() + def are_complete(self, rule: Rule, dataframe: pd.DataFrame) -> Union[bool, int]: return dataframe.loc[:, rule.column].notnull().astype(int).sum().sum() / len( rule.column diff --git a/cuallee/polars_validation.py b/cuallee/polars_validation.py index d0ede333..f8cb0ff2 100644 --- a/cuallee/polars_validation.py +++ b/cuallee/polars_validation.py @@ -33,6 +33,14 @@ def is_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: .to_series() ) + def is_empty(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: + """Validate null""" + return Compute._result( + dataframe.select(pl.col(rule.column).is_null().cast(pl.Int8)) + .sum() + .to_series() + ) + def are_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]: """Validate absence of null in group of columns""" return Compute._result( diff --git a/cuallee/pyspark_validation.py b/cuallee/pyspark_validation.py index 9d491515..52356a83 100644 --- a/cuallee/pyspark_validation.py +++ b/cuallee/pyspark_validation.py @@ -59,6 +59,16 @@ def is_complete(self, rule: Rule): ) return self.compute_instruction + def is_empty(self, rule: Rule): + """Validation for null values in column""" + predicate = F.col(f"`{rule.column}`").isNull().cast("integer") + self.compute_instruction = ComputeInstruction( + predicate, + F.sum(predicate), + ComputeMethod.OBSERVE, + ) + return self.compute_instruction + def are_complete(self, rule: Rule): """Validation for non-null values in a group of columns""" predicate = ( diff --git a/cuallee/snowpark_validation.py b/cuallee/snowpark_validation.py index 50ef94c2..1f0c116c 100644 --- a/cuallee/snowpark_validation.py +++ b/cuallee/snowpark_validation.py @@ -74,6 +74,16 @@ def is_complete(self, rule: Rule): ) return self.compute_instruction + def is_empty(self, rule: Rule): + """Validation for null values in column""" + predicate = F.col(rule.column).isNull() + self.compute_instruction = ComputeInstruction( + predicate, + self._sum_predicate_to_integer(predicate), + ComputeMethod.SELECT, + ) + return self.compute_instruction + def are_complete(self, rule: Rule): """Validation for non-null values in a group of columns""" predicate = [F.col(c).isNotNull() for c in rule.column] diff --git a/docs/catalogue.md b/docs/catalogue.md index 4c1a4417..46692509 100644 --- a/docs/catalogue.md +++ b/docs/catalogue.md @@ -7,6 +7,7 @@ The following table contains the list of all available checks in `cuallee`: Check | Description | DataType ------- | ----------- | ---- `is_complete` | Zero `nulls` | _agnostic_ +`is_empty` | All `nulls` | _agnostic_ `is_unique` | Zero `duplicates` | _agnostic_ `is_primary_key` | Zero `duplicates` | _agnostic_ `are_complete` | Zero `nulls` on group of columns | _agnostic_ diff --git a/test/unit/daft/test_is_empty.py b/test/unit/daft/test_is_empty.py new file mode 100644 index 00000000..322ae313 --- /dev/null +++ b/test/unit/daft/test_is_empty.py @@ -0,0 +1,27 @@ +import daft +from cuallee import Check + + +def test_positive(check: Check): + check.is_complete("id") + df = daft.from_pydict({"id": [None, None], "id2": [None, None]}) + result = check.validate(df) + assert result.select(daft.col("status").str.match("PASS")).to_pandas().status.all() + + +def test_negative(check: Check): + check.is_empty("id") + df = daft.from_pydict({"id": [10, None], "id2": [300, 500]}) + result = check.validate(df) + assert result.select(daft.col("status").str.match("FAIL")).to_pandas().status.all() + + +def test_coverage(check: Check): + check.is_empty("id", 0.5) + df = daft.from_pydict({"id": [10, None], "id2": [300, 500]}) + result = check.validate(df) + assert result.select(daft.col("status").str.match("PASS")).to_pandas().status.all() + col_pass_rate = daft.col("pass_rate") + assert ( + result.agg(col_pass_rate.max()).select(col_pass_rate == 0.50).to_pandas().pass_rate.all() + ) diff --git a/test/unit/duckdb_dataframe/test_is_empty.py b/test/unit/duckdb_dataframe/test_is_empty.py new file mode 100644 index 00000000..5ed86483 --- /dev/null +++ b/test/unit/duckdb_dataframe/test_is_empty.py @@ -0,0 +1,27 @@ +import pandas as pd +from cuallee import Check +import pytest +import duckdb + + +def test_positive(check: Check, db: duckdb.DuckDBPyConnection): + check.is_complete("id") + df = pd.DataFrame({"id": [None, None], "id2": [None, None]}) + check.table_name = "df" + assert check.validate(db).status.str.match("PASS").all() + + +def test_negative(check: Check, db: duckdb.DuckDBPyConnection): + check.is_complete("id") + df = pd.DataFrame({"id": [10, None], "id2": [300, 500]}) + check.table_name = "df" + assert check.validate(db).status.str.match("FAIL").all() + + +def test_coverage(check: Check, db: duckdb.DuckDBPyConnection): + check.is_complete("id", 0.5) + df = pd.DataFrame({"id": [10, None], "id2": [300, 500]}) + check.table_name = "df" + result = check.validate(db) + assert result.status.str.match("PASS").all() + assert result.pass_rate.max() == 0.5 diff --git a/test/unit/pandas_dataframe/test_is_empty.py b/test/unit/pandas_dataframe/test_is_empty.py new file mode 100644 index 00000000..acd0d8c6 --- /dev/null +++ b/test/unit/pandas_dataframe/test_is_empty.py @@ -0,0 +1,22 @@ +import pandas as pd +from cuallee import Check +import pytest + + +def test_positive(check: Check): + check.is_empty("id") + df = pd.DataFrame({"id": [None, None], "id2": [None, None]}) + assert check.validate(df).status.str.match("PASS").all() + + +def test_negative(check: Check): + check.is_empty("id") + df = pd.DataFrame({"id": [10, None], "id2": [300, 500]}) + assert check.validate(df).status.str.match("FAIL").all() + + +def test_coverage(check: Check): + check.is_empty("id2", 0.5) + df = pd.DataFrame({"id": [10, None], "id2": [300, 500]}) + assert result.status.str.match("PASS").all() + assert result.pass_rate.max() == 0.5 diff --git a/test/unit/polars_dataframe/test_is_empty.py b/test/unit/polars_dataframe/test_is_empty.py new file mode 100644 index 00000000..344eed32 --- /dev/null +++ b/test/unit/polars_dataframe/test_is_empty.py @@ -0,0 +1,25 @@ +import polars as pl +from cuallee import Check +import pytest + + +def test_positive(check: Check): + check.is_empty("id") + df = pl.DataFrame({"id": [None, None], "id2": [None, None]}) + result = check.validate(df).select(pl.col("status")) == "PASS" + assert all(result.to_series().to_list()) + + +def test_negative(check: Check): + check.is_empty("id") + df = pl.DataFrame({"id": [10, None], "id2": [300, 500]}) + result = check.validate(df).select(pl.col("status")) == "FAIL" + assert all(result.to_series().to_list()) + + +def test_coverage(check: Check): + check.is_empty("id2", 0.5) + df = pl.DataFrame({"id": [10, None], "id2": [300, 500]}) + + result = check.validate(df).select(pl.col("status")) == "PASS" + assert all(result.to_series().to_list()) diff --git a/test/unit/pyspark_dataframe/test_is_empty.py b/test/unit/pyspark_dataframe/test_is_empty.py new file mode 100644 index 00000000..a11dad37 --- /dev/null +++ b/test/unit/pyspark_dataframe/test_is_empty.py @@ -0,0 +1,46 @@ +import pytest + +from cuallee import Check, CheckLevel + + +def test_positive(spark): + df = spark.createDataFrame([[None], [None], [None], [None], [None]], ["id"]) + check = Check(CheckLevel.WARNING, "pytest") + check.is_empty("id") + rs = check.validate(df) + assert rs.first().status == "PASS" + assert rs.first().violations == 0 + assert rs.first().pass_threshold == 1.0 + + +@pytest.mark.parametrize( + "data, violation, pass_rate", + [ + [[[0], [1], [None], [4], [5]], 4, 1 / 5], + [[[0], [1], [None], [4], [None]], 3, 2 / 5], + ], + ids=("one_null_value", "two_null_value"), +) +def test_negative(spark, data, violation, pass_rate): + df = spark.createDataFrame(data, ["id"]) + check = Check(CheckLevel.WARNING, "pytest") + check.is_empty("id") + rs = check.validate(df) + assert rs.first().status == "FAIL" + assert rs.first().violations == violation + assert rs.first().pass_threshold == 1.0 + assert rs.first().pass_rate >= pass_rate + + +def test_parameters(): + return "😅 No parameters to be tested!" + + +def test_coverage(spark): + df = spark.createDataFrame([[0], [1], [None], [4], [5]], ["id"]) + check = Check(CheckLevel.WARNING, "pytest") + check.is_empty("id", 0.1) + rs = check.validate(df) + assert rs.first().status == "PASS" + assert rs.first().pass_threshold == 0.1 + assert rs.first().pass_rate >= 1 / 5 diff --git a/test/unit/pyspark_dataframe/test_spark_validation.py b/test/unit/pyspark_dataframe/test_spark_validation.py index 314bb607..b8203156 100644 --- a/test/unit/pyspark_dataframe/test_spark_validation.py +++ b/test/unit/pyspark_dataframe/test_spark_validation.py @@ -34,6 +34,7 @@ def test_compute(): assert hasattr(compute, "compute_instruction") assert hasattr(compute, "is_complete") + assert hasattr(compute, "is_empty") assert hasattr(compute, "are_complete") assert hasattr(compute, "is_unique") assert hasattr(compute, "are_unique") diff --git a/test/unit/snowpark_dataframe/test_is_empty.py b/test/unit/snowpark_dataframe/test_is_empty.py new file mode 100644 index 00000000..46e292ee --- /dev/null +++ b/test/unit/snowpark_dataframe/test_is_empty.py @@ -0,0 +1,46 @@ +import pytest + +from cuallee import Check, CheckLevel + + +def test_positive(snowpark): + df = snowpark.createDataFrame([[None], [None]], ["ID"]) + check = Check(CheckLevel.WARNING, "pytest") + check.is_empty("ID") + rs = check.validate(df) + assert rs.first().STATUS == "PASS" + assert rs.first().VIOLATIONS == 0 + assert rs.first().PASS_THRESHOLD == 1.0 + + +@pytest.mark.parametrize( + "data, violation, pass_rate", + [ + [[[0], [1], [None], [4], [5]], 4, 1 / 5], + [[[0], [1], [None], [4], [None]], 3, 2 / 5], + ], + ids=("one_null_value", "two_null_value"), +) +def test_negative(snowpark, data, violation, pass_rate): + df = snowpark.createDataFrame(data, ["ID"]) + check = Check(CheckLevel.WARNING, "pytest") + check.is_empty("ID") + rs = check.validate(df) + assert rs.first().STATUS == "FAIL" + assert rs.first().VIOLATIONS == violation + assert rs.first().PASS_THRESHOLD == 1.0 + assert rs.first().PASS_RATE == pass_rate + + +def test_parameters(): + return "😅 No parameters to be tested!" + + +def test_coverage(snowpark): + df = snowpark.createDataFrame([[0], [1], [None], [4], [5]], ["ID"]) + check = Check(CheckLevel.WARNING, "pytest") + check.is_empty("ID", 0.1) + rs = check.validate(df) + assert rs.first().STATUS == "PASS" + assert rs.first().PASS_THRESHOLD == 0.1 + assert rs.first().PASS_RATE == 1 / 5 diff --git a/test/unit/snowpark_dataframe/test_snowpark_validation.py b/test/unit/snowpark_dataframe/test_snowpark_validation.py index 56d80acd..7c5ef57f 100644 --- a/test/unit/snowpark_dataframe/test_snowpark_validation.py +++ b/test/unit/snowpark_dataframe/test_snowpark_validation.py @@ -35,6 +35,7 @@ def test_compute(): assert hasattr(compute, "_single_value_rule") assert hasattr(compute, "_stats_fn_rule") assert hasattr(compute, "is_complete") + assert hasattr(compute, "is_empty") assert hasattr(compute, "are_complete") assert hasattr(compute, "is_unique") assert hasattr(compute, "are_unique")