# DQDL Wrappers for PyDeequ

This notebook expands upon the [basic_example](basic_example.ipynb) to define a Class having a set of methods similar to the boto3 Glue.Class.   This provides a custom implementation of DLDQ-like functionality with alternate data stores in DynamoDb and S3, while supporting easy future migration to Glue ETL.


### Sample Scripts
First execute Initialization and Definition cells below

In [None]:
dq = SimpleDQ()

ruleset = {
 "Name": "cstg_agency_example",
 "ClientToken": "string",
 "Description": "just a simple sample ruleset",
 "Ruleset": "HasSize >= 50, HasMin \"penaltyassessmentdays\" >= 90, IsComplete \"agencyname\", IsUnique \"agencyname\", IsContainedIn \"applypenalty\" in [\"Y\",\"N\"], IsNonNegative \"interestrate\"",
 "Tags": {
  "optional": "stuff"
 },
 "TargetTable": {
  "CatalogId": "AwsDataCatalog",
  "DatabaseName": "dtl-prd-smpl0-g2",
  "TableName": "cstg_agency"
 }
}

dq.update_data_quality_ruleset(**ruleset)


In [122]:
dq = SimpleDQ()

ruleset = dq.get_data_quality_ruleset( Name = 'cstg_agency_example' )
ruleset


Getting cstg_agency_example


{'TargetTable': {'DatabaseName': 'dtl-prd-smpl0-g2',
  'CatalogId': 'AwsDataCatalog',
  'TableName': 'cstg_agency'},
 'Description': 'just a simple sample ruleset',
 'Ruleset': 'HasSize >= 50, HasMin "penaltyassessmentdays" >= 90, IsComplete "agencyname", IsUnique "agencyname", IsContainedIn "applypenalty" in ["Y","N"], IsNonNegative "interestrate"',
 'ClientToken': 'string',
 'Tags': {'optional': 'stuff'},
 'Name': 'cstg_agency_example'}

In [168]:
dq = SimpleDQ()

df_results = dq.start_data_quality_ruleset_evaluation_run( **ruleset)




s3://wc2h-dtl-prd-datalake/PARQUET/cstg_agency
Getting cstg_agency_example
{
  "Type": "HasSize",
  "ColName": "",
  "Expression": ">= 50",
  "Lambda": "<function parse_dqdl_rule.<locals>.<lambda> at 0x7f586ebcd480>",
  "Text": "HasSize >= 50"
}
{
  "Type": "HasMin",
  "ColName": "penaltyassessmentdays",
  "Expression": ">= 90",
  "Lambda": "<function parse_dqdl_rule.<locals>.<lambda> at 0x7f5831eb7eb0>",
  "Text": "HasMin \"penaltyassessmentdays\" >= 90"
}
{
  "Type": "IsComplete",
  "ColName": "agencyname",
  "Expression": "",
  "Lambda": null,
  "Text": "IsComplete \"agencyname\""
}
{
  "Type": "IsUnique",
  "ColName": "agencyname",
  "Expression": "",
  "Lambda": null,
  "Text": "IsUnique \"agencyname\""
}
{
  "Type": "IsContainedIn",
  "ColName": "applypenalty",
  "Expression": "in [\"Y\",\"N\"]",
  "Lambda": [
    "Y",
    "N"
  ],
  "Text": "IsContainedIn \"applypenalty\" in [\"Y\",\"N\"]"
}
{
  "Type": "IsNonNegative",
  "ColName": "interestrate",
  "Expression": "",
  "Lambda"



In [169]:
df_results

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message,rule,dqrunid
0,cstg_agency_example,Error,Error,SizeConstraint(Size(None)),Success,,HasSize >= 50,230831-162949040150-interactive
1,cstg_agency_example,Error,Error,MinimumConstraint(Minimum(penaltyassessmentday...,Failure,Value: 0.0 does not meet the constraint requir...,"HasMin ""penaltyassessmentdays"" >= 90",230831-162949040150-interactive
2,cstg_agency_example,Error,Error,CompletenessConstraint(Completeness(agencyname...,Success,,"IsComplete ""agencyname""",230831-162949040150-interactive
3,cstg_agency_example,Error,Error,UniquenessConstraint(Uniqueness(List(agencynam...,Failure,Value: 0.0 does not meet the constraint requir...,"IsUnique ""agencyname""",230831-162949040150-interactive
4,cstg_agency_example,Error,Error,ComplianceConstraint(Compliance(applypenalty c...,Success,,"IsContainedIn ""applypenalty"" in [""Y"",""N""]",230831-162949040150-interactive
5,cstg_agency_example,Error,Error,ComplianceConstraint(Compliance(interestrate i...,Success,,"IsNonNegative ""interestrate""",230831-162949040150-interactive


### Initialization

In [19]:
%%bash
# cold start
pip install pydeequ
pip install 'awswrangler[redshift]'

Collecting awswrangler[redshift]
  Obtaining dependency information for awswrangler[redshift] from https://files.pythonhosted.org/packages/eb/7f/3f0296b736de88a8b5c918e41053c69948a200939106a4e1fd64c5925c6a/awswrangler-3.3.0-py3-none-any.whl.metadata
  Using cached awswrangler-3.3.0-py3-none-any.whl.metadata (23 kB)
Collecting redshift-connector<3.0.0,>=2.0.0 (from awswrangler[redshift])
  Obtaining dependency information for redshift-connector<3.0.0,>=2.0.0 from https://files.pythonhosted.org/packages/63/86/fb94423bc8c385fdce1bfe2afe720bf415d9df56e137e4f321e13844c498/redshift_connector-2.0.913-py3-none-any.whl.metadata
  Downloading redshift_connector-2.0.913-py3-none-any.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.5/61.5 kB[0m [31m933.1 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
Collecting scramp<1.5.0,>=1.2.0 (from redshift-connector<3.0.0,>=2.0.0->awswrangler[redshift])
  Downloading scramp-1.4.4-py3-none-any.whl (13 kB)
Collecting lx

In [None]:
# cold start for SageMaker on WC2H -- unzip dependencies from local file, since Maven is blocked
import zipfile
import os

S_rootdir = os.getcwd()

with zipfile.ZipFile( f"{S_rootdir}/common/ivy2cache.zip" ) as z:
    z.extractall( f"{os.environ['HOME']}/.ivy2" )


In [32]:
import os 
os.environ['AWS_DEFAULT_REGION'] = 'us-gov-west-1'
os.environ["SPARK_VERSION"] = '3.0'

import awswrangler as wr
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items # (https://stackoverflow.com/questions/75926636/databricks-issue-while-creating-spark-data-frame-from-pandas)


In [2]:
from pyspark.sql import SparkSession, Row, DataFrame

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

from pyspark import SparkConf
conf = (SparkConf()
        .set('fs.s3a.endpoint', 's3-us-gov-west-1.amazonaws.com')
       )

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config( conf=conf )
    .getOrCreate())



:: loading settings :: url = jar:file:/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3fbdd046-4fb5-4036-9175-b6f3ddfb10d5;1.0
	confs: [default]
	found com.amazon.deequ#deequ;1.2.2-spark-3.0 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found org.scala-lang#scala-reflect;2.12.1 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1

23/08/30 13:11:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/30 13:11:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/08/30 13:11:26 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


### Function and Class Definitions
Will be moved to common module for access from either notebook or ETL jobs.

In [98]:
def parse_dqdl_rule(rule_text): # ToDo move to common module
    """ Transform a DQDL-like rule from string to dict """
    import json
    import re
    
    s = rule_text.split(' ', 2)

    rule = {
        'Type' : s[0],
        'ColName' : '',
        'Expression' : '',
        'Lambda' : None,
        'Text' : rule_text
    }
    if '"' in s[1]:
        rule['ColName'] = s[1].replace('"','')
        if len(s) == 3:
            rule['Expression'] = s[2]
    else:
        rule['Expression'] = f"{s[1]} {s[2]}"

    # transform the Expression into a lambda assertion
    if rule['Expression'] == '':
        pass
    
    elif re.search("[<=>]", rule['Expression']):
        xpr = rule['Expression'].split()
        op =  xpr[0]
        val = float(xpr[1])
        if op == "=":
            rule['Lambda'] = lambda x: x == val
        elif op == ">":
            rule['Lambda'] = lambda x: x > val
        elif op == "<":
            rule['Lambda'] = lambda x: x < val
        elif op == ">=":
            rule['Lambda'] = lambda x: x >= val
        elif op == "<=":
            rule['Lambda'] = lambda x: x <= val
        
    elif rule['Expression'].startswith('between'):
        xpr = rule['Expression'].split()
        lo = xpr[1]
        hi = xpr[3]
        rule['Lambda'] = lambda x: lo < x < hi
        
    elif rule['Expression'].startswith('in'): 
        xpr = rule['Expression'].split() # no spaces between list values, please!
        inlist = xpr[1][1:-1].replace('"','').split(',')
        #rule['Lambda'] = lambda x: x in inlist
        rule['Lambda'] = inlist
        
    else:
        print("Can't Parse Expression")
    
    print(json.dumps(rule, indent=2, default=str))

    return rule

In [96]:
def run_pydeequ_checks( df, ruleset_name ): # ToDo move to common module
    from pydeequ.checks import Check,CheckLevel
    from pydeequ.verification import VerificationSuite,VerificationResult
    
    dq = SimpleDQ()
    ruleset = dq.get_data_quality_ruleset( Name = ruleset_name ) 
                       
    check = Check(spark, CheckLevel.Error, ruleset_name)

    rules_list = ruleset['Ruleset'].split(', ')
    
    for rule_text in rules_list:
        rule = parse_dqdl_rule(rule_text)

        if rule['Type'] == 'HasSize':
            check.hasSize( rule['Lambda'] )
        elif rule['Type'] == 'HasMin':
            check.hasMin( rule['ColName'], rule['Lambda'] )
        elif rule['Type'] == 'IsComplete':
            check.isComplete( rule['ColName'] )        
        elif rule['Type'] == 'IsUnique':
            check.isUnique( rule['ColName'] )        
        elif rule['Type'] == 'IsContainedIn':  
            check.isContainedIn(rule['ColName'], rule['Lambda'])
        elif rule['Type'] == 'IsNonNegative':
            check.isNonNegative( rule['ColName'] )
        else:
            print( f"Skipping Check {rule['Type']} -- Rule Type  is not implemented.")
        #break

    checkResult = VerificationSuite(spark).onData(df).addCheck(check).run()
    checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    #checkResult_df.show(truncate=False)
                                          
    return checkResult_df

In [167]:
class SimpleDQ: # ToDo move to common module
    ''' Data Quality functionality similar to boto3 Glue.Client '''
    import boto3
    glue_client = boto3.client('glue')
    ddb_resource = boto3.resource('dynamodb')
    ruleset_table = ddb_resource.Table('dtl-prd-SMPL0-dqrulesets') # dynamodb table for PyDeeQu rulesets    
    ruleset_runs_table = ddb_resource.Table('dtl-prd-SMPL0-dqruleset-eval-runs') # dynamodb table for PyDeeQu rulesets    
    mode='PyDeeQu'  

    def __init__(self, mode='PyDeeQu', **kwargs) -> None:
        if mode == 'GlueDQ':
            # pass-thru wrapper for boto3 class Glue.Client *data_quality* methods
            print('Glue Data Quality implementation pending availability on Govcloud')
        self.mode = mode


    def batch_get_data_quality_result(self, **kwargs): pass # ToDo 
    def cancel_data_quality_rule_recommendation_run(self, **kwargs): pass # ToDo 
    def cancel_data_quality_ruleset_evaluation_run(self, **kwargs): pass # ToDo

    def create_data_quality_ruleset( self, **kwargs):
        if type(kwargs['Ruleset']) == list:
            ", ".join(kwargs['Ruleset'])

        if self.mode == "GlueDQ":
            response = self.glue_client.create_data_quality_ruleset(
                Name = kwargs['Name'],       # str Reqd
                Ruleset = kwargs['Ruleset'], # str Reqd 
                Description = kwargs['Description'],
                Tags = kwargs['Tags'],      # dict
                TargetTable = kwargs['TargetTable'], # dict
                ClientToken = kwargs['ClientTokens']
            )
            return response   # { 'Name': 'string' }
        elif self.mode == "PyDeeQu":
            self.ruleset_table.put_item(
                Item = kwargs
            )
            return { 'Name' : kwargs['Name'] }


    def delete_data_quality_ruleset(self, **kwargs): pass # ToDo

    def get_data_quality_result(self, **kwargs): pass # ToDo
    
    def get_data_quality_rule_recommendation_run(self, **kwargs): pass # ToDo

    def get_data_quality_ruleset(self, **kwargs): 
        print(f"Getting {kwargs['Name']}")
        if self.mode == "GlueDQ":
            response = self.glue_client.get_data_quality_ruleset(
                Name=kwargs['Name']
            )
            return response
        elif self.mode == "PyDeeQu":
            response = self.ruleset_table.get_item(
                Key = { 'Name' : kwargs['Name'] }
            )
            return response['Item']
       
    def get_data_quality_ruleset_evaluation_run(self, **kwargs): pass # ToDo
    def list_data_quality_results(self, **kwargs): pass # ToDo
    def list_data_quality_rule_recommendation_runs(self, **kwargs): pass # ToDo
    def list_data_quality_ruleset_evaluation_runs(self, **kwargs): pass # ToDo
    def list_data_quality_rulesets(self, **kwargs): pass # ToDo
    def start_data_quality_rule_recommendation_run(self, **kwargs): pass # ToDo
    
    
    def start_data_quality_ruleset_evaluation_run(self, **kwargs): 
        
        if self.mode == "GlueDQ":
            print('Glue Data Quality implementation pending availability on Govcloud')
            """
            response = self.glue_client.get_data_quality_ruleset( 
                DataSource = kwargs['DataSource'],    # dict Reqd
                Role = kwargs['Role'],                # str Reqd
                RulesetNames = kwargs['RulesetNames'],# list [str] Reqd,
                NumberOfWorkers = kwargs['NumberOfWorkers'], # int default=5
                Timeout = kwargs['Timeout'],          # int default=2880
                ClientToken = kwargs['ClientToken'],
                AdditionalRunOptions = kwargs['AdditionalRunOptions'],   # dict
                AdditionalDataSources = kwargs['AdditionalDataSources']  # dict
            )
            return response # { 'RunId': 'string' }
            """
        elif self.mode == "PyDeeQu":
            
            
            # passed into Glue ETL job
            process_parms = {
                'DataSource' : {
                    'GlueTable' : {
                        'DatabaseName' : 'dtl-prd-smpl0-g2',
                        'TableName' : 'cstg_agency'
                    }
                },
                'RulesetNames' : [
                    'cstg_agency_example',
                ]
            }
            
            # Three options for DataSource ...
            if 'GlueTable' in process_parms['DataSource'].keys(): # GlueDQ
                table = process_parms['DataSource']['GlueTable']
                s3_url = wr.catalog.get_table_location( database=table['DatabaseName'], table=table['TableName'])

            elif 'S3Url' in process_parms['DataSource'].keys():   # Alt #1
                s3_url = process_parms['DataSource']['S3Url']
                
            elif 'Athena' in process_parms['DataSource'].keys():  # Alt #2
                s3_url = None
                sql = process_parms['Athena']['SQL']
                dbname = process_parms['Athena']['DataBase']


            # 'lightweight' interactive option for logic that will be implemented in Glue ETL job
            if s3_url:
                #df_pd = wr.s3.read_parquet(s3_url)
                print(s3_url)
                df_pd = spark.read.parquet(s3_url.replace( 's3://', 's3a://')+'/*/*')
            else:
                df_pd = wr.athena.read_sql_query(sql, database=dbname)
   
            if isinstance(df_pd, pd.DataFrame):
                print('Convert Pandas to Spark')
                df = spark.createDataFrame(df_pd) 
            else:
                df = df_pd
 
            for ruleset_name in process_parms['RulesetNames']:
                df_results = run_pydeequ_checks( df, ruleset_name )  
                # ToDo aggregate df_results
            
            df_pd1 = df_results.toPandas()
            for index,rule in enumerate(ruleset_list):
                df_pd1.at[index, 'rule'] = rule

            import datetime
            dqrunid = f'{datetime.datetime.now().strftime("%y%m%d-%H%M%S%f")}-interactive'
            df_pd1['dqrunid'] = dqrunid
            
            return df_pd1
    
    def update_data_quality_ruleset(self, **kwargs): 
        if self.mode == "GlueDQ":
            response = self.glue_client.update_data_quality_ruleset(
                Name = kwargs['Name'],
                Description = kwargs['Description'],
                Ruleset = kwargs['Ruleset']
            )
            return response
        elif self.mode == "PyDeeQu":
            # 'put_item' works for both create and update
            response = self.create_data_quality_ruleset(**kwargs)

