In [0]:
pip install --force-reinstall --no-cache-dir "git+https://github.com/STEFANOVIVAS/dqx.git@profile_subset_dataframe"

In [0]:
dbutils.library.restartPython() 

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime
import pyspark.sql.functions as sf

NCTAXI GREEN DATASET

In [0]:

df_taxi_green=spark.read \
    .option("header",True) \
    .option("inferSchema", "true") \
    .csv('/databricks-datasets/nyctaxi/tripdata/green/green_tripdata_2019-03.csv.gz')
display(df_taxi_green)
display(df_taxi_green.count())

In [0]:
#Filtered dataframe
df_taxi_green_filter=df_taxi_green.filter("vendorId=1")

NCTAXI TAXIZONE

In [0]:
df_taxi_zone=spark.read \
    .option("header",True) \
    .option("inferSchema", "true") \
    .csv('/databricks-datasets/nyctaxi/taxizone/taxi_zone_lookup.csv')
display(df_taxi_zone)


In [0]:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator

In [0]:
ws = WorkspaceClient()
profiler = DQProfiler(ws)

custom_options = {
            "sample_fraction": None,
            "limit": None,
            "filter": "vendorId=1"
        }

# Profile a specific table with custom options
summary_stats_green_taxi_filter, profiles_green_taxi_filter = profiler.profile(df_taxi_green,options=custom_options)

In [0]:
summary_stats_green_taxi_filter

In [0]:
profiles_green_taxi_filter

In [0]:
ws = WorkspaceClient()
profiler = DQProfiler(ws)

custom_options = {
            "sample_fraction": None,
            "limit": None,         
        }

# Profile a specific table with custom options
summary_stats_green_taxi, profiles_green_taxi = profiler.profile(df_taxi_green,options=custom_options)

In [0]:
summary_stats_green_taxi

In [0]:
profiles_green_taxi

#### Test filter works

In [0]:
df_green_filter_pass_sttdev=df_taxi_green_filter.select(sf.stddev("passenger_count")).collect()[0][0]
df_green_pass_sttdev=df_taxi_green.select(sf.stddev("passenger_count")).collect()[0][0]
df_green_filter_pass_mean=df_taxi_green_filter.select(sf.mean("passenger_count")).collect()[0][0]
df_green_pass_mean=df_taxi_green.select(sf.mean("passenger_count")).collect()[0][0]

df_green_filter_fare_sttdev=df_taxi_green_filter.select(sf.stddev("fare_amount")).collect()[0][0]
df_green_fare_sttdev=df_taxi_green.select(sf.stddev("fare_amount")).collect()[0][0]
df_green_filter_fare_mean=df_taxi_green_filter.select(sf.mean("fare_amount")).collect()[0][0]
df_green_fare_mean=df_taxi_green.select(sf.mean("fare_amount")).collect()[0][0]

print(f"filtered_dataset:",df_green_filter_pass_sttdev,summary_stats_green_taxi_filter["passenger_count"]["stddev"])
print(f"filtered_dataset:",df_green_filter_pass_mean,summary_stats_green_taxi_filter["passenger_count"]["mean"])
print(f"filtered_dataset:",df_green_filter_fare_sttdev,summary_stats_green_taxi_filter["fare_amount"]["stddev"])
print(f"filtered_dataset:",df_green_filter_fare_mean,summary_stats_green_taxi_filter["fare_amount"]["mean"])
print(f"dataset:",df_green_pass_sttdev,summary_stats_green_taxi["passenger_count"]["stddev"])
print(f"dataset:",df_green_pass_mean,summary_stats_green_taxi["passenger_count"]["mean"])
print(f"dataset:",df_green_fare_sttdev,summary_stats_green_taxi["fare_amount"]["stddev"])
print(f"dataset:",df_green_fare_mean,summary_stats_green_taxi["fare_amount"]["mean"])

TEST GENERATE CHECKS ANDA SAVE IT IN FILES AND TABLES

In [0]:

from databricks.labs.dqx.config import (
  FileChecksStorageConfig,
  WorkspaceFileChecksStorageConfig,
  InstallationChecksStorageConfig,
  TableChecksStorageConfig,
  VolumeFileChecksStorageConfig
)
ws = WorkspaceClient()
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles_green_taxi_filter)  # with default level "error"

dq_engine = DQEngine(ws)

# save checks as a YAML file in the local filesystem (overwrite the file) using relative or absolute path
# also works with absolute and relative workspace paths if invoked from Databricks notebook or job
dq_engine.save_checks(checks, config=FileChecksStorageConfig(location="checks_filter.yml"))

# save checks as a YAML file in arbitrary workspace location (overwrite the file) using absolute path
dq_engine.save_checks(checks, config=WorkspaceFileChecksStorageConfig(location="/Workspace/Users/stefanovivas@gmail.com/quality_rules/check_filter_ws"))

# save checks in a Delta table to "default" run config (append checks)
dq_engine.save_checks(checks, config=TableChecksStorageConfig(location="main.default.checks_table", mode="append"))

In [0]:
%sql
SELECT * FROM main.default.checks_table

TEST LOAD CHECKS FROM FILES AND TABLES

In [0]:
# df_taxi_green.write.saveAsTable("main.default.taxi_green")

In [0]:
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.labs.dqx.config import (
  FileChecksStorageConfig,
  WorkspaceFileChecksStorageConfig,
  InstallationChecksStorageConfig,
  TableChecksStorageConfig,
  VolumeFileChecksStorageConfig
)
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# load checks from a local file using relative or absolute path
# also works for absolute and relative workspace paths if invoked from Databricks notebook or job
quality_checks_file: list[dict] = dq_engine.load_checks(config=FileChecksStorageConfig(location="checks_filter.yml"))

# load checks from arbitrary workspace location using absolute path
quality_checks_workspace: list[dict] = dq_engine.load_checks(config=WorkspaceFileChecksStorageConfig(location="/Workspace/Users/stefanovivas@gmail.com/quality_rules/check_filter_ws"))

# load checks from a Delta table and default run config name
quality_checks_table= dq_engine.load_checks(config=TableChecksStorageConfig(location="main.default.checks_table"))

Applying checks defined using metadata in the filtered dataset

In [0]:
# Option 1: apply quality checks on the DataFrame and output results as a single DataFrame
valid_and_invalid_df = dq_engine.apply_checks_by_metadata(df_taxi_green, checks)
dq_engine.save_results_in_table(
  output_df=valid_and_invalid_df,
  output_config=OutputConfig(location="main.default.output_checks"),
)

# Option 2: apply quality checks on the DataFrame and provide valid and invalid (quarantined) DataFrames
valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split(df_taxi_green, checks)
dq_engine.save_results_in_table(
  output_df=valid_df,
  quarantine_df=invalid_df,
  output_config=OutputConfig(location="main.default.output_valid_checks"),
  quarantine_config=OutputConfig(location="main.default.quarantine_checks"),
)

# Option 3 End-to-End approach: apply quality checks on the input table and save results to valid and invalid (quarantined) tables
dq_engine.apply_checks_by_metadata_and_save_in_table(
    checks=checks,
    input_config=InputConfig(location="main.default.taxi_green"),
    output_config=OutputConfig(location="main.default.valid_checks_e2e"),
    quarantine_config=OutputConfig(location="main.default.quarantine_checks_e2e"),
)

In [0]:
%sql
SELECT * FROM main.default.output_checks

In [0]:
%sql
-- SELECT * FROM main.default.output_valid_checks
SELECT * FROM  main.default.quarantine_checks

TEST INVALID FILTER

In [0]:
ws = WorkspaceClient()
profiler = DQProfiler(ws)

custom_options = {
            "sample_fraction": None,
            "limit": None,
            "filter": ""
        }

# Profile a specific table with custom options
summary_stats_green_taxi_invalid_filter, profiles_green_taxi_invalid_filter = profiler.profile(df_taxi_green,options=custom_options)

In [0]:
summary_stats_green_taxi_invalid_filter