In [None]:
%pip install databricks-labs-dqx

In [None]:
import os
from databricks. labs.dqx.profiler.profiler import DQProfiler
from databricks. labs.dqx.profiler.generator import DQGenerator
from databricks. labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from pprint import pprint

In [None]:
import os
workspace_root_path = os.getcwd()
quality_rules_path = f"{workspace_root_path}/quality_rules/"

# Read Input Data
mntnc_bronze_df = spark.read.table("dqx_dais_demo.dqx_mfg_example.maintenar ce_data")

1. Instantiate DQX Engine

In [None]:
# Instantiate DQX engine
ws = WorkspaceClient()
dq_engine = DQEngine(ws)

#DQProfiler and DQGenerator are used for profiling and generating data quality rules.
#DQEngine applies those rules to data.
#WorkspaceClient connects to your Databricks workspace.

#WorkspaceClient() initializes a connection to your Databricks workspace.
#DQEngine(ws) creates an instance of the DQX engine using that workspace connection. This engine will be used to apply data quality checks to your DataFrame.

2. Profile Input Data

In [None]:

profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(mntnc_bronze_df)

#DQProfiler analyzes the input DataFrame (mntnc_bronze_df) to generate:
#1. summary_stats: High-level statistics (e.g., null counts, distinct values, etc.)
#2. profiles: Detailed metadata for each column, which is used to generate data quality rules.

3. Generate Data Quality Rules

In [None]:
generator = DQGenerator(ws)
checks = generator.generate_cq_rules(profiles)  # default criticality: "error"

#DQGenerator uses the column profiles to automatically create data quality checks.
#All generated checks are marked with criticality level "error" by default.

4.A. Save the Data Quality Rules in a file to workspace

In [None]:
maintenance_quality_rules = f"{quality_rules_path}/maintenance_dq_rules.yml"
dq_engine.save_checks_in_workspace_file(checks, workspace_path=maintenance_quality_rules)


4.B. Save the Data Quality Rules in a delta table workspace

In [None]:
fq_tbl ="dqx_dais_demo.dqx_mfg_example.maintenance_quality_rules"
dq_engine.save_checks_in_table(table_name=fq_tbl, checks=checks)


# rules will be saved something like this in a table
| column_name | function      | arguments         | criticality | metadata |
|-------------|---------------|-------------------|-------------|----------|
| col1        | is_in_list    | {"allowed": [1,2]}| warn        | {...}    |
| col2        | is_not_null   | {}                | error       | {...}    |

# you can query it also
select * from dqx_dais_demo.dqx_mfg_example.maintenance_quality_rules


5.A. Apply saved checks to an input dataframe from a file

In [None]:
# Apply checks on input data
quality_checks = dq_engine. load_checks_from_workspace_file(workspace_path=maintenance_quality_rules)
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(mntnc_bronze_df, quality_checks)

print(" === Maintenance Bad Data Sample === ")
display(quarantined_df)

5.B. Apply saved checks to an input dataframe from delta table

In [None]:
# Load quality checks from Delta table
quality_checks_from_table = dq_engine.load_checks_from_table(table_name=fq_tbl)

# Apply checks and split valid and quarantined data
valid_df_tbl, quarantined_df_tbl = dq_engine.apply_checks_by_metadata_and_split(mntnc_bronze_df, quality_checks_from_table)

# Display quarantined data
print("=== Maintenance Bad Data Sample (from Delta table) ===")
display(quarantined_df_tbl)
