### **Check spark version**

In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

print(spark.version)

3.5.3


### **Install pydeequ**

In [None]:
pip install pydeequ

Collecting pydeequ
  Downloading pydeequ-1.4.0-py3-none-any.whl.metadata (9.4 kB)
Downloading pydeequ-1.4.0-py3-none-any.whl (37 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.4.0


### NOTE: Restart the notebook kernel before proceeding. Click on **Runtime** -> **Restart Session**



### **Kick off the exercise**

In [None]:
import os
import pyspark
from pyspark.sql import SparkSession

os.environ["SPARK_VERSION"] = '3.5'
import pydeequ

In [None]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate()
    )

In [None]:
print("Deequ JAR: {}".format(spark.conf.get("spark.jars.packages")))
print("JAR to exclude: {}".format(spark.conf.get("spark.jars.excludes")))

Deequ JAR: com.amazon.deequ:deequ:2.0.7-spark-3.5
JAR to exclude: net.sourceforge.f2j:arpack_combined_all


In [None]:
# transaction Data with Random values
data = [
    {"transactionID": "T00001", "transactionDateTime": "2024-09-01 10:00:00", "market": "US", "currency": "USD", "customerID": "CUST1001", "productID": "PROD200", "productQty": 2, "productPrice": 50.00, "discount": 5.00, "subtotal": 95.00},
    {"transactionID": "T00002", "transactionDateTime": "2024-09-02 11:15:00", "market": "IN", "currency": "INR", "customerID": "CUST1002", "productID": "PROD201", "productQty": 1, "productPrice": 30.00, "discount": 0.00, "subtotal": 30.00},
    {"transactionID": "T00003", "transactionDateTime": "2024-09-03 12:30:00", "market": "US", "currency": "USD", "customerID": "CUST1003", "productID": "PROD202", "productQty": 3, "productPrice": 20.00, "discount": 2.00, "subtotal": 58.00},
    {"transactionID": "T00004", "transactionDateTime": "2024-09-04 13:45:00", "market": "IN", "currency": "INR", "customerID": "CUST1004", "productID": "PROD203", "productQty": 4, "productPrice": 15.00, "discount": 1.50, "subtotal": 56.50},
    {"transactionID": "T00005", "transactionDateTime": "2024-09-05 14:00:00", "market": "US", "currency": "USD", "customerID": "CUST1005", "productID": "PROD204", "productQty": 1, "productPrice": 100.00, "discount": 10.00, "subtotal": 90.00},
    {"transactionID": "T00006", "transactionDateTime": "2024-09-06 15:15:00", "market": "IN", "currency": "INR", "customerID": "CUST1006", "productID": "PROD205", "productQty": 5, "productPrice": 12.00, "discount": 0.00, "subtotal": 60.00},
    {"transactionID": "T00007", "transactionDateTime": "2024-09-07 16:30:00", "market": "US", "currency": "USD", "customerID": "CUST1007", "productID": "PROD206", "productQty": 2, "productPrice": 25.00, "discount": 3.00, "subtotal": 47.00},
    {"transactionID": "T00008", "transactionDateTime": "2024-09-08 17:45:00", "market": "IN", "currency": "INR", "customerID": "CUST1008", "productID": "PROD207", "productQty": 3, "productPrice": 10.00, "discount": 0.00, "subtotal": 30.00},
    {"transactionID": "T00009", "transactionDateTime": "2024-09-09 18:00:00", "market": "US", "currency": "USD", "customerID": "CUST1009", "productID": "PROD208", "productQty": 4, "productPrice": 20.00, "discount": 2.00, "subtotal": 76.00},
    {"transactionID": "T00010", "transactionDateTime": "2024-09-10 19:15:00", "market": "IN", "currency": "INR", "customerID": "CUST1010", "productID": "PROD209", "productQty": 2, "productPrice": 35.00, "discount": 5.00, "subtotal": 65.00},
    {"transactionID": "T00011", "transactionDateTime": "2024-09-11 20:30:00", "market": "US", "currency": "USD", "customerID": "CUST1011", "productID": "PROD210", "productQty": 1, "productPrice": 150.00, "discount": 15.00, "subtotal": 135.00},
    {"transactionID": "T00012", "transactionDateTime": "2024-09-12 21:45:00", "market": "IN", "currency": "INR", "customerID": "CUST1012", "productID": "PROD211", "productQty": 3, "productPrice": 18.00, "discount": 0.00, "subtotal": 54.00},
    {"transactionID": "T00013", "transactionDateTime": "2024-09-13 22:00:00", "market": "US", "currency": "USD", "customerID": "CUST1013", "productID": "PROD212", "productQty": 2, "productPrice": 45.00, "discount": 4.00, "subtotal": 86.00},
    {"transactionID": "T00014", "transactionDateTime": "2024-09-14 23:15:00", "market": "IN", "currency": "INR", "customerID": "CUST1014", "productID": "PROD213", "productQty": 4, "productPrice": 20.00, "discount": 2.50, "subtotal": 75.50},
    {"transactionID": "T00015", "transactionDateTime": "2024-09-15 00:30:00", "market": "US", "currency": "USD", "customerID": "CUST1015", "productID": "PROD214", "productQty": 1, "productPrice": 200.00, "discount": 20.00, "subtotal": 180.00},
    {"transactionID": "T00016", "transactionDateTime": "2024-09-16 01:45:00", "market": "IN", "currency": "INR", "customerID": "CUST1016", "productID": "PROD215", "productQty": 5, "productPrice": 14.00, "discount": 1.00, "subtotal": 66.00},
    {"transactionID": "T00017", "transactionDateTime": "2024-09-17 02:00:00", "market": "US", "currency": "USD", "customerID": "CUST1017", "productID": "PROD216", "productQty": 3, "productPrice": 30.00, "discount": 3.00, "subtotal": 81.00},
    {"transactionID": "T00018", "transactionDateTime": "2024-09-18 03:15:00", "market": "IN", "currency": "INR", "customerID": "CUST1018", "productID": "PROD217", "productQty": 2, "productPrice": 50.00, "discount": 0.00, "subtotal": 100.00},
    {"transactionID": "T00019", "transactionDateTime": "2024-09-19 04:30:00", "market": "US", "currency": "USD", "customerID": "CUST1019", "productID": "PROD218", "productQty": 4, "productPrice": 25.00, "discount": 2.00, "subtotal": 98.00},
    {"transactionID": "T00020", "transactionDateTime": "2024-09-20 05:45:00", "market": "IN", "currency": "USD", "customerID": "CUST1020", "productID": "PROD219", "productQty": 3, "productPrice": 28.00, "discount": 3.00, "subtotal": 78.60},
]

# Create DataFrame from transaction data
df = spark.createDataFrame(data)
df = df.select("transactionID", "transactionDateTime", "market", "currency", "customerID", "productID", "productQty", "productPrice", "discount", "subtotal")

In [None]:
df.show()

+-------------+-------------------+------+--------+----------+---------+----------+------------+--------+--------+
|transactionID|transactionDateTime|market|currency|customerID|productID|productQty|productPrice|discount|subtotal|
+-------------+-------------------+------+--------+----------+---------+----------+------------+--------+--------+
|       T00001|2024-09-01 10:00:00|    US|     USD|  CUST1001|  PROD200|         2|        50.0|     5.0|    95.0|
|       T00002|2024-09-02 11:15:00|    IN|     INR|  CUST1002|  PROD201|         1|        30.0|     0.0|    30.0|
|       T00003|2024-09-03 12:30:00|    US|     USD|  CUST1003|  PROD202|         3|        20.0|     2.0|    58.0|
|       T00004|2024-09-04 13:45:00|    IN|     INR|  CUST1004|  PROD203|         4|        15.0|     1.5|    56.5|
|       T00005|2024-09-05 14:00:00|    US|     USD|  CUST1005|  PROD204|         1|       100.0|    10.0|    90.0|
|       T00006|2024-09-06 15:15:00|    IN|     INR|  CUST1006|  PROD205|        

## **Profiler**

In [None]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: customerID: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 20,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 20
    },
    "histogram": [
        [
            "CUST1008",
            1,
            0.05
        ],
        [
            "CUST1017",
            1,
            0.05
        ],
        [
            "CUST1019",
            1,
            0.05
        ],
        [
            "CUST1006",
            1,
            0.05
        ],
        [
            "CUST1013",
            1,
            0.05
        ],
        [
            "CUST1018",
            1,
            0.05
        ],
        [
            "CUST1002",
            1,
            0.05
        ],
        [
            "CUST1003",
            1,
            0.05
        ],
        [
            "CUST1010",
            1,
         

## **Analyzer**

In [None]:
from pydeequ.analyzers import *

analysisResult = (AnalysisRunner(spark)
                    .onData(df)
                    .addAnalyzer(Size())                          # No of Records in our data
                    .addAnalyzer(Completeness("transactionID"))   # % of non-NULL Records in transactionID column
                    .addAnalyzer(Completeness("customerID"))      # % of non-NULL Records in customerID column
                    .addAnalyzer(Completeness("productID"))       # % of non-NULL Records in productID column
                    .addAnalyzer(Uniqueness(["transactionID"]))   # transactionID column should be unique - primary key, shows %
                    .addAnalyzer(Compliance("market is from predefined values", "market IN ('US', 'IN')"))        # market column should not have any value other than IN or US
                    .addAnalyzer(Compliance("market-currency match for IN", "currency = 'INR'", "market = 'IN'")) # for IN, currency should be INR
                    .addAnalyzer(Compliance("market-currency match for US", "currency = 'USD'", "market = 'US'")) # for US, currency should be USD
                    .addAnalyzer(Compliance("maximum discount is 10 for IN", "discount <= 10", "market = 'IN'"))  # discount check for IN
                    .addAnalyzer(Compliance("maximum discount is 12 for US", "discount <= 12", "market = 'US'"))  # discount check for US
                    .run()
                )

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

In [None]:
from pyspark.sql.functions import col
analysisResult_df.orderBy(col("entity").desc(), "name").show(truncate=False)

+-------+--------------------------------+---------------------------------+-----+
|entity |instance                        |name                             |value|
+-------+--------------------------------+---------------------------------+-----+
|Dataset|*                               |Size                             |20.0 |
|Column |productID                       |Completeness                     |1.0  |
|Column |customerID                      |Completeness                     |1.0  |
|Column |transactionID                   |Completeness                     |1.0  |
|Column |market is from predefined values|Compliance                       |1.0  |
|Column |maximum discount is 10 for IN   |Compliance (where: market = 'IN')|1.0  |
|Column |market-currency match for IN    |Compliance (where: market = 'IN')|0.9  |
|Column |maximum discount is 12 for US   |Compliance (where: market = 'US')|0.8  |
|Column |market-currency match for US    |Compliance (where: market = 'US')|1.0  |
|Col

## **Constraint Suggestion**

In [None]:
import json
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(customerID,None,None))",
      "column_name": "customerID",
      "current_value": "Completeness: 1.0",
      "description": "'customerID' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"customerID\")"
    },
    {
      "constraint_name": "UniquenessConstraint(Uniqueness(List(customerID),None,None))",
      "column_name": "customerID",
      "current_value": "ApproxDistinctness: 1.0",
      "description": "'customerID' is unique",
      "suggesting_rule": "UniqueIfApproximatelyUniqueRule()",
      "rule_description": "If the ratio of approximate num distinct values in a column is close to the number of records (within the error of the HLL sketch), we suggest a UNIQUE constraint",
      "code_for_constraint": ".isUnique(\"cust

## **Constraint Verification**

In [None]:
from pydeequ.checks import *
from pydeequ.verification import *

checks = Check(spark, CheckLevel.Warning, "Transaction Data Quality")

checkResult = (VerificationSuite(spark)
    .onData(df)
    .addCheck(
        checks.hasSize(lambda x: x >= 20)
        .isComplete("transactionID")
        .isComplete("customerID")
        .isComplete("productID")
        .isUnique("transactionID")
        .satisfies("market IN ('US', 'IN')", "market is from predefined values")
    ).run()
  )

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)

In [None]:
checkResult_df.show(truncate=False)

+------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
|check                   |check_level|check_status|constraint                                                                                                        |constraint_status|constraint_message                                   |
+------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
+------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+



In [None]:
from pydeequ.checks import *
from pydeequ.verification import *

warning_checks = Check(spark, CheckLevel.Warning, "Transaction Data Quality")
error_checks = Check(spark, CheckLevel.Error, "Transaction Data Quality")

checkResult = (VerificationSuite(spark)
    .onData(df)
    .addCheck(
         warning_checks.hasSize(lambda x: x >= 20)
        .isComplete("transactionID")
        .isComplete("customerID")
        .isComplete("productID")
        .satisfies("market IN ('US', 'IN')", "market is from predefined values")
    )
    .addCheck(
        error_checks.isUnique("transactionID")
        .satisfies("currency = 'INR' AND market = 'IN'", "market-currency match for IN")
    )
    .run()
  )

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)

In [None]:
checkResult_df.show(truncate=False)

+------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
|check                   |check_level|check_status|constraint                                                                                                        |constraint_status|constraint_message                                   |
+------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
|Transaction Data Quality|Error      |Error       |UniquenessConstraint(Uniqueness(List(transactionID),None,None))                                                   |Success          |                                                     |
|Transaction Data Quality|Error      |Error 

In [None]:
from pyspark.sql.functions import col, split

# Filter out failing constraints and parse their details
failed_conditions = (checkResult_df
                     .filter((col("check_status") == "Error") & (col("constraint_status") == "Failure"))
                     .select("constraint")
                     .withColumn("failed_constraint_type", split(col("constraint"), '\\(')[1])
                     .withColumn("failed_constraint_expressions", split(col("constraint"), '\\(')[2])
                     )

In [None]:
failed_conditions.show(truncate=False)

+----------------------+--------------------------------------------------------------------------------------------+
|failed_constraint_type|failed_constraint_expression                                                                |
+----------------------+--------------------------------------------------------------------------------------------+
|ComplianceConstraint  |Compliance(market-currency match for IN,currency = 'INR' AND market = 'IN',None,List(),None)|
+----------------------+--------------------------------------------------------------------------------------------+



In [None]:
from pyspark.sql.functions import regexp_extract

constraint_type_pattern = r'([A-Za-z]+)\('
constraint_expression_pattern = r'\(.*?,([^,]+)'

# Extract constraint type and expression using regex
failed_conditions = (
    checkResult_df
    .filter((col("check_status") == "Error") & (col("constraint_status") == "Failure"))
    .withColumn("failed_constraint_type", regexp_extract(col("constraint"), constraint_type_pattern, 1))
    .withColumn("failed_constraint_expression", regexp_extract(col("constraint"), constraint_expression_pattern, 1))
    .select("failed_constraint_type", "failed_constraint_expression")
)

# Show the parsed results
failed_conditions.show(truncate=False)


+----------------------+----------------------------------+
|failed_constraint_type|failed_constraint_expression      |
+----------------------+----------------------------------+
|ComplianceConstraint  |currency = 'INR' AND market = 'IN'|
+----------------------+----------------------------------+



In [None]:
result_dict = dict(failed_conditions.rdd.map(lambda row: (row[0], row[1])).collect())

In [None]:
result_dict

{'ComplianceConstraint': "currency = 'INR' AND market = 'IN'"}

In [None]:
newData = [
    {"id": "JSNOW001", "firstName": "John", "middleName": None, "lastName": "Snow", "kingdom": "Winterfell"},
    {"id": "DANY0059", "firstName": "Daenerys", "middleName": None, "lastName": "Targaryen", "kingdom": "Kings Landing"},
    {"id": None, "firstName": "Oberyn", "middleName": "TheRedViper", "lastName": "Martell", "kingdom": "Dorne"},
    {"id": "CLGN0089", "firstName": "Sandor", "middleName": "TheHound", "lastName": "Clegane", "kingdom": None}
]

newDF = spark.createDataFrame(newData)
newDF.select("id", "firstName", "middleName", "lastName", "kingdom").show()

+--------+---------+-----------+---------+-------------+
|      id|firstName| middleName| lastName|      kingdom|
+--------+---------+-----------+---------+-------------+
|JSNOW001|     John|       NULL|     Snow|   Winterfell|
|DANY0059| Daenerys|       NULL|Targaryen|Kings Landing|
|    NULL|   Oberyn|TheRedViper|  Martell|        Dorne|
|CLGN0089|   Sandor|   TheHound|  Clegane|         NULL|
+--------+---------+-----------+---------+-------------+



In [None]:
from pydeequ.checks import *
from pydeequ.verification import *

checks = Check(spark, CheckLevel.Warning, "GoT Realm")

checkResult = (VerificationSuite(spark)
    .onData(newDF)
    .addCheck(
        checks.hasCompleteness("id", lambda x: x >= 0.9)
    ).run()
  )

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)

In [None]:
checkResult_df.show(truncate=False)

+---------+-----------+------------+--------------------------------------------------+-----------------+-----------------------------------------------------+
|check    |check_level|check_status|constraint                                        |constraint_status|constraint_message                                   |
+---------+-----------+------------+--------------------------------------------------+-----------------+-----------------------------------------------------+
+---------+-----------+------------+--------------------------------------------------+-----------------+-----------------------------------------------------+



In [None]:
from pydeequ.checks import *
from pydeequ.verification import *

warning_checks = Check(spark, CheckLevel.Warning, "Transaction DQ - Low Priority Check")
error_checks = Check(spark, CheckLevel.Error, "Transaction DQ - High Priority Check")

checkResult = (VerificationSuite(spark)
    .onData(df)
    .addCheck(
         warning_checks.isComplete("productID")
         .hasCompleteness("customerID", lambda x: x >= 0.9)
    )
    .addCheck(
        error_checks.isComplete("transactionID")
        .satisfies("currency = 'INR' AND market = 'IN'", "market-currency match for IN")
    )
    .run()
  )

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+------------------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
|check                               |check_level|check_status|constraint                                                                                                        |constraint_status|constraint_message                                   |
+------------------------------------+-----------+------------+------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
|Transaction DQ - High Priority Check|Error      |Error       |CompletenessConstraint(Completeness(transactionID,None,None))                                                     |Success          |                                                   

#### Interpretation

* transactionID should not have any NULL - Not restrictive, even if the check fails let the record flow

* transactionID should be 100% unique - Not restrictive, even if the check fails let the record flow

* customerID should not have more than 10% NULL - Not restrictive, even if the check fails let the record flow

* For country = IN, the currency must be INR - Restrictive, if the check fails quarantine such records

