In [1]:
%env SPARK_VERSION=3.2.0  # TODO PUT YOUR VALUE

env: SPARK_VERSION=3.2.0  # TODO PUT YOUR VALUE


In [2]:
# TODO create spark session with jdbc driver path
import pydeequ
from pyspark.sql import SparkSession, Row
from pydeequ.analyzers import *

jdbc_driver_path = "mssql-jdbc-12.6.1.jre11.jar:deequ-2.0.1-spark-3.2.jar"

# Initializing SparkSession
sparkdf = SparkSession.builder \
    .appName("myApp") \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .getOrCreate()

In [3]:
# TODO connect to DB with Spark using JDBC connection to read the data

db_url = "jdbc:sqlserver://127.0.0.1:1433;databaseName=AdventureWorks2012;encrypt=false"

table_name = "Production.Product"
user = "test" # your user creds here
password  = "test" # your user creds here

db_properties = {
    "user": "test",
    "password": "test",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Read data from the database table into a DataFrame
df = sparkdf.read.jdbc(url=db_url, table=table_name, properties=db_properties)

df.printSchema()

#spark.sql("SHOW TABLES").show()

root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: boolean (nullable = true)
 |-- FinishedGoodsFlag: boolean (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: short (nullable = true)
 |-- ReorderPoint: short (nullable = true)
 |-- StandardCost: decimal(19,4) (nullable = true)
 |-- ListPrice: decimal(19,4) (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- Weight: decimal(8,2) (nullable = true)
 |-- DaysToManufacture: integer (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductSubcategoryID: integer (nullable = true)
 |-- ProductModelID: integer (nullable = true)
 |-- SellStartDate: timestamp (nullable = true)
 |-- SellEndDate: timestamp (nullable = true)
 |-- Disco

In [4]:
### Data Analyzers section

# Analyze data using PyDeequ
analysis_result = AnalysisRunner(sparkdf) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("ProductID")) \
    .addAnalyzer(Distinctness("ProductID")) \
    .addAnalyzer(Uniqueness(["ProductID", "Name", "ProductNumber"])) \
    .addAnalyzer(Mean("SafetyStockLevel")) \
    .addAnalyzer(Compliance("top SafetyStockLevel", "SafetyStockLevel >= 500")) \
    .addAnalyzer(Sum("ReorderPoint")) \
    .addAnalyzer(Correlation("StandardCost", "ListPrice")) \
    .run()

# Print analysis result
analysis_result_df = AnalyzerContext.successMetricsAsDataFrame(sparkdf, analysis_result)
analysis_result_df.show()

+-----------+--------------------+------------+------------------+
|     entity|            instance|        name|             value|
+-----------+--------------------+------------+------------------+
|     Column|    SafetyStockLevel|        Mean| 535.1507936507936|
|     Column|           ProductID|Distinctness|               1.0|
|     Column|top SafetyStockLevel|  Compliance|0.6904761904761905|
|Mutlicolumn|StandardCost,List...| Correlation|0.9974159499736837|
|Mutlicolumn|ProductID,Name,Pr...|  Uniqueness|               1.0|
|    Dataset|                   *|        Size|             504.0|
|     Column|           ProductID|Completeness|               1.0|
|     Column|        ReorderPoint|         Sum|          202287.0|
+-----------+--------------------+------------+------------------+



In [5]:
### Data profiling section
# TODO profile data here

from pydeequ.profiles import ColumnProfilerRunner

# Run data profiling using PyDeequ
profile_result = ColumnProfilerRunner(sparkdf) \
    .onData(df) \
    .restrictToColumns(['ProductID', 'Name', 'ProductNumber', 'MakeFlag', 'FinishedGoodsFlag', 'StandardCost', 'ListPrice', 'SellStartDate', 'SellEndDate'])\
    .run()

# Print the profiling result
for col, profile in profile_result.profiles.items():
    print(col)
    print(profile)

Name
StandardProfiles for column: Name: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 511,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 504
    },
    "histogram": null
}
ListPrice
NumericProfiles for column: ListPrice: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 104,
    "dataType": "Fractional",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null,
    "kll": "None",
    "mean": 438.66624999999937,
    "maximum": 3578.27,
    "minimum": 0.0,
    "sum": 221087.7899999997,
    "stdDev": 772.8349984287421,
    "approxPercentiles": []
}
SellStartDate
StandardProfiles for column: SellStartDate: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 4,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null
}
MakeFlag
StandardProfiles 

In [6]:
### Constraint Suggestions section
# TODO find meaninful constraints here

from pydeequ.suggestions import *

suggestion_result = ConstraintSuggestionRunner(sparkdf) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions:
for sugg in suggestion_result['constraint_suggestions']:
    print(f"Constraint suggestion for \'{sugg['column_name']}\': {sugg['description']}")
    print(f"The corresponding Python code is: {sugg['code_for_constraint']}\n")

Constraint suggestion for 'ModifiedDate': 'ModifiedDate' is not null
The corresponding Python code is: .isComplete("ModifiedDate")

Constraint suggestion for 'ReorderPoint': 'ReorderPoint' has value range '375', '750', '75', '3', '600', '45'
The corresponding Python code is: .isContainedIn("ReorderPoint", ["375", "750", "75", "3", "600", "45"])

Constraint suggestion for 'ReorderPoint': 'ReorderPoint' is not null
The corresponding Python code is: .isComplete("ReorderPoint")

Constraint suggestion for 'ReorderPoint': 'ReorderPoint' has value range '375', '750', '75', '3' for at least 91.0% of values
The corresponding Python code is: .isContainedIn("ReorderPoint", ["375", "750", "75", "3"], lambda x: x >= 0.91, "It should be above 0.91!")

Constraint suggestion for 'ReorderPoint': 'ReorderPoint' has no negative values
The corresponding Python code is: .isNonNegative("ReorderPoint")

Constraint suggestion for 'ProductLine': 'ProductLine' has value range 'R ', 'M ', 'T ', 'S '
The correspo

In [7]:
### Constraint Verification section
# TODO check selected constraints here and make beautify the report

from pydeequ.checks import *
from pydeequ.verification import *


check = Check(sparkdf, CheckLevel.Warning, "Review Check")


check_result = VerificationSuite(sparkdf) \
    .onData(df) \
    .addCheck(
        check.isUnique("ProductID") \
        .isUnique("Name") \
        .isUnique("ProductNumber") \
        .isComplete("Name") \
        .isComplete("ProductNumber") \
        .isComplete("ListPrice") \
        .isComplete("StandardCost") \
        .isComplete("FinishedGoodsFlag") \
        .isComplete("ModifiedDate") \
        .isComplete("SellStartDate") \
        .isContainedIn("Color", ["Black", "Silver", "Red", "Yellow", "Blue", "Multi", "Silver/Black", "White", "Grey"]) \
        .isContainedIn("Class", ["L ", "H ", "M "]) \
        .isContainedIn("Style", ["U ", "W ", "M "]) \
        .isContainedIn("ProductLine", ["R ", "M ", "T ", "S "]) \
        .isContainedIn("WeightUnitMeasureCode", ["LB ", "G  "]) \
        .hasCompleteness("Size", lambda x: x >= 0.37, "It should be above 0.37!") \
        .isNonNegative("ProductModelID") \
        .isNonNegative("ProductSubcategoryID") \
        .isNonNegative("Weight") \
        .isNonNegative("StandardCost") \
        .isNonNegative("ListPrice")) \
     .run()

print(f"Verification Run Status: {check_result.status}")

check_result_df = VerificationResult.checkResultsAsDataFrame(sparkdf, check_result)
check_result_df.show()


Python Callback server started!
Verification Run Status: Success
+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+
only showing top 20 rows



In [8]:
# Save Constraint Verification results in the check_results.csv file

check_result_df.coalesce(1).write.mode('overwrite').option("header", "true").csv('check_results.csv')


In [9]:
# Save Constraint Verification results in the check_results.html file

import pandas as pd
import webbrowser
import os

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = check_result_df.select("*").toPandas()

# Convert the Pandas DataFrame to HTML
result_html = result_pdf.to_html()

#Write HTML String to file.html
with open('check_results.html', 'w') as f:
    f.write(result_html)

# Open the result_file.html in the browser
webbrowser.open('file://' + os.path.realpath('check_results.html'))


# Stop SparkSession
sparkdf.sparkContext._gateway.shutdown_callback_server()
sparkdf.stop()