diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 786f06ef..73a507e7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,5 +49,5 @@ jobs: - uses: actions/setup-python@v4 with: python-version: 3.x - - run: pip install mkdocs-material + - run: pip install mkdocs-material mkdocstrings[python] - run: mkdocs gh-deploy --force diff --git a/cuallee/__init__.py b/cuallee/__init__.py index 27051478..597430b2 100644 --- a/cuallee/__init__.py +++ b/cuallee/__init__.py @@ -12,7 +12,7 @@ from toolz.curried import map as map_curried logger = logging.getLogger("cuallee") -__version__ = "0.10.1" +__version__ = "0.10.2" # Verify Libraries Available # ========================== try: @@ -57,11 +57,13 @@ class CheckLevel(enum.Enum): + """Level of verifications in cuallee""" WARNING = 0 ERROR = 1 class CheckDataType(enum.Enum): + """Accepted data types in checks""" AGNOSTIC = 0 NUMERIC = 1 STRING = 2 @@ -71,6 +73,7 @@ class CheckDataType(enum.Enum): class CheckStatus(enum.Enum): + """Validation result criteria""" PASS = "PASS" FAIL = "FAIL" NO_RUN = "NO_RUN" @@ -78,6 +81,7 @@ class CheckStatus(enum.Enum): @dataclass class Rule: + """Predicate definition holder""" method: str column: Union[str, List[str], Tuple[str, str]] value: Optional[Any] @@ -91,10 +95,12 @@ class Rule: @property def settings(self) -> dict: + """holds the additional settings for the predicate execution""" return dict(self.options) @property def key(self): + """blake2s hash of the rule, made of method, column, value, options and coverage""" return ( hashlib.blake2s( bytes( @@ -182,6 +188,7 @@ def evaluate(self, result: Any, rows: int): class ComputeEngine(Protocol): + """An interface for validatiosn to adhere to""" def compute(self, rules: Dict[str, Rule]) -> bool: """Returns compute instructions for each rule""" @@ -202,7 +209,17 @@ def __init__( table_name: str = None, session: Any = None, ): - """A container of data quality rules.""" + """ + A container of data quality rules. + + Args: + level (CheckLevel): [0-1] value to describe if its a WARNING or ERROR check + name (str): Normally the name of the dataset being verified, or a name for this check + execution_date (date): An automatically generated timestamp of the check in UTC + table_name (str): When using databases matches the table name of the source + session (Session): When operating in Session enabled environments like Databricks or Snowflake + + """ self._rule: Dict[str, Rule] = {} # TODO: Should be a compute engine protocol self.compute_engine: ModuleType @@ -233,7 +250,7 @@ def __repr__(self): @property def sum(self): - """Collect compute, unique and union type of rules""" + """Total number of rules in Check""" return len(self._rule.keys()) @property @@ -252,16 +269,32 @@ def empty(self): return len(self.rules) == 0 def _remove_rule_generic(self, key: str): - """Remove a key from rules and compute dictionaries""" + """ + Remove a key from rules and compute dictionaries + + Args: + key (str): the blake2s key of the rule + """ if key in self._rule: self._rule.pop(key) def add_rule(self, method: str, *arg): - """Add a new rule to the Check class.""" + """ + Add a new rule to the Check class. + + Args: + method (str): Check name + args (list): Parameters of the check + """ return operator.methodcaller(method, *arg)(self) def delete_rule_by_key(self, keys: Union[str, List[str]]): - """Delete rules from self._rule and self._compute dictionnary based on keys.""" + """ + Delete rules from check based on keys. + + Args: + keys (List[str]): a single or list of keys to remove from the check + """ if isinstance(keys, str): keys = [keys] @@ -273,7 +306,13 @@ def delete_rule_by_attribute( rule_attribute: Literal["method", "column", "coverage"], values: Union[List[str], List[float]], ): - """Delete rule based on method(s) or column name(s) or coverage value(s).""" + """ + Delete rule based on method(s) or column name(s) or coverage value(s). + + Args: + rule_attribute (str): Finds a rule with by: method, column or coverage + values (List[str]): Deletes a rule that matches the rule_attribute equal to the value in this parameter + """ if not isinstance(values, List): values = [values] @@ -286,7 +325,15 @@ def delete_rule_by_attribute( return self def adjust_rule_coverage(self, rule_index: int, rule_coverage: float): - """Targeted for adjusting the predicate/rows ratio or making rules less strict""" + """ + Adjust the ratio predicate/rows for a rule. + It is intended to lower or increase tolerance without having to rewrite the entire check + + Args: + rule_index (int): The position of the rule in the check list + rule_coverage (float): New value between [0..1] for tolerance + + """ target_rule = self.rules[rule_index] old_key = target_rule.key target_rule = self._rule.pop(old_key) @@ -295,48 +342,105 @@ def adjust_rule_coverage(self, rule_index: int, rule_coverage: float): return self def is_complete(self, column: str, pct: float = 1.0): - """Validation for non-null values in column""" + """ + Validation for non-null values in column + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + + """ Rule("is_complete", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self def are_complete(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0): - """Validation for non-null values in a group of columns""" + """ + Validation for non-null values in a group of columns + + Args: + column (List[str]): A tuple or list of column names in dataframe + pct (float): The threshold percentage required to pass + """ Rule("are_complete", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self def is_unique(self, column: str, pct: float = 1.0): - """Validation for unique values in column""" + """ + Validation for unique values in column + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self def is_primary_key(self, column: str, pct: float = 1.0): - """Validation for unique values in column""" + """ + Validation for unique values in column + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self def are_unique(self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0): - """Validation for unique values in a group of columns""" + """ + Validation for unique values in a group of columns + + Args: + column (List[str]): A tuple or list of column names in dataframe + pct (float): The threshold percentage required to pass + """ Rule("are_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self def is_composite_key( self, column: Union[List[str], Tuple[str, str]], pct: float = 1.0 ): - """Validation for unique values in a group of columns""" + """ + Validation for unique values in a group of columns + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ Rule("are_unique", column, "N/A", CheckDataType.AGNOSTIC, pct) >> self._rule return self def is_greater_than(self, column: str, value: float, pct: float = 1.0): - """Validation for numeric greater than value""" + """ + Validation for numeric greater than value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + pct (float): The threshold percentage required to pass + """ Rule("is_greater_than", column, value, CheckDataType.NUMERIC, pct) >> self._rule return self def is_positive(self, column: str, pct: float = 1.0): - """Validation for numeric greater than zero""" + """ + Validation for numeric greater than zero + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_greater_than(column, 0, pct) def is_greater_or_equal_than(self, column: str, value: float, pct: float = 1.0): - """Validation for numeric greater or equal than value""" + """ + Validation for numeric greater or equal than value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + pct (float): The threshold percentage required to pass + """ ( Rule("is_greater_or_equal_than", column, value, CheckDataType.NUMERIC, pct) >> self._rule @@ -344,24 +448,56 @@ def is_greater_or_equal_than(self, column: str, value: float, pct: float = 1.0): return self def is_in_millions(self, column: str, pct: float = 1.0): - """Validates that a column has values greater than 1M""" + """ + Validates that a column has values greater than 1M (1e6) + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_greater_or_equal_than(column, 1e6, pct) def is_in_billions(self, column: str, pct: float = 1.0): - """Validates that a column has values greater than 1B""" + """ + Validates that a column has values greater than 1B (1e9) + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_greater_or_equal_than(column, 1e9, pct) def is_less_than(self, column: str, value: float, pct: float = 1.0): - """Validation for numeric less than value""" + """ + Validation for numeric less than value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + pct (float): The threshold percentage required to pass + """ Rule("is_less_than", column, value, CheckDataType.NUMERIC, pct) >> self._rule return self def is_negative(self, column: str, pct: float = 1.0): - """Validation for numeric less than zero""" + """ + Validation for numeric less than zero + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_less_than(column, 0, pct) def is_less_or_equal_than(self, column: str, value: float, pct: float = 1.0): - """Validation for numeric less or equal than value""" + """ + Validation for numeric less or equal than value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + pct (float): The threshold percentage required to pass + """ ( Rule("is_less_or_equal_than", column, value, CheckDataType.NUMERIC, pct) >> self._rule @@ -369,47 +505,108 @@ def is_less_or_equal_than(self, column: str, value: float, pct: float = 1.0): return self def is_equal_than(self, column: str, value: float, pct: float = 1.0): - """Validation for numeric column equal than value""" + """ + Validation for numeric column equal than value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + pct (float): The threshold percentage required to pass + """ Rule("is_equal_than", column, value, CheckDataType.NUMERIC, pct) >> self._rule return self def has_pattern(self, column: str, value: str, pct: float = 1.0): - """Validation for string type column matching regex expression""" + """ + Validation for string type column matching regex expression + + Args: + column (str): Column name in dataframe + value (regex): A regular expression used to match values in the `column` + pct (float): The threshold percentage required to pass + """ Rule("has_pattern", column, value, CheckDataType.STRING, pct) >> self._rule return self def is_legit(self, column: str, pct: float = 1.0): - """Validation for string type having none space chars. Useful for CSV reading""" + """ + Validation for string columns giving wrong signal about completeness due to empty strings. + + Useful for reading CSV files and preventing empty strings being reported as valid records. + This is an `alias` implementation of the `has_pattern` rule using `^\S+$` as the pattern + Which validates the presence of non-empty characters between the begining and end of a string. + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ Rule("has_pattern", column, "^\S+$", CheckDataType.STRING, pct) >> self._rule return self def has_min(self, column: str, value: float): - """Validation of a column’s minimum value""" + """ + Validation of a column's minimum value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + """ Rule("has_min", column, value, CheckDataType.NUMERIC) >> self._rule return self def has_max(self, column: str, value: float): - """Validation of a column’s maximum value""" + """ + Validation of a column's maximum value + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + """ Rule("has_max", column, value, CheckDataType.NUMERIC) >> self._rule return self def has_std(self, column: str, value: float): - """Validation of a column’s standard deviation""" + """ + Validation of a column's standard deviation + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + """ Rule("has_std", column, value, CheckDataType.NUMERIC) >> self._rule return self def has_mean(self, column: str, value: float): - """Validation of a column's average/mean""" + """ + Validation of a column's average/mean + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + """ Rule("has_mean", column, value, CheckDataType.NUMERIC) >> self._rule return self def has_sum(self, column: str, value: float): - """Validation of a sum of all values of a column""" + """ + Validation of a sum of all values of a column + + Args: + column (str): Column name in dataframe + value (number): The condition for the column to match + """ Rule("has_sum", column, value, CheckDataType.NUMERIC) >> self._rule return self def is_between(self, column: str, value: Tuple[Any], pct: float = 1.0): - """Validation of a column between a range""" + """ + Validation of a column between a range + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The condition for the column to match + pct (float): The threshold percentage required to pass + """ Rule("is_between", column, value, CheckDataType.AGNOSTIC, pct) >> self._rule return self @@ -419,7 +616,14 @@ def not_contained_in( value: Union[List, Tuple], pct: float = 1.0, ): - """Validation of column value not in set of given values""" + """ + Validation of column value not in set of given values + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The condition for the column to match + pct (float): The threshold percentage required to pass + """ ( Rule("not_contained_in", column, value, CheckDataType.AGNOSTIC, pct) >> self._rule @@ -428,7 +632,14 @@ def not_contained_in( return self def not_in(self, column: str, value: Tuple[str, int, float], pct: float = 1.0): - """Vaidation of column value not in set of given values""" + """ + Vaidation of column value not in set of given values + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The condition for the column to match + pct (float): The threshold percentage required to pass + """ return self.not_contained_in(column, value, pct) def is_contained_in( @@ -437,7 +648,14 @@ def is_contained_in( value: Union[List, Tuple], pct: float = 1.0, ): - """Validation of column value in set of given values""" + """ + Validation of column value in set of given values + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The condition for the column to match + pct (float): The threshold percentage required to pass + """ ( Rule("is_contained_in", column, value, CheckDataType.AGNOSTIC, pct) @@ -447,38 +665,91 @@ def is_contained_in( return self def is_in(self, column: str, value: Tuple[str, int, float], pct: float = 1.0): - """Vaidation of column value in set of given values""" + """ + Vaidation of column value in set of given values + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The condition for the column to match + pct (float): The threshold percentage required to pass + """ return self.is_contained_in(column, value, pct) def is_t_minus_n(self, column: str, value: int, pct: float = 1.0): - """Validate that date is yesterday""" + """ + Validate that date is `n` days before the current date + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The number of days before the current date + pct (float): The threshold percentage required to pass + """ yesterday = datetime.utcnow() - timedelta(days=value) return self.is_in(column, tuple([yesterday.strftime("%Y-%m-%d")]), pct) def is_t_minus_1(self, column: str, pct: float = 1.0): - """Validate that date is yesterday""" + """ + Validate that date is yesterday + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_t_minus_n(column, 1, pct) def is_t_minus_2(self, column: str, pct: float = 1.0): - """Validate that date is 2 days ago""" + """ + Validate that date is 2 days ago + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_t_minus_n(column, 2, pct) def is_t_minus_3(self, column: str, pct: float = 1.0): - """Validate that date is 3 days ago""" + """ + Validate that date is 3 days ago + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_t_minus_n(column, 3, pct) def is_yesterday(self, column: str, pct: float = 1.0): - """Validate that date is yesterday""" + """ + Validate that date is yesterday + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_t_minus_1(column, pct) def is_today(self, column: str, pct: float = 1.0): - """Validate that date is today""" + """ + Validate that date is today + + Args: + column (str): Column name in dataframe + pct (float): The threshold percentage required to pass + """ return self.is_t_minus_n(column, 0, pct) def has_percentile( self, column: str, value: float, percentile: float, precision: int = 10000 ): - """Validation of a column percentile value""" + """ + Validation of a column percentile value using approximantion + + Args: + column (str): Column name in dataframe + value (List[str,number,date]): The condition for the column to match + percentile (float): Value between [0..1] i.e. `0.5` for median + precision (float): The precision to calculate percentiles + + """ ( Rule( "has_percentile", @@ -497,7 +768,14 @@ def has_percentile( def is_inside_interquartile_range( self, column: str, value: List[float] = [0.25, 0.75], pct: float = 1.0 ): - """Validates a number resides inside the Q3 - Q1 range of values""" + """ + Validates a number resides inside the quartile(1) and quartile(3) of the range of values + + Args: + column (str): Column name in dataframe + value (List[number]): A number between 0 and 1 demarking the quartile + pct (float): The threshold percentage required to pass + """ ( Rule( "is_inside_interquartile_range", @@ -513,7 +791,14 @@ def is_inside_interquartile_range( def has_max_by( self, column_source: str, column_target: str, value: Union[float, str] ): - """Validation of a column value based on another column maximum""" + """ + Validation the correspondance of a column value based on another column maximum + + Args: + column_source (str): Column used to obtain the row with the max value + column_target (str): Column used to varify the matching value + value (str,number): The value to match against + """ ( Rule( "has_max_by", @@ -528,7 +813,14 @@ def has_max_by( def has_min_by( self, column_source: str, column_target: str, value: Union[float, str] ): - """Validation of a column value based on another column minimum""" + """ + Validation the correspondance of a column value based on another column minimum + + Args: + column_source (str): Column used to obtain the row with the min value + column_target (str): Column used to varify the matching value + value (str,number): The value to match against + """ ( Rule( "has_min_by", @@ -541,7 +833,14 @@ def has_min_by( return self def has_correlation(self, column_left: str, column_right: str, value: float): - """Validates the correlation between 2 columns with some tolerance""" + """ + Validates the correlation in a range of [0..1] between 2 columns + + Args: + column_left (str): Column name in dataframe + column_right (str): Column name in dataframe + value (float): Value to match the correlation + """ ( Rule( "has_correlation", @@ -554,17 +853,38 @@ def has_correlation(self, column_left: str, column_right: str, value: float): return self def satisfies(self, column: str, predicate: str, pct: float = 1.0): - """Validation of a column satisfying a SQL-like predicate""" + """ + Validation of a column satisfying a SQL-like predicate + + Args: + column (str): Column name in the dataframe + predicate (str): A predicate written in SQL-like syntax + pct (float): The threshold percentage required to pass + """ Rule("satisfies", column, predicate, CheckDataType.AGNOSTIC, pct) >> self._rule return self def has_cardinality(self, column: str, value: int): - """Validate cardinality in column""" + """ + Validates the number of distinct values in a column + + Args: + column (str): Column name in the dataframe + value (int): The number of expected distinct values on a column + """ Rule("has_cardinality", column, value, CheckDataType.AGNOSTIC) >> self._rule return self def has_infogain(self, column: str, pct: float = 1.0): - """Validate cardinality > 1""" + """ + Validate cardinality > 1. + Particularly useful when validating categorical data for Machine Learning + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + + """ ( Rule( method="has_infogain", @@ -578,7 +898,18 @@ def has_infogain(self, column: str, pct: float = 1.0): return self def has_entropy(self, column: str, value: float, tolerance: float = 0.01): - """Validation for entropy calculation on continuous values""" + """ + Validation for entropy calculation on continuous variables/features on `log2`. + Useful in Machine Learning classifications to test imbalanced datasets with low entropy. + + Args: + column (str): Column name in the dataframe + value (float): The expected entropy value + tolerance (float): The tolerance/precision used when comparing the actual and expected value + + Examples: + + """ ( Rule( "has_entropy", @@ -592,52 +923,113 @@ def has_entropy(self, column: str, value: float, tolerance: float = 0.01): return self def is_on_weekday(self, column: str, pct: float = 1.0): - """Validates a datetime column is in a Mon-Fri time range""" + """ + Validates a datetime column is in a Mon-Fri time range + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_weekday", column, "Mon-Fri", CheckDataType.DATE, pct) >> self._rule return self def is_on_weekend(self, column: str, pct: float = 1.0): - """Validates a datetime column is in a Sat-Sun time range""" + """ + Validates a datetime column is in a Sat-Sun time range + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_weekend", column, "Sat-Sun", CheckDataType.DATE, pct) >> self._rule return self def is_on_monday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Mon""" + """ + Validates a datetime column is on Monday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_monday", column, "Mon", CheckDataType.DATE, pct) >> self._rule return self def is_on_tuesday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Tue""" + """ + Validates a datetime column is on Tuesday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_tuesday", column, "Tue", CheckDataType.DATE, pct) >> self._rule return self def is_on_wednesday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Wed""" + """ + Validates a datetime column is on Wednesday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_wednesday", column, "Wed", CheckDataType.DATE, pct) >> self._rule return self def is_on_thursday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Thu""" + """ + Validates a datetime column is on Thursday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_thursday", column, "Thu", CheckDataType.DATE, pct) >> self._rule return self def is_on_friday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Fri""" + """ + Validates a datetime column is on Friday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_friday", column, "Fri", CheckDataType.DATE, pct) >> self._rule return self def is_on_saturday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Sat""" + """ + Validates a datetime column is on Saturday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_saturday", column, "Sat", CheckDataType.DATE, pct) >> self._rule return self def is_on_sunday(self, column: str, pct: float = 1.0): - """Validates a datetime column is on Sun""" + """ + Validates a datetime column is on Sunday + + Args: + column (str): Column name in the dataframe + pct (float): The threshold percentage required to pass + """ Rule("is_on_sunday", column, "Sun", CheckDataType.DATE, pct) >> self._rule return self def is_on_schedule(self, column: str, value: Tuple[Any], pct: float = 1.0): - """Validation of a datetime column between an hour interval""" + """ + Validation of a datetime column between an hour interval + + Args: + column (str): Column name in the dataframe + value (Tuple[int,int]): A tuple indicating a 24hr day interval. i.e. (9,17) for 9am to 5pm + pct (float): The threshold percentage required to pass + """ ( Rule("is_on_schedule", column, value, CheckDataType.TIMESTAMP, pct) >> self._rule @@ -647,7 +1039,19 @@ def is_on_schedule(self, column: str, value: Tuple[Any], pct: float = 1.0): def is_daily( self, column: str, value: Union[None, List[int]] = None, pct: float = 1.0 ): - """Validates that there is no missing dates using only week days in the date/timestamp column""" + """ + Validates that there is no missing dates using only week days in the date/timestamp column. + + An alternative day combination can be provided given that a user wants to validate only certain dates. + For example in PySpark to validate that time series are every Wednesday consecutively on a year + without any missing values, the value input should contain `[4]` as it represent the numeric + equivalence of the day of week Wednesday. + + Args: + column (str): Column name in the dataframe + value (List[int]): A list of numbers describing the days of the week to consider. i.e. Pyspark uses [2, 3, 4, 5, 6] for Mon-Fri + pct (float): The threshold percentage required to pass + """ (Rule("is_daily", column, value, CheckDataType.DATE, pct) >> self._rule) return self @@ -659,7 +1063,68 @@ def has_workflow( edges: List[Tuple[str]], pct: float = 1.0, ): - """Validates events in a group clause with order, followed a specific sequence. Similar to adjacency matrix validation""" + """ + Validates events in a group clause with order, followed a specific sequence. Similar to adjacency matrix validation. + + Args: + column_group (str): The dataframe column used to group events + column_event (str): The state of the event within the group + column_order (List[date,number,str]): The order within the group, should be deterministic and without collisions. + edges (List[Tuple[str,str]]): The combinations of events expected in the data frame i.e `[("A","B"), ("B","C")]` + + + ???+ example "Example" + + Given the following fictitious dataset example: + + | date | ticket | status | + |------------|----------|-------------| + | 2024-01-01 | CASE-001 | New | + | 2024-01-02 | CASE-001 | In Progress | + | 2024-01-03 | CASE-001 | Closed | + + You can validate that events for each ticket follow certain sequence by using: + + ``` python + from cuallee import Check, CheckLevel + df = spark.createDataFrame( + [ + ["2024-01-01", "CASE-001", "New"], + ["2024-01-02", "CASE-001", "In Progress"], + ["2024-01-03", "CASE-001", "Closed"], + ], + ["date", "ticket", "status"], + ) + + + check = Check(CheckLevel.WARNING, "WorkflowValidation") + check.has_workflow( + column_group="ticket", + column_event="status", + column_order="date", + edges=[(None, "New"),("New", "In Progress"),("In Progress","Closed"), ("Closed", None)] + ) + + # Validate + check.validate(df).show(truncate=False) + + # Result + +---+-------------------+------------------+-------+----------------------------+------------+------------------------------------------------------------------------------------+----+----------+---------+--------------+------+ + |id |timestamp |check |level |column |rule |value |rows|violations|pass_rate|pass_threshold|status| + +---+-------------------+------------------+-------+----------------------------+------------+------------------------------------------------------------------------------------+----+----------+---------+--------------+------+ + |1 |2024-05-11 11:24:00|WorkflowValidation|WARNING|('ticket', 'status', 'date')|has_workflow|((None, 'New'), ('New', 'In Progress'), ('In Progress', 'Closed'), ('Closed', None))|3 |0 |1.0 |1.0 |PASS | + +---+-------------------+------------------+-------+----------------------------+------------+------------------------------------------------------------------------------------+----+----------+---------+--------------+------+ + + ``` + + The check validates that: + + - Nothing preceds a `New` state + - `In Progress` follows the `New` event + - `Closed` follows the `In Progress` event + - Nothing follows after `Closed` state + + """ ( Rule( "has_workflow", @@ -673,7 +1138,12 @@ def has_workflow( return self def validate(self, dataframe: Any): - """Compute all rules in this check for specific data frame""" + """ + Compute all rules in this check for specific data frame + + Args: + dataframe (Union[pyspark,snowpark,pandas,polars,duckdb,bigquery]): A dataframe object + """ # Stop execution if the there is no rules in the check assert not self.empty, "Check is empty. Try adding some rules?" diff --git a/cuallee/pyspark_validation.py b/cuallee/pyspark_validation.py index 3711a816..e83c5f75 100644 --- a/cuallee/pyspark_validation.py +++ b/cuallee/pyspark_validation.py @@ -364,11 +364,11 @@ def _execute(dataframe: DataFrame, key: str): ) .withColumn("probs", F.transform("freq", lambda x: x / F.col("rows"))) .withColumn("n_labels", F.size("probs")) - .withColumn("log_labels", F.log("n_labels")) - .withColumn("log_prob", F.transform("probs", lambda x: F.log(x))) + .withColumn("log_labels", F.log2("n_labels")) + .withColumn("log_prob", F.transform("probs", lambda x: F.log2(x))) .withColumn( "log_classes", - F.transform("probs", lambda x: F.log((x / x) * F.col("n_labels"))), + F.transform("probs", lambda x: F.log2((x / x) * F.col("n_labels"))), ) .withColumn("entropy_vals", F.arrays_zip("probs", "log_prob")) .withColumn( @@ -379,13 +379,13 @@ def _execute(dataframe: DataFrame, key: str): ), ) .select( - ( + F.coalesce( F.aggregate( "product_prob", F.lit(0.0), lambda acc, x: acc + x ).alias("p") / F.col("log_labels") * -1 - ).alias("entropy") + , F.lit(0.0)).alias("entropy") ) .select( F.expr( diff --git a/docs/dependencies.md b/docs/dependencies.md index e482fab7..90c650cb 100644 --- a/docs/dependencies.md +++ b/docs/dependencies.md @@ -1 +1,7 @@ -# Dependencies \ No newline at end of file +# Dependencies + +It relies has only `2` dependencies that are automatically installed with the package: + +- `toolz` for functional programming adoption +- `requests` for downloading ISO standard required by `check.iso` interface. +- `pandas` as the default validation result data abstraction diff --git a/docs/index.md b/docs/index.md index d857bdd0..f5616ad2 100644 --- a/docs/index.md +++ b/docs/index.md @@ -36,60 +36,3 @@ graph LR ``` -## Installation - -`cuallee` is designed to work primarily with `pyspark==3.3.0` and this is its only dependency. -It uses the `Observation` API features in pyspark, to reduce the computation time for aggregations, and calculating summaries in one pass of the data frames being validated. - -## pip - -```bash -# Latest -pip install cuallee -``` - -## Check - -Validating data sets is about creating a `Check` and adding rules into it. -You can choose from different types: `numeric`, `date algebra`, `range of values`, `temporal`, and many others. - -A `Check` provides a declarative interface to build a comprehensive validation on a dataframe as shown below: - -```python -# Imports -from cuallee import Check, CheckLevel -from cuallee import dataframe as D - -# Check -check = Check(CheckLevel.WARNING, "TaxiNYCheck") - -# Data -df = spark.read.parquet("temp/taxi/*.parquet") - -# Adding rules -# ============= - - # All fields are filled -[check.is_complete(name) for name in df.columns] - -# Verify taxi ride distance is positive -[check.is_greater_than(name, 0) for name in D.numeric_fields(df)] - -# Confirm that tips are not outliers -[check.is_less_than(name, 1e4) for name in D.numeric_fields(df)] - -# 70% of data is on weekdays -[check.is_on_weekday(name, .7) for name in D.timestamp_fields(df)] - -# Binary classification fields -[check.has_entropy(name, 1.0, 0.5) for name in D.numeric_fields(df)] - -# Percentage of big tips -[check.is_between(name, (1000,2000)) for name in D.numeric_fields(df)] - -# Confirm 22 years of data -[check.is_between(name, ("2000-01-01", "2022-12-31")) for name in D.timestamp_fields(df)] - -# Validation -check.validate(df) -``` \ No newline at end of file diff --git a/docs/installation.md b/docs/installation.md index f34d65bd..f944970c 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -1 +1,32 @@ -# Installation \ No newline at end of file +# Installation + +`cuallee` is developed with a `functional` programming style. Classes are only defined for compatibility and ease the migration process from `pydeeque`. + +For better performance it requires `pyspark>=3.3.0` and the `Observation` API. + +## Using pip + +```bash +# Latest +pip install cuallee +``` + +## External frameworks +```bash +# For Pyspark +pip install cuallee[pyspark] + +# For Spark Connect +pip install cuallee[pyspark_connect] + +# For Snowflake/Snowpark +pip install cuallee[snowpark] + +# For DuckDb +pip install cuallee[duckdb] + +# For Polars +pip install cuallee[polars] +``` +Alternatively, you can have your own versions of the frameworks installed separately in your environment and `cuallee` simply will rely in the version installed, considering it meets its minimum requirements and compatibility. + diff --git a/docs/module/check.md b/docs/module/check.md new file mode 100644 index 00000000..55350820 --- /dev/null +++ b/docs/module/check.md @@ -0,0 +1,10 @@ + +::: cuallee.Check + handler: python + rendering: + show_bases: False + heading_level: 3 + + + + diff --git a/docs/module/rule.md b/docs/module/rule.md new file mode 100644 index 00000000..b9a0a8fe --- /dev/null +++ b/docs/module/rule.md @@ -0,0 +1,5 @@ +::: cuallee.Rule + handler: python + rendering: + show_bases: False + show_inheritance_diagram: True \ No newline at end of file diff --git a/docs/new-check.md b/docs/new-check.md deleted file mode 100644 index d16d4ed8..00000000 --- a/docs/new-check.md +++ /dev/null @@ -1 +0,0 @@ -# New Check \ No newline at end of file diff --git a/docs/pyspark/index.md b/docs/pyspark/index.md index 0edf184b..8dbf128d 100644 --- a/docs/pyspark/index.md +++ b/docs/pyspark/index.md @@ -1,8 +1,7 @@ -# Check +# PySpark -## pyspark -### is_complete +## is_complete This check is the most popular. It validates the _completeness_ attribute of a data set. It confirms that all fields contain values different of `null`. @@ -18,17 +17,17 @@ This check is the most popular. It validates the _completeness_ attribute of a d check.is_complete("id") # Validate - check.validate(spark, df).show(truncate=False) + check.validate(df).show(truncate=False) ``` __Result:__ ``` markdown - +---+-------------------+-----------------+-------+------+-----------+-----+----+----------+---------+--------------+--------+------+ - |id |timestamp |check |level |column|rule |value|rows|violations|pass_rate|pass_threshold|metadata|status| - +---+-------------------+-----------------+-------+------+-----------+-----+----+----------+---------+--------------+--------+------+ - |1 |2022-10-09 23:45:10|CompletePredicate|WARNING|id |is_complete|N/A |10 |0 |1.0 |1.0 |{} |PASS | - +---+-------------------+-----------------+-------+------+-----------+-----+----+----------+---------+--------------+--------+------+ + +---+-------------------+-----------------+-------+------+-----------+-----+----+----------+---------+--------------+------+ + |id |timestamp |check |level |column|rule |value|rows|violations|pass_rate|pass_threshold|status| + +---+-------------------+-----------------+-------+------+-----------+-----+----+----------+---------+--------------+------+ + |1 |2024-05-10 21:00:09|CompletePredicate|WARNING|id |is_complete|N/A |10 |0 |1.0 |1.0 |PASS | + +---+-------------------+-----------------+-------+------+-----------+-----+----+----------+---------+--------------+------+ ``` === "coverage" @@ -40,7 +39,7 @@ This check is the most popular. It validates the _completeness_ attribute of a d check.is_complete("id", .5) # Only 50% coverage # Validate - check.validate(spark, df).show(truncate=False) + check.validate(df).show(truncate=False) ``` __Result:__ diff --git a/docs/pyspark/observation.md b/docs/pyspark/observation.md deleted file mode 100644 index e69de29b..00000000 diff --git a/docs/usage.md b/docs/usage.md new file mode 100644 index 00000000..113b6cb9 --- /dev/null +++ b/docs/usage.md @@ -0,0 +1,164 @@ +# Usage + +Validating data sets is about creating a `Check` and adding rules into it. +You can choose from different types: `numeric`, `date algebra`, `range of values`, `temporal`, and many others. + +A `Check` provides a declarative interface to build a comprehensive validation on a dataframe as shown below: + +## Libraries + +```python +# Imports +from pyspark.sql import SparkSession +import pyspark.sql.types as T +import pyspark.sql.functions as F + +# Cuallee +from cuallee import Check, CheckLevel +from cuallee.pyspark_validation import _field_type_filter +``` + +## Session + +```python + +# Spark Session +spark = SparkSession.builder.getOrCreate() + +``` + +## Check + +```python +# Check +check = Check(CheckLevel.WARNING, "TaxiNYCheck") + +``` + + +## Data +```python +# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet +# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet +df = spark.read.parquet("*yellow*tripdata*.parquet") +df = df.withColumn("date", F.to_date("tpep_pickup_datetime")) +``` + + +## Checks +```python +# Confirm that all records are not null for all columns in dataframe +[check.is_complete(name) for name in df.columns] + +# Verify taxi ride distance is positive +[check.is_greater_than(name, 0) for name in _field_type_filter(df, T.NumericType)] + +# Confirm that tips are not outliers +[check.is_less_than(name, 1e4) for name in _field_type_filter(df, T.NumericType)] + +# 70% of data is on weekdays +check.is_on_weekday("date", .7) + +# Binary classification fields +[check.has_entropy(name, 1.0, 0.5) for name in _field_type_filter(df, T.NumericType)] + +# Percentage of big tips +[check.is_between(name, (1000,2000)) for name in _field_type_filter(df, T.NumericType)] + +# Confirm 22 years of data +check.is_between("date", ("2000-01-01", "2022-12-31")) +``` + + +## Validation +```python +check.validate(df).show() + +``` + +If you want to display full results use: + +`check.validate(df).show(n=check.sum, truncate=False)` + + +## Code + +??? example "Check" + + + + ``` python + # Imports + from pyspark.sql import SparkSession + import pyspark.sql.types as T + import pyspark.sql.functions as F + + # Cuallee + from cuallee import Check, CheckLevel + from cuallee.pyspark_validation import _field_type_filter + + # Session + spark = SparkSession.builder.getOrCreate() + + # Check + check = Check(CheckLevel.WARNING, "TaxiNYCheck") + + # !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet + # !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet + df = spark.read.parquet("*yellow*tripdata*.parquet") + df = df.withColumn("date", F.to_date("tpep_pickup_datetime")) + + # Confirm that all records are not null for all columns in dataframe + [check.is_complete(name) for name in df.columns] + + # Verify taxi ride distance is positive + [check.is_greater_than(name, 0) for name in _field_type_filter(df, T.NumericType)] + + # Confirm that tips are not outliers + [check.is_less_than(name, 1e4) for name in _field_type_filter(df, T.NumericType)] + + # 70% of data is on weekdays + check.is_on_weekday("date", .7) + + # Binary classification fields + [check.has_entropy(name, 1.0, 0.5) for name in _field_type_filter(df, T.NumericType)] + + # Percentage of big tips + [check.is_between(name, (1000,2000)) for name in _field_type_filter(df, T.NumericType)] + + # Confirm 22 years of data + check.is_between("date", ("2000-01-01", "2022-12-31")) + + # Validate + check.validate(df).show() + ``` + + __Result:__ + + ``` markdown + # Result + +---+-------------------+-----------+-------+--------------------+-----------+-----+-------+----------+------------------+--------------+------+ + | id| timestamp| check| level| column| rule|value| rows|violations| pass_rate|pass_threshold|status| + +---+-------------------+-----------+-------+--------------------+-----------+-----+-------+----------+------------------+--------------+------+ + | 1|2024-05-10 21:00:09|TaxiNYCheck|WARNING| VendorID|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 2|2024-05-10 21:00:09|TaxiNYCheck|WARNING|tpep_pickup_datetime|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 3|2024-05-10 21:00:09|TaxiNYCheck|WARNING|tpep_dropoff_date...|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 4|2024-05-10 21:00:09|TaxiNYCheck|WARNING| passenger_count|is_complete| N/A|5972150| 325772|0.9454514705759233| 1.0| FAIL| + | 5|2024-05-10 21:00:09|TaxiNYCheck|WARNING| trip_distance|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 6|2024-05-10 21:00:09|TaxiNYCheck|WARNING| RatecodeID|is_complete| N/A|5972150| 325772|0.9454514705759233| 1.0| FAIL| + | 7|2024-05-10 21:00:09|TaxiNYCheck|WARNING| store_and_fwd_flag|is_complete| N/A|5972150| 325772|0.9454514705759233| 1.0| FAIL| + | 8|2024-05-10 21:00:09|TaxiNYCheck|WARNING| PULocationID|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 9|2024-05-10 21:00:09|TaxiNYCheck|WARNING| DOLocationID|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 10|2024-05-10 21:00:09|TaxiNYCheck|WARNING| payment_type|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 11|2024-05-10 21:00:09|TaxiNYCheck|WARNING| fare_amount|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 12|2024-05-10 21:00:09|TaxiNYCheck|WARNING| extra|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 13|2024-05-10 21:00:09|TaxiNYCheck|WARNING| mta_tax|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 14|2024-05-10 21:00:09|TaxiNYCheck|WARNING| tip_amount|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 15|2024-05-10 21:00:09|TaxiNYCheck|WARNING| tolls_amount|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 16|2024-05-10 21:00:09|TaxiNYCheck|WARNING|improvement_surch...|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 17|2024-05-10 21:00:09|TaxiNYCheck|WARNING| total_amount|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + | 18|2024-05-10 21:00:09|TaxiNYCheck|WARNING|congestion_surcharge|is_complete| N/A|5972150| 325772|0.9454514705759233| 1.0| FAIL| + | 19|2024-05-10 21:00:09|TaxiNYCheck|WARNING| Airport_fee|is_complete| N/A|5972150| 325772|0.9454514705759233| 1.0| FAIL| + | 20|2024-05-10 21:00:09|TaxiNYCheck|WARNING| date|is_complete| N/A|5972150| 0| 1.0| 1.0| PASS| + +---+-------------------+-----------+-------+--------------------+-----------+-----+-------+----------+------------------+--------------+------+ + ``` \ No newline at end of file diff --git a/mkdocs.yaml b/mkdocs.yaml index e830261a..50d2b585 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -4,20 +4,15 @@ nav: - Getting Started: - Installation: installation.md - Dependencies: dependencies.md - - New Checks: new-check.md - - PySpark: - - pyspark/index.md - - About observation: pyspark/observation.md - - Pandas: - - pandas/index.md - - Snowpark: - - snowpark/index.md - - Advanced: advanced.md - - Validate: validate.md + - Usage: usage.md + - Code Reference: + - Check: module/check.md + - Rule: module/rule.md theme: features: # - announce.dismiss - content.code.annotate + - content.code.copy # - content.tabs.link - content.tooltips # - header.autohide @@ -70,9 +65,13 @@ markdown_extensions: - pymdownx.tabbed: alternate_style: true - def_list - - attr_list - - pymdownx.emoji: - emoji_index: !!python/name:materialx.emoji.twemoji - emoji_generator: !!python/name:materialx.emoji.to_svg + - attr_list - pymdownx.tasklist: - custom_checkbox: true \ No newline at end of file + custom_checkbox: true +plugins: + - search + - mkdocstrings: + handlers: + python: + options: + docstring_style: google \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6962feab..ea33f4b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cuallee" -version = "0.10.1" +version = "0.10.2" authors = [ { name="Herminio Vazquez", email="canimus@gmail.com"}, { name="Virginie Grosboillot", email="vestalisvirginis@gmail.com" } diff --git a/setup.cfg b/setup.cfg index 124422c1..02409a0b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] name = cuallee -version = 0.10.1 +version = 0.10.2 [options] packages = find: \ No newline at end of file