# DEEQU:
Deequ is a library that has been built on top of Apache Spark. Its purpose is to define "unit tests for data" so that you       can measure the quality of large datasets.

If using Deequ within a Python environment, there is a library called PyDeequ.

This tool is specifically made for data quality testing and tracking.
Seamless integration with Spark as it is built on top of Spark.
Wide variety of built-in functions for data quality testing.
Deequ provides a Profiler to automatically determine tests for your dataset using historical data.

## The six dimensions of data quality are:

### Accuracy:  
The accuracy of data is the degree to which the data represent a real-world event or object.

### Consistency:
Consistency is the absence of difference when comparing two or more representations of something against a               reference. If data are recorded or captured in multiple places, consistency becomes very important. One cannot have             the same data point recorded in various ways.

### Timeliness:
Timeliness is the degree to which data represent reality from the required point in time. Timeliness expects that              the data within your dataset is sufficiently up to date. What are the delays between an event happening and the data            point being recorded?

### Validity:
The validity of a dataset is specific to a certain field. In other words, data is valid if it conforms to the syntax            (format, type, range) of its definition. Each field will have a property that makes it valid, such as an "@" symbol            for an email address.

### Completeness:
The completeness of data relates to how many values may be missing in your dataset.

### Uniqueness:
This dimension relates to having a real-world object or event represented only once in a particular dataset. The                same object cannot be duplicated. In other words, uniqueness specifies that nothing will be recorded more than once            based upon how that thing is identified. It is the inverse of an assessment of the level of duplication.

In [1]:
import os
import pyspark
print(pyspark.__file__)
path = os.path.abspath(pyspark.__file__)
print(path)

C:\Users\user\anaconda3\envs\deequ\lib\site-packages\pyspark\__init__.py
C:\Users\user\anaconda3\envs\deequ\lib\site-packages\pyspark\__init__.py


In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.0.0-bin-hadoop2.7'

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pydeequ
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, DateType, NumericType, StructType, StringType, StructField

In [4]:
#memory = '6g'
#pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
#os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [4]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

In [5]:
spark

In [6]:
model = spark.read.parquet('model_parquet.parquet')
#validation_df = spark.read.parquet('validation_parquet.parquet')
#evaluation_df = spark.read.parquet('evaluation_parquet.parquet')
#cal_df = spark.read.parquet('calendar_parquet.parquet')
#sellprice_df = spark.read.parquet('sellprice_parquet.parquet')

In [7]:
model.show(5)

+--------------------+-------------+---------+-------+--------+--------+---+------+-------------------+--------+--------+----+-----+----+------------+------------+------------+------------+-------+-------+-------+----------+---------+-----------------+
|                  id|      item_id|  dept_id| cat_id|store_id|state_id|  d|demand|               date|wm_yr_wk| weekday|wday|month|year|event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|sell_price|     cost|__index_level_0__|
+--------------------+-------------+---------+-------+--------+--------+---+------+-------------------+--------+--------+----+-----+----+------------+------------+------------+------------+-------+-------+-------+----------+---------+-----------------+
|HOBBIES_1_008_CA_...|HOBBIES_1_008|HOBBIES_1|HOBBIES|    CA_1|      CA|  1|    12|2011-01-29 01:00:00|   11101|Saturday|   1|    1|2011|     NoEvent|     NoEvent|     NoEvent|     NoEvent|  false|  false|  false|0.45996094|5.5195312|       

In [8]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import desc

model = model.withColumn('index', monotonically_increasing_id())
model.orderBy(desc('index')).drop('index').show(5)

+--------------------+-----------+-------+------+--------+--------+----+------+-------------------+--------+-------+----+-----+----+------------+------------+------------+------------+-------+-------+-------+----------+---------+-----------------+
|                  id|    item_id|dept_id|cat_id|store_id|state_id|   d|demand|               date|wm_yr_wk|weekday|wday|month|year|event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|sell_price|     cost|__index_level_0__|
+--------------------+-----------+-------+------+--------+--------+----+------+-------------------+--------+-------+----+-----+----+------------+------------+------------+------------+-------+-------+-------+----------+---------+-----------------+
|FOODS_3_827_WI_3_...|FOODS_3_827|FOODS_3| FOODS|    WI_3|      WI|1941|     1|2016-05-22 01:00:00|   11617| Sunday|   2|    5|2016|     NoEvent|     NoEvent|     NoEvent|     NoEvent|  false|  false|  false|       1.0|      1.0|         59181089|
|FOODS_3

DATA QUALITY TEST - CHECKING FOR COMPLETENESS (model)

In [9]:
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Null value Check")

checkResult = VerificationSuite(spark) \
    .onData(model) \
    .addCheck(  
     check.isComplete("id")\
    .isComplete("item_id")\
    .isComplete("dept_id")\
    .isComplete("cat_id")\
    .isComplete("store_id")\
    .isComplete("state_id")\
    .isComplete("d")\
    .isComplete("demand")\
    .isComplete("date")\
    .isComplete("wm_yr_wk")\
    .isComplete("weekday")\
    .isComplete("wday")\
    .isComplete("month")\
    .isComplete("year")\
    .isComplete("event_name_1")\
    .isComplete("event_type_1")\
    .isComplete("event_name_2")\
    .isComplete("event_type_2")\
    .isComplete("snap_CA")\
    .isComplete("snap_TX")\
    .isComplete("snap_WI")\
    .isComplete("sell_price")\
    .isComplete("cost"))\
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+----------------+-----------+------------+-------------------------------------------------------+-----------------+------------------+
|check           |check_level|check_status|constraint                                             |constraint_status|constraint_message|
+----------------+-----------+------------+-------------------------------------------------------+-----------------+------------------+
+----------------+-----------+------------+-------------------------------------------------------+-----------------+------------------+
only showing top 20 rows



CHECKING IF RANGE OF VALUES ARE VALID i.e. minimum and maximum values

In [10]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(model.select('d','demand','wm_yr_wk','wday','month','year','sell_price','cost')) \
    .run()
for col, profile in result.profiles.items():
    print(f'Statistics of \'{col}\':')
    print('\t', f"Minimum value for {col} column is "+ str(profile.minimum))
    print('\t', f"Maximum value for {col} column is "+ str(profile.maximum))

Statistics of 'wday':
	 Minimum value for wday column is 1.0
	 Maximum value for wday column is 7.0
Statistics of 'year':
	 Minimum value for year column is 2011.0
	 Maximum value for year column is 2016.0
Statistics of 'cost':
	 Minimum value for cost column is 0.0
	 Maximum value for cost column is 2164.21875
Statistics of 'wm_yr_wk':
	 Minimum value for wm_yr_wk column is 11101.0
	 Maximum value for wm_yr_wk column is 11617.0
Statistics of 'sell_price':
	 Minimum value for sell_price column is 0.01000213623046875
	 Maximum value for sell_price column is 107.3125
Statistics of 'demand':
	 Minimum value for demand column is 0.0
	 Maximum value for demand column is 763.0
Statistics of 'month':
	 Minimum value for month column is 1.0
	 Maximum value for month column is 12.0
Statistics of 'd':
	 Minimum value for d column is 1.0
	 Maximum value for d column is 1941.0


CHECK FOR UNIQUENESS i.e. duplication (model)

In [11]:
#Check if date 'd' are unique. 
from pydeequ.verification import *
check = Check(spark, CheckLevel.Warning, "Duplication test Check")
checkResult = VerificationSuite(spark).onData(model).addCheck(check\
    .hasUniqueness(['date', 'id'], lambda x:x ==1)).run()
                                                           
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

Python Callback server started!
+----------------------+-----------+------------+------------------------------------------------------+-----------------+------------------+
|check                 |check_level|check_status|constraint                                            |constraint_status|constraint_message|
+----------------------+-----------+------------+------------------------------------------------------+-----------------+------------------+
+----------------------+-----------+------------+------------------------------------------------------+-----------------+------------------+



CHECKING FOR ZERO VALUES (model)

In [12]:
check = Check(spark, CheckLevel.Warning, "Zero value Check")
result = VerificationSuite(spark)\
.onData(model)\
.addCheck(check
    .satisfies("d == 0", "Zero value check", lambda x: x==0)\
    .satisfies("wm_yr_wk == 0", "Zero value check", lambda x: x==0)\
    .satisfies("wday == 0", "Zero value check", lambda x: x==0)\
    .satisfies("month == 0", "Zero value check", lambda x: x==0)\
    .satisfies("year == 0", "Zero value check", lambda x: x==0)\
    .satisfies("sell_price == 0", "Zero value check", lambda x: x==0)\
    )\
.run()

result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show(truncate=False)

+----------------+-----------+------------+-----------------------------------------------------------------------+-----------------+------------------+
|check           |check_level|check_status|constraint                                                             |constraint_status|constraint_message|
+----------------+-----------+------------+-----------------------------------------------------------------------+-----------------+------------------+
+----------------+-----------+------------+-----------------------------------------------------------------------+-----------------+------------------+



CHECKING FOR NEGATIVE VALUES (model)

In [13]:
checkResult = VerificationSuite(spark) \
                    .onData(model) \
                    .addCheck(
                        Check(spark, CheckLevel.Warning, "Negative Values")\
                            .isNonNegative('d')\
                            .isNonNegative('demand')\
                            .isNonNegative('wm_yr_wk')\
                            .isNonNegative('wday')\
                            .isNonNegative('month')\
                            .isNonNegative('year')\
                            .isNonNegative('sell_price')\
                            .isNonNegative('cost')\
                            )\
                            .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+---------------+-----------+------------+------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check          |check_level|check_status|constraint                                                                                                              |constraint_status|constraint_message|
+---------------+-----------+------------+------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
+---------------+-----------+------------+------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+



In [14]:
spark.stop()