# Generate DQX Checks

## Reference

### DQProfiler

#### [Profiling Options](https://databrickslabs.github.io/dqx/docs/reference/profiler/#profiling-options)[](url)        

| Option            | Default Value | Description                                                                 |
|-------------------|---------------|-----------------------------------------------------------------------------|
| `round`           | `True`        | Round min/max values for cleaner rules                                     |
| `max_in_count`    | `10`          | Generate `is_in` rule if distinct values < this count                      |
| `distinct_ratio`  | `0.05`        | Generate `is_in` rule if distinct values < 5% of total                     |
| `max_null_ratio`  | `0.01`        | Generate `is_not_null` rule if null values < 1% of total                   |
| `remove_outliers` | `True`        | Enable outlier detection for min/max rules                                 |
| `outlier_columns` | `[]`          | Specific columns for outlier detection (empty = all numeric)               |
| `num_sigmas`      | `3`           | Number of standard deviations for outlier detection                        |
| `trim_strings`    | `True`        | Trim whitespace from strings before analysis                               |
| `max_empty_ratio` | `0.01`        | Generate `is_not_null_or_empty` rule if empty strings < 1% of total        |
| `sample_fraction` | `0.3`         | Sample 30% of the data for profiling                                       |
| `sample_seed`     | `None`        | Seed for sampling (`None` = random)                                        |
| `limit`           | `1000`        | Maximum number of records to analyze                                       |


#### [Summary Statistics Reference](https://databrickslabs.github.io/dqx/docs/guide/data_profiling/#summary-statistics-reference)[](url) 

| Field            | Meaning                                                  | Notes                                                                 |
|------------------|----------------------------------------------------------|-----------------------------------------------------------------------|
| `count`          | Rows actually profiled (after sampling and limit)        | ≈ min(`limit`, `sample_fraction` × total_rows)                        |
| `mean`           | Arithmetic average of non-null numeric values            | N/A for non-numeric                                                   |
| `stddev`         | Sample standard deviation of non-null numeric values     | N/A for non-numeric                                                   |
| `min`            | Smallest non-null value                                  | String = lexicographic; Date/Timestamp = earliest; Numeric = minimum |
| `25 / 50 / 75`   | Approximate 25th/50th/75th percentiles of numeric values | Uses Spark approximate quantiles                                     |
| `max`            | Largest non-null value                                   | String = lexicographic; Date/Timestamp = latest; Numeric = maximum   |
| `count_non_null` | Number of non-null entries within the profiled rows      |                                                                       |
| `count_null`     | Number of null entries within the profiled rows          | `count_non_null` + `count_null` = `count`                            |


In [0]:
#NOTE: https://databrickslabs.github.io/dqx/docs/guide/data_profiling/#profiling-options 
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

# Custom profiling options
custom_options = {
    # Sampling options
    "sample_fraction": 0.2,       # Sample 20% of the data
    "sample_seed": 42,            # Seed for reproducible sampling
    "limit": 2000,                # Limit to 2000 records after sampling
    
    # Outlier detection options
    "remove_outliers": True,      # Enable outlier detection for min/max rules
    "outlier_columns": ["price", "age"],  # Only detect outliers in specific columns
    "num_sigmas": 2.5,            # Use 2.5 standard deviations for outlier detection
    
    # Null value handling
    "max_null_ratio": 0.05,       # Generate is_not_null rule if <5% nulls
    
    # String handling
    "trim_strings": True,         # Trim whitespace from strings before analysis
    "max_empty_ratio": 0.02,      # Generate is_not_null_or_empty rule if <2% empty strings
    
    # Distinct value analysis
    "distinct_ratio": 0.01,       # Generate is_in rule if <1% distinct values
    "max_in_count": 20,           # Maximum items in is_in rule list
    
    # Value rounding
    "round": True,                # Round min/max values for cleaner rules
}

ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Apply custom options for profiling a DataFrame
summary_stats, profiles = profiler.profile(input_df, options=custom_options)

# Apply custom options for profiling a table
summary_stats, profiles = profiler.profile_table(
    table="catalog1.schema1.table1",
    columns=["col1", "col2", "col3"],
    options=custom_options
)

In [0]:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

ws = WorkspaceClient()
profiler = DQProfiler(ws)

tables = [
  "dqx.bronze.table_001",
  "dqx.silver.table_001",
  "dqx.silver.table_002",
]

# Custom options with wildcard patterns
custom_table_options = [
  {
    "table": "*",  # matches all tables by pattern
    "options": {"sample_fraction": 0.5}
  },
  {
    "table": "dqx.silver.*",  # matches tables in the 'dqx.silver' schema by pattern
    "options": {"num_sigmas": 5}
  },
  {
    "table": "dqx.silver.table_*",  # matches tables in 'dqx.silver' schema and having 'table_' prefix
    "options": {"num_sigmas": 5}
  },
  {
    "table": "dqx.silver.table_002",  # matches a specific table, overrides generic option
    "options": {"sample_fraction": 0.1}
  },
]

# Profile multiple tables using custom options
results = profiler.profile_tables(tables=tables, options=custom_table_options)

# Profile multiple tables by wildcard patterns using custom options
results = profiler.profile_tables(
  patterns=["dqx.*"],
  options=custom_table_options
)

### Future Enhancments

In [0]:
# NOTE: When profiling large datasets, use sampling or limits for best performance (https://databrickslabs.github.io/dqx/docs/guide/data_profiling/#performance-considerations)

# For large datasets, use aggressive sampling
large_dataset_opts = {
    "sample_fraction": 0.01,  # Sample only 1% for very large datasets
    "limit": 10000,          # Increase limit for better statistical accuracy
    "sample_seed": 42,       # Use consistent seed for reproducible results
}

# For medium datasets, use moderate sampling
medium_dataset_opts = {
    "sample_fraction": 0.1,   # Sample 10%
    "limit": 5000,           # Reasonable limit
}

# For small datasets, disable sampling
small_dataset_opts = {
    "sample_fraction": None,  # Use all data
    "limit": None,           # No limit
}

## Implementation

In [0]:
%pip install databricks-labs-dqx==0.8.0

In [0]:
dbutils.library.restartPython()

### Generate & Load

In [0]:
# generate_checks.py
# Generate DQX checks from tables or YAML table-lists and write to YAML, table, or both.

import os
import re
import io
import json
import hashlib
import datetime
from typing import List, Optional, Dict, Any, Literal, Tuple

import yaml
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import (
    FileChecksStorageConfig,
    WorkspaceFileChecksStorageConfig,
    TableChecksStorageConfig,
    VolumeFileChecksStorageConfig,
)
from pyspark.sql import SparkSession, types as T
from pyspark.sql import DataFrame  # for type hints in show_df()

# Notebook env helper (prints banner in the notebook and returns a dict we can reuse)
from utils.print import print_notebook_env, get_notebook_path as _nb_path

# --------------------------------------------------------------------------------------
# Documentation dictionary for the generated checks table (apply on first create)
# --------------------------------------------------------------------------------------

DQX_GENERATED_CHECKS_CONFIG_METADATA: Dict[str, Any] = {
    "table": "dq_dev.dqx.checks_generated_config",  # will be overridden by the actual FQN you pass
    "table_comment": (
        "## **DQX *Generated* Checks Configuration**\n"
        "- Stores flattened rules generated by the profiler.\n"
        "- Each row is a rule; `check_id` is a stable hash of the canonical payload.\n"
        "- `generator_meta` captures the profiler options and generator settings used to create these rows.\n"
    ),
    "columns": {
        "check_id": "SHA-256 **hash** of canonical payload (stable rule identity).",
        "check_id_payload": "Canonical **JSON** used to compute `check_id`.",
        "table_name": "Fully qualified **target table** (`catalog.schema.table`).",

        "name": "Human-readable **rule name**.",
        "criticality": "Rule severity: `warn|warning|error`.",
        "check": "Structured **check** object: `{function, for_each_column, arguments}`.",
        "filter": "Optional row-level **filter** expression.",
        "run_config_name": "**Execution group/tag** for this rule.",
        "user_metadata": "User-provided **metadata** `map<string,string>`.",

        "yaml_path": "YAML **file path** that held this rule (or `<generated://...>` if ephemeral).",
        "active": "If **false**, the rule is ignored by runners.",

        # NEW (placed after active, before audit)
        "generator_meta": (
            "Array of two items: "
            "`[{section:'profile_options', payload:map}, {section:'generator_settings', payload:map}]`."
        ),

        "created_by": "Audit: **creator** of the row.",
        "created_at": "Audit: **creation timestamp** (UTC ISO).",
        "updated_by": "Audit: **last updater**.",
        "updated_at": "Audit: **last update timestamp**.",
    },
}

DQXGeneratedChecksConfig = DQX_GENERATED_CHECKS_CONFIG_METADATA

# --------------------------------------------------------------------------------------
# Schema (unified) for the generated checks table (adds generator_meta)
# --------------------------------------------------------------------------------------

DQX_GENERATED_CHECKS_CONFIG_SCHEMA = T.StructType([
    T.StructField("check_id",            T.StringType(),  False),
    T.StructField("check_id_payload",    T.StringType(),  False),
    T.StructField("table_name",          T.StringType(),  False),

    # DQX fields
    T.StructField("name",                T.StringType(),  False),
    T.StructField("criticality",         T.StringType(),  False),
    T.StructField("check", T.StructType([
        T.StructField("function",        T.StringType(),  False),
        T.StructField("for_each_column", T.ArrayType(T.StringType()), True),
        T.StructField("arguments",       T.MapType(T.StringType(), T.StringType()), True),
    ]), False),
    T.StructField("filter",              T.StringType(),  True),
    T.StructField("run_config_name",     T.StringType(),  False),
    T.StructField("user_metadata",       T.MapType(T.StringType(), T.StringType()), True),

    # Ops
    T.StructField("yaml_path",           T.StringType(),  False),
    T.StructField("active",              T.BooleanType(), False),

    # NEW: meta goes right here (as requested, before audit)
    T.StructField("generator_meta", T.ArrayType(T.StructType([
        T.StructField("section", T.StringType(), False),  # "profile_options" | "generator_settings"
        T.StructField("payload", T.MapType(T.StringType(), T.StringType()), True),
    ])), True),

    # Audit
    T.StructField("created_by",          T.StringType(),  False),
    T.StructField("created_at",          T.StringType(),  False),  # ISO string; cast downstream if needed
    T.StructField("updated_by",          T.StringType(),  True),
    T.StructField("updated_at",          T.StringType(),  True),
])

# --------------------------------------------------------------------------------------
# Constants / helpers (kept intact)
# --------------------------------------------------------------------------------------

DOC_SUPPORTED_KEYS = {
    "sample_fraction", "sample_seed", "limit",
    "remove_outliers", "outlier_columns", "num_sigmas",
    "max_null_ratio", "trim_strings", "max_empty_ratio",
    "distinct_ratio", "max_in_count", "round",
}

_YAML_PATH_RE = re.compile(r"\.(ya?ml)$", re.IGNORECASE)
_FROM_INFOSCHEMA = re.compile(r"FROM\s+([A-Za-z0-9_]+)\.information_schema\.tables", re.IGNORECASE)
_TABLE_SCHEMA_EQ = re.compile(r"table_schema\s*=\s*'([^']+)'", re.IGNORECASE)

def _is_yaml_path(p: str) -> bool:
    return bool(_YAML_PATH_RE.search(p))

def _esc_sql_comment(s: str) -> str:
    return s.replace("'", "''")

def _safe_json(obj: Any) -> str:
    return json.dumps(obj, sort_keys=True, separators=(",", ":"))

def _resolve_local_like_path(path: str) -> Optional[str]:
    """Resolve a repo/local-like path by walking up a few parents."""
    if os.path.exists(path):
        return os.path.abspath(path)
    base = os.getcwd()
    for _ in range(6):
        cand = os.path.abspath(os.path.join(base, path))
        if os.path.exists(cand):
            return cand
        parent = os.path.dirname(base)
        if parent == base:
            break
        base = parent
    return None

def _read_text_any(path: str) -> str:
    # DBFS / Volumes
    if path.startswith("dbfs:/") or path.startswith("/dbfs/") or path.startswith("/Volumes/"):
        try:
            from databricks.sdk.runtime import dbutils
        except Exception as e:
            raise RuntimeError("dbutils is required to read from DBFS/Volumes") from e
        target = path if path.startswith("dbfs:") else (f"dbfs:{path}" if path.startswith("/") else f"dbfs:/{path}")
        return dbutils.fs.head(target, 10 * 1024 * 1024)

    # Workspace Files (absolute)
    if path.startswith("/"):
        wc = WorkspaceClient()
        try:
            data = wc.files.download(file_path=path).read()
        except TypeError:
            data = wc.files.download(path=path).read()
        return data.decode("utf-8")

    # Local / repo-relative
    resolved = _resolve_local_like_path(path)
    if resolved and os.path.isfile(resolved):
        with open(resolved, "r", encoding="utf-8") as fh:
            return fh.read()

    raise FileNotFoundError(f"Could not read file: {path}")

def _ensure_parent_local(path: str) -> None:
    parent = os.path.dirname(path)
    if parent and not os.path.exists(parent):
        os.makedirs(parent, exist_ok=True)

def _to_dbfs_target(path: str) -> str:
    if path.startswith("dbfs:/"):
        return path
    if path.startswith("/dbfs/") or path.startswith("/Volumes/"):
        return "dbfs:" + path
    return path

def _write_text_any(path: str, payload: str) -> None:
    # DBFS / Volumes
    if path.startswith("dbfs:/") or path.startswith("/dbfs/") or path.startswith("/Volumes/"):
        try:
            from databricks.sdk.runtime import dbutils
        except Exception:
            raise RuntimeError("dbutils is required to write to DBFS/Volumes.")
        target = path if path.startswith("dbfs:/") else (f"dbfs:{path}" if not path.startswith("dbfs:") else path)
        parent = target.rsplit("/", 1)[0]
        if parent:
            dbutils.fs.mkdirs(parent)
        dbutils.fs.put(target, payload, True)
        return

    # Workspace Files
    if path.startswith("/"):
        wc = WorkspaceClient()
        try:
            wc.files.upload(file_path=path, contents=payload.encode("utf-8"), overwrite=True)
        except TypeError:
            wc.files.upload(path=path, contents=payload.encode("utf-8"), overwrite=True)
        return

    # Local (Repos/driver)
    full = os.path.abspath(path)
    _ensure_parent_local(full)
    with open(full, "w", encoding="utf-8") as fh:
        fh.write(payload)

def _parse_global_hints_from_comments(text: str) -> Tuple[Optional[str], Optional[str]]:
    m_cat = _FROM_INFOSCHEMA.search(text)
    m_sch = _TABLE_SCHEMA_EQ.search(text)
    return (m_cat.group(1) if m_cat else None, m_sch.group(1) if m_sch else None)

def _ensure_fqns(names: List[str], hint_catalog: Optional[str], hint_schema: Optional[str]) -> List[str]:
    out: List[str] = []
    for n in names:
        n = n.strip()
        if not n:
            continue
        parts = n.split(".")
        if len(parts) == 3:
            out.append(n)
        elif len(parts) == 2:
            if not hint_catalog:
                raise ValueError(f"'{n}' lacks catalog; add it or provide a comment default.")
            out.append(f"{hint_catalog}.{n}")
        elif len(parts) == 1:
            if not (hint_catalog and hint_schema):
                raise ValueError(f"'{n}' needs catalog & schema; add comments or use dotted forms.")
            out.append(f"{hint_catalog}.{hint_schema}.{n}")
        else:
            raise ValueError(f"Unrecognized table format: {n}")
    return sorted(set(out))

def _discover_tables_from_yaml_file(yaml_path: str) -> List[str]:
    """YAML contains e.g. `table_name: [a, b, c]`. Comments can hint catalog/schema."""
    text = _read_text_any(yaml_path)
    cat_hint, sch_hint = _parse_global_hints_from_comments(text)
    obj = yaml.safe_load(io.StringIO(text))
    if not isinstance(obj, dict):
        raise ValueError(f"YAML must contain a mapping with a list; got: {type(obj).__name__}")
    names = None
    for key in ("table_name", "tables", "table_names", "list"):
        if isinstance(obj.get(key), list):
            names = [str(x).strip() for x in obj[key] if x]
            break
    if not names:
        raise ValueError(f"No table list found in YAML: {yaml_path}")
    return _ensure_fqns(names, cat_hint, sch_hint)

def _prefix_of(table_fqn: str) -> str:
    """Prefix up to first underscore of the *table* portion."""
    base = table_fqn.split(".")[-1]
    return base.split("_", 1)[0].lower() if base else ""

def _filter_by_prefix_regex(tables: List[str], exclude_prefix_regex: Optional[str]) -> List[str]:
    if not exclude_prefix_regex:
        return tables
    pat = re.compile(exclude_prefix_regex, re.IGNORECASE)
    keep: List[str] = []
    for t in tables:
        if not pat.search(_prefix_of(t)):
            keep.append(t)
    return keep

def _display_table_preview(spark: SparkSession, fqns: List[str], title: str = "Resolved Tables") -> None:
    rows = [(f, *f.split(".")) for f in fqns]
    df = spark.createDataFrame(rows, "fqn string, catalog string, schema string, table string")
    print(f"\n=== {title} ({len(fqns)}) ===")
    try:
        display(df)
    except NameError:
        df.show(len(fqns), truncate=False)

def _now_iso() -> str:
    return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"

def _stringify_map_values(d: Optional[Dict[str, Any]]) -> Dict[str, str]:
    out: Dict[str, str] = {}
    for k, v in (d or {}).items():
        if isinstance(v, (list, dict)):
            out[k] = _safe_json(v)
        elif isinstance(v, bool):
            out[k] = "true" if v else "false"
        elif v is None:
            out[k] = "null"
        else:
            out[k] = str(v)
    return out

def _compute_check_id_payload(table_name: str, check_dict: Dict[str, Any], filter_str: Optional[str]) -> str:
    def _canon_filter(s: Optional[str]) -> str:
        return "" if not s else " ".join(str(s).split())

    def _canon_check(chk: Dict[str, Any]) -> Dict[str, Any]:
        out = {"function": chk.get("function"), "for_each_column": None, "arguments": {}}
        fec = chk.get("for_each_column")
        if isinstance(fec, list):
            out["for_each_column"] = sorted([str(x) for x in fec]) or None
        args = chk.get("arguments") or {}
        canon_args: Dict[str, str] = {}
        for k, v in args.items():
            sv = "" if v is None else str(v).strip()
            if (sv.startswith("{") and sv.endswith("}")) or (sv.startswith("[") and sv.endswith("]")):
                try:
                    sv = _safe_json(json.loads(sv))
                except Exception:
                    pass
            canon_args[str(k)] = sv
        out["arguments"] = {k: canon_args[k] for k in sorted(canon_args)}
        return out

    payload_obj = {
        "table_name": (table_name or "").lower(),
        "filter": _canon_filter(filter_str),
        "check": _canon_check(check_dict or {}),
    }
    return _safe_json(payload_obj)

def _compute_check_id(payload: str) -> str:
    return hashlib.sha256(payload.encode("utf-8")).hexdigest()

# --------------------------------------------------------------------------------------
# Public aliases (you asked to keep these helper names around)
# --------------------------------------------------------------------------------------

def DisplayTablePreview(spark: SparkSession, fqns: List[str], title: str = "Resolved Tables") -> None:
    return _display_table_preview(spark, fqns, title)

def NowISO() -> str:
    return _now_iso()

def GetNotebookPath() -> str:
    return _nb_path()

def PathStartsWith(s: str, *prefixes: str) -> bool:
    return any(s.startswith(p) for p in prefixes)

def ResolveLocalLikePath(p: str) -> Optional[str]:
    return _resolve_local_like_path(p)

def DiscoverTablesFromYAML(yaml_path: str) -> List[str]:
    return _discover_tables_from_yaml_file(yaml_path)

def ListYAMLPathsInFolder(folder: str) -> List[str]:
    # DBFS/Volumes
    out: List[str] = []
    if folder.startswith("dbfs:/") or folder.startswith("/dbfs/") or folder.startswith("/Volumes/"):
        try:
            from databricks.sdk.runtime import dbutils
        except Exception:
            raise RuntimeError("dbutils is required to traverse DBFS/Volumes.")
        root = _to_dbfs_target(folder)
        def _walk_dbfs(dirpath: str):
            for fi in dbutils.fs.ls(dirpath):
                p = fi.path
                if p.endswith("/"):
                    _walk_dbfs(p)
                elif _is_yaml_path(p):
                    out.append(p)
        _walk_dbfs(root)
        return out

    # Workspace Files -> best-effort local resolve for recursion
    if folder.startswith("/"):
        resolved = _resolve_local_like_path(folder)
        if resolved and os.path.isdir(resolved):
            for r, _, files in os.walk(resolved):
                for f in files:
                    if _is_yaml_path(f):
                        out.append(os.path.join(r, f))
            return out
        return out

    # Local / repo
    resolved = _resolve_local_like_path(folder)
    if resolved and os.path.isdir(resolved):
        for r, _, files in os.walk(resolved):
            for f in files:
                if _is_yaml_path(f):
                    out.append(os.path.join(r, f))
    return out

# --------------------------------------------------------------------------------------
# Pretty display helpers (as requested)
# --------------------------------------------------------------------------------------

def _can_display() -> bool:
    return "display" in globals()

def show_df(df: DataFrame, n: int = 100, truncate: bool = False) -> None:
    if _can_display():
        display(df.limit(n))
    else:
        df.show(n, truncate=truncate)

def display_section(title: str) -> None:
    print("\n" + "═" * 80)
    print(f"║ {title}")
    print("═" * 80)

# --------------------------------------------------------------------------------------
# Column comment helper with robust multi-syntax fallback
# --------------------------------------------------------------------------------------

def _apply_column_comment_with_fallback(
    spark: SparkSession,
    cat: str,
    sch: str,
    tbl: str,
    col_name: str,
    comment_text: str,
    col_types_lower: Dict[str, str],
) -> bool:
    """Try COMMENT ON COLUMN, then ALTER ... ALTER COLUMN, then CHANGE COLUMN with type."""
    fqn_q = f"`{cat}`.`{sch}`.`{tbl}`"
    col_q = f"`{col_name}`"
    cmt = _esc_sql_comment(comment_text)

    # 1) COMMENT ON COLUMN
    try:
        spark.sql(f"COMMENT ON COLUMN {fqn_q}.{col_q} IS '{cmt}'")
        return True
    except Exception as e1:
        pass

    # 2) ALTER TABLE ... ALTER COLUMN ... COMMENT
    try:
        spark.sql(f"ALTER TABLE {fqn_q} ALTER COLUMN {col_q} COMMENT '{cmt}'")
        return True
    except Exception as e2:
        pass

    # 3) ALTER TABLE ... CHANGE COLUMN col col <type> COMMENT ...
    dtype = col_types_lower.get(col_name.lower())
    if not dtype:
        print(f"[WARN] Cannot determine data type for {cat}.{sch}.{tbl}.{col_name}; skipping column comment.")
        return False
    try:
        spark.sql(f"ALTER TABLE {fqn_q} CHANGE COLUMN {col_q} {col_q} {dtype} COMMENT '{cmt}'")
        return True
    except Exception as e3:
        print(f"[WARN] Failed to set comment for {cat}.{sch}.{tbl}.{col_name}: {e3}")
        return False

# --------------------------------------------------------------------------------------
# Table documentation application
#  - Table comment applied when just-created
#  - Column comments applied ALWAYS using robust fallback (fixes your error)
# --------------------------------------------------------------------------------------

def _apply_table_documentation_on_create(spark: SparkSession, table_fqn: str, doc: Dict[str, Any], just_created: bool):
    try:
        cat, sch, tbl = table_fqn.split(".")
    except ValueError:
        return

    table_comment = (doc or {}).get("table_comment") or ""
    if table_comment and just_created:
        spark.sql(
            f"COMMENT ON TABLE `{cat}`.`{sch}`.`{tbl}` IS '{_esc_sql_comment(table_comment)}'"
        )

    cols_doc: Dict[str, str] = (doc or {}).get("columns") or {}
    if not cols_doc:
        return

    # Discover existing columns and their types (for fallback syntax)
    desc_rows = spark.sql(f"DESCRIBE TABLE `{cat}`.`{sch}`.`{tbl}`").collect()
    existing_cols = {}
    col_types = {}
    for r in desc_rows:
        if r.col_name and not r.col_name.startswith("#"):
            existing_cols[r.col_name.lower()] = True
            # Some rows may not have data_type; guard defensively
            if hasattr(r, "data_type") and r.data_type:
                col_types[r.col_name.lower()] = r.data_type

    # Apply column comments when columns exist
    for col_name, cmt in cols_doc.items():
        if col_name.lower() not in existing_cols:
            continue
        _apply_column_comment_with_fallback(
            spark, cat, sch, tbl, col_name, cmt, col_types_lower=col_types
        )

# --------------------------------------------------------------------------------------
# Main generator
# --------------------------------------------------------------------------------------

class CheckGenerator:
    def __init__(
        self,
        scope: str,                         # "pipeline" | "catalog" | "schema" | "table" | "file" | "folder"
        source: str,                        # depends on scope
        output_format: str,                 # "yaml" | "table" | "both"
        output_yaml: Optional[str],         # folder or /Volumes/... or dbfs:/... or workspace "/..."
        output_table: Optional[str],        # fully-qualified table FQN
        profile_options: Dict[str, Any],
        exclude_prefix_regex: Optional[str] = None,   # regex on table prefix (before first "_")
        created_by: Optional[str] = "LMG",
        columns: Optional[List[str]] = None,          # only valid when scope=="table"
        run_config_name: str = "default",
        criticality: str = "warn",
        key_order: Literal["engine", "custom"] = "custom",
        include_table_name: bool = True,
        yaml_metadata: bool = False,                  # add commented header on each YAML
        table_doc: Optional[Dict[str, Any]] = None,   # documentation dict; defaults to DQX_GENERATED_CHECKS_CONFIG_METADATA
    ):
        self.scope = scope.lower().strip()
        self.source = source
        self.output_format = output_format.lower().strip()
        self.output_yaml = output_yaml
        self.output_table = output_table
        self.profile_options = profile_options or {}
        self.exclude_prefix_regex = exclude_prefix_regex
        self.created_by = created_by
        self.columns = columns
        self.run_config_name = run_config_name
        self.criticality = criticality
        self.key_order = key_order
        self.include_table_name = include_table_name
        self.yaml_metadata = yaml_metadata
        self.table_doc = table_doc or DQX_GENERATED_CHECKS_CONFIG_METADATA

        self.spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
        self._validate_top_level()

    # -------------------
    # Validation
    # -------------------
    def _validate_top_level(self):
        allowed_scopes = {"pipeline", "catalog", "schema", "table", "file", "folder"}
        if self.scope not in allowed_scopes:
            raise ValueError(f"Invalid scope '{self.scope}'. Must be one of: {sorted(allowed_scopes)}.")

        allowed_formats = {"yaml", "table", "both"}
        if self.output_format not in allowed_formats:
            raise ValueError("output_format must be 'yaml' or 'table' or 'both'.")

        # Source expectations per scope (explicit notes)
        # scope="pipeline" -> source="pipeline_name1,pipeline_name2"
        # scope="catalog"  -> source="catalog"
        # scope="schema"   -> source="catalog.schema"
        # scope="table"    -> source="catalog.schema.table[,catalog.schema.table]"
        # scope="file"     -> source="<path to YAML file listing tables>"
        # scope="folder"   -> source="<path to folder with YAML table lists>"
        if self.scope == "catalog":
            if "." in self.source:
                raise ValueError("For scope='catalog', pass just the catalog name (no dots).")
        if self.scope == "schema":
            if self.source.count(".") != 1:
                raise ValueError("For scope='schema', pass 'catalog.schema'.")
        if self.scope == "table":
            for t in [x.strip() for x in self.source.split(",") if x.strip()]:
                if t.count(".") != 2:
                    raise ValueError(f"Invalid table FQN '{t}'. Use catalog.schema.table")
        if self.scope == "file":
            if not _is_yaml_path(self.source):
                raise ValueError("For scope='file', source must be a YAML path.")
        # folder: any path is ok; we'll scan recursively

        # Sinks
        if self.output_format == "yaml" and not self.output_yaml:
            raise ValueError("output_yaml is required when output_format='yaml'.")
        if self.output_format == "table" and not self.output_table:
            raise ValueError("output_table is required when output_format='table'.")
        if self.output_format == "both":
            if not self.output_yaml or not self.output_table:
                raise ValueError("When output_format='both', both output_yaml and output_table are required.")

    # -------------------
    # Discovery
    # -------------------
    def _walk_yaml_files(self, folder: str) -> List[str]:
        return ListYAMLPathsInFolder(folder)

    def _discover_tables(self) -> List[str]:
        print("\n===== PARAMETERS PASSED THIS RUN =====")
        print(f"scope:            {self.scope}")
        print(f"source:           {self.source}")
        print(f"output_format:    {self.output_format}")
        print(f"output_yaml:      {self.output_yaml}")
        print(f"output_table:     {self.output_table}")
        print(f"exclude_prefix_rx:{self.exclude_prefix_regex}")
        print(f"created_by:       {self.created_by}")
        print(f"columns:          {self.columns}")
        print(f"run_config_name:  {self.run_config_name}")
        print(f"criticality:      {self.criticality}")
        print(f"key_order:        {self.key_order}")
        print(f"include_table_name: {self.include_table_name}")
        print(f"yaml_metadata:    {self.yaml_metadata}")
        print("profile_options:")
        for k, v in self.profile_options.items():
            print(f"  {k}: {v}")
        print("======================================\n")

        discovered: List[str] = []

        if self.scope == "pipeline":
            print("Searching for pipeline output tables...")
            ws = WorkspaceClient()
            pipeline_names = [p.strip() for p in self.source.split(",") if p.strip()]
            for pipeline_name in pipeline_names:
                pls = list(ws.pipelines.list_pipelines())
                pl = next((p for p in pls if p.name == pipeline_name), None)
                if not pl:
                    raise RuntimeError(f"Pipeline '{pipeline_name}' not found via SDK.")
                latest_update = pl.latest_updates[0].update_id
                events = ws.pipelines.list_pipeline_events(pipeline_id=pl.pipeline_id, max_results=250)
                pipeline_tables = [
                    getattr(ev.origin, "flow_name", None)
                    for ev in events
                    if getattr(ev.origin, "update_id", None) == latest_update and getattr(ev.origin, "flow_name", None)
                ]
                discovered += [x for x in pipeline_tables if x]

        elif self.scope == "catalog":
            print("Discovering all tables in catalog...")
            catalog = self.source.strip()
            schemas = [row.namespace for row in self.spark.sql(f"SHOW SCHEMAS IN {catalog}").collect()]
            for s in schemas:
                tbls = self.spark.sql(f"SHOW TABLES IN {catalog}.{s}").collect()
                discovered += [f"{catalog}.{s}.{row.tableName}" for row in tbls]

        elif self.scope == "schema":
            print("Discovering all tables in schema...")
            catalog, schema = self.source.strip().split(".")
            tbls = self.spark.sql(f"SHOW TABLES IN {catalog}.{schema}").collect()
            discovered = [f"{catalog}.{schema}.{row.tableName}" for row in tbls]

        elif self.scope == "table":
            print("Using provided fully-qualified table(s)...")
            discovered = [t.strip() for t in self.source.split(",") if t.strip()]

        elif self.scope == "file":
            print("Reading table list from YAML file...")
            discovered = _discover_tables_from_yaml_file(self.source)

        else:  # folder
            print("Reading table lists from all YAML files in folder (recursive)...")
            yaml_files = self._walk_yaml_files(self.source)
            agg: List[str] = []
            for yp in yaml_files:
                try:
                    agg += _discover_tables_from_yaml_file(yp)
                except Exception as e:
                    print(f"[WARN] Skipping YAML '{yp}': {e}")
            discovered = sorted(set(agg))

        discovered = _filter_by_prefix_regex(discovered, self.exclude_prefix_regex)
        _display_table_preview(self.spark, discovered, title="Final table list to generate DQX rules for")
        print("==========================================\n")
        return discovered

    # -------------------
    # Profiler call args
    # -------------------
    def _profile_call_kwargs(self) -> Dict[str, Any]:
        kwargs: Dict[str, Any] = {}
        if self.columns is not None:
            kwargs["cols"] = self.columns
        if self.profile_options:
            unknown = sorted(set(self.profile_options) - DOC_SUPPORTED_KEYS)
            if unknown:
                print(f"[INFO] Profiling options not in current docs (passing through anyway): {unknown}")
            kwargs["options"] = self.profile_options
        return kwargs

    # -------------------
    # Rule shaping
    # -------------------
    def _dq_constraint_to_check(self, rule_name: str, constraint_sql: str, table_name: str) -> Dict[str, Any]:
        d = {
            "name": rule_name,
            "criticality": self.criticality,
            "run_config_name": self.run_config_name,
            "check": {
                "function": "sql_expression",
                "arguments": {
                    # remove duplicate 'name' from arguments per your preference
                    "expression": constraint_sql,
                }
            },
        }
        if self.include_table_name:
            d = {"table_name": table_name, **d}  # keep table_name first
        return d

    # -------------------
    # YAML emission (header + list items with blank lines)
    # -------------------
    def _yaml_header_block(self, table_fqn: str, env_info: Dict[str, Any]) -> str:
        dashed = "-" * 81
        lines = [
            "#" * 76,
            f"# GENERATED DQX CHECKS",
            f"# Table: {table_fqn}",
            f"# Generated at (UTC): {env_info.get('utc_time','')}",
            f"# Notebook: {env_info.get('notebook_path','Unknown')}",
            f"# Spark: {env_info.get('spark_version','')}  |  Python: {env_info.get('python_version','')}",
            f"# Cluster: {env_info.get('cluster_name','')} ({env_info.get('cluster_id','')})  |  Executor memory: {env_info.get('executor_memory','')}",
            "#" * 76,
            "",
            f"# {dashed}",
            "# Profile options:",
            "# " + _safe_json(self.profile_options),
            "# Generator settings:",
            "# " + _safe_json({
                "scope": self.scope,
                "source": self.source,
                "output_format": self.output_format,
                "output_yaml": self.output_yaml,
                "output_table": self.output_table,
                "criticality": self.criticality,
                "run_config_name": self.run_config_name,
                "include_table_name": self.include_table_name,
                "key_order": self.key_order,
                "exclude_prefix_regex": self.exclude_prefix_regex,
            }),
        ]
        return "\n".join(lines) + "\n\n"

    def _dump_rules_as_yaml_stream(self, rules: List[Dict[str, Any]]) -> str:
        """
        Emit a single YAML document that is a list of rule objects:
        - table_name: ...
          name: ...
          ...
        (Blank line between items for readability.)
        """
        pieces: List[str] = []
        for r in rules:
            block = yaml.safe_dump(r, sort_keys=False, default_flow_style=False).rstrip()
            lines = block.splitlines()
            if not lines:
                continue
            first = f"- {lines[0]}"
            rest = "\n".join(("  " + ln) for ln in lines[1:])
            pieces.append(first + ("\n" + rest if rest else ""))
        return "\n\n".join(pieces) + "\n"

    # -------------------
    # Table write helpers
    # -------------------
    def _ensure_schema_exists(self, fqn: str):
        cat, sch, _ = fqn.split(".")
        self.spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{cat}`.`{sch}`")

    def _write_rows_to_table(self, fqn: str, rows: List[Dict[str, Any]], mode: str = "append"):
        self._ensure_schema_exists(fqn)
        existed = self.spark.catalog.tableExists(fqn)
        if not existed:
            # create empty table with correct schema first, then apply docs
            empty_df = self.spark.createDataFrame([], DQX_GENERATED_CHECKS_CONFIG_SCHEMA)
            empty_df.write.format("delta").mode("overwrite").saveAsTable(fqn)
        # Apply docs (table comment only on create; column comments always with fallback)
        _apply_table_documentation_on_create(self.spark, fqn, {**self.table_doc, "table": fqn}, just_created=(not existed))

        df = self.spark.createDataFrame(rows, schema=DQX_GENERATED_CHECKS_CONFIG_SCHEMA)
        df.write.format("delta").mode(mode).saveAsTable(fqn)
        print(f"[WRITE] {len(rows)} rows -> {fqn} ({mode})")

    # -------------------
    # Summary display
    # -------------------
    def _show_summary_table(self, summary: Dict[str, Dict[str, Any]]):
        if not summary:
            display_section("Checks written per table")
            print("(no tables processed)")
            return
        rows = []
        for t, s in summary.items():
            rows.append((
                s.get("table_name", t),
                int(s.get("checks_generated", 0)),
                bool(s.get("wrote_yaml", False)),
                s.get("yaml_path", None),
                int(s.get("table_rows_written", 0)),
                self.output_table or "",
            ))
        schema = "table_name string, checks_generated int, wrote_yaml boolean, yaml_path string, table_rows_written int, output_table string"
        df = self.spark.createDataFrame(rows, schema=schema)
        display_section("Checks written per table")
        show_df(df.orderBy("table_name"))

    # -------------------
    # Main
    # -------------------
    def run(self):
        dq_engine = DQEngine(WorkspaceClient())
        profiler = DQProfiler(WorkspaceClient())
        generator = DQDltGenerator(WorkspaceClient())

        env_info = print_notebook_env(self.spark)  # prints banner and returns dict
        call_kwargs = self._profile_call_kwargs()
        tables = self._discover_tables()

        all_rows_for_table_sink: List[Dict[str, Any]] = []
        written_yaml_paths: List[str] = []

        # Per-table summary tracking
        per_table_summary: Dict[str, Dict[str, Any]] = {}

        for fq in tables:
            if fq.count(".") != 2:
                print(f"[WARN] Skipping invalid table name: {fq}")
                continue

            cat, sch, tab = fq.split(".")
            per_table_summary.setdefault(fq, {
                "table_name": fq,
                "checks_generated": 0,
                "wrote_yaml": False,
                "yaml_path": None,
                "table_rows_written": 0,
            })

            try:
                self.spark.table(fq).limit(1).collect()  # readability probe
            except Exception as e:
                print(f"[WARN] Table not readable: {fq} -> {e}")
                continue

            # Profile & generate DLT rules
            try:
                df = self.spark.table(fq)
                _, profiles = profiler.profile(df, **call_kwargs)
                rules_dict = generator.generate_dlt_rules(profiles, language="Python_Dict")
            except Exception as e:
                print(f"[WARN] Profiling/rule-gen failed for {fq}: {e}")
                continue

            # Shape checks
            checks: List[Dict[str, Any]] = []
            for rule_name, constraint_sql in (rules_dict or {}).items():
                checks.append(self._dq_constraint_to_check(rule_name, constraint_sql, fq))

            per_table_summary[fq]["checks_generated"] = len(checks)

            if not checks:
                print(f"[INFO] No checks generated for {fq}.")
                continue

            # YAML sink
            yaml_path_for_rows: str = f"<generated://{fq}>"
            if self.output_format in {"yaml", "both"}:
                if self.output_yaml.endswith((".yaml", ".yml")):
                    path = self.output_yaml  # explicit file path (edge case)
                else:
                    path = f"{self.output_yaml.rstrip('/')}/{tab}.yaml"

                if self.key_order == "engine":
                    cfg = self._infer_file_storage_config(path)
                    dq_engine.save_checks(checks, config=cfg)  # writes list-of-dicts
                else:
                    header = self._yaml_header_block(fq, env_info) if self.yaml_metadata else ""
                    body = self._dump_rules_as_yaml_stream(checks)
                    _write_text_any(path, header + body)

                yaml_path_for_rows = path
                written_yaml_paths.append(path)
                per_table_summary[fq]["wrote_yaml"] = True
                per_table_summary[fq]["yaml_path"] = path
                print(f"[RUN] Wrote {len(checks)} rule(s) to YAML: {path}")

            # Prepare table rows (for TABLE only path; BOTH path reloads YAMLs to avoid drift)
            if self.output_format == "table":
                gen_meta = [
                    {"section": "profile_options", "payload": _stringify_map_values(self.profile_options)},
                    {"section": "generator_settings", "payload": _stringify_map_values({
                        "scope": self.scope, "source": self.source, "output_format": self.output_format,
                        "output_yaml": self.output_yaml or "", "output_table": self.output_table or "",
                        "criticality": self.criticality, "run_config_name": self.run_config_name,
                        "include_table_name": self.include_table_name, "key_order": self.key_order,
                        "exclude_prefix_regex": self.exclude_prefix_regex or "",
                    })},
                ]
                for rule in checks:
                    raw_check = rule["check"]
                    payload = _compute_check_id_payload(fq, raw_check, rule.get("filter"))
                    all_rows_for_table_sink.append({
                        "check_id": _compute_check_id(payload),
                        "check_id_payload": payload,
                        "table_name": fq,
                        "name": rule["name"],
                        "criticality": rule["criticality"],
                        "check": {
                            "function": raw_check.get("function"),
                            "for_each_column": raw_check.get("for_each_column"),
                            "arguments": _stringify_map_values(raw_check.get("arguments") or {}),
                        },
                        "filter": rule.get("filter"),
                        "run_config_name": rule["run_config_name"],
                        "user_metadata": _stringify_map_values(rule.get("user_metadata") or None) or None,
                        "yaml_path": yaml_path_for_rows,
                        "active": True,
                        "generator_meta": gen_meta,
                        "created_by": self.created_by,
                        "created_at": _now_iso(),
                        "updated_by": None,
                        "updated_at": None,
                    })

        # BOTH → reload the exact YAMLs we wrote and write those rows (canonical)
        if self.output_format == "both":
            rows_from_yaml: List[Dict[str, Any]] = []
            for yp in written_yaml_paths:
                try:
                    txt = _read_text_any(yp)
                    docs = list(yaml.safe_load_all(io.StringIO(txt)))
                    rules: List[dict] = []
                    for d in docs:
                        if not d:
                            continue
                        if isinstance(d, dict):
                            rules.append(d)
                        elif isinstance(d, list):
                            rules.extend([x for x in d if isinstance(x, dict)])

                    # If file is a single list (our custom format), docs will be [list]; handled above.
                    for r in rules:
                        fq = r.get("table_name")
                        raw_check = r.get("check") or {}
                        payload = _compute_check_id_payload(fq, raw_check, r.get("filter"))
                        row_obj = {
                            "check_id": _compute_check_id(payload),
                            "check_id_payload": payload,
                            "table_name": fq,
                            "name": r.get("name"),
                            "criticality": r.get("criticality"),
                            "check": {
                                "function": raw_check.get("function"),
                                "for_each_column": raw_check.get("for_each_column"),
                                "arguments": _stringify_map_values(raw_check.get("arguments") or {}),
                            },
                            "filter": r.get("filter"),
                            "run_config_name": r.get("run_config_name", self.run_config_name),
                            "user_metadata": _stringify_map_values(r.get("user_metadata") or None) or None,
                            "yaml_path": yp,
                            "active": True,
                            "generator_meta": [
                                {"section": "profile_options", "payload": _stringify_map_values(self.profile_options)},
                                {"section": "generator_settings", "payload": _stringify_map_values({
                                    "scope": self.scope, "source": self.source, "output_format": self.output_format,
                                    "output_yaml": self.output_yaml or "", "output_table": self.output_table or "",
                                    "criticality": self.criticality,
                                    "run_config_name": r.get("run_config_name", self.run_config_name),
                                    "include_table_name": self.include_table_name, "key_order": self.key_order,
                                    "exclude_prefix_regex": self.exclude_prefix_regex or "",
                                })},
                            ],
                            "created_by": self.created_by,
                            "created_at": _now_iso(),
                            "updated_by": None,
                            "updated_at": None,
                        }
                        rows_from_yaml.append(row_obj)
                        # count per-table table_rows_written (we'll add to summary after the write too)
                        if fq in per_table_summary:
                            per_table_summary[fq]["table_rows_written"] = per_table_summary[fq].get("table_rows_written", 0) + 1
                        else:
                            per_table_summary[fq] = {
                                "table_name": fq,
                                "checks_generated": 0,
                                "wrote_yaml": True,
                                "yaml_path": yp,
                                "table_rows_written": 1,
                            }
                except Exception as e:
                    print(f"[WARN] Could not load back YAML '{yp}' for table sink: {e}")

            if self.output_table and rows_from_yaml:
                self._write_rows_to_table(self.output_table, rows_from_yaml, mode="append")
            print(f"[DONE] Wrote YAML files ({len(written_yaml_paths)}). Then loaded {len(rows_from_yaml)} rows into {self.output_table}.")

        elif self.output_format == "table":
            if self.output_table and all_rows_for_table_sink:
                # update per-table counts before write
                for r in all_rows_for_table_sink:
                    fq = r["table_name"]
                    per_table_summary.setdefault(fq, {
                        "table_name": fq,
                        "checks_generated": 0,
                        "wrote_yaml": False,
                        "yaml_path": None,
                        "table_rows_written": 0,
                    })
                    per_table_summary[fq]["table_rows_written"] = per_table_summary[fq].get("table_rows_written", 0) + 1

                self._write_rows_to_table(self.output_table, all_rows_for_table_sink, mode="append")
            print(f"[DONE] Wrote {len(all_rows_for_table_sink)} rows into {self.output_table}.")
        else:
            print(f"[DONE] Wrote YAML files ({len(written_yaml_paths)}).")

        # Print a nice per-table summary
        self._show_summary_table(per_table_summary)

    # Storage config passthrough (kept)
    @staticmethod
    def _infer_file_storage_config(file_path: str):
        if file_path.startswith("/Volumes/"):
            return VolumeFileChecksStorageConfig(location=file_path)
        if file_path.startswith("/"):
            return WorkspaceFileChecksStorageConfig(location=file_path)
        return FileChecksStorageConfig(location=file_path)

    @staticmethod
    def _table_storage_config(table_fqn: str, run_config_name: Optional[str] = None, mode: str = "append"):
        return TableChecksStorageConfig(location=table_fqn, run_config_name=run_config_name, mode=mode)


# -------------------- Usage examples --------------------
if __name__ == "__main__":
    profile_options = {
        "sample_fraction": 0.3,
        "sample_seed": 42,
        "limit": 1000,
        "remove_outliers": True,
        "outlier_columns": [],
        "num_sigmas": 3,
        "max_null_ratio": 0.01,
        "trim_strings": True,
        "max_empty_ratio": 0.01,
        "distinct_ratio": 0.05,
        "max_in_count": 10,
        "round": True,
        # other passthrough keys are fine
    }

    # Example A — scope="table": write BOTH (YAMLs first, then load exactly those YAMLs into the table)
    CheckGenerator(
        scope="table",                                  # "pipeline" | "catalog" | "schema" | "table" | "file" | "folder"
        source="dq_prd.monitoring.job_run_audit",
        output_format="yaml",                           # "yaml" | "table" | "both"
        output_yaml="/Volumes/dq_dev/dqx/generated_checks/",  # Workspace Folder or Volume
        yaml_metadata=True,      #(yaml_modifier)      # True = Add run metadata to file | False = Don't add metadata to file
        key_order="custom",      #(yaml_modifier)      # "custom" = our ordered YAML with list items; "engine" = DQX default writer
        include_table_name=True, #(yaml_modifier)
        output_table="dq_dev.dqx.generated_checks_config",  # 'Table_fqn'  | 'None' if not writing to table
        profile_options=profile_options,
        exclude_prefix_regex=r"^tama",               # exclude tables whose prefix (before _) matches
        created_by="LMG",          #(table_modifier)      # populates 'created_by' column
        columns=None,              #(check_modifier)      # only valid when scope=="table"
        run_config_name="default", #(check_modifier)
        criticality="error",       #(check_modifier)       # "error" | "warn"
        table_doc=DQX_GENERATED_CHECKS_CONFIG_METADATA,   # used if/when we create the output table
    ).run()

    """
    # Example B — scope="catalog": write BOTH (YAMLs first, then load those YAMLs into table)
    CheckGenerator(
        scope="catalog",
        source="de_prd",
        output_format="both",
        output_yaml="dbfs:/mnt/dqx/generated_checks/de_prd",
        output_table="dq_dev.dqx.checks_generated_config",
        profile_options=profile_options,
        exclude_prefix_regex=r"^tamarack$",
        created_by="LMG",
        columns=None,
        run_config_name="default",
        criticality="error",
        key_order="custom",
        include_table_name=True,
        yaml_metadata=True,
        table_doc=DQX_GENERATED_CHECKS_CONFIG_METADATA,
    ).run()

    # Example C — scope="table": write directly to table (no YAML)
    CheckGenerator(
        scope="table",
        source="dq_prd.monitoring.job_run_audit",
        output_format="table",
        output_yaml=None,
        output_table="dq_dev.dqx.checks_generated_config",
        profile_options=profile_options,
        exclude_prefix_regex=None,
        created_by="LMG",
        columns=None,
        run_config_name="default",
        criticality="error",
        key_order="custom",
        include_table_name=True,
        yaml_metadata=False,
        table_doc=DQX_GENERATED_CHECKS_CONFIG_METADATA,
    ).run()
    """

### Run

In [0]:
# === DQX runner with table prefix filtering (glob) ===
from __future__ import annotations
from typing import Dict, Any, List, Optional, Tuple
import json, fnmatch

from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
from pyspark.sql import SparkSession, DataFrame, Row, functions as F, types as T

from utils.color import Color

# -------------------
# Display helpers
# -------------------
def _can_display() -> bool:
    return "display" in globals()

def show_df(df: DataFrame, n: int = 100, truncate: bool = False) -> None:
    if _can_display():
        display(df.limit(n))
    else:
        df.show(n, truncate=truncate)

def display_section(title: str) -> None:
    print("\n" + f"{Color.b}{Color.light_seafoam}═{Color.r}" *80)
    print(f"{Color.b}{Color.light_seafoam}║{Color.r} {Color.b}{Color.ghost_white}{title}{Color.r}")
    print(f"{Color.b}{Color.light_seafoam}═{Color.r}"* 80)

# -------------
# Result schema (row-level hits)
# -------------
ROW_LOG_SCHEMA = T.StructType([
    T.StructField("log_id",                      T.StringType(),  False),
    T.StructField("check_id",                    T.ArrayType(T.StringType()), True),
    T.StructField("table_name",                  T.StringType(),  False),
    T.StructField("run_config_name",             T.StringType(),  False),

    T.StructField("_errors", T.ArrayType(T.StructType([
        T.StructField("name",                   T.StringType(), True),
        T.StructField("message",                T.StringType(), True),
        T.StructField("columns",                T.ArrayType(T.StringType()), True),
        T.StructField("filter",                 T.StringType(), True),
        T.StructField("function",               T.StringType(), True),
        T.StructField("run_time",               T.TimestampType(), True),
        T.StructField("user_metadata",          T.MapType(T.StringType(), T.StringType()), True),
    ])), False),
    T.StructField("_errors_fingerprint",         T.StringType(),  False),

    T.StructField("_warnings", T.ArrayType(T.StructType([
        T.StructField("name",                   T.StringType(), True),
        T.StructField("message",                T.StringType(), True),
        T.StructField("columns",                T.ArrayType(T.StringType()), True),
        T.StructField("filter",                 T.StringType(), True),
        T.StructField("function",               T.StringType(), True),
        T.StructField("run_time",               T.TimestampType(), True),
        T.StructField("user_metadata",          T.MapType(T.StringType(), T.StringType()), True),
    ])), False),
    T.StructField("_warnings_fingerprint",       T.StringType(),  False),

    T.StructField("row_snapshot", T.ArrayType(T.StructType([
        T.StructField("column",                 T.StringType(), False),
        T.StructField("value",                  T.StringType(), True),
    ])), False),
    T.StructField("row_snapshot_fingerprint",    T.StringType(),  False),

    T.StructField("created_by",                  T.StringType(),  False),
    T.StructField("created_at",                  T.TimestampType(), False),
    T.StructField("updated_by",                  T.StringType(),  True),
    T.StructField("updated_at",                  T.TimestampType(), True),
])

DQX_CHECKS_LOG_METADATA: Dict[str, Any] = {
    "table": "<override at create time>",
    "table_comment": (
        "## **DQX Row-level Check Results Log**\n"
        "- One row per source row that triggered at least one rule (error or warn).\n"
        "- `check_id` contains the originating rule IDs from the config table.\n"
        "- Fingerprint columns are deterministic digests to aid de-duplication & rollups.\n"
    ),
    "columns": {
        "log_id": "Deterministic SHA-256 over table/run_config/row_snapshot/_errors/_warnings.",
        "check_id": "Originating rule IDs attached post-hoc via join.",
        "table_name": "Fully qualified source table (`catalog.schema.table`).",
        "run_config_name": "Run configuration tag/group under which checks were applied.",
        "_errors": "Array<struct> of error issues.",
        "_errors_fingerprint": "SHA-256 of a normalized view of `_errors`.",
        "_warnings": "Array<struct> of warning issues.",
        "_warnings_fingerprint": "SHA-256 of a normalized view of `_warnings`.",
        "row_snapshot": "Array<struct{column:string, value:string}> for non-reserved columns.",
        "row_snapshot_fingerprint": "SHA-256 of JSON(row_snapshot).",
        "created_by": "Audit: user/process that wrote this record.",
        "created_at": "Audit: creation timestamp (UTC).",
        "updated_by": "Audit: last updater (nullable).",
        "updated_at": "Audit: last update timestamp (UTC, nullable).",
    },
}

# -------------------
# Comment + DDL helpers
# -------------------
def _esc_sql_comment(s: str) -> str:
    return (s or "").replace("'", "''")

def _comment_on_table(spark: SparkSession, fqn: str, text: Optional[str]):
    if not text:
        return
    cat, sch, tbl = fqn.split(".")
    spark.sql(f"COMMENT ON TABLE `{cat}`.`{sch}`.`{tbl}` IS '{_esc_sql_comment(text)}'")

def _set_column_comment_safe(spark: SparkSession, fqn: str, col_name: str, comment: str):
    cat, sch, tbl = fqn.split(".")
    escaped = _esc_sql_comment(comment)
    try:
        spark.sql(f"COMMENT ON COLUMN `{cat}`.`{sch}`.`{tbl}`.`{col_name}` IS '{escaped}'")
        return
    except Exception:
        info = spark.sql(f"DESCRIBE TABLE `{cat}`.`{sch}`.`{tbl}`").collect()
        types_map = {r.col_name: r.data_type for r in info if r.col_name and not r.col_name.startswith("#")}
        dt = types_map.get(col_name)
        if not dt:
            return
        spark.sql(
            f"ALTER TABLE `{cat}`.`{sch}`.`{tbl}` CHANGE COLUMN `{col_name}` `{col_name}` {dt} COMMENT '{escaped}'"
        )

def _apply_table_documentation_on_create(spark: SparkSession, table_fqn: str,
                                         doc: Dict[str, Any], just_created: bool):
    if not doc:
        return
    try:
        if just_created:
            _comment_on_table(spark, table_fqn, doc.get("table_comment"))
        cat, sch, tbl = table_fqn.split(".")
        existing = {
            r.col_name for r in spark.sql(f"DESCRIBE TABLE `{cat}`.`{sch}`.`{tbl}`").collect()
            if r.col_name and not str(r.col_name).startswith("#")
        }
        for col_name, cmt in (doc.get("columns") or {}).items():
            if col_name in existing:
                _set_column_comment_safe(spark, table_fqn, col_name, cmt)
    except Exception as e:
        print(f"[WARN] Could not apply table/column comments to {table_fqn}: {e}")

def ensure_table(full_name: str, doc: Optional[Dict[str, Any]] = None):
    spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
    existed = spark.catalog.tableExists(full_name)
    if not existed:
        cat, sch, _ = full_name.split(".")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{cat}`.`{sch}`")
        spark.createDataFrame([], ROW_LOG_SCHEMA).write.format("delta").mode("overwrite").saveAsTable(full_name)
    _apply_table_documentation_on_create(spark, full_name, doc or {}, just_created=(not existed))

def _ensure_schema_exists_for_fqn(fqn: str) -> None:
    if not fqn:
        return
    try:
        cat, sch, _ = fqn.split(".")
    except ValueError:
        return
    spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{cat}`.`{sch}`")

# --------------------------
# Issue array helpers
# --------------------------
def _pick_col(df: DataFrame, *candidates: str) -> Optional[str]:
    for c in candidates:
        if c in df.columns:
            return c
    return None

def _empty_issues_array() -> F.Column:
    elem = T.StructType([
        T.StructField("name",          T.StringType(), True),
        T.StructField("message",       T.StringType(), True),
        T.StructField("columns",       T.ArrayType(T.StringType()), True),
        T.StructField("filter",        T.StringType(), True),
        T.StructField("function",      T.StringType(), True),
        T.StructField("run_time",      T.TimestampType(), True),
        T.StructField("user_metadata", T.MapType(T.StringType(), T.StringType()), True),
    ])
    return F.from_json(F.lit("[]"), T.ArrayType(elem))

def _normalize_issues_for_fp(arr_col: F.Column) -> F.Column:
    return F.transform(
        arr_col,
        lambda r: F.struct(
            r["name"].alias("name"),
            r["message"].alias("message"),
            F.coalesce(F.to_json(F.array_sort(r["columns"])), F.lit("[]")).alias("columns_json"),
            r["filter"].alias("filter"),
            r["function"].alias("function"),
        ),
    )

# --------------------------
# JIT argument coercion (exec-time only)
# --------------------------
_EXPECTED: Dict[str, Dict[str, str]] = {
    "is_unique": {"columns": "list"},
    "is_in_list": {"column": "str", "allowed": "list"},
    "is_in_range": {"column": "str", "min_limit": "num", "max_limit": "num",
                    "inclusive_min": "bool", "inclusive_max": "bool"},
    "regex_match": {"column": "str", "regex": "str"},
    "sql_expression": {"expression": "str"},
    "sql_query": {"query": "str", "limit": "num"},
    "is_not_null": {"column": "str"},
    "is_not_null_and_not_empty": {"column": "str"},
}

def _parse_scalar(s: Optional[str]):
    if s is None: return None
    s = s.strip()
    sl = s.lower()
    if sl in ("null", "none", ""): return None
    if sl == "true": return True
    if sl == "false": return False
    if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")):
        try: return json.loads(s)
        except Exception: return s
    try:
        return int(s) if s.lstrip("+-").isdigit() else float(s)
    except Exception:
        return s

def _to_list(v):
    if v is None: return []
    if isinstance(v, list): return v
    if isinstance(v, str) and v.strip().startswith("["):
        try: return json.loads(v)
        except Exception: return [v]
    return [v]

def _to_num(v):
    if v is None: return None
    if isinstance(v, (int, float)): return v
    try: return int(v) if str(v).lstrip("+-").isdigit() else float(v)
    except Exception: return v

def _to_bool(v):
    if isinstance(v, bool): return v
    if isinstance(v, str):
        vl = v.strip().lower()
        if vl in ("true", "t", "1"): return True
        if vl in ("false", "f", "0"): return False
    return v

def _coerce_arguments(args_map: Optional[Dict[str, str]],
                      function_name: Optional[str],
                      mode: str = "permissive") -> Tuple[Dict[str, Any], List[str]]:
    if not args_map: return {}, []
    raw = {k: _parse_scalar(v) for k, v in args_map.items()}
    spec = _EXPECTED.get((function_name or "").strip(), {})
    out: Dict[str, Any] = {}
    errs: List[str] = []
    for k, v in raw.items():
        want = spec.get(k)
        if want == "list":
            out[k] = _to_list(v)
            if not isinstance(out[k], list):
                errs.append(f"key '{k}' expected list, got {type(out[k]).__name__}")
        elif want == "num":
            out[k] = _to_num(v)
        elif want == "bool":
            out[k] = _to_bool(v)
        elif want == "str":
            out[k] = "" if v is None else str(v)
        else:
            out[k] = v
    if (function_name or "").strip() == "sql_query" and ("limit" not in out or out.get("limit") in (None, 0)):
        errs.append("sql_query requires a positive 'limit'")
    if mode == "strict" and errs:
        raise ValueError(f"Argument coercion failed for '{function_name}': {errs}")
    return out, errs

# --------------------------
# Table-name filtering helpers (glob -> Python & SQL LIKE)
# --------------------------
def _normalize_glob(p: str) -> str:
    # Accept bare table patterns like "crm_*" => match any catalog/schema
    if "." not in p:
        return f"*.*.{p}"
    return p

def _glob_to_sql_like(p: str) -> str:
    # naive: '*' -> '%', '?' -> '_' ; if you need literal %/_ you can add ESCAPE later
    p = _normalize_glob(p)
    return p.replace("*", "%").replace("?", "_")

def _matches_any_glob(name: str, patterns: Optional[List[str]]) -> bool:
    if not patterns:
        return True
    for g in patterns:
        if fnmatch.fnmatchcase(name, _normalize_glob(g)):
            return True
    return False

# --------------------------
# Load rules from table (with optional table LIKE filters)
# --------------------------
def _group_by_table(rules: List[dict]) -> Dict[str, List[dict]]:
    out: Dict[str, List[dict]] = {}
    for r in rules:
        out.setdefault(r["table_name"], []).append(r)
    return out

def _load_checks_from_table_as_dicts(
    spark: SparkSession,
    checks_table: str,
    run_config_name: str,
    coercion_mode: str = "permissive",
    include_tables_glob: Optional[List[str]] = None,   # NEW: pushdown filter
) -> Tuple[Dict[str, List[dict]], int, int]:
    df = (
        spark.table(checks_table)
        .where((F.col("run_config_name") == run_config_name) & (F.col("active") == True))
        .select("table_name", "name", "criticality", "filter",
                "run_config_name", "user_metadata", "check")
    )

    if include_tables_glob:
        likes = [_glob_to_sql_like(g) for g in include_tables_glob]
        cond = " OR ".join([f"table_name LIKE '{lk}'" for lk in likes])
        df = df.where(F.expr(cond))

    rows = [r.asDict(recursive=True) for r in df.collect()]
    raw_rules: List[dict] = []
    coerced: int = 0

    for r in rows:
        chk = r.get("check") or {}
        fn  = chk.get("function")
        fec = chk.get("for_each_column")
        args, _errs = _coerce_arguments(chk.get("arguments"), fn, mode=coercion_mode)
        coerced += 1
        raw_rules.append({
            "table_name":       r["table_name"],
            "name":             r["name"],
            "criticality":      r["criticality"],
            "run_config_name":  r["run_config_name"],
            "filter":           r.get("filter"),
            "user_metadata":    r.get("user_metadata"),
            "check": {
                "function":        fn,
                "for_each_column": fec if fec else None,
                "arguments":       args,
            },
        })

    status = DQEngine.validate_checks(raw_rules)
    if getattr(status, "has_errors", False):
        keep: List[dict] = []
        for r in raw_rules:
            st = DQEngine.validate_checks([r])
            if not getattr(st, "has_errors", False):
                keep.append(r)
        return _group_by_table(keep), coerced, len(raw_rules) - len(keep)
    else:
        return _group_by_table(raw_rules), coerced, 0

# --------------------------
# Apply with isolation and diagnostics
# --------------------------
def _force_eval(df: DataFrame) -> None:
    cols = []
    if "_errors" in df.columns:
        cols.append(F.size(F.coalesce(F.col("_errors"), F.array())).alias("e"))
    if "_warnings" in df.columns:
        cols.append(F.size(F.coalesce(F.col("_warnings"), F.array())).alias("w"))
    if not cols:
        cols = [F.lit(1).alias("one")]
    df.select(*cols).limit(1).collect()

def _apply_rules_isolating_failures(dq: DQEngine,
                                    src: DataFrame,
                                    table_name: str,
                                    tbl_rules: List[dict]) -> Tuple[Optional[DataFrame], List[Tuple[str, str]]]:
    try:
        df_all = dq.apply_checks_by_metadata(src, tbl_rules)
        _force_eval(df_all)
        return df_all, []
    except Exception:
        pass

    bad: List[Tuple[str, str]] = []
    good: List[dict] = []
    for r in tbl_rules:
        try:
            df_one = dq.apply_checks_by_metadata(src, [r])
            _force_eval(df_one)
            good.append(r)
        except Exception as ex:
            bad.append((r.get("name") or "<unnamed>", str(ex)))
            try:
                print(f"    offending rule JSON: {json.dumps(r, indent=2)}")
            except Exception:
                pass

    if bad:
        print(f"[{table_name}] Skipping {len(bad)} bad rule(s).")
    if not good:
        return None, bad

    try:
        df_good = dq.apply_checks_by_metadata(src, good)
        _force_eval(df_good)
        return df_good, bad
    except Exception as ex2:
        print(f"[{table_name}] Still failing after pruning bad rules: {ex2}")
        return None, bad

# --------------------------
# Projection & enrichment
# --------------------------
def _empty_issues_array_struct() -> T.ArrayType:
    return T.ArrayType(T.StructType([
        T.StructField("name",          T.StringType(), True),
        T.StructField("message",       T.StringType(), True),
        T.StructField("columns",       T.ArrayType(T.StringType()), True),
        T.StructField("filter",        T.StringType(), True),
        T.StructField("function",      T.StringType(), True),
        T.StructField("run_time",      T.TimestampType(), True),
        T.StructField("user_metadata", T.MapType(T.StringType(), T.StringType()), True),
    ]))

def _project_row_hits(df_annot: DataFrame,
                      table_name: str,
                      run_config_name: str,
                      created_by: str,
                      exclude_cols: Optional[List[str]] = None) -> DataFrame:
    exclude_cols = set(exclude_cols or [])
    e_name = _pick_col(df_annot, "_errors", "_error")
    w_name = _pick_col(df_annot, "_warnings", "_warning")
    errors_col   = F.col(e_name) if e_name else _empty_issues_array()
    warnings_col = F.col(w_name) if w_name else _empty_issues_array()

    df = (df_annot
          .withColumn("_errs", errors_col)
          .withColumn("_warns", warnings_col)
          .where((F.size("_errs") > 0) | (F.size("_warns") > 0)))

    reserved = {e_name, w_name, "_errs", "_warns"} - {None} | exclude_cols
    cols = [c for c in df.columns if c not in reserved]
    row_snapshot = F.array(*[F.struct(F.lit(c).alias("column"), F.col(c).cast("string").alias("value")) for c in sorted(cols)])
    row_snapshot_fp = F.sha2(F.to_json(row_snapshot), 256)

    _errors_fp   = F.sha2(F.to_json(F.array_sort(_normalize_issues_for_fp(F.col("_errs")))), 256)
    _warnings_fp = F.sha2(F.to_json(F.array_sort(_normalize_issues_for_fp(F.col("_warns")))), 256)

    log_id = F.sha2(F.concat_ws("||",
                                F.lit(table_name),
                                F.lit(run_config_name),
                                row_snapshot_fp,
                                _errors_fp,
                                _warnings_fp), 256)

    return df.select(
        log_id.alias("log_id"),
        F.lit(None).cast(T.ArrayType(T.StringType())).alias("check_id"),
        F.lit(table_name).alias("table_name"),
        F.lit(run_config_name).alias("run_config_name"),
        F.col("_errs").alias("_errors"),
        _errors_fp.alias("_errors_fingerprint"),
        F.col("_warns").alias("_warnings"),
        _warnings_fp.alias("_warnings_fingerprint"),
        row_snapshot.alias("row_snapshot"),
        row_snapshot_fp.alias("row_snapshot_fingerprint"),
        F.lit(created_by).alias("created_by"),
        F.current_timestamp().alias("created_at"),
        F.lit(None).cast(T.StringType()).alias("updated_by"),
        F.lit(None).cast(T.TimestampType()).alias("updated_at"),
    )

def _enrich_check_ids(row_log_df: DataFrame, checks_table: str) -> DataFrame:
    spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
    cfg = (
        spark.table(checks_table)
        .select(
            F.lower(F.col("table_name")).alias("t_tbl_norm"),
            F.col("run_config_name").alias("t_rc"),
            F.lower(F.col("name")).alias("t_name_norm"),
            F.col("check_id").alias("cfg_check_id"),
            F.col("active").alias("t_active")
        )
        .where(F.col("t_active") == True)
        .drop("t_active")
        .dropDuplicates(["t_tbl_norm", "t_rc", "t_name_norm"])
    )
    names = (
        row_log_df
        .select(
            "log_id",
            F.lower(F.col("table_name")).alias("tbl_norm"),
            F.col("run_config_name").alias("rc"),
            F.expr("transform(_errors, x -> x.name)").alias("e_names"),
            F.expr("transform(_warnings, x -> x.name)").alias("w_names"),
        )
        .withColumn("all_names", F.array_union("e_names", "w_names"))
        .withColumn("name", F.explode_outer("all_names"))
        .where(F.col("name").isNotNull())
        .withColumn("name_norm", F.lower(F.trim(F.col("name"))))
        .select("log_id","tbl_norm","rc","name_norm")
    )
    j = (
        names.join(
            cfg,
            (names.tbl_norm == cfg.t_tbl_norm) &
            (names.rc == cfg.t_rc) &
            (names.name_norm == cfg.t_name_norm),
            "left"
        )
        .groupBy("log_id")
        .agg(F.array_sort(F.array_distinct(F.collect_list("cfg_check_id"))).alias("check_id"))
    )
    out = (
        row_log_df.drop("check_id")
        .join(j, "log_id", "left")
        .withColumn("check_id", F.coalesce(F.col("check_id"), F.array().cast(T.ArrayType(T.StringType()))))
    )
    return out

# --------------------------
# Summaries
# --------------------------
def _summarize_table(annot: DataFrame, table_name: str) -> Row:
    err = "_errors" if "_errors" in annot.columns else "_error"
    wrn = "_warnings" if "_warnings" in annot.columns else "_warning"
    error_rows   = annot.where(F.size(F.col(err)) > 0).count()
    warning_rows = annot.where(F.size(F.col(wrn)) > 0).count()
    total_rows   = annot.count()
    total_flagged_rows = annot.where((F.size(F.col(err)) > 0) | (F.size(F.col(wrn)) > 0)).count()
    rules_fired = (
        annot.select(
            F.explode_outer(
                F.array_union(
                    F.expr(f"transform({err}, x -> x.name)"),
                    F.expr(f"transform({wrn}, x -> x.name)")
                )
            ).alias("nm")
        )
        .where(F.col("nm").isNotNull())
        .agg(F.countDistinct("nm").alias("rules"))
        .collect()[0]["rules"]
    )
    return Row(table_name=table_name,
               table_total_rows=int(total_rows),
               table_total_error_rows=int(error_rows),
               table_total_warning_rows=int(warning_rows),
               total_flagged_rows=int(total_flagged_rows),
               distinct_rules_fired=int(rules_fired))

def _rules_hits_for_table(annot: DataFrame, table_name: str) -> DataFrame:
    err = "_errors" if "_errors" in annot.columns else "_error"
    wrn = "_warnings" if "_warnings" in annot.columns else "_warning"
    errs = (
        annot
        .select(F.explode_outer(F.expr(f"transform({err}, x -> x.name)")).alias("name"))
        .where(F.col("name").isNotNull())
        .withColumn("severity", F.lit("error"))
    )
    warns = (
        annot
        .select(F.explode_outer(F.expr(f"transform({wrn}, x -> x.name)")).alias("name"))
        .where(F.col("name").isNotNull())
        .withColumn("severity", F.lit("warning"))
    )
    both = errs.unionByName(warns, allowMissingColumns=True)
    return (
        both.groupBy("name", "severity")
        .agg(F.count(F.lit(1)).alias("rows_flagged"))
        .withColumn("table_name", F.lit(table_name))
    )

# ---------------
# Main entry point
# ---------------
def run_checks(
    generated_dqx_checks_config_table_name: str,
    generated_dqx_checks_log_table_name: str,
    *,
    created_by: str = "AdminUser",
    exclude_cols: Optional[List[str]] = None,
    coercion_mode: str = "permissive",  # or "strict"
    table_summary_output: Optional[str] = "dq_dev.dqx.generated_checks_log_summary_by_table",
    row_summary_output: Optional[str]   = "dq_dev.dqx.generated_checks_log_summary_by_rule",
    write_mode: str = "overwrite",               # <-- applies to ALL outputs
    write_options: Optional[Dict[str, str]] = None,
    run_configs: Optional[List[str]] = None,     # if None, auto-detect from checks table
    include_tables_glob: Optional[List[str]] = None,  # only process matching tables
):
    """
    Process only tables whose fully-qualified names match any glob in `include_tables_glob`.
      - Examples: ["de_prd.gold.crm_*"], ["dq_prd.monitoring.*"], ["crm_*"]  (# => *.*.crm_*)
    The `write_mode` applies to:
      - detailed row log (generated_checks_log),
      - per-rule summary (generated_checks_log_summary_by_rule),
      - per-table summary (generated_checks_log_summary_by_table).
    """
    spark = SparkSession.builder.getOrCreate()
    dq = DQEngine(WorkspaceClient())

    checks_table  = generated_dqx_checks_config_table_name
    results_table = generated_dqx_checks_log_table_name
    write_opts = write_options or {}

    # Ensure results table exists (row-log)
    ensure_table(results_table, {**DQX_CHECKS_LOG_METADATA, "table": results_table})

    # Discover run_configs if not provided
    if run_configs is None:
        run_configs = [r[0] for r in
                       spark.table(checks_table)
                            .where(F.col("active") == True)
                            .select("run_config_name").distinct()
                            .collect()]
        run_configs = sorted(str(rc) for rc in run_configs if rc is not None)

    # Count total active checks once (for logging only)
    try:
        checks_table_total = spark.table(checks_table).where(F.col("active") == True).count()
    except Exception:
        checks_table_total = -1

    grand_total = 0
    all_tbl_summaries: List[Row] = []
    printed_grand_once = False

    # normalize patterns once
    globs_norm = [ _normalize_glob(g) for g in (include_tables_glob or []) ]

    for rc_name in run_configs:
        if rc_name is None or str(rc_name).lower() == "none":
            continue

        display_section(f"Run config: {rc_name}")
        by_tbl, coerced, skipped = _load_checks_from_table_as_dicts(
            spark,
            checks_table,
            rc_name,
            coercion_mode=coercion_mode,
            include_tables_glob=globs_norm,     # SQL pushdown
        )

        # Python-side filter (belt & suspenders)
        if globs_norm:
            by_tbl = {tbl: rules for tbl, rules in by_tbl.items() if _matches_any_glob(tbl, globs_norm)}

        checks_loaded = sum(len(v) for v in by_tbl.values())
        print(f"[{rc_name}] checks_in_table_total={checks_table_total}, loaded={checks_loaded}, coerced={coerced}, skipped_invalid={skipped}")
        if globs_norm:
            print(f"[{rc_name}] include_tables_glob={globs_norm}")
            if not by_tbl:
                print(f"[{rc_name}] No tables matched globs; skipping.")
                continue
            else:
                print(f"[{rc_name}] Tables to process ({len(by_tbl)}): {', '.join(sorted(by_tbl.keys()))}")

        if not checks_loaded:
            print(f"[{rc_name}] no checks loaded (active=TRUE & run_config_name='{rc_name}').")
            continue

        out_batches: List[DataFrame] = []
        rc_tbl_summaries: List[Row] = []
        rc_rule_hit_parts: List[DataFrame] = []
        table_row_counts: Dict[str, int] = {}
        processed_tables: List[str] = []

        for tbl, tbl_rules in by_tbl.items():
            try:
                src = spark.read.table(tbl)
                annot, bad = _apply_rules_isolating_failures(dq, src, tbl, tbl_rules)
                if annot is None:
                    continue
            except Exception as e:
                print(f"[{rc_name}] {tbl} failed: {e}")
                continue

            processed_tables.append(tbl)

            total_rows = annot.count()
            table_row_counts[tbl] = total_rows

            summary_row = _summarize_table(annot, tbl)
            rc_tbl_summaries.append(summary_row)
            all_tbl_summaries.append(Row(run_config_name=rc_name, **summary_row.asDict()))

            rc_rule_hit_parts.append(_rules_hits_for_table(annot, tbl))

            row_hits = _project_row_hits(annot, tbl, rc_name, created_by, exclude_cols=exclude_cols)
            if row_hits.limit(1).count() > 0:
                out_batches.append(row_hits)

        if rc_tbl_summaries:
            summary_df = spark.createDataFrame(rc_tbl_summaries).orderBy("table_name")
            display_section(f"Row-hit summary by table (run_config={rc_name})")
            show_df(summary_df, n=200, truncate=False)

        if rc_rule_hit_parts:
            rules_all = rc_rule_hit_parts[0]
            for part in rc_rule_hit_parts[1:]:
                rules_all = rules_all.unionByName(part, allowMissingColumns=True)

            cfg_rules = (
                spark.table(checks_table)
                .where((F.col("run_config_name") == rc_name) & (F.col("active") == True))
                .where(F.col("table_name").isin(processed_tables))
                .select(
                    F.col("table_name"),
                    F.col("name").alias("rule_name"),
                    F.when(F.lower("criticality").isin("warn", "warning"), F.lit("warning"))
                     .otherwise(F.lit("error")).alias("severity")
                )
                .dropDuplicates(["table_name","rule_name","severity"])
            )

            counts = (
                rules_all
                .groupBy("table_name", "name", "severity")
                .agg(F.sum("rows_flagged").alias("rows_flagged"))
                .withColumnRenamed("name", "rule_name")
            )

            full_rules = (
                cfg_rules.join(counts, on=["table_name","rule_name","severity"], how="left")
                .withColumn("rows_flagged", F.coalesce(F.col("rows_flagged"), F.lit(0)))
            )

            totals_df = spark.createDataFrame(
                [Row(table_name=k, table_total_rows=v) for k, v in table_row_counts.items()]
            )
            full_rules = (
                full_rules.join(totals_df, "table_name", "left")
                .withColumn(
                    "pct_of_table_rows",
                    F.when(F.col("table_total_rows") > 0,
                           F.col("rows_flagged") / F.col("table_total_rows"))
                     .otherwise(F.lit(0.0))
                )
                .select("table_name", "rule_name", "severity", "rows_flagged",
                        "table_total_rows", "pct_of_table_rows")
                .orderBy(F.desc("rows_flagged"), F.asc("table_name"), F.asc("rule_name"))
            )

            display_section(f"Row-hit summary by rule (run_config={rc_name})")
            show_df(full_rules, n=2000, truncate=False)

            if row_summary_output:
                _ensure_schema_exists_for_fqn(row_summary_output)
                (full_rules
                 .withColumn("run_config_name", F.lit(rc_name))
                 .select("run_config_name","table_name","rule_name","severity",
                         "rows_flagged","table_total_rows","pct_of_table_rows")
                 .write.format("delta")
                 .mode(write_mode)                  # <--- unified write mode
                 .options(**write_opts)
                 .saveAsTable(row_summary_output))
                print(f"[{rc_name}] per-rule summary ({write_mode}) → {row_summary_output}")

        if not out_batches:
            print(f"[{rc_name}] no row-level hits.")
            continue

        out = out_batches[0]
        for b in out_batches[1:]:
            out = out.unionByName(b, allowMissingColumns=True)

        out = _enrich_check_ids(out, checks_table)

        out = out.select([f.name for f in ROW_LOG_SCHEMA.fields])
        rows = out.count()
        out.write.format("delta").mode(write_mode).options(**write_opts).saveAsTable(results_table)
        grand_total += rows
        print(f"[{rc_name}] wrote {rows} rows ({write_mode}) → {results_table}")

    if all_tbl_summaries and not printed_grand_once:
        grand_df = (
            SparkSession.getActiveSession().createDataFrame(all_tbl_summaries)
            .select(
                F.col("run_config_name"),
                F.col("table_name"),
                F.col("table_total_rows"),
                F.col("table_total_error_rows"),
                F.col("table_total_warning_rows"),
                F.col("total_flagged_rows"),
                F.col("distinct_rules_fired"),
            )
            .orderBy("run_config_name", "table_name")
        )
        display_section("Row-hit summary by table (ALL run_configs)")
        show_df(grand_df, n=500, truncate=False)

        if table_summary_output:
            _ensure_schema_exists_for_fqn(table_summary_output)
            grand_df.write.format("delta").mode(write_mode).options(**write_opts).saveAsTable(table_summary_output)
            print(f"[ALL RCs] table summary ({write_mode}) → {table_summary_output}")

    display_section("Grand total")
    print(f"TOTAL rows written: {grand_total}")

# ---- example run: ONLY outlaw_* tables under de_prd.gold, APPEND everywhere ----
run_checks(
    generated_dqx_checks_config_table_name="dq_dev.dqx.generated_checks_config",
    generated_dqx_checks_log_table_name="dq_dev.dqx.generated_checks_log",
    created_by="AdminUser",
    coercion_mode="strict",
    table_summary_output="dq_dev.dqx.generated_checks_log_summary_by_table",
    row_summary_output="dq_dev.dqx.generated_checks_log_summary_by_rule",
    write_mode="append",                           # <-- append across ALL outputs
    write_options={"mergeSchema": "true"},
    # run_configs=["default"],  # optional
    include_tables_glob=["de_prd.gold.audit_*"],
)