In [0]:
%pip install pydeequ

Collecting pydeequ
  Downloading pydeequ-1.5.0-py3-none-any.whl.metadata (9.7 kB)
Downloading pydeequ-1.5.0-py3-none-any.whl (37 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.5.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
import os
os.environ["SPARK_VERSION"] = "3.5"

In [0]:
print("Reading processed IoT data...")

storage_account = "iotdatastoragebalu"
storage_key = ""
container_processed = "iot-processed"
container_reports = "iot-reports"

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_key
)

df = spark.read.format("parquet").load(
    f"abfss://{container_processed}@{storage_account}.dfs.core.windows.net/"
)

print(f"Loaded {df.count()} records from processed IoT data")


from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import json

check = (
    Check(spark, CheckLevel.Error, "IoT Sensor Data Quality")
    .isContainedIn("Status", ["OK", "WARNING", "ALERT"])
    .isNonNegative("Temperature_C")
    .isNonNegative("Pressure_bar")
)

result = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(check)
    .run()
)

print("Data Quality Check Results:")
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
display(result_df)

result_path = f"abfss://{container_reports}@{storage_account}.dfs.core.windows.net/deequ_report"
result_df.write.mode("overwrite").json(result_path)

print(f"Data Quality report saved to {result_path}")
print("DQ Check complete.")


Reading processed IoT data...
Loaded 20 records from processed IoT data
Data Quality Check Results:




check,check_level,check_status,constraint,constraint_status,constraint_message
IoT Sensor Data Quality,Error,Success,"ComplianceConstraint(Compliance(Status contained in OK,WARNING,ALERT,`Status` IS NULL OR `Status` IN ('OK','WARNING','ALERT'),None))",Success,
IoT Sensor Data Quality,Error,Success,"ComplianceConstraint(Compliance(Temperature_C is non-negative,COALESCE(CAST(Temperature_C AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
IoT Sensor Data Quality,Error,Success,"ComplianceConstraint(Compliance(Pressure_bar is non-negative,COALESCE(CAST(Pressure_bar AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,


Data Quality report saved to abfss://iot-reports@iotdatastoragebalu.dfs.core.windows.net/deequ_report
DQ Check complete.
