In [1]:
%env SPARK_VERSION=3.0.0

env: SPARK_VERSION=3.0.0


In [2]:
import pydeequ
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, Row
import pymssql
import pandas as pd

In [3]:
# TODO create spark session with jdbc driver path
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .enableHiveSupport()
    .getOrCreate())

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
###some workaround was used because of a lot of jdbc connection issues
server_url = "host.docker.internal:1433" # think what should be here
database_name = "TRN"
url = server_url + ";" + "databaseName=" + database_name + ";"

table = "hr.locations"
user = "DQELogin" # your user creds here
password  = "DQE2023*" # your user creds here

# Connect to the SQL Server 
conn = pymssql.connect(server=server_url, user=user, password=password, database=database_name)
query = f"SELECT * FROM {table}"
cursor = conn.cursor()
cursor.execute(query)

schema = StructType([
    StructField('location_id', IntegerType()),
    StructField('street_address', StringType()),
    StructField('postal_code', StringType()),
    StructField('city', StringType()),
    StructField('state_province', StringType()),
    StructField('country_id', StringType())
])


# Create a list of rows with appropriate types
rows = []
for row in cursor.fetchall():
    rows.append((
        int(row[0]),
        str(row[1]),
        str(row[2]),
        str(row[3]),
        str(row[4]),
        str(row[5])
    ))

# Create a Spark DataFrame
df = spark.createDataFrame(rows, schema=schema)

df.show()

+-----------+--------------------+-----------+-------------------+--------------+----------+
|location_id|      street_address|postal_code|               city|state_province|country_id|
+-----------+--------------------+-----------+-------------------+--------------+----------+
|       1400| 2014 Jabberwocky Rd|      26192|          Southlake|         Texas|        US|
|       1500| 2011 Interiors Blvd|      99236|South San Francisco|    California|        US|
|       1700|     2004 Charade Rd|      98199|            Seattle|    Washington|        US|
|       1800|     147 Spadina Ave|    M5V 2L7|            Toronto|       Ontario|        CA|
|       2400|      8204 Arthur St|       None|             London|          None|        UK|
|       2500|Magdalen Centre, ...|    OX9 9ZB|             Oxford|        Oxford|        UK|
|       2700|Schwanthalerstr. ...|      80925|             Munich|       Bavaria|        DE|
+-----------+--------------------+-----------+-------------------+----

In [11]:
### Data Analyzers section
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Mean("location_id")) \
                    .addAnalyzer(MaxLength("street_address")) \
                    .addAnalyzer(Distinctness("postal_code")) \
                    .addAnalyzer(Completeness("city")) \
                    .addAnalyzer(CountDistinct("country_id")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------------+-------------+------+
| entity|      instance|         name| value|
+-------+--------------+-------------+------+
| Column|   postal_code| Distinctness|   1.0|
| Column|          city| Completeness|   1.0|
| Column|   location_id|         Mean|2000.0|
| Column|street_address|    MaxLength|  40.0|
|Dataset|             *|         Size|   7.0|
| Column|    country_id|CountDistinct|   4.0|
+-------+--------------+-------------+------+



In [6]:
### Data profiling section
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: city: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 7,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 7
    },
    "histogram": [
        [
            "Oxford",
            1,
            0.14285714285714285
        ],
        [
            "London",
            1,
            0.14285714285714285
        ],
        [
            "Munich",
            1,
            0.14285714285714285
        ],
        [
            "Southlake",
            1,
            0.14285714285714285
        ],
        [
            "Seattle",
            1,
            0.14285714285714285
        ],
        [
            "Toronto",
            1,
            0.14285714285714285
        ],
        [
            "South San Francisco",
            1,
            0.14285714285714285
        ]
    ]
}
StandardProfiles for column

In [7]:
### Constraint Suggestions section
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

for sugg in suggestionResult['constraint_suggestions']:
       print(f"Constraint suggestion for \'{sugg['column_name']}\': {sugg['description']}")

Constraint suggestion for 'city': 'city' is not null
Constraint suggestion for 'city': 'city' is unique
Constraint suggestion for 'state_province': 'state_province' is not null
Constraint suggestion for 'state_province': 'state_province' is unique
Constraint suggestion for 'location_id': 'location_id' is not null
Constraint suggestion for 'location_id': 'location_id' has no negative values
Constraint suggestion for 'location_id': 'location_id' is unique
Constraint suggestion for 'postal_code': 'postal_code' is not null
Constraint suggestion for 'postal_code': 'postal_code' is unique
Constraint suggestion for 'street_address': 'street_address' is not null
Constraint suggestion for 'street_address': 'street_address' is unique
Constraint suggestion for 'country_id': 'country_id' is not null


In [9]:
### Constraint Verification section
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x <= 7) \
        .isComplete("location_id") \
        .isUnique("postal_code") \
        .isNonNegative("location_id")
        .hasMaxLength("street_address", lambda x: x == 40)) \
        .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+



In [10]:
report = checkResult_df.select("*").toPandas()
html = report.to_html()
file = open("deequ_report.html", "w")
file.write(html)
file.close()