In [14]:
%env SPARK_VERSION='v3.3.1'

env: SPARK_VERSION='v3.3.1'


In [15]:
import pydeequ

from pyspark.sql import SparkSession, Row

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.driver.extraClassPath","/home/jovyan/work/sqljdbc42.jar")
    .getOrCreate())

spark

In [16]:
server_name = "jdbc:sqlserver://192.168.0.185:1433"
database_name = "AdventureWorks2012"
url = server_name + ";" + "databaseName=" + database_name + ";"

table = "Person.Address"
user = "roboto"
password  = "roboto123"

data_frame = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("user", user) \
        .option("password", password) \
        .load()

data_frame.show(5)

+---------+--------------------+------------+-------+---------------+----------+--------------------+--------------------+-------------------+
|AddressID|        AddressLine1|AddressLine2|   City|StateProvinceID|PostalCode|     SpatialLocation|             rowguid|       ModifiedDate|
+---------+--------------------+------------+-------+---------------+----------+--------------------+--------------------+-------------------+
|        1|       1970 Napa Ct.|        null|Bothell|             79|     98011|[E6 10 00 00 01 0...|9AADCB0D-36CF-483...|2007-12-04 00:00:00|
|        2|  9833 Mt. Dias Blv.|        null|Bothell|             79|     98011|[E6 10 00 00 01 0...|32A54B9E-E034-4BF...|2008-11-30 00:00:00|
|        3|7484 Roundtree Drive|        null|Bothell|             79|     98011|[E6 10 00 00 01 0...|4C506923-6D1B-452...|2013-03-07 00:00:00|
|        4|    9539 Glenside Dr|        null|Bothell|             79|     98011|[E6 10 00 00 01 0...|E5946C78-4BCC-477...|2009-02-03 00:00:00|

In [17]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(data_frame) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("AddressID")) \
                    .addAnalyzer(Completeness("AddressLine2")) \
                    .addAnalyzer(Correlation("StateProvinceID","PostalCode")) \
                    .addAnalyzer(ApproxCountDistinct("rowguid")) \
                    .addAnalyzer(CountDistinct("rowguid")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+------------+-------------------+--------------------+
| entity|    instance|               name|               value|
+-------+------------+-------------------+--------------------+
| Column|     rowguid|      CountDistinct|             19614.0|
| Column|AddressLine2|       Completeness|0.018456204751707964|
|Dataset|           *|               Size|             19614.0|
| Column|     rowguid|ApproxCountDistinct|             19107.0|
| Column|   AddressID|       Completeness|                 1.0|
+-------+------------+-------------------+--------------------+



In [18]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(data_frame) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: ModifiedDate: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 1367,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null
}
StandardProfiles for column: SpatialLocation: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 17450,
    "dataType": "Unknown",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null
}
StandardProfiles for column: rowguid: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 19107,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 19614
    },
    "histogram": null
}
StandardProfiles for column: AddressLine1: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 14696,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Frac

In [19]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(data_frame) \
             .addConstraintRule(DEFAULT()) \
             .run()

#print Constraint Suggestions 
for sugg in suggestionResult['constraint_suggestions']:
       print(f"Constraint suggestion for \'{sugg['column_name']}\': {sugg['description']}")
       print(f"The corresponding Python code is: {sugg['code_for_constraint']}\n")
    
# Constraint Suggestions in JSON format
#print(json.dumps(suggestionResult))

Constraint suggestion for 'ModifiedDate': 'ModifiedDate' is not null
The corresponding Python code is: .isComplete("ModifiedDate")

Constraint suggestion for 'SpatialLocation': 'SpatialLocation' is not null
The corresponding Python code is: .isComplete("SpatialLocation")

Constraint suggestion for 'rowguid': 'rowguid' is not null
The corresponding Python code is: .isComplete("rowguid")

Constraint suggestion for 'rowguid': 'rowguid' is unique
The corresponding Python code is: .isUnique("rowguid")

Constraint suggestion for 'AddressLine1': 'AddressLine1' is not null
The corresponding Python code is: .isComplete("AddressLine1")

Constraint suggestion for 'AddressID': 'AddressID' is not null
The corresponding Python code is: .isComplete("AddressID")

Constraint suggestion for 'AddressID': 'AddressID' has no negative values
The corresponding Python code is: .isNonNegative("AddressID")

Constraint suggestion for 'StateProvinceID': 'StateProvinceID' has value range '9', '79', '14', '50', '7'

In [23]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(data_frame) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("AddressID", lambda x: x == 1) \
        .isComplete("AddressID")  \
        .isUnique("rowguid")  \
        .isContainedIn("StateProvinceID", "88") \
        .isNonNegative("PostalCode")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
#checkResult_df.show()
#readable print of checks result
checkResult_df.show(truncate=False)




+------------+-----------+------------+-----------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------+
|check       |check_level|check_status|constraint                                                                                                                         |constraint_status|constraint_message                                                   |
+------------+-----------+------------+-----------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------+
+------------+-----------+------------+-----------------------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------

In [25]:
import pandas
import pretty_html_table

#get checks results in the json format
checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)

#create pandas dataframe from checks results of verification module
df = pandas.DataFrame(checkResult_json)
df

#declare html tamplate
html_string = '''
<html>
  <head><title>HTML Pandas Dataframe with CSS</title></head>
  <link rel="stylesheet" type="text/css" href="df_style.css"/>
  <body>
    {table}
  </body>
</html>
'''


#render dataframe as html 
html = html_string.format(table=df.to_html(classes='mystyle'))
#html = pretty_html_table.build_table(df, 'blue_light')
#write html to file 
text_file = open("verification_report.html", "w") 
text_file.write(html) 
text_file.close()
