# Deequ - Data Validation and Data Profiling

## Contents

- [Introduction](#introduction)
- [Installation](#installation)
- [Setup](#setup)
- [DataLoader](#dataloader)
- [Analyzers](#analyzers)
- [Data Profling](data-profiling)
- [Constraint Suggestion](#constraint-suggestion)
- [Constraint Verification](#constraint-verification)
- [Metrics Repositories](#metrics-repositories)
- [Anomaly Detection](#anomaly-detetcion)


## Introduction

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.

**Purpose:**
- Deequ's purpose is to "unit-test" data to find errors early, before the data gets fed to consuming systems or machine learning algorithms

**Capabilities:**
- Suitable for large data files
- It works on tabular data, e.g., CSV files, database tables, logs, flattened json files, basically anything that you can fit into a Spark dataframe. 
- In deequ we can explicitly state our assumptions about underlying in the form of a "unit-test" for data, which can be verified on a piece of data at hand. If the data has errors, we can "quarantine" and fix it, before we feed it to an application.
- **PyDeequ** Python API for Deequ is avaiable which can be used in Pyspark env.

**Components:**

There are 4 main components of Deequ
- Metrics Computation:
    - `Profiles` leverages Analyzers to analyze each column of a dataset.
    - `Analyzers` serve here as a foundational module that computes metrics for data profiling and validation at scale.
- Constraint Suggestion:
    - Specify rules for various groups of Analyzers to be run over a dataset to return back a collection of constraints suggested to run in a Verification Suite.
- Constraint Verification:
    - Perform data validation on a dataset with respect to various constraints set by users.
    - Also support pattern matching and pattern(regex) related constraints
- Metrics Repository
    - Allows for persistence and tracking of Deequ runs over time.


## Installation

[Github - Source Code](https://github.com/awslabs/python-deequ)

Note that we need to install following maven library to cluster itself inorder to intialize spark session with deequ dependencies `com.amazon.deequ:deequ:2.0.8-spark-3.5`. Available via [maven central](http://mvnrepository.com/artifact/com.amazon.deequ/deequ).

Here is a snippet showing how to install the above dependency using cluster configuration. The version of artifact depends on your spark version installed on cluster.

## Setup

In [None]:
!pip install pydeequ

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting pydeequ
  Downloading pydeequ-1.4.0-py3-none-any.whl (37 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.4.0
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
from pyspark.sql import SparkSession, Row, DataFrame
import pyspark
import os

#set the spark version installed on cluster
os.environ["SPARK_VERSION"] = "3.5"


In [None]:
import pydeequ

In [None]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", 'com.amazon.deequ:deequ:2.0.8-spark-3.5')
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate()
)

**Additional Imports**

In [None]:
from pydeequ.checks import *
from pydeequ.verification import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pydeequ.profiles import *

## DataLoader

In [None]:
table_name = 'sample_table'

In [None]:
df = spark.sql("SELECT * from {table_name} where current_date = '2024-12-01'")

df.printSchema()

In [None]:
test_cols = ["user_id", "lifetime_order_count", "lifetime_gsv", "lifetime_gppo"]

## Analyzers

All avaialble methods - [Analyzers](https://github.com/awslabs/python-deequ/blob/master/docs/analyzers.md)

In [None]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df.select(test_cols)) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Correlation("lifetime_gsv", "lifetime_gppo")) \
                    .addAnalyzer(Completeness("user_id")) \
                    .addAnalyzer(MinLength("user_id")) \
                    .addAnalyzer(MaxLength("user_id")) \
                    .addAnalyzer(Maximum("lifetime_gsv")) \
                    .addAnalyzer(Maximum("lifetime_gppo")) \
                    .addAnalyzer(Minimum("lifetime_gsv")) \
                    .addAnalyzer(Minimum("lifetime_gppo")) \
                    .addAnalyzer(Distinctness("lifetime_order_count")) \
                    .addAnalyzer(Distinctness("user_id")) \
                    .addAnalyzer(StandardDeviation("lifetime_gsv")) \
                    .addAnalyzer(Sum("lifetime_gsv")) \
                    .addAnalyzer(Compliance("top users", "lifetime_gppo >= 0")) \
                    .addAnalyzer(ApproxCountDistinct("user_id")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.display()



entity,instance,name,value
Column,user_id,Distinctness,1.0
Column,user_id,ApproxCountDistinct,22857264.0
Column,top users,Compliance,0.60715486774642
Column,user_id,Completeness,1.0
Column,user_id,MinLength,36.0
Column,user_id,MaxLength,36.0
Column,lifetime_gsv,StandardDeviation,9756.243642500607
Column,lifetime_gsv,Sum,75135482086.8327
Dataset,*,Size,20832782.0
Column,lifetime_order_count,Distinctness,3.403290064668272e-05


We can convert above df into pandas too

In [None]:
analysisResult_pandas_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult, pandas=True)
analysisResult_pandas_df



Unnamed: 0,entity,instance,name,value
0,Column,user_id,Distinctness,1.0
1,Column,user_id,ApproxCountDistinct,22857260.0
2,Column,top users,Compliance,0.6071549
3,Column,user_id,Completeness,1.0
4,Column,user_id,MinLength,36.0
5,Column,user_id,MaxLength,36.0
6,Column,lifetime_gsv,StandardDeviation,9756.244
7,Column,lifetime_gsv,Sum,75135480000.0
8,Dataset,*,Size,20832780.0
9,Column,lifetime_order_count,Distinctness,3.40329e-05


## Data Profiling

[All functionalities available for profiling](https://github.com/awslabs/python-deequ/blob/48ed4420aa648a71ea34c75d39d0cc829f98abda/docs/profiles.md)

In [None]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df.select(test_cols)) \
    .run()

In [None]:
result.profiles.keys()

dict_keys(['user_id', 'lifetime_order_count', 'lifetime_gsv', 'lifetime_gppo'])

In [None]:
lifetimegsv_profile = result.profiles['lifetime_gsv']

print(f'Statistics of \'lifetime_gsv\':')
print('\t',f"minimum: {lifetimegsv_profile.minimum}")
print('\t',f"maximum: {lifetimegsv_profile.maximum}")
print('\t',f"mean: {lifetimegsv_profile.mean}")
print('\t',f"standard deviation: {lifetimegsv_profile.stdDev}")

Statistics of 'lifetime_gsv':
	 minimum: -2720.0
	 maximum: 1274423.0
	 mean: 3606.5985851929963
	 standard deviation: 9756.243642500594


In [None]:
result.profiles


{'user_id': <pydeequ.profiles.StandardColumnProfile at 0xffff20dc3160>,
 'lifetime_order_count': <pydeequ.profiles.NumericColumnProfile at 0xffff20dc16f0>,
 'lifetime_gsv': <pydeequ.profiles.NumericColumnProfile at 0xffff2096bcd0>,
 'lifetime_gppo': <pydeequ.profiles.NumericColumnProfile at 0xffff20968eb0>}

In [None]:
for col, profile in result.profiles.items():
    print(f'Column: {col}')
    
    if isinstance(profile, pydeequ.profiles.NumericColumnProfile):  
        d = {}
        d['minimum'] = profile.minimum
        d['maximum'] = profile.maximum
        d['mean'] = profile.mean
        d['standard_deviation'] = profile.stdDev
        d['distribution'] = {}
        d['distribution']['KLL'] = {}
        d['distribution']['KLL']['buckets'] = {}
        for b in range(len(profile.kll.buckets)): 
            d['distribution']['KLL']['buckets'][f'bucket_{b}'] = {
                'lowValue': profile.kll.buckets[b].lowValue,
                'highValue':profile.kll.buckets[b].highValue,
                'count': profile.kll.buckets[b].count
            }
        d['distribution']['KLL']['sketch'] = {
            'c': profile.kll.parameters[0],
            'k': profile.kll.parameters[1]
        }
        d['distribution']['KLL']['data'] = profile.kll.data

        print(json.dumps(d, indent=2))
 
    else: 
        for i in profile.histogram: 
            print(f"{i.value} occurred {i.count} times (ratio is: {i.ratio})")
        
    print('\n')

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:447)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.Usa

## Constraint Suggestion

Constraint suggestion functionality into deequ to assist users in finding reasonable constraints for users data.

[All avaialble constraint suggestions](https://github.com/awslabs/python-deequ/blob/48ed4420aa648a71ea34c75d39d0cc829f98abda/docs/suggestions.md)

In [None]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df.select(test_cols)) \
             .addConstraintRule(CompleteIfCompleteRule()) \
             .addConstraintRule(NonNegativeNumbersRule()) \
             .addConstraintRule(RetainTypeRule()) \
             .addConstraintRule(UniqueIfApproximatelyUniqueRule()) \
             .run()

In [None]:
for sugg in suggestionResult['constraint_suggestions']:
    print(f"Constraint suggestion for \'{sugg['column_name']}\': {sugg['description']}")
    print(f"The corresponding Python code is: {sugg['code_for_constraint']}\n")

Constraint suggestion for 'user_id': 'user_id' is not null
The corresponding Python code is: .isComplete("user_id")

Constraint suggestion for 'lifetime_order_count': 'lifetime_order_count' is not null
The corresponding Python code is: .isComplete("lifetime_order_count")

Constraint suggestion for 'lifetime_order_count': 'lifetime_order_count' has no negative values
The corresponding Python code is: .isNonNegative("lifetime_order_count")

Constraint suggestion for 'lifetime_gsv': 'lifetime_gsv' is not null
The corresponding Python code is: .isComplete("lifetime_gsv")

Constraint suggestion for 'lifetime_gppo': 'lifetime_gppo' is not null
The corresponding Python code is: .isComplete("lifetime_gppo")



## Constraint Verification

[All available constraints](https://github.com/awslabs/python-deequ/blob/48ed4420aa648a71ea34c75d39d0cc829f98abda/docs/checks.md)

In [None]:
from pydeequ.checks import *
from pyspark.sql.types import DoubleType
from pydeequ.verification import *

check = Check(spark, CheckLevel.Error, "test_check")

checkResult = VerificationSuite(spark) \
    .onData(df.select(test_cols)) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("lifetime_gppo", lambda x: x < 0) \
        .hasMax("lifetime_gppo", lambda x: x > 0)  \
        .isComplete("user_id")  \
        .isUnique("user_id")  \
        .isUnique("lifetime_order_count")  \
        .isNonNegative("lifetime_gppo") \
        .isNonNegative("lifetime_gsv") \
        .isGreaterThan("lifetime_gsv", "lifetime_gppo") \
        .hasPattern(column='user_id', pattern=r"ba(r|z)", assertion=lambda x: x == 0/3)
    ) \
    .run()

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult) #checkResultsAsJson also available to get results in json format
display(checkResult_df)

Verification Run Status: Error




check,check_level,check_status,constraint,constraint_status,constraint_message
test_check,Error,Error,SizeConstraint(Size(None)),Success,
test_check,Error,Error,"MinimumConstraint(Minimum(lifetime_gppo,None,None))",Success,
test_check,Error,Error,"MaximumConstraint(Maximum(lifetime_gppo,None,None))",Success,
test_check,Error,Error,"CompletenessConstraint(Completeness(user_id,None,None))",Success,
test_check,Error,Error,"UniquenessConstraint(Uniqueness(List(user_id),None,None))",Success,
test_check,Error,Error,"UniquenessConstraint(Uniqueness(List(lifetime_order_count),None,None))",Failure,Value: 5.232138463312293E-6 does not meet the constraint requirement!
test_check,Error,Error,"ComplianceConstraint(Compliance(lifetime_gppo is non-negative,COALESCE(CAST(lifetime_gppo AS DECIMAL(20,10)), 0.0) >= 0,None,List(lifetime_gppo),None))",Failure,Value: 0.60715486774642 does not meet the constraint requirement!
test_check,Error,Error,"ComplianceConstraint(Compliance(lifetime_gsv is non-negative,COALESCE(CAST(lifetime_gsv AS DECIMAL(20,10)), 0.0) >= 0,None,List(lifetime_gsv),None))",Failure,Value: 0.9999991359771345 does not meet the constraint requirement!
test_check,Error,Error,"ComplianceConstraint(Compliance(lifetime_gsv is greater than lifetime_gppo,lifetime_gsv > lifetime_gppo,None,List(lifetime_gsv, lifetime_gppo),None))",Failure,Value: 0.9965364683410982 does not meet the constraint requirement!
test_check,Error,Error,"PatternMatchConstraint(user_id, ba(r|z))",Success,


## Metrics Repository

PyDeequ allows us to persist the metrics we computed on dataframes in a so-called MetricsRepository.

**Metrics Repository allows us to store the metrics in json format on the local disk (note that it also supports HDFS and S3).**

More about metrics repository 
- [Metrics Repository](https://github.com/awslabs/python-deequ/blob/48ed4420aa648a71ea34c75d39d0cc829f98abda/tutorials/repository.ipynb)
- [Metrics Repository - DBFS](https://github.com/awslabs/python-deequ/blob/48ed4420aa648a71ea34c75d39d0cc829f98abda/tutorials/repository_file_dbfs.ipynb)

In [None]:
from pydeequ.repository import *

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
print(f'metrics_file path: {metrics_file}')
repository = FileSystemMetricsRepository(spark, metrics_file)

metrics_file path: /local_disk0/tmp/1733472396389-0/metrics.json


Each set of metrics that we computed needs be indexed by a so-called ResultKey, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.

In [None]:
key_tags = {'tag': 'gp-data-test'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)



In [None]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df.select(test_cols)) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Correlation("lifetime_gsv", "lifetime_gppo")) \
                    .addAnalyzer(Completeness("user_id")) \
                    .addAnalyzer(MinLength("user_id")) \
                    .addAnalyzer(MaxLength("user_id")) \
                    .addAnalyzer(Maximum("lifetime_gsv")) \
                    .addAnalyzer(Maximum("lifetime_gppo")) \
                    .addAnalyzer(Minimum("lifetime_gsv")) \
                    .addAnalyzer(Minimum("lifetime_gppo")) \
                    .addAnalyzer(Distinctness("lifetime_order_count")) \
                    .addAnalyzer(Distinctness("user_id")) \
                    .addAnalyzer(StandardDeviation("lifetime_gsv")) \
                    .addAnalyzer(Sum("lifetime_gsv")) \
                    .addAnalyzer(Compliance("top users", "lifetime_gppo >= 0")) \
                    .addAnalyzer(ApproxCountDistinct("user_id")) \
                    .useRepository(repository) \
                    .saveOrAppendResult(resultKey) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.display()



entity,instance,name,value
Column,user_id,Distinctness,1.0
Column,user_id,ApproxCountDistinct,22857264.0
Column,top users,Compliance,0.60715486774642
Column,user_id,Completeness,1.0
Column,user_id,MinLength,36.0
Column,user_id,MaxLength,36.0
Column,lifetime_gsv,StandardDeviation,9756.243642500607
Column,lifetime_gsv,Sum,75135482086.8327
Dataset,*,Size,20832782.0
Column,lifetime_order_count,Distinctness,3.403290064668272e-05


**Load the above metrics from `metric_file_path`**

We can load metrics with all tags or some specific tags, we can also specify timestamp while fetching the dump.

In [None]:
metrics = repository.load() \
                            .before(ResultKey.current_milli_time()) \
                            .withTagValues(key_tags) \
                            .getSuccessMetricsAsDataFrame()

metrics.display()

entity,instance,name,value,dataset_date,tag
Column,user_id,Distinctness,1.0,1733472446745,gp-data-test
Column,user_id,ApproxCountDistinct,22857264.0,1733472446745,gp-data-test
Column,top users,Compliance,0.60715486774642,1733472446745,gp-data-test
Column,user_id,Completeness,1.0,1733472446745,gp-data-test
Column,user_id,MinLength,36.0,1733472446745,gp-data-test
Column,user_id,MaxLength,36.0,1733472446745,gp-data-test
Column,lifetime_gsv,StandardDeviation,9756.243642500607,1733472446745,gp-data-test
Column,lifetime_gsv,Sum,75135482086.8327,1733472446745,gp-data-test
Dataset,*,Size,20832782.0,1733472446745,gp-data-test
Column,lifetime_order_count,Distinctness,3.403290064668272e-05,1733472446745,gp-data-test


## Anomaly Detection

It helps in checking how much change we expect in certain metrics of our data wrt yesterday data. Available functionalities can be found here [anomaly_detection](https://github.com/awslabs/python-deequ/blob/48ed4420aa648a71ea34c75d39d0cc829f98abda/docs/anomaly_detection.md)

In [None]:
from pydeequ.anomaly_detection import *

In [None]:
prev_df = spark.sql("SELECT * from {table1} where current_date = '2024-12-01'")

curr_df = spark.sql("SELECT * from {table2} where current_date = '2024-12-02'")

prev_df.count(), curr_df.count()

(20832782, 20935007)

In [None]:
(20935007 - 20832782)/20832782

0.004906929856991735

In [None]:
from pydeequ.repository import *
from pydeequ.verification import *
metricsRepository = InMemoryMetricsRepository(spark)
metricsRepository

<pydeequ.repository.InMemoryMetricsRepository at 0xffff085df070>

In [None]:
prevKey = ResultKey(spark, ResultKey.current_milli_time() - 24 * 60 * 60 * 1000)

# maxRateIncrease -> allowable increase in size metric (2.0 -> 200% increase max)
prev_Result = VerificationSuite(spark).onData(prev_df) \
    .useRepository(metricsRepository) \
    .saveOrAppendResult(prevKey) \
    .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=0.0003), Size()) \
    .run()

In [None]:
currKey = ResultKey(spark, ResultKey.current_milli_time())

curr_Result = VerificationSuite(spark).onData(curr_df) \
    .useRepository(metricsRepository) \
    .saveOrAppendResult(currKey) \
    .addAnomalyCheck(RelativeRateOfChangeStrategy( maxRateIncrease=1.0), Size()) \
    .run()

In [None]:
if (curr_Result.status != "Success"):
    print("Anomaly detected in the Size() metric!")
    metricsRepository.load().forAnalyzers([Size()]).getSuccessMetricsAsDataFrame().show()

Anomaly detected in the Size() metric!
+-------+--------+----+-----------+-------------+
| entity|instance|name|      value| dataset_date|
+-------+--------+----+-----------+-------------+
|Dataset|       *|Size|2.0935007E7|1733474457310|
|Dataset|       *|Size|2.0832782E7|1733388055998|
+-------+--------+----+-----------+-------------+



In [None]:
curr_Result.status



**More details and definition of deequ methods can be found here [Automating large-scale data quality verification](https://www.vldb.org/pvldb/vol11/p1781-schelter.pdf)**