In [1]:
%env SPARK_VERSION=3.5.5

env: SPARK_VERSION=3.5.5


In [2]:
### Spark Session Initialization with PyDeequ and JDBC Support
import pydeequ
from pyspark.sql import SparkSession

# Initialize Spark session 
spark = (
    SparkSession.builder
    .appName("DeequDataProfiling")
    .config("spark.jars", "/opt/spark/jars/mssql-jdbc-12.10.0.jre11.jar")
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate()
)
# use this part for clarity (optional)
print("✅ Spark session created!")
print("Spark version:", spark.version)

✅ Spark session created!
Spark version: 3.5.5


In [3]:
### JDBC connection properties
jdbc_url = "jdbc:sqlserver://host.docker.internal:1433;databaseName=TRN;encrypt=true;trustServerCertificate=true"
connection_properties = {
    "user": "robot",           # Replace with your actual username
    "password": "Vika_password123",  # Replace with your actual password
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Load the employees table from the TRN database
df_employees = spark.read.jdbc(
    url=jdbc_url,
    table="hr.employees",
    properties=connection_properties
)

# Show the first 20 rows to confirm it's loaded
df_employees.show(20)


+-----------+-----------+----------+--------------------+------------+----------+------+--------+----------+-------------+
|employee_id| first_name| last_name|               email|phone_number| hire_date|job_id|  salary|manager_id|department_id|
+-----------+-----------+----------+--------------------+------------+----------+------+--------+----------+-------------+
|        100|     Steven|      King|steven.king@sqltu...|515.123.4567|1987-06-17|     4|24000.00|      NULL|            9|
|        101|      Neena|   Kochhar|neena.kochhar@sql...|515.123.4568|1989-09-21|     5|17000.00|       100|            9|
|        102|        Lex|   De Haan|lex.de haan@sqltu...|515.123.4569|1993-01-13|     5|17000.00|       100|            9|
|        103|  Alexander|    Hunold|alexander.hunold@...|590.423.4567|1990-01-03|     9| 9000.00|       102|            6|
|        104|      Bruce|     Ernst|bruce.ernst@sqltu...|590.423.4568|1991-05-21|     9| 6000.00|       103|            6|
|        105|   

In [4]:
### Display the schema of the table to understand the column names and data types
df_employees.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- salary: decimal(8,2) (nullable = true)
 |-- manager_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)



In [6]:
### Data Analyzers section
from pydeequ.analyzers import *

analysis_result = (
    AnalysisRunner(spark)
    .onData(df_employees)

    # Dataset-level
    .addAnalyzer(Size())

    # Column-level: email
    .addAnalyzer(Completeness("email"))
    .addAnalyzer(Uniqueness(["email"]))
    .addAnalyzer(CountDistinct("email")) 

    # Column-level: employee_id
    .addAnalyzer(CountDistinct("employee_id"))
    .addAnalyzer(Uniqueness(["employee_id"]))

    # Column-level: salary
    .addAnalyzer(Minimum("salary"))
    .addAnalyzer(Maximum("salary"))
    .addAnalyzer(Mean("salary"))
    .addAnalyzer(StandardDeviation("salary"))

    # Column-level: hire_date
    .addAnalyzer(Minimum("hire_date"))
    .addAnalyzer(Maximum("hire_date"))

    .run()
)

# Convert results to DataFrame and show
analysis_result_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)
analysis_result_df.show(truncate=False)

+-------+-----------+-----------------+-----------------+
|entity |instance   |name             |value            |
+-------+-----------+-----------------+-----------------+
|Column |email      |Completeness     |1.0              |
|Column |email      |Uniqueness       |1.0              |
|Column |employee_id|Uniqueness       |1.0              |
|Column |email      |CountDistinct    |40.0             |
|Column |employee_id|CountDistinct    |40.0             |
|Dataset|*          |Size             |40.0             |
|Column |salary     |Minimum          |2500.0           |
|Column |salary     |Maximum          |24000.0          |
|Column |salary     |Mean             |8060.0           |
|Column |salary     |StandardDeviation|4528.068020690502|
+-------+-----------+-----------------+-----------------+



In [6]:
### Data profiling version 1 (raw)

from pydeequ.profiles import *

# creating a data profile for the Production.Product table
result = ColumnProfilerRunner(spark) \
    .onData(df_employees) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: first_name: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 36,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 40
    },
    "histogram": [
        [
            "Den",
            1,
            0.025
        ],
        [
            "John",
            2,
            0.05
        ],
        [
            "Karen",
            2,
            0.05
        ],
        [
            "William",
            1,
            0.025
        ],
        [
            "Alexander",
            2,
            0.05
        ],
        [
            "Pat",
            1,
            0.025
        ],
        [
            "Lex",
            1,
            0.025
        ],
        [
            "Nancy",
            1,
            0.025
        ],
        [
            "Shelli",
            1,
            0.025
        ],
  

In [None]:
### Data profiling version 2 
from pydeequ.profiles import ColumnProfilerRunner

# Run the column profiler
profile_result = ColumnProfilerRunner(spark) \
    .onData(df_employees) \
    .run()

# Show basic profiling output
for col, profile in profile_result.profiles.items():
    print(f"\nColumn: {col}")
    print(f" - Completeness: {profile.completeness}")
    print(f" - Data type: {profile.dataType}")
    print(f" - Approx. distinct count: {profile.approximateNumDistinctValues}")
    print(f" - Is nullable: {profile.isDataTypeInferred}")


Column: first_name
 - Completeness: 1.0
 - Data type: String
 - Approx. distinct count: 36
 - Is nullable: False

Column: employee_id
 - Completeness: 1.0
 - Data type: Integral
 - Approx. distinct count: 39
 - Is nullable: False

Column: hire_date
 - Completeness: 1.0
 - Data type: Unknown
 - Approx. distinct count: 38
 - Is nullable: False

Column: manager_id
 - Completeness: 0.975
 - Data type: Integral
 - Approx. distinct count: 10
 - Is nullable: False

Column: phone_number
 - Completeness: 0.85
 - Data type: String
 - Approx. distinct count: 34
 - Is nullable: False

Column: email
 - Completeness: 1.0
 - Data type: String
 - Approx. distinct count: 41
 - Is nullable: False

Column: department_id
 - Completeness: 1.0
 - Data type: Integral
 - Approx. distinct count: 11
 - Is nullable: False

Column: job_id
 - Completeness: 1.0
 - Data type: Integral
 - Approx. distinct count: 19
 - Is nullable: False

Column: salary
 - Completeness: 1.0
 - Data type: Fractional
 - Approx. distinc

In [22]:
### Data profiling version 3 (my favorite option)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, length

# Define enhanced schema
schema = StructType([
    StructField("column", StringType(), True),
    StructField("data_type", StringType(), True),
    StructField("completeness", DoubleType(), True),
    StructField("null_count", DoubleType(), True),
    StructField("approx_distinct", DoubleType(), True),
    StructField("min", StringType(), True),
    StructField("max", StringType(), True),
    StructField("mean", StringType(), True),
    StructField("min_length", DoubleType(), True),
    StructField("max_length", DoubleType(), True),
    StructField("pattern_example", StringType(), True),
])

total_count = df_employees.count()
string_cols = [f.name for f in df_employees.schema.fields if f.dataType.simpleString() == "string"]

profile_rows = []

for col_name, profile in profile_result.profiles.items():
    nulls = total_count * (1 - profile.completeness)
    data_type = str(profile.dataType)
    
    # Default values
    min_len = max_len = None
    sample_pattern = None

    if col_name in string_cols:
        length_stats = df_employees.select(length(col(col_name)).alias("len")).agg(
            {"len": "min"}
        ).withColumnRenamed("min(len)", "min_len").join(
            df_employees.select(length(col(col_name)).alias("len")).agg({"len": "max"}).withColumnRenamed("max(len)", "max_len")
        ).collect()[0]

        min_len = float(length_stats["min_len"]) if length_stats["min_len"] is not None else None
        max_len = float(length_stats["max_len"]) if length_stats["max_len"] is not None else None

        sample_row = df_employees.select(col_name).filter(col(col_name).isNotNull()).limit(1).collect()
        sample_pattern = sample_row[0][0] if sample_row else None

    row = (
        col_name,
        data_type,
        float(profile.completeness),
        float(nulls),
        float(profile.approximateNumDistinctValues),
        str(getattr(profile, "minimum", getattr(profile, "min", None))),
        str(getattr(profile, "maximum", getattr(profile, "max", None))),
        str(getattr(profile, "mean", None)),
        min_len,
        max_len,
        sample_pattern
    )
    profile_rows.append(row)

# Create and show final profiling DataFrame
df_profile = spark.createDataFrame(profile_rows, schema=schema)
df_profile.show(truncate=False)
# ┌────────────────────────────────────────────────────────────────────┐
# │ ℹ️  Notes on Profiling Fields:                                     │
# │ 📊 min, max, mean              → Applicable to numeric columns only│
# │ 🔤 min_length, max_length      → Applicable to string columns only │
# │ 🧩 pattern_example             → Sample value to illustrate format │
# └────────────────────────────────────────────────────────────────────┘

+-------------+----------+------------+------------------+---------------+------+-------+------------------+----------+----------+---------------------------+
|column       |data_type |completeness|null_count        |approx_distinct|min   |max    |mean              |min_length|max_length|pattern_example            |
+-------------+----------+------------+------------------+---------------+------+-------+------------------+----------+----------+---------------------------+
|first_name   |String    |1.0         |0.0               |36.0           |None  |None   |None              |3.0       |11.0      |Steven                     |
|employee_id  |Integral  |1.0         |0.0               |39.0           |100.0 |206.0  |140.225           |NULL      |NULL      |NULL                       |
|hire_date    |Unknown   |1.0         |0.0               |38.0           |None  |None   |None              |NULL      |NULL      |NULL                       |
|manager_id   |Integral  |0.975       |1.00000

In [None]:
# Optional: Save profiling results to CSV. This will create a folder called profiling_results with a CSV file inside it 
df_profile.coalesce(1).write.option("header", True).csv("profiling_results")

In [None]:
### Constraint Suggestions section
from pydeequ.suggestions import *

# Generate constraint suggestions
suggestion_result = (
    ConstraintSuggestionRunner(spark)
    .onData(df_employees)
    .addConstraintRule(DEFAULT())
    .run()
)

# Display each suggestion with a fallback if no 'code'
#print("🔍 Suggested Constraints:\n")
#for s in suggestion_result['constraint_suggestions']:
    #print(f"✔️ {s['description']}")
    #print(f"Code: {s.get('code', '⚠️ No code suggestion available')}\n")

print("🔍 Suggested Constraints:\n")
for s in suggestion_result['constraint_suggestions']:
    print(f"✔️ {s['description']}")
    print(f"The corresponding Python code is: {s.get('code_for_constraint', '⚠️ No code suggestion available')}\n")

🔍 Suggested Constraints:

✔️ 'first_name' is not null
The corresponding Python code is: .isComplete("first_name")

✔️ 'employee_id' is not null
The corresponding Python code is: .isComplete("employee_id")

✔️ 'employee_id' has no negative values
The corresponding Python code is: .isNonNegative("employee_id")

✔️ 'employee_id' is unique
The corresponding Python code is: .isUnique("employee_id")

✔️ 'hire_date' is not null
The corresponding Python code is: .isComplete("hire_date")

✔️ 'hire_date' is unique
The corresponding Python code is: .isUnique("hire_date")

✔️ 'manager_id' has no negative values
The corresponding Python code is: .isNonNegative("manager_id")

✔️ 'manager_id' has less than 7% missing values
The corresponding Python code is: .hasCompleteness("manager_id", lambda x: x >= 0.92, "It should be above 0.92!")

✔️ 'phone_number' has less than 27% missing values
The corresponding Python code is: .hasCompleteness("phone_number", lambda x: x >= 0.73, "It should be above 0.73!")

In [12]:
### Constraint Verification section
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

check = (
    Check(spark, CheckLevel.Error, "Employee data verification")
    
    # employee_id checks
    .isComplete("employee_id")
    .isNonNegative("employee_id")
    .isUnique("employee_id")

    # first_name checks
    .isComplete("first_name")
    .hasMinLength("first_name", lambda x: x >= 2)
    .hasMaxLength("first_name", lambda x: x <= 30)
    .hasPattern("first_name", r"^[^\d]*$")  # no digits

    # last_name checks
    .isComplete("last_name")
    .hasMinLength("last_name", lambda x: x >= 2)
    .hasMaxLength("last_name", lambda x: x <= 30)
    .hasPattern("last_name", r"^[^\d]*$")  # no digits

    # email checks
    .isComplete("email")
    .isUnique("email")
    .hasPattern("email", r".+@.+\..+")

    # phone_number checks
    .hasCompleteness("phone_number", lambda x: x >= 0.70)
    .hasMinLength("phone_number", lambda x: x >= 10)
    .hasMaxLength("phone_number", lambda x: x <= 12)

    # hire_date checks
    .isComplete("hire_date")    

    # job_id checks
    .isComplete("job_id")
    .isNonNegative("job_id")
    .isContainedIn("job_id", [str(i) for i in range(1, 20)])

    # salary checks
    .isComplete("salary")
    .hasMin("salary", lambda x: x >= 2500)

    # manager_id checks
    .isNonNegative("manager_id")
    .hasCompleteness("manager_id", lambda x: x >= 0.95)
    .isContainedIn("manager_id", ['100', '101', '102', '103', '108', '114', '120', '123', '201', '205'])

    # department_id checks
    .isComplete("department_id")
    .hasCompleteness("department_id", lambda x: x >= 0.95)
    .isContainedIn("department_id", [str(i) for i in range(1, 12)])
)

# Run verification
verification_result = VerificationSuite(spark) \
    .onData(df_employees) \
    .addCheck(check) \
    .run()

# Show results
#verification_result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
#verification_result_df.show(truncate=False)

# Show overall check status
print(f"✅ Verification Run Status: {verification_result.status}")

# Convert result to Pandas DataFrame for a nice table format
verification_result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result, pandas=True)
verification_result_df


✅ Verification Run Status: Success




Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Employee data verification,Error,Success,CompletenessConstraint(Completeness(employee_i...,Success,
1,Employee data verification,Error,Success,ComplianceConstraint(Compliance(employee_id is...,Success,
2,Employee data verification,Error,Success,UniquenessConstraint(Uniqueness(List(employee_...,Success,
3,Employee data verification,Error,Success,CompletenessConstraint(Completeness(first_name...,Success,
4,Employee data verification,Error,Success,"MinLengthConstraint(MinLength(first_name,None,...",Success,
5,Employee data verification,Error,Success,"MaxLengthConstraint(MaxLength(first_name,None,...",Success,
6,Employee data verification,Error,Success,"PatternMatchConstraint(first_name, ^[^\d]*$)",Success,
7,Employee data verification,Error,Success,"CompletenessConstraint(Completeness(last_name,...",Success,
8,Employee data verification,Error,Success,"MinLengthConstraint(MinLength(last_name,None,N...",Success,
9,Employee data verification,Error,Success,"MaxLengthConstraint(MaxLength(last_name,None,N...",Success,


In [16]:
### Create .csv Test Report (This will create a file "constraint_verification_report.csv" in the root folder)
from pydeequ.verification import VerificationResult
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result, pandas=True)
checkResult_df.to_csv("constraint_verification_report.csv", index=False)

In [17]:
### Create HTML Test Report (This will create a file "constraint_verification_report.html" in the root folder)
checkResult_df.to_html("constraint_verification_report.html", index=False)