In [1]:
%env SPARK_VERSION=3.0.0

env: SPARK_VERSION=3.0.0


In [2]:
import pydeequ
import pretty_html_table
import datetime

from pyspark.sql import SparkSession, Row
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *
from datetime import timezone

In [3]:
# Connect to DB with Spark using JDBC connection to read the data
server_url = "jdbc:sqlserver://host.docker.internal:1433" # think what should be here
database_name = "AdventureWorks2019"
url = server_url + ";" + "databaseName=" + database_name + ";encrypt=true;trustServerCertificate=true;"

table = "Sales.SalesPerson"
user = "auto_test" # your user creds here
password  = "auto_test" # your user creds here

# Path to .jar file
classpath = "/usr/local/spark-3.0.0-bin-hadoop3.2/jars/mssql-jdbc-12.2.0.jre8.jar"

# Create SparkSession
spark = (SparkSession .builder \
         .config("spark.driver.extraClassPath", classpath) \
         .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
         .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate())
    
# Create Spark DataFrame
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .load()

# Show some of table metadata
jdbcDF.printSchema()

root
 |-- BusinessEntityID: integer (nullable = true)
 |-- TerritoryID: integer (nullable = true)
 |-- SalesQuota: decimal(19,4) (nullable = true)
 |-- Bonus: decimal(19,4) (nullable = true)
 |-- CommissionPct: decimal(10,4) (nullable = true)
 |-- SalesYTD: decimal(19,4) (nullable = true)
 |-- SalesLastYear: decimal(19,4) (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [4]:
### Data Analyzers section
# Retrieve some information about data for analysis
analysisResult = AnalysisRunner(spark)\
                    .onData(jdbcDF)\
                    .addAnalyzer(Size('TerritoryID is not NULL'))\
                    .addAnalyzer(Completeness('SalesYTD'))\
                    .addAnalyzer(Uniqueness(['TerritoryID']))\
                    .addAnalyzer(MaxLength('SalesYTD'))\
                    .addAnalyzer(Maximum('SalesYTD'))\
                    .addAnalyzer(Mean('SalesYTD'))\
                    .addAnalyzer(Minimum('SalesYTD'))\
                    .run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

# Show results
analysisResult_df.show()

+-------+-----------+--------------------+-----------------+
| entity|   instance|                name|            value|
+-------+-----------+--------------------+-----------------+
|Dataset|          *|Size (where: Terr...|             14.0|
| Column|   SalesYTD|        Completeness|              1.0|
| Column|   SalesYTD|             Maximum|     4251368.5497|
| Column|   SalesYTD|                Mean|2133975.994317647|
| Column|   SalesYTD|             Minimum|      172524.4512|
| Column|TerritoryID|          Uniqueness|              0.5|
+-------+-----------+--------------------+-----------------+



In [5]:
### Data profiling section
result = ColumnProfilerRunner(spark) \
    .onData(jdbcDF) \
    .run()

# Return results of profiling. This data could be used for creation of suggestions
for col, profile in result.profiles.items():
    print(f"Column: {col}")
    print(profile)

Column: ModifiedDate
StandardProfiles for column: ModifiedDate: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 7,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": null
}
Column: rowguid
StandardProfiles for column: rowguid: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 17,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 17
    },
    "histogram": [
        [
            "48754992-9EE0-4C0E-8C94-9451604E3E02",
            1,
            0.058823529411764705
        ],
        [
            "4DD9EEE4-8E81-4F8C-AF97-683394C1F7C0",
            1,
            0.058823529411764705
        ],
        [
            "25F6838D-9DB4-4833-9DDC-7A24283AF1BA",
            1,
            0.058823529411764705
        ],
        [
            "F509E3D4-76C8-42AA-B353-90B7B8DB08DE",
 

In [6]:
### Constraint Suggestions section
suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(jdbcDF) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Return provided constraint suggestion. This data will be used for verificationSuite creation
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(ModifiedDate,None))",
      "column_name": "ModifiedDate",
      "current_value": "Completeness: 1.0",
      "description": "'ModifiedDate' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"ModifiedDate\")"
    },
    {
      "constraint_name": "CompletenessConstraint(Completeness(rowguid,None))",
      "column_name": "rowguid",
      "current_value": "Completeness: 1.0",
      "description": "'rowguid' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"rowguid\")"
    },
    {
      "constraint_name": "UniquenessConstraint(Uniqueness(List(rowguid),None))",
      "column

In [7]:
### Constraint Verification section
# Run constraing verification. 
check = Check(spark, CheckLevel.Warning, "Review Check")
checkResult = VerificationSuite(spark) \
    .onData(jdbcDF) \
    .addCheck(
        check \
        .isComplete("BusinessEntityID") \
        .isUnique("BusinessEntityID") \
        .isContainedIn("TerritoryID", ["1","2","3","4","5","6","7","8","9","10"])
        .isNonNegative("SalesQuota") \
        .isNonNegative("Bonus") \
        .isNonNegative("CommissionPct") \
        .isNonNegative("SalesYTD") \
        .isComplete("rowguid") \
        .isUnique("rowguid") \
        .isComplete("ModifiedDate") \
        )\
    .run()

# Add constraints verification into spark dataframe. 
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
# Transform spark dataframe into Pandas dataframe for report creation
pandasDF = checkResult_df.toPandas()

In [8]:
# Create .html report with results of constraints verification
html_body = pretty_html_table.build_table(pandasDF, 'orange_light', width='300')
html_header = \
f'''<div class="header">
  <h1 style="color: #3c1800;">Report with results of constraints verification</h1>
  <p style="color: #c65911;">State of table: {table}. UTC time: {datetime.datetime.now(timezone.utc)} </p>
</div>'''
html = html_header + html_body

# Write html to file
text_file = open("report.html", "w")
text_file.write(html)
text_file.close()