In [122]:
from pyspark.sql import SparkSession, Row
from pyspark.sql import SparkSession
from pyspark.mllib.stat import Statistics
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.verification import *
from pyspark.sql.functions import *
import pydeequ
import pandas as pd
from jinja2 import Template


# create a SparkSession object
spark = SparkSession.builder.appName("PyDeequ")\
.config("spark.jars", "/home/jovyan/work/enu/mssql-jdbc-12.2.0.jre8.jar")\
.config('spark.driver.extraClassPath', '/home/jovyan/work/enu/mssql-jdbc-12.2.0.jre8.jar')\
.config('spark.executor.extraClassPath', '/home/jovyan/work/enu/mssql-jdbc-12.2.0.jre8.jar')\
.config("spark.jars.packages", pydeequ.deequ_maven_coord)\
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)\
.getOrCreate()
jars = spark.conf.get("spark.driver.extraClassPath")

In [104]:
# connect to the SQL Server database
jdbcHostname = r"EPPLWARW01DC\SQLEXPRESS"
jdbcDatabase = "AdventureWorks2012"
jdbcPort = 1433
username = "test_user"
password = "test_user"
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
      "user" : username,
      "password" : password,
      "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "encrypt": "false",
}

# read data from a table
df = spark.read.jdbc(url=jdbcUrl, table="Production.Product", properties=connectionProperties)

In [105]:
# perform data analyzing
analysisResult = AnalysisRunner(spark) \
                .onData(df) \
                .addAnalyzer(Size()) \
                .addAnalyzer(Completeness("ProductID")) \
                .addAnalyzer(ApproxCountDistinct("ProductID")) \
                .addAnalyzer(Mean("ListPrice")) \
                .addAnalyzer(Compliance("top ListPrice", "ListPrice >= 10.0")) \
                .addAnalyzer(Correlation("SafetyStockLevel", "ReorderPoint")) \
.run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()


+-----------+--------------------+-------------------+------------------+
|     entity|            instance|               name|             value|
+-----------+--------------------+-------------------+------------------+
|     Column|       top ListPrice|         Compliance|0.5773809523809523|
|     Column|           ListPrice|               Mean|         438.66625|
|    Dataset|                   *|               Size|             504.0|
|     Column|           ProductID|       Completeness|               1.0|
|     Column|           ProductID|ApproxCountDistinct|             510.0|
|Mutlicolumn|SafetyStockLevel,...|        Correlation|1.0000000000000009|
+-----------+--------------------+-------------------+------------------+



In [131]:
# perform data profiling
result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

# print readable profiling results 
for col, profile in result.profiles.items():
    print(f"Column: {col}")
    if hasattr(profile, 'dataType'):
        print(f"Data type: {profile.dataType}")
    if hasattr(profile, 'completeness'):
        print(f"Completeness: {profile.completeness}")
    if hasattr(profile, 'approximateNumDistinctValues') and hasattr(profile, 'numDistinctValues') :
        print(f"Distinctness: {profile.approximateNumDistinctValues}/{profile.numDistinctValues}")
    if hasattr(profile, 'minimum'):
        print(f"Min value: {profile.minimum}")
    if hasattr(profile, 'maximum'):
        print(f"Max value: {profile.maximum}")
    if hasattr(profile, 'mean'):
        print(f"Mean: {profile.mean}")
    if hasattr(profile, 'standardDeviation'):
        print(f"Standard deviation: {profile.standardDeviation}")
    if hasattr(profile, 'histogram'):
        print(f"Histogram: {profile.histogram}")
    if hasattr(profile, 'correlation'):
        print(f"Correlation: {profile.correlation}")
    print()

Column: ModifiedDate
Data type: String
Completeness: 1.0
Histogram: None

Column: ReorderPoint
Data type: Integral
Completeness: 1.0
Min value: 3.0
Max value: 750.0
Mean: 401.36309523809524
Histogram: [DistributionValue(value='45', count=5, ratio=0.00992063492063492), DistributionValue(value='750', count=156, ratio=0.30952380952380953), DistributionValue(value='375', count=167, ratio=0.33134920634920634), DistributionValue(value='75', count=97, ratio=0.19246031746031747), DistributionValue(value='600', count=25, ratio=0.0496031746031746), DistributionValue(value='3', count=54, ratio=0.10714285714285714)]

Column: ProductLine
Data type: String
Completeness: 0.5515873015873016
Histogram: [DistributionValue(value='NullValue', count=226, ratio=0.44841269841269843), DistributionValue(value='R ', count=100, ratio=0.1984126984126984), DistributionValue(value='M ', count=91, ratio=0.18055555555555555), DistributionValue(value='S ', count=35, ratio=0.06944444444444445), DistributionValue(value=

In [107]:
# suggest constraints
suggestionResult = ConstraintSuggestionRunner(spark) \
                        .onData(df) \
                        .addConstraintRule(DEFAULT()) \
                        .run()

#print(json.dumps(suggestionResult, indent=2))
# print constraint suggestions
for suggestion in suggestionResult['constraint_suggestions']:
    print(str(suggestion['constraint_name']) , str(suggestion['column_name']))

CompletenessConstraint(Completeness(ModifiedDate,None)) ModifiedDate
ComplianceConstraint(Compliance('ReorderPoint' has value range '375', '750', '75', '3', '600', '45',`ReorderPoint` IN ('375', '750', '75', '3', '600', '45'),None)) ReorderPoint
CompletenessConstraint(Completeness(ReorderPoint,None)) ReorderPoint
ComplianceConstraint(Compliance('ReorderPoint' has value range '375', '750', '75', '3' for at least 91.0% of values,`ReorderPoint` IN ('375', '750', '75', '3'),None)) ReorderPoint
ComplianceConstraint(Compliance('ReorderPoint' has no negative values,ReorderPoint >= 0,None)) ReorderPoint
ComplianceConstraint(Compliance('ProductLine' has value range 'R ', 'M ', 'T ', 'S ',`ProductLine` IN ('R ', 'M ', 'T ', 'S '),None)) ProductLine
ComplianceConstraint(Compliance('ProductLine' has value range 'R ', 'M ', 'T ' for at least 90.0% of values,`ProductLine` IN ('R ', 'M ', 'T '),None)) ProductLine
CompletenessConstraint(Completeness(ProductLine,None)) ProductLine
CompletenessConstrain

In [109]:
# verify constraints
checkResult = VerificationSuite(spark) \
                         .onData(df) \
                         .addCheck(Check(spark, 
                                   CheckLevel.Warning, 
                                   "ProductID check") \
                        .isComplete("ProductID")\
                        .isUnique("ProductID")  \
                        .hasMin("ProductID", lambda x: x >= 0.0)) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "Name check") \
                        .isComplete("Name")\
                        .isUnique("Name")) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ProductNumber check") \
                        .isComplete("ProductNumber")\
                        .isUnique("ProductNumber")) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "MakeFlag check") \
                        .isComplete("MakeFlag")) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "FinishedGoodsFlag check") \
                        .isComplete("FinishedGoodsFlag")) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "Color check") \
                        .isComplete("Color")
                        .isContainedIn("Color", ['Black', 'Silver', 'Red', 'Yellow', 'Blue', 'Multi', 'Silver/Black', 'White', 'Grey'])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "SafetyStockLevel check") \
                        .isComplete("SafetyStockLevel")
                        .hasMin("SafetyStockLevel", lambda x: x >= 0.0) \
                        .isContainedIn("SafetyStockLevel", ['500', '1000', '100', '4', '800', '60'])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ReorderPoint check") \
                        .isComplete("ReorderPoint")
                        .hasMin("ReorderPoint", lambda x: x >= 0.0) \
                        .isContainedIn("ReorderPoint", ['375', '750', '75', '3', '600', '45'])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "StandardCost check") \
                        .isComplete("StandardCost")
                        .hasMin("StandardCost", lambda x: x >= 0.0)) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ListPrice check") \
                        .isComplete("ListPrice")
                        .hasMin("ListPrice", lambda x: x >= 0.0)) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "Size check") \
                        .isComplete("Size")\
                        .isContainedIn("Size", ['44', '48', '52', '42', '58', '38', '60', 'L', 'M', '46', '40', '62', 'S', '54', '50', 'XL', '56', '70'])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "SizeUnitMeasureCode check") \
                        .isComplete("SizeUnitMeasureCode")\
                        .isContainedIn("SizeUnitMeasureCode", ['CM '])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "WeightUnitMeasureCode check") \
                        .isComplete("WeightUnitMeasureCode")\
                        .isContainedIn("WeightUnitMeasureCode", ['LB ', 'G  '])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "Weight check") \
                        .isComplete("Weight")\
                        .hasMin("Weight", lambda x: x >= 0.0)) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "DaysToManufacture check") \
                        .isComplete("DaysToManufacture")
                        .hasMin("DaysToManufacture", lambda x: x >= 0.0) \
                        .isContainedIn("DaysToManufacture", ['0', '1', '4', '2'])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ProductLine check") \
                        .isComplete("ProductLine")\
                        .isContainedIn("ProductLine", ['R ', 'M ', 'T ', 'S '])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "Class check") \
                        .isComplete("Class")\
                        .isContainedIn("Class", ['L ', 'H ', 'M '])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "Style check") \
                        .isComplete("Style")\
                        .isContainedIn("Style", ['U ', 'W ', 'M '])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ProductSubcategoryID check") \
                        .isComplete("ProductSubcategoryID")
                        .hasMin("ProductSubcategoryID", lambda x: x >= 0.0) \
                        .isContainedIn("ProductSubcategoryID", ['2', '14', '1', '12', '3', '16', '17', '37', '15', '21', '4', '13', '22', '20'])) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ProductModelID check") \
                        .isComplete("ProductModelID")
                        .hasMin("ProductModelID", lambda x: x >= 0.0)) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "SellStartDate check") \
                        .isComplete("SellStartDate")) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "ModifiedDate check") \
                        .isComplete("ModifiedDate")) \
                        .addCheck(Check(spark, 
                                  CheckLevel.Warning, 
                                  "rowguid check") \
                        .isComplete("rowguid")\
                        .isUnique("rowguid")) \
    .run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(50)

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [112]:
# Convert checkResult_df to a pandas DataFrame and sort it by check
checkResult_df_pd = checkResult_df.toPandas()
checkResult_df_pd = checkResult_df_pd.sort_values(by='check')
# Define the template for the HTML report also some style added for better cheking constraint_status
html_template = """
<!DOCTYPE html>
<html>
<head>
  <title>Verification Result Report</title>
  <style>
    table, th, td {
      border: 1px solid black;
      border-collapse: collapse;
    }
    th, td {
      padding: 5px;
      text-align: left;
    }
    th {
      background-color: #ddd;
    }
    .success {
      color: green;
    }
    .failure {
      color: red;
    }
  </style>
</head>
<body>
  <h1>Verification Result Report</h1>
  <p>{{ num_checks }} checks were performed.</p>
  <table>
    <thead>
      <tr>
        <th>Check</th>
        <th>Check Level</th>
        <th>Check Status</th>
        <th>Constraint</th>
        <th>Constraint Status</th>
        <th>Constraint Message</th>
      </tr>
    </thead>
    <tbody>
      {% for index, row in data.iterrows() %}
        <tr>
          <td>{{ row['check'] }}</td>
          <td>{{ row['check_level'] }}</td>
          <td>{{ row['check_status'] }}</td>
          <td>{{ row['constraint'] }}</td>
          <td class="{% if row['constraint_status'] == 'Success' %}success{% else %}failure{% endif %}">
            {{ row['constraint_status'] }}
          </td>
          <td>{{ row['constraint_message'] }}</td>
        </tr>
      {% endfor %}
    </tbody>
  </table>
</body>
</html>
"""

# Define the variables to render the template
template_vars = {
    'num_checks': len(checkResult_df_pd),
    'data': checkResult_df_pd
}

# Create a jinja2 template object
template = Template(html_template)

# Render the template with the variables
html_report = template.render(template_vars)

# Write the HTML report to a file
with open('verification_report.html', 'w') as f:
    f.write(html_report)