In [0]:
print("this is a testing code for notebook")

this is a testing code for notebook


In [0]:
%pip install pydeequ

Collecting pydeequ
  Downloading pydeequ-1.5.0-py3-none-any.whl.metadata (9.7 kB)
Downloading pydeequ-1.5.0-py3-none-any.whl (37 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.5.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
import os
os.environ["SPARK_VERSION"] = "3.5"

In [0]:
from pyspark.sql import SparkSession
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

# SparkSession is already available as `spark` in Databricks

# Sample data
data = spark.createDataFrame([
    (1, 100),
    (2, 200),
    (3, 300),
    (4, None)
], ["transaction_id", "sales_amount"])

# Define checks
check = Check(spark, CheckLevel.Error, "Data quality checks") \
    .isNonNegative("sales_amount")

# Run verification
result = VerificationSuite(spark).onData(data).addCheck(check).run()

# Convert results to DataFrame
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)

# Overall status
print("Verification status:", result.status)


+-------------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check              |check_level|check_status|constraint                                                                                                                  |constraint_status|constraint_message|
+-------------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|Data quality checks|Error      |Success     |ComplianceConstraint(Compliance(sales_amount is non-negative,COALESCE(CAST(sales_amount AS DECIMAL(20,10)), 0.0) >= 0,None))|Success          |                  |
+-------------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------



In [0]:
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import uuid
from scipy.stats import ks_2samp
import pandas as pd
from datetime import date
import json

# Spark session (already available in Databricks)
spark = SparkSession.builder.appName("DataQualityChecks").getOrCreate()

# Load sample data with schema inference
# data = spark.read.format("csv") \
#     .option("header", "true") \
#     .option("inferSchema", "true") \
#     .load("dbfs:/FileStore/shared_uploads/traininguser8@sudosu.ai/sample_sales_data-1.csv")

data = spark.table("hive_metastore.default.sample_sales_data")

# Define quality rules
check = Check(spark, CheckLevel.Error, "Data quality checks") \
    .hasSize(lambda x: x >= 10) \
    .isNonNegative("sales_amount")

# Run verification
result = VerificationSuite(spark) \
    .onData(data) \
    .addCheck(check) \
    .run()

result_df = VerificationResult.checkResultsAsDataFrame(spark, result)

unique_id = str(uuid.uuid4())
result_df.write.mode("overwrite").json(f"dbfs:/FileStore/deequ_report_{unique_id}")

# Stop job if failed
if result.status != "Success":
    print("Deequ checks failed.")
else:
    print("Deequ checks passed")




Deequ checks passed


In [0]:
baseline_pdf = pd.DataFrame([
    {"transaction_id": 9001, "sales_amount": 250, "product_id": 101, "store_id": 1},
    {"transaction_id": 9002, "sales_amount": 300, "product_id": 102, "store_id": 2},
    {"transaction_id": 9003, "sales_amount": 150, "product_id": 103, "store_id": 1},
    {"transaction_id": 9004, "sales_amount": 400, "product_id": 104, "store_id": 3},
])

current_pdf = data.toPandas()
stat, p_value = ks_2samp(baseline_pdf['sales_amount'], current_pdf['sales_amount'])
drift_detected = p_value < 0.05

drift_schema = StructType([
    StructField("date", StringType(), True),
    StructField("column", StringType(), True),
    StructField("drift_detected", BooleanType(), True),
    StructField("p_value", DoubleType(), True)
])

# Create DataFrame with schema
drift_df = spark.createDataFrame([
    Row(
        date=str(date.today()),
        column="sales_amount",
        drift_detected=bool(drift_detected),
        p_value=float(p_value)
    )
], schema=drift_schema)

In [0]:
deequ_results_json = result_df.toPandas().to_dict(orient="records")
drift_results_json = [{
    "date": str(date.today()),
    "column": "sales_amount",
    "ks_statistic": float(stat),
    "p_value": float(p_value),
    "drift_detected": bool(drift_detected)
}]

final_output = {
    "deequ_results": deequ_results_json,
    "drift_results": drift_results_json
}

final_output_json = json.dumps(final_output, indent=2)

# Print for logs
print(final_output_json)

# Exit for GitHub Actions
dbutils.notebook.exit(final_output_json)