In [1]:
%env SPARK_VERSION=3.0.0 # TODO PUT YOUR VALUE

env: SPARK_VERSION=3.0.0 # TODO PUT YOUR VALUE


In [2]:
import pydeequ

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySpark SQL Server via JDBC")
    .config(
        "spark.driver.extraClassPath",
        "/usr/local/spark-3.0.0-bin-hadoop3.2/jars/mssql-jdbc-10.2.1.jre8.jar",
    )
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate()
)

spark

In [9]:

#Config variables:
database = "TRN1"
table = "hr.countries"
user = "loginfortest"
password = "test123"
ipv4 = "192.168.31.161"
port = "1433"


#Connection to DB. Get DF depending to config variables
jdbcDF = (
    spark.read.format("jdbc")
    .option(
        "url",
        f"jdbc:sqlserver://{ipv4}:{port};databaseName={database};encrypt=true;trustServerCertificate=true;",
    )
    .option("dbtable", table)
    .option("user", user)
    .option("password", password)
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()
)


#Display result
jdbcDF.show()


+----------+------------+---------+
|country_id|country_name|region_id|
+----------+------------+---------+
|        AR|   Argentina|        2|
|        AU|   Australia|        3|
|        BE|     Belgium|        1|
|        BR|      Brazil|        2|
|        CA|      Canada|        2|
|        CH| Switzerland|        1|
|        CN|       China|        3|
|        DE|     Germany|        1|
|        DK|     Denmark|        1|
|        EG|       Egypt|        4|
|        FR|      France|        1|
|        HK|    HongKong|        3|
|        IL|      Israel|        4|
|        IN|       India|        3|
|        IT|       Italy|        1|
|        JP|       Japan|        3|
|        KW|      Kuwait|        4|
|        MX|      Mexico|        2|
|        NG|     Nigeria|        4|
|        NL| Netherlands|        1|
+----------+------------+---------+
only showing top 20 rows



In [4]:
### Data Analyzers section
from pydeequ.analyzers import *

analysisResult = (
    AnalysisRunner(spark)
    .onData(jdbcDF)
    .addAnalyzer(Size())
    .addAnalyzer(ApproxCountDistinct("country_id"))
    .addAnalyzer(ApproxCountDistinct("country_name"))
    .addAnalyzer(ApproxCountDistinct("region_id"))
    .addAnalyzer(Completeness("country_id"))
    .addAnalyzer(Completeness("country_name"))
    .addAnalyzer(Completeness("region_id"))
    .addAnalyzer(Maximum("region_id"))
    .addAnalyzer(Mean("region_id"))
    .addAnalyzer(Entropy("region_id"))
    .run()
)

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()


+-------+------------+-------------------+-----------------+
| entity|    instance|               name|            value|
+-------+------------+-------------------+-----------------+
| Column|  country_id|ApproxCountDistinct|             25.0|
| Column|  country_id|       Completeness|              1.0|
|Dataset|           *|               Size|             25.0|
| Column|country_name|ApproxCountDistinct|             22.0|
| Column|country_name|       Completeness|              1.0|
| Column|   region_id|ApproxCountDistinct|              4.0|
| Column|   region_id|       Completeness|              1.0|
| Column|   region_id|            Maximum|              4.0|
| Column|   region_id|               Mean|              2.4|
| Column|   region_id|            Entropy|1.371522403814367|
+-------+------------+-------------------+-----------------+



In [5]:
### Data profiling section

from pydeequ.profiles import *

result = ColumnProfilerRunner(spark).onData(jdbcDF).run()

for col, profile in result.profiles.items():
    print(profile)


StandardProfiles for column: country_id: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 25,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 25
    },
    "histogram": [
        [
            "IN",
            1,
            0.04
        ],
        [
            "KW",
            1,
            0.04
        ],
        [
            "EG",
            1,
            0.04
        ],
        [
            "SG",
            1,
            0.04
        ],
        [
            "IL",
            1,
            0.04
        ],
        [
            "US",
            1,
            0.04
        ],
        [
            "HK",
            1,
            0.04
        ],
        [
            "BE",
            1,
            0.04
        ],
        [
            "AU",
            1,
            0.04
        ],
        [
            "ZW",
       

In [6]:
### Constraint Suggestions section
from pydeequ.suggestions import *

suggestionResult = (
    ConstraintSuggestionRunner(spark).onData(jdbcDF).addConstraintRule(DEFAULT()).run()
)

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))


{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(country_name,None))",
      "column_name": "country_name",
      "current_value": "Completeness: 1.0",
      "description": "'country_name' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"country_name\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('region_id' has value range '1', '3', '4', '2',`region_id` IN ('1', '3', '4', '2'),None))",
      "column_name": "region_id",
      "current_value": "Compliance: 1",
      "description": "'region_id' has value range '1', '3', '4', '2'",
      "suggesting_rule": "CategoricalRangeRule()",
      "rule_description": "If we see a categorical range for a column, we suggest an IS IN (...) constraint",
      "code_for_constraint": ".isContainedIn(\"region_id\", [\

In [7]:
### Constraint Verification section
from pydeequ.checks import *
from pydeequ.verification import *
import pandas as pd

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = (
    VerificationSuite(spark)
    .onData(jdbcDF)
    .addCheck(
        check.isUnique("country_id")
        .isComplete("country_id")
        .isComplete("region_id")
        .isNonNegative("region_id")
        .isContainedIn("region_id", ["1", "3", "4", "2"])
    )
    .run()
)

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.toPandas().to_csv("results.csv")
checkResult_df.show()


+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+

