# Basic Example
Here is a basic example of running a `VerificationSuite` with a couple `checks` and then filtering them based on their results. 

This notebook copied from the [PyDeequ tutorial](https://github.com/awslabs/python-deequ/blob/master/tutorials/basic_example.ipynb), has been enhanced to produce the same result using DQDL-like syntax to dynamically generate and run a list of 'check' constraints.

We'll start by creating a Spark session and a small sample dataframe.

In [1]:
%%bash
pip install pydeequ

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com


In [2]:
# unzip dependencies from local file, since Maven is blocked from within WC2(H) VPC
import zipfile
import os

S_rootdir = os.getcwd()

with zipfile.ZipFile( f"{S_rootdir}/common/ivy2cache.zip" ) as z:
    z.extractall( f"{os.environ['HOME']}/.ivy2" )


In [3]:
import os 
os.environ['AWS_DEFAULT_REGION'] = 'us-gov-west-1'
os.environ["SPARK_VERSION"] = '3.0'

In [4]:
from pyspark.sql import SparkSession, Row, DataFrame

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

from pyspark import SparkConf
conf = (SparkConf()
        .set('fs.s3a.endpoint', 's3-us-gov-west-1.amazonaws.com')
       )

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config( conf=conf )
    .getOrCreate())



:: loading settings :: url = jar:file:/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-21ad6e41-07ab-4404-b42b-4e0f95bfa383;1.0
	confs: [default]
	found com.amazon.deequ#deequ;1.2.2-spark-3.0 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found org.scala-lang#scala-reflect;2.12.1 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1

23/08/25 17:25:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/25 17:25:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# create a trivial dataframe to play with ...
df = spark.sparkContext.parallelize([
    Row(a="foo", b=1, c=5),
    Row(a="bar", b=2, c=6),
    Row(a="baz", b=3, c=None)]).toDF()
df.show()

                                                                                

+---+---+----+
|  a|  b|   c|
+---+---+----+
|foo|  1|   5|
|bar|  2|   6|
|baz|  3|null|
+---+---+----+



Now, we will be importing the necessary `PyDeequ` modules for running a VerificationSuite with Checks. We will be checking the following: 

- does `df` have a size of at least 3? 
- does the `b` column have a minimum value of 0? 
- is the `c` column complete? 
- is the `a` column unique? 
- are the values of `a` column contained in "foo", "bar", and "baz"? 
- are the values in `b` colum non-negative? 

Once these checks are run, we'll display out the dataframe to see the results!


In [6]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

Python Callback server started!
+----------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check           |check_level|check_status|constraint                                                                                                 |constraint_status|constraint_message                                                 |
+----------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|Integrity checks|Error      |Error       |SizeConstraint(Size(None))                                                                                 |Success          |                                                                   |
|Integrity check



It's nice to see those as a dataframe, but we noticed a couple **Failures** in the `constraint_status` column! Let's filter them by accessing the `checkResults` property of our run.

In [7]:
if checkResult.status == "Success": 
    print('The data passed the test, everything is fine!')

else:
    print('We found errors in the data, the following constraints were not satisfied:')
    
    for check_json in checkResult.checkResults:
        if check_json['constraint_status'] != "Success": 
            print(f"\t{check_json['constraint']} failed because: {check_json['constraint_message']}")

We found errors in the data, the following constraints were not satisfied:
	MinimumConstraint(Minimum(b,None)) failed because: Value: 1.0 does not meet the constraint requirement!
	CompletenessConstraint(Completeness(c,None)) failed because: Value: 0.6666666666666666 does not meet the constraint requirement!


## Rules-based Dynamic Check
Now we will reproduce the same result using a 'metadata' layer of rules following [DQDL-like syntax](https://docs.aws.amazon.com/glue/latest/dg/dqdl.html).

In [1]:
# DQDL-like Rule List
rules_list = [
    'HasSize >= 3',
    'HasMin "b" = 0',
    'IsComplete "c"',
    'IsUnique "a"',
    'IsContainedIn "a" in ["foo","bar","baz"] ',
    'IsNonNegative "b"'
]


In [9]:
def parse_dqdl_rule(rule_text):
    import json
    import re
    
    s = rule_text.split(' ', 2)

    rule = {
        'Type' : s[0],
        'ColName' : '',
        'Expression' : '',
        'Lambda' : None,
        'Text' : rule_text
    }
    if '"' in s[1]:
        rule['ColName'] = s[1].replace('"','')
        if len(s) == 3:
            rule['Expression'] = s[2]
    else:
        rule['Expression'] = f"{s[1]} {s[2]}"

    if re.search("[<=>]", rule['Expression']):
        xpr = rule['Expression'].split()
        op =  xpr[0]
        val = float(xpr[1])
        if op == "=":
            rule['Lambda'] = lambda x: x == val
        elif op == ">":
            rule['Lambda'] = lambda x: x > val
        elif op == "<":
            rule['Lambda'] = lambda x: x < val
        elif op == ">=":
            rule['Lambda'] = lambda x: x >= val
        elif op == "<=":
            rule['Lambda'] = lambda x: x <= val
        
    elif rule['Expression'].startswith('between'):
        xpr = rule['Expression'].split()
        lo = xpr[1]
        hi = xpr[3]
        rule['Lambda'] = lambda x: lo < x < hi
        
    #elif rule['Expression'].startswith('in'):  # ToDo
    else:
        print("Can't Parse Expression")
    
    print(json.dumps(rule, indent=2, default=str))

    return rule

In [10]:
    
check = Check(spark, CheckLevel.Error, "DQDL Rules")

for rule_text in rules_list:
    rule = parse_dqdl_rule(rule_text)

    if rule['Type'] == 'HasSize':
        check.hasSize( rule['Lambda'] )
    elif rule['Type'] == 'HasMin':
        check.hasMin( rule['ColName'], rule['Lambda'] )
    elif rule['Type'] == 'IsComplete':
        check.isComplete( rule['ColName'] )        
    elif rule['Type'] == 'IsUnique':
        check.isUnique( rule['ColName'] )        
    #elif rule['Type'] == 'IsContainedIn':   # ToDo
    #    check.isContainedIn(rule['ColName'], ["foo","bar","baz"])
    elif rule['Type'] == 'IsNonNegative':
        check.isNonNegative( rule['ColName'] )
    else:
        print( f"Skipping Check {rule['Type']} -- Rule Type  is not implemented.")
    #break

checkResult = VerificationSuite(spark).onData(df).addCheck(check).run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)


{
  "Type": "HasSize",
  "ColName": "",
  "Expression": ">= 3",
  "Lambda": "<function parse_dqdl_rule.<locals>.<lambda> at 0x7f3c422ace50>",
  "Text": "HasSize >= 3"
}
{
  "Type": "HasMin",
  "ColName": "b",
  "Expression": "= 0",
  "Lambda": "<function parse_dqdl_rule.<locals>.<lambda> at 0x7f3c42476e60>",
  "Text": "HasMin \"b\" = 0"
}
Can't Parse Expression
{
  "Type": "IsComplete",
  "ColName": "c",
  "Expression": "",
  "Lambda": null,
  "Text": "IsComplete \"c\""
}
Can't Parse Expression
{
  "Type": "IsUnique",
  "ColName": "a",
  "Expression": "",
  "Lambda": null,
  "Text": "IsUnique \"a\""
}
Can't Parse Expression
{
  "Type": "IsContainedIn",
  "ColName": "a",
  "Expression": "in [\"foo\", \"bar\", \"baz\"] ",
  "Lambda": null,
  "Text": "IsContainedIn \"a\" in [\"foo\", \"bar\", \"baz\"] "
}
Skipping Check IsContainedIn -- Rule Type  is not implemented.
Can't Parse Expression
{
  "Type": "IsNonNegative",
  "ColName": "b",
  "Expression": "",
  "Lambda": null,
  "Text": "IsNo