In [0]:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
import yaml
from databricks.labs.dqx.contexts.workspace import WorkspaceContext



In [0]:
# Initialize the WorkspaceClient to interact with the Databricks workspace
ws = WorkspaceClient()

# Initialize a DQProfiler instance with the workspace client
profiler = DQProfiler(ws)

generator = DQGenerator(ws)

dq_engine = DQEngine(ws)


ws = WorkspaceClient()
profiler = DQProfiler(ws)
generator = DQGenerator(ws)
dlt_generator = DQDltGenerator(ws)
dq_engine = DQEngine(ws)

input_df = spark.read.csv("dbfs:/databricks-datasets/flights/departuredelays.csv", header=True, inferSchema=True)



In [0]:
with open( "custom_dqx_rules.yaml", "r") as file:
    check_dict = yaml.safe_load(file)

dq_engine = DQEngine(spark)
validation_result = dq_engine.validate_checks(check_dict)

assert not validation_result.has_errors, f"Validation failed: {validation_result.errors}"

silver_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, check_dict)
print(f'Total row count: {input_df.count()}, Good row count: {silver_df.count()}, Quarantined row count: {quarantine_df.count()}')


In [0]:
from pyspark.sql.functions import when, col

#input_df = input_df.withColumn("destination", when(col("origin") == "ANC", None).otherwise(col("destination")))
input_df = input_df.withColumn("origin", when(col("destination") == "ANC", None).otherwise(col("origin")))

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.row_checks import make_condition

def distance_gt_check(col_name):
    column = F.col(col_name)
    return make_condition(column>3500, f"Column {col_name} exceeds 3500")

In [0]:
from pyspark.sql import Column
import pyspark.sql.functions as F
from databricks.labs.dqx import row_checks
from databricks.labs.dqx.row_checks import make_condition
from databricks.labs.dqx.rule import DQColRule, DQColSetRule

def distance_gt_check(col_name):
    column = F.col(col_name)
    return make_condition(column>3500, f"Column {col_name} exceeds 3500", f"{col_name}")

Inline_rule_checks = [
    DQColRule(name="destination_blanks", col_name="destination", check_func=row_checks.is_not_null_and_not_empty, criticality="error"),
    DQColRule(name="delay_range", col_name="delay", check_func=row_checks.is_in_range, criticality="warn", check_func_kwargs={"min_limit": -85, "max_limit": 1050}),
    DQColRule(name="distance_gt", col_name="distance", check_func=distance_gt_check,criticality="warn")
] + DQColSetRule(columns=["origin", "destination"], criticality="error", check_func=row_checks.is_not_null,).get_rules()

valid_and_quarantined_df = dq_engine.apply_checks(input_df, Inline_rule_checks)
display(valid_and_quarantined_df.groupBy(col("_errors").isNotNull().alias("has_errors")).count())
display(valid_and_quarantined_df.groupBy(col("_warnings").isNotNull().alias("has_warnings")).count())

In [0]:
display(valid_and_quarantined_df.withColumn("has_errors", col("_errors").isNotNull()).withColumn("has_warnings", col("_warnings").isNotNull()).groupBy("has_errors", "has_warnings").count())

In [0]:
quality_check_results = dq_engine.run_quality_checks(
    table_name=None, # No table_name needed if input_df is passed directly
    input_df=input_df, # Pass your DataFrame here
    checks=DQColRule_checks
)

# 2. Get the valid and quarantined DataFrames from the results object
valid_df_alt = quality_check_results.get_valid_data()
quarantined_df_alt = quality_check_results.get_quarantined_data()

In [0]:
quarantine_df.limit(1).display()

In [0]:
# Create a DQEngine instance with the WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())

# Apply quality checks and split the DataFrame into silver and quarantine DataFrames
silver_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, check_dict)

In [0]:
display(quarantine_df.count())
display(silver_df.count())
display(input_df.count())

In [0]:
ctx = WorkspaceContext(WorkspaceClient())
dashboards_folder_link = f"{ctx.installation.workspace_link('https://adb-8333330282859393.13.azuredatabricks.net/')}dashboards/"
print(f"Open a dashboard from the following folder and refresh it:")
print(dashboards_folder_link)

In [0]:
# Initialize the WorkspaceClient to interact with the Databricks workspace
ws = WorkspaceClient()

# Initialize a DQProfiler instance with the workspace client
profiler = DQProfiler(ws)

# Read the input data from a Delta table
input_df = spark.read.csv("dbfs:/databricks-datasets/flights/departuredelays.csv", header=True, inferSchema=True)


# Display a sample of the input data
input_df.display()

In [0]:
input_df = spark.read.csv("dbfs:/databricks-datasets/flights/departuredelays.csv", header=True, inferSchema=True)
summary_stats, profiles = profiler.profile(input_df, opts={"sample_fraction": 1.0})
print(yaml.safe_dump(summary_stats))

In [0]:
for profile in profiles:
    print('*',profile)

In [0]:
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)  # with default level "error"
print(yaml.safe_dump(checks))

In [0]:
dlt_generator = DQDltGenerator(ws)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)