# Test data quality at scale with PyDeequ

Authors: Calvin Wang (calviwan@), Chris Ghyzel (cghyzel@), Joan Aoanan (jaoanan@), Veronika Megler (meglerv@) 

You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your machine learning application. Incorrect, missing or malformed data can have a large impact on production systems. Examples of data quality issues are:

* Missing values can lead to failures in production system that require non-null values (NullPointerException).
* Changes in the distribution of data can lead to unexpected outputs of machine learning models.
* Aggregations of incorrect data can lead to wrong business decisions.

In this practical work, we introduce PyDeequ, an open source Python wrapper over [Deequ](https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/) (an open source tool developed and used at Amazon).  While Deequ is written in Scala, PyDeequ allows you to use its  data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with [Pandas](https://pandas.pydata.org/) DataFrame as opposed to restricting within Spark DataFrames. 

Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of [Apache Spark](https://spark.apache.org/) and is designed to scale with large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

## Deequ at Amazon 

Deequ is being used internally at Amazon for verifying the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues do not propagate to consumer data pipelines, reducing their blast radius. 

Deequ is also used within [Amazon SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html#model-monitor-how-it-works). Now with the availability of PyDeequ, it is finding its way into a broader set of environments - SageMaker Notebooks, AWS Glue, and more.

## Overview of PyDeequ

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in Figure 1). 

* Metrics Computation — Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon S3, and to compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
* Constraint Verification — As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
* Constraint Suggestion — You can choose to define your own custom data quality constraints, or use the automated constraint suggestion methods that profile the data to infer useful constraints.
* Python wrappers — You can call each of the Deequ functions using Python syntax. The wrappers translate the commands to the underlying Deequ calls, and return their response.



## Example 

As a running example, we use NYC TLC Trip Record Data on Amazon S3. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook. 

During the data exploration phase, you’d like to easily answer some basic questions about the data: 

* Are the fields that are supposed to contain unique values, really unique? Are there fields that are missing values? 
* How many distinct categories are there in the categorical fields?
* Are there correlations between some key features?
* If there are two supposedly similar datasets (different categories, or different time periods, say), are they really similar?

Then, we’ll show you how to scale this approach to large-scale datasets, using the same code on an EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

### Setup: Start a PySpark Session in a SageMaker Notebook

In [35]:
%%bash 

# install PyDeequ via pip 
pip install pydeequ 



[0m

In [36]:
%%bash 

pip install sagemaker-pyspark pyspark



[0m

In [44]:
# Create a directory for Java installation
!mkdir -p /root/java

# Download the Java JDK
!wget -P /root/java https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz

# Extract the downloaded JDK
!tar --no-same-owner -xvzf /root/java/openjdk-11+28_linux-x64_bin.tar.gz -C /root/java/


jdk-11/bin/jaotc
jdk-11/bin/jar
jdk-11/bin/jarsigner
jdk-11/bin/java
jdk-11/bin/javac
jdk-11/bin/javadoc
jdk-11/bin/javap
jdk-11/bin/jcmd
jdk-11/bin/jconsole
jdk-11/bin/jdb
jdk-11/bin/jdeprscan
jdk-11/bin/jdeps
jdk-11/bin/jhsdb
jdk-11/bin/jimage
jdk-11/bin/jinfo
jdk-11/bin/jjs
jdk-11/bin/jlink
jdk-11/bin/jmap
jdk-11/bin/jmod
jdk-11/bin/jps
jdk-11/bin/jrunscript
jdk-11/bin/jshell
jdk-11/bin/jstack
jdk-11/bin/jstat
jdk-11/bin/jstatd
jdk-11/bin/keytool
jdk-11/bin/pack200
jdk-11/bin/rmic
jdk-11/bin/rmid
jdk-11/bin/rmiregistry
jdk-11/bin/serialver
jdk-11/bin/unpack200
jdk-11/conf/logging.properties
jdk-11/conf/management/jmxremote.access
jdk-11/conf/management/jmxremote.password.template
jdk-11/conf/management/management.properties
jdk-11/conf/net.properties
jdk-11/conf/security/java.policy
jdk-11/conf/security/java.security
jdk-11/conf/security/policy/README.txt
jdk-11/conf/security/policy/limited/default_US_export.policy
jdk-11/conf/security/policy/limited/default_local.policy
jdk-11/conf

In [45]:
import os

# Set JAVA_HOME to the extracted JDK directory
java_home = "/root/java/jdk-11"
if not os.path.exists(java_home):
    raise RuntimeError(f"Java home directory does not exist: {java_home}")

os.environ["JAVA_HOME"] = java_home
os.environ["PATH"] = os.path.join(java_home, "bin") + ":" + os.environ["PATH"]

# Verify that java is in the PATH
java_path = os.popen('which java').read().strip()
print(f"Java path: {java_path}")

# Check if the Java command works
java_version_output = os.popen('java -version').read()
print(f"Java version output: {java_version_output}")

# Set the SPARK_VERSION environment variable
os.environ["SPARK_VERSION"] = "3.3.0"


Java path: /root/java/jdk-11/bin/java
Java version output: 


openjdk version "11" 2018-09-25
OpenJDK Runtime Environment 18.9 (build 11+28)
OpenJDK 64-Bit Server VM 18.9 (build 11+28, mixed mode)


In [48]:
import sagemaker_pyspark
from pyspark.sql import SparkSession, Row, DataFrame
import json
import pandas as pd

import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())


### We will be using the NYC TLC Trip Record Dataset

In [78]:
s3_url = "s3a://nyc-tlc/trip data/yellow_tripdata_2022-01.parquet"

# Read the Parquet file into a Spark DataFrame
df = spark.read.parquet(s3_url)

# Print the schema of the DataFrame
df.printSchema()

# Show the first few rows of the DataFrame
# df.show(5)

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



## Data Analysis 

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. 

In [79]:
# Run the analysis with PyDeequ
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("VendorID")) \
                    .addAnalyzer(ApproxCountDistinct("VendorID")) \
                    .addAnalyzer(Mean("trip_distance")) \
                    .addAnalyzer(Compliance("long trips", "trip_distance >= 10.0")) \
                    .addAnalyzer(Correlation("fare_amount", "trip_distance")) \
                    .addAnalyzer(Correlation("total_amount", "fare_amount")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()



+-----------+--------------------+-------------------+--------------------+
|     entity|            instance|               name|               value|
+-----------+--------------------+-------------------+--------------------+
|     Column|          long trips|         Compliance| 0.06526116194000563|
|     Column|       trip_distance|               Mean|    5.37275119311366|
|    Dataset|                   *|               Size|           2463931.0|
|     Column|            VendorID|       Completeness|                 1.0|
|Multicolumn|total_amount,fare...|        Correlation|  0.9998747962656196|
|Multicolumn|fare_amount,trip_...|        Correlation|4.057810518341101...|
|     Column|            VendorID|ApproxCountDistinct|                 4.0|
+-----------+--------------------+-------------------+--------------------+





### You can also get that result in a Pandas Dataframe!

Passing `pandas=True` in any call for getting metrics as DataFrames will return the dataframe in Pandas form! We'll see more of it down the line! 

In [81]:
analysisResult_pd_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult, pandas=True)
analysisResult_pd_df



Unnamed: 0,entity,instance,name,value
0,Column,long trips,Compliance,0.06526116
1,Column,trip_distance,Mean,5.372751
2,Dataset,*,Size,2463931.0
3,Column,VendorID,Completeness,1.0
4,Multicolumn,"total_amount,fare_amount",Correlation,0.9998748
5,Multicolumn,"fare_amount,trip_distance",Correlation,0.0004057811
6,Column,VendorID,ApproxCountDistinct,4.0


From the data metrics, we learned the following:

- **Compliance of Long Trips**: Only 6.53% of trips are classified as long trips.
- **Mean Trip Distance**: The average trip distance is approximately 5.37 miles.
- **Dataset Size**: The dataset contains approximately 2,463,931 records.
- **VendorID Completeness**: The `VendorID` column is 100% complete, with no missing values.
- **Correlation between Total Amount and Fare Amount**: There is a very high correlation (0.9999) between `total_amount` and `fare_amount`, indicating that these two variables are almost perfectly correlated.
- **Correlation between Fare Amount and Trip Distance**: There is a very low correlation (0.0004) between `fare_amount` and `trip_distance`, indicating little to no linear relationship between these variables.
- **Approximate Count of Distinct VendorIDs**: There are approximately 4 distinct values in the `VendorID` column.



## Define and Run Tests for Data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the _VerificationSuite (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/VerificationSuite.scala)_ and add _Checks (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/checks/Check.scala)_ on attributes of the data. 

In this example, we test for the following properties of our data:

* There are at least 2 million rows in total.
* `VendorID` is never NULL.
* `VendorID` is unique.
* `VendorID` only contains the values "2", "1", "6", or "5".
* `payment_type` is never NULL.
* `payment_type` only contains the values "1" or "2", and at least 96% of the values must meet this criteria.
* `DOLocationID` is never NULL and does not contain negative values.
* `improvement_surcharge` is never NULL.
* `tpep_dropoff_datetime` is never NULL.
* `PULocationID` is never NULL and does not contain negative values.
* `trip_distance` is never NULL and does not contain negative values.
* `tolls_amount` is never NULL.

This is the code that reflects the previous statements. For information about all available checks, see _this GitHub repository (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/checks/Check.scala)_. You can run this directly in the Spark shell as previously explained:


In [90]:
from pydeequ.checks import *
from pydeequ.verification import *

# Define the check
check = Check(spark, CheckLevel.Warning, "NYC TLC Trip Record Data")

# Run the verification suite with the defined checks
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 2000000) \
        .isComplete("VendorID") \
        .isUnique("VendorID") \
        .isContainedIn("VendorID", ["2", "1", "6", "5"]) \
        .isComplete("payment_type") \
        .isContainedIn("payment_type", ["1", "2"], lambda x: x >= 0.96, "It should be above 0.96!") \
        .isComplete("DOLocationID") \
        .isNonNegative("DOLocationID") \
        .isComplete("improvement_surcharge") \
        .isComplete("tpep_dropoff_datetime") \
        .isComplete("PULocationID") \
        .isNonNegative("PULocationID") \
        .isComplete("trip_distance") \
        .isNonNegative("trip_distance") \
        .isComplete("tolls_amount")) \
    .run()


# Print the verification run status
print(f"Verification Run Status: {checkResult.status}")

# Convert the check results to a DataFrame and display it
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
print(checkResult_df)




                       check check_level check_status  \

                                           constraint constraint_status  \
0                          SizeConstraint(Size(None))           Success   
1   CompletenessConstraint(Completeness(VendorID,N...           Success   
2   UniquenessConstraint(Uniqueness(List(VendorID)...           Failure   
3   ComplianceConstraint(Compliance(VendorID conta...           Success   
4   CompletenessConstraint(Completeness(payment_ty...           Success   
5   ComplianceConstraint(Compliance(payment_type c...           Success   
6   CompletenessConstraint(Completeness(DOLocation...           Success   
7   ComplianceConstraint(Compliance(DOLocationID i...           Success   
8   CompletenessConstraint(Completeness(improvemen...           Success   
9   CompletenessConstraint(Completeness(tpep_dropo...           Success   
10  CompletenessConstraint(Completeness(PULocation...           Success   
11  ComplianceConstraint(Compliance(PULoca



After executing the verification suite, PyDeequ translated the test descriptions into a series of Spark jobs to compute metrics on the NYC TLC Trip Record Data. The verification run status was "Warning," indicating some checks did not fully pass.

Here's a concise summary of the results:

- **Size Check**: Passed, with the data having at least 2,000,000 records.
- **Completeness Checks**: All columns (`VendorID`, `payment_type`, `DOLocationID`, `improvement_surcharge`, `tpep_dropoff_datetime`, `PULocationID`, `trip_distance`, `tolls_amount`) passed their completeness checks.
- **Uniqueness Check**: Failed for `VendorID`, indicating the values are not unique.
- **Value Containment Check**: Passed for both `VendorID` and `payment_type` (with the latter requiring at least 96% of values within ["1", "2"]).
- **Non-Negativity Checks**: Passed for `DOLocationID`, `PULocationID`, and `trip_distance`.

Interestingly, the `VendorID` column failed the uniqueness check, showing that it does not have unique values, as indicated by the failure message. All other constraints were successfully met.


In [88]:
checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df



Unnamed: 0,entity,instance,name,value
0,Column,improvement_surcharge,Completeness,1.0
1,Column,PULocationID is non-negative,Compliance,1.0
2,Column,tpep_dropoff_datetime,Completeness,1.0
3,Column,payment_type,Completeness,1.0
4,Column,DOLocationID,Completeness,1.0
5,Column,tolls_amount,Completeness,1.0
6,Column,trip_distance is non-negative,Compliance,1.0
7,Column,"VendorID contained in 2,1,6,5",Compliance,1.0
8,Column,trip_distance,Completeness,1.0
9,Dataset,*,Size,2463931.0


## Automated Constraint Suggestion 

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see _this GitHub repository. (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/data_profiling_example.md)_

In [74]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

                                                                                

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(DOLocationID,None,None))",
      "column_name": "DOLocationID",
      "current_value": "Completeness: 1.0",
      "description": "'DOLocationID' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"DOLocationID\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('DOLocationID' has no negative values,DOLocationID >= 0,None,List(DOLocationID),None))",
      "column_name": "DOLocationID",
      "current_value": "Minimum: 1.0",
      "description": "'DOLocationID' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"DOLocationID

The above result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks.

## More Examples on GitHub

You can find examples of more advanced features at _Deequ’s GitHub page (https://github.com/awslabs/deequ)_:

* Deequ not only provides data quality checks with fixed thresholds. Learn how to use _anomaly detection on data quality metrics (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/anomaly_detection_example.md)_ to apply tests on metrics that change over time.
* Deequ offers support for storing and loading metrics. Learn how to use the _MetricsRepository (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/metrics_repository_example.md)_ for this use case.
* If your dataset grows over time or is partitioned, you can use Deequ’s _incremental metrics computation (https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/algebraic_states_example.md)_ capability. For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

## Additional Resources

Learn more about the inner workings of Deequ in the VLDB 2018 paper “_Automating large-scale data quality verification. (http://www.vldb.org/pvldb/vol11/p1781-schelter.pdf)_”

## Conclusion

This blog post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available for you now to build your own data quality management pipeline.
