# Data Quality Monitoring Framework 
**- # Covers: Null checks, Integer checks, and Date checks**

**- # Classifies records into good/bad based on rule actions (Reject vs Flag)**

In [7]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StringType, IntegerType, LongType, ShortType, ByteType,
    DoubleType, FloatType, DateType, TimestampType
)
import json
import sys
from typing import List, Dict, Any, Optional

StatementMeta(sparkpool1, 514, 2, Finished, Available, Finished)

In [42]:
# -------------------------------
# Utility helpers
# -------------------------------

INTEGER_TYPES = (IntegerType, LongType, ShortType, ByteType)
NUMERIC_TYPES = INTEGER_TYPES + (DoubleType, FloatType)

def _col_exists(df: DataFrame, col: str) -> bool:
    return col in df.columns

def _dtype(df: DataFrame, col: str):
    return df.schema[col].dataType if _col_exists(df, col) else None

def _null_invalid_expr(df: DataFrame, col: str, treat_blank_as_null: bool = True):
    """
    Returns a boolean Column that is True when the value is considered invalid due to null/blank/NaN.
    """
    dt = _dtype(df, col)
    if dt is None:
        return F.lit(True)  # Missing column -> invalid
    c = F.col(col)
    if isinstance(dt, StringType):
        blank = F.length(F.trim(c)) == 0
        return c.isNull() | (blank if treat_blank_as_null else F.lit(False))
    elif isinstance(dt, (DoubleType, FloatType)):
        return c.isNull() | F.isnan(c)
    else:
        return c.isNull()

def _integer_invalid_expr(
    df: DataFrame,
    col: str,
    min_value: Optional[int] = None,
    max_value: Optional[int] = None,
    allow_string_integers: bool = True
):
    """
    Returns a boolean Column that is True when the value is invalid as an integer
    (null/blank/non-integer/out of range).
    """
    dt = _dtype(df, col)
    if dt is None:
        return F.lit(True)

    c = F.col(col)

    if isinstance(dt, INTEGER_TYPES):
        base_invalid = c.isNull()
        bounds_invalid = F.lit(False)
        if min_value is not None:
            bounds_invalid = bounds_invalid | (c < F.lit(min_value))
        if max_value is not None:
            bounds_invalid = bounds_invalid | (c > F.lit(max_value))
        return base_invalid | bounds_invalid

    if isinstance(dt, StringType) and allow_string_integers:
        s = F.trim(c)
        is_blank = F.length(s) == 0
        is_int_str = s.rlike(r'^[+-]?\d+$')
        casted = s.cast("long")
        bounds_invalid = F.lit(False)
        if min_value is not None:
            bounds_invalid = bounds_invalid | (casted < F.lit(min_value))
        if max_value is not None:
            bounds_invalid = bounds_invalid | (casted > F.lit(max_value))
        return is_blank | (~is_int_str) | bounds_invalid

    # Float/Double or other types -> treat as invalid for integer check
    return F.lit(True)

def _date_invalid_expr(
    df: DataFrame,
    col: str,
    fmt: str = "yyyy-MM-dd",
    min_date: Optional[str] = None,  # string formatted according to fmt
    max_date: Optional[str] = None   # string formatted according to fmt
):
    """
    Returns a boolean Column that is True when the value is invalid as a date:
      - missing column
      - null/blank
      - not parseable with given format (for strings)
      - outside [min_date, max_date] if provided
    Notes:
      - For Date/Timestamp columns, blank check is not needed; null is invalid.
      - For strings, parsing failure yields null -> invalid.
    """
    dt = _dtype(df, col)
    if dt is None:
        return F.lit(True)

    c = F.col(col)

    # Parse to date
    if isinstance(dt, StringType):
        s = F.trim(c)
        is_blank = F.length(s) == 0
        parsed = F.to_date(s, fmt)
        parse_invalid = parsed.isNull()
    elif isinstance(dt, TimestampType):
        parsed = F.to_date(c)
        parse_invalid = parsed.isNull()
        is_blank = F.lit(False)
    elif isinstance(dt, DateType):
        parsed = c
        parse_invalid = parsed.isNull()
        is_blank = F.lit(False)
    else:
        # Unsupported types -> invalid
        return F.lit(True)

    bounds_invalid = F.lit(False)
    if min_date is not None:
        min_col = F.to_date(F.lit(min_date), fmt)
        bounds_invalid = bounds_invalid | (parsed < min_col)
    if max_date is not None:
        max_col = F.to_date(F.lit(max_date), fmt)
        bounds_invalid = bounds_invalid | (parsed > max_col)

    return is_blank | parse_invalid | bounds_invalid

# -------------------------------
# Rule application
# -------------------------------

def _row_to_rule_dict(row) -> Dict[str, Any]:
    """
    Convert a rule table/dataset row to a normalized dict.
    Expected columns in rule table/dataset:
      - rule_id
      - rule_description
      - dataset_name
      - field_name
      - action   (e.g., Reject, Flag)
      - error_description
      - rule_type ('NULL_CHECK' | 'INTEGER_CHECK' | 'DATE_CHECK')
      - params (JSON string), optional:
          For NULL_CHECK:
            {"treat_blank_as_null": true}
          For INTEGER_CHECK:
            {"min": 0, "max": 100, "allow_string_integers": true}
          For DATE_CHECK:
            {"format": "yyyy-MM-dd", "min_date": "2020-01-01", "max_date": "2030-12-31"}
    """
    d = row.asDict(recursive=True)
    params = d.get("params")
    try:
        d["params"] = json.loads(params) if isinstance(params, str) and params else {}
    except Exception:
        d["params"] = {}
    return d

def apply_dq_rules(
    df: DataFrame,
    rules_df: DataFrame,
    dataset_name: Optional[str] = None
):
    """
    Apply data quality rules to a DataFrame and classify records.

    Returns:
      result_df: df + violations columns and classification flags
      good_df: records without Reject violations
      bad_df:  records with at least one Reject violation
      summary_df: counts by violation message
    """
    # Filter and collect rules to driver
    filtered = rules_df
    if dataset_name:
        filtered = filtered.filter(F.col("dataset_name") == F.lit(dataset_name))
    # if table_name:
    #     filtered = filtered.filter(F.col("table_name") == F.lit(table_name))

    rules = [ _row_to_rule_dict(r) for r in filtered.collect() ]

    # Build violation expressions
    viol_all_exprs: List[Any] = []
    viol_reject_exprs: List[Any] = []
    viol_flag_exprs: List[Any] = []

    for r in rules:
        rid = r.get("rule_id")
        rtype = r.get("rule_type")
        field = r.get("field_name")
        severity = r.get("severity", "Warning")
        action = r.get("action", "Flag")
        err = r.get("error_description", r.get("rule_description", "Rule violation"))
        p = r.get("params", {}) or {}

        # Choose invalid expression based on rule type
        if rtype == "NULL_CHECK":
            invalid = _null_invalid_expr(df, field, p.get("treat_blank_as_null", True))
        elif rtype == "INTEGER_CHECK":
            invalid = _integer_invalid_expr(
                df, field,
                min_value=p.get("min"),
                max_value=p.get("max"),
                allow_string_integers=p.get("allow_string_integers", True)
            )
        elif rtype == "DATE_CHECK":
            invalid = _date_invalid_expr(
                df, field,
                fmt=p.get("format", "yyyy-MM-dd"),
                min_date=p.get("min_date"),
                max_date=p.get("max_date")
            )
        else:
            # Unknown rule -> consider invalid to draw attention
            invalid = F.lit(True)

        msg = f"{rid} | {field} | {err} | severity={severity} | action={action}"
        msg_col = F.when(invalid, F.lit(msg)).otherwise(F.lit(None))

        viol_all_exprs.append(msg_col)
        if action.upper() == "REJECT":
            viol_reject_exprs.append(msg_col)
        else:
            viol_flag_exprs.append(msg_col)

    # Create arrays and remove nulls
    result_df = df
    if viol_all_exprs:
        result_df = result_df.withColumn("violations_all", F.array(*viol_all_exprs)) \
                             .withColumn("violations_all", F.expr("filter(violations_all, x -> x is not null)"))
    else:
        result_df = result_df.withColumn("violations_all", F.array())

    if viol_reject_exprs:
        result_df = result_df.withColumn("violations_reject", F.array(*viol_reject_exprs)) \
                             .withColumn("violations_reject", F.expr("filter(violations_reject, x -> x is not null)"))
    else:
        result_df = result_df.withColumn("violations_reject", F.array())

    if viol_flag_exprs:
        result_df = result_df.withColumn("violations_flag", F.array(*viol_flag_exprs)) \
                             .withColumn("violations_flag", F.expr("filter(violations_flag, x -> x is not null)"))
    else:
        result_df = result_df.withColumn("violations_flag", F.array())

    # Summaries and classification
    result_df = result_df.withColumn("violation_summary", F.expr("concat_ws('; ', violations_all)")) \
                         .withColumn("is_bad", F.size(F.col("violations_reject")) > F.lit(0)) \
                         .withColumn("is_flagged", F.size(F.col("violations_flag")) > F.lit(0))

    bad_df = result_df.filter(F.col("is_bad"))
    good_df = result_df.filter(~F.col("is_bad"))

    bad_df = bad_df.withColumn('violations_all', F.concat_ws('|', F.col('violations_all')))\
                .withColumn('violations_reject', F.concat_ws('|', F.col('violations_all')))\
                .withColumn('violations_flag', F.concat_ws('|', F.col('violations_all')))
    
    
    good_df = good_df.withColumn('violations_all', F.concat_ws('|', F.col('violations_all')))\
                .withColumn('violations_reject', F.concat_ws('|', F.col('violations_all')))\
                .withColumn('violations_flag', F.concat_ws('|', F.col('violations_all')))

    summary_df = result_df.select(F.explode(F.col("violations_all")).alias("violation")) \
                          .groupBy("violation").agg(F.count("*").alias("count")) \
                          .orderBy(F.desc("count"))

    return result_df, good_df, bad_df, summary_df

# -------------------------------
# ADLS write helpers
# -------------------------------

def write_records_to_adls(bad_df: DataFrame, invalid_rec_adls_path: str,good_df: DataFrame, valid_rec_adls_path: str, mode: str = "append"):
    """
    Writes bad records (including violation metadata) to ADLS.
    adls_path example: "abfss://dq@myaccount.dfs.core.windows.net/bad/my_dataset/my_table/"
    """
    try:
        bad_df.write.mode(mode).csv(invalid_rec_adls_path)
        print(f"Wrote bad records to {invalid_rec_adls_path}")
        good_df.write.mode(mode).csv(valid_rec_adls_path)
    except Exception as e:
        print(f"Failed to write to ADLS: {e}", file=sys.stderr)



StatementMeta(sparkpool1, 514, 37, Finished, Available, Finished)

In [24]:
dqm_file_path = <provide your adls path here>
rules_df = spark.read.option('header','true').option('inferSchema','true').csv(dqm_file_path)

StatementMeta(sparkpool1, 514, 19, Finished, Available, Finished)