In [0]:
!pip install databricks-labs-dqx==0.12.0

In [0]:
!pip install 'databricks-labs-dqx[llm]'

In [0]:
!pip install 'databricks-labs-dqx[pii]'

In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql.functions import col, explode, lit, current_timestamp
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii

In [0]:
dbutils.widgets.text("catalog", "")
dbutils.widgets.text("schema", "")
dbutils.widgets.text("table", "")
 
catalog = dbutils.widgets.get("catalog")
schema  = dbutils.widgets.get("schema")
table   = dbutils.widgets.get("table")

In [0]:
def map_severity(sev):
    return "error" if sev == "HIGH" else "warn"

table_fqn = f"{catalog}.{schema}.{table}"
df = spark.table(table_fqn)

In [0]:

def pincode_digits_only(df, column):
    return df.withColumn(
        "__dq_pass__",
        F.col(column).rlike("^[0-9]+$")
    )

def email_valid_format(df, column):
    email_regex = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"
    return df.withColumn(
        "__dq_pass__",
        col(column).rlike(email_regex)
    )


In [0]:
metadata_df = (
    spark.table("data_catalog.di_dq.dq_rules")
         .filter(
             (col("catalog_name") == catalog) &
             (col("schema_name") == schema) &
             (col("table_name") == table)
         )
)
if metadata_df.count() == 0:
    raise Exception(f"No DQ rules found for {table_fqn}")


flat_rules = (
    metadata_df
    .select(explode("rules").alias("rule"))
    .filter(col("rule.enabled") == True)
)

rules = []

In [0]:
for r in flat_rules.collect():
    rule = r.rule
    args = dict(rule.rule_args) if rule.rule_args else {}
    criticality = map_severity(rule.severity)

    if rule.rule_type == "builtin":
        rules.append(
            DQRowRule(
                name=rule.rule_id,
                criticality=criticality,
                check_func=getattr(check_funcs, rule.rule_name),
                column=rule.column_name,
                check_func_kwargs=args
            )
        )

    elif rule.rule_type == "pii":
        rules.append(
            DQRowRule(
                name=rule.rule_id,
                criticality=criticality,
                check_func=does_not_contain_pii,
                column=rule.column_name,
                check_func_kwargs=args
            )
        )

    elif rule.rule_type == "custom":
        rules.append(
            DQRowRule(
                name=rule.rule_id,
                criticality=criticality,
                check_func=globals()[rule.rule_name],
                check_func_kwargs=args
            )
        )

ws = WorkspaceClient()
dq_engine = DQEngine(ws)

valid_df, invalid_df = dq_engine.apply_checks_and_split(df, rules)

results_df = (
    invalid_df
    .withColumn("catalog_name", lit(catalog))
    .withColumn("schema_name", lit(schema))
    .withColumn("table_name", lit(table))
    .withColumn("run_timestamp", current_timestamp())
)

In [0]:
results_df.display()

In [0]:
pipeline_runid,pipeline starttime,pipeline endtime, pipelinename, filename run

In [0]:
separate logs table for ingestion and quality logs
to join use audit columns

In [0]:
load type - append(only execute on new data)
load type - overwrite