### Parameterize catalog and schema

In [0]:
default_database = "main"
default_schema_name = "default"

dbutils.widgets.text("demo_database", default_database, "Catalog Name")
dbutils.widgets.text("demo_schema", default_schema_name, "Schema Name")

database = dbutils.widgets.get("demo_database")
schema = dbutils.widgets.get("demo_schema")

print(f"Selected Catalog for Demo Dataset: {database}")
print(f"Selected Schema for Demo Dataset: {schema}")

spark.sql(f"CREATE CATALOG IF NOT EXISTS {database}")
spark.sql(f"USE CATALOG {database}")

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
spark.sql(f"USE SCHEMA {schema}")

### Define location to store the rules

In [0]:
import os
os.getcwd()

In [0]:
import os
workspace_root_path = os.getcwd()
quality_rules_path = f"{workspace_root_path}/quality_rules"

### DQX - Install as Library <br>


In [0]:
%pip install databricks-labs-dqx
dbutils.library.restartPython()

### Load source tables into DataFrames

In [0]:
sensor_table = f"{database}.{schema}.sensor_data"
maintenance_table = f"{database}.{schema}.maintenance_data"

sensor_bronze_df = spark.read.table(sensor_table)
mntnc_bronze_df = spark.read.table(maintenance_table)


### Problem - Team doesn't know the Quality Rules for Maintenance Dataset
### Feature - Infer the Data Quality Rules using DQX

#### **Step-1**. Instantiate DQX engine

In [0]:
import os
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from pprint import pprint

# Instantiate DQX engine
ws = WorkspaceClient()
dq_engine = DQEngine(ws)

#### Maintenance Data

**Step-2**. Run DQX Profiler and **Infer** Quality Rules

In [0]:
# Profile Inpute Data
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(mntnc_bronze_df)

# Generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)  # with default level "error"

In [0]:
for idx, check in enumerate(checks):
   print(f"\n========Check {idx} ==========")
   pprint(check)

**Step-3.1**. Save profiler generated reules in a file

In [0]:
# save checks in arbitrary workspace location
maintenance_quality_rules = f"{quality_rules_path}/maintenance_dq_rules.yml"
dq_engine.save_checks_in_workspace_file(checks, workspace_path=maintenance_quality_rules)

# display the link to the saved checks
displayHTML(f'<a href="/#workspace{maintenance_quality_rules}" target="_blank">Maintenance Data Quality Rules YAML</a>')

**Step-3.2**. Save profiler generated reules in a table

In [0]:
# or save in delta table
fq_tbl = f"{database}.{schema}.maintenance_inferred_quality_rules"
dq_engine.save_checks_in_table(table_name=fq_tbl, checks=checks, run_config_name="maintenance")


### Data Quality Rules for Machine Sensor Data

| Rule Type             | Example Rule                                                                                           | Purpose / Impact                                                        |DQ Rule|Quality Error Level|
|-----------------------|-------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------|-|--|
| **Completeness**      | Required fields (`sensor_id`, `machine_id`) must not be null    | Ensures all critical data is present and usable     |`is_not_null_and_not_empty`                    |ERROR|
| **Range / Domain**    | `reading_value` (temperature): 0–100 | Detects outliers and sensor faults; ensures physical plausibility       |**FILTER quality Check + `is_in_range`**| WARN|
| **Format Standardization**  |  `machine_id` follows standard format                             | Standardizes data for integration and analysis                          |`regex_match` |WARN|
| **Timeliness**        | `reading_timestamp` is not in the future; beyond 3 days                                     | Prevents erroneous time-series data                            |`is_not_in_future` |ERROR|
| **Correctness**        | `calibration_date` is eariler than `reading_timestamp`| Prevents erroneous sesnor readings data                            |`SQL Expression` |ERROR|





### Feature: Quality Rules as YAML 
Read quality rules from the a YAML file and apply checks on a Dataframe


In [0]:
sensor_bronze_df = spark.read.table(sensor_table)
sensor_quality_checks = dq_engine.load_checks_from_workspace_file(workspace_path=f"{quality_rules_path}/sensor_dq_rules.yml")

### Features: Quarantine Bad Data & Perform Granular Issue Detection

In [0]:
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(sensor_bronze_df, sensor_quality_checks)

print("=== Bad Data DF ===")
display(quarantined_df)

### Feature: Bring / Build Your own Rule (Check)
|Dataset| Rule Type             | Example Rule                                                                                           | Purpose / Impact                                                        |DQ Rule|
|-|-----------------------|-------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------|--|
|Sensor Data| **Standardization**          | `firmware_version` starts with "v"  | Ensures firmware version value is a standard value | Custom Rule Development| 


In [0]:
import pyspark.sql.functions as F
from pyspark.sql import Column as col
from databricks.labs.dqx.check_funcs import make_condition

def firmware_version_start_with_v(column: str) -> col:
    column_expr = F.expr(column)
    
    quality_rule_expr = ~(column_expr.startswith("v"))
    quality_rule_err_msg = f"firmware_version doesn't starts with 'v'"
    quality_rule_err_col_name = f"firmware_version_not_starts_with_v"

    return make_condition(quality_rule_expr, quality_rule_err_msg, quality_rule_err_col_name)


### Feature : YAML Way

[Sensor Data Quality Custom Rules YAML](https://e2-demo-field-eng.cloud.databricks.com/editor/files/2408509664666985?o=1444828305810485)



**Step-1** Read the input data and instantiate DQX Engine

In [0]:
sensor_bronze_df = spark.read.table(sensor_table)
sensor_quality_checks = dq_engine.load_checks_from_workspace_file(
    workspace_path=f"{quality_rules_path}/sensor_dq_rules_custom.yml")

dq_engine = DQEngine(WorkspaceClient())

In [0]:
# Define the custom check 
custom_check_functions = {"firmware_version_start_with_v": firmware_version_start_with_v}  # list of custom check functions

# Apply the custom check on the bronze data
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(sensor_bronze_df, sensor_quality_checks, custom_check_functions)
display(quarantined_df)

sensor_quarantine_table = f"{database}.{schema}.sensor_quarantine"
quarantined_df.write.mode("overwrite").saveAsTable(sensor_quarantine_table)

### Feature: Visualize Quality on Pre-Configured Dashboard


https://e2-demo-field-eng.cloud.databricks.com/dashboardsv3/01f025eecf261a81996b8991df9e870b/published?o=1444828305810485

![dashboard.png](./images/dashboard.png "dashboard.png")
