# PyDeequ

+ Original Post: <https://aws.amazon.com/pt/blogs/big-data/test-data-quality-at-scale-with-deequ/>
+ Data Set: <s3://amazon-reviews-pds/parquet/product_category=Electronics/>

In [2]:
import pandas as pd
import json

df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)



In [2]:
df.limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,year
0,US,51163966,R2RX7KLOQQ5VBG,B00000JBAT,738692522,Diamond Rio Digital Player,3,0,0,N,N,Why just 30 minutes?,"RIO is really great, but Diamond should increa...",1999-06-22,1999
1,US,30050581,RPHMRNCGZF2HN,B001BRPLZU,197287809,NG 283220 AC Adapter Power Supply for HP Pavil...,5,0,0,N,Y,Five Stars,Great quality for the price!!!!,2014-11-17,2014
2,US,52246039,R3PD79H9CTER8U,B00000JBAT,738692522,Diamond Rio Digital Player,5,1,2,N,N,The digital audio &quot;killer app&quot;,One of several first-generation portable MP3 p...,1999-06-30,1999
3,US,16186332,R3U6UVNH7HGDMS,B009CY43DK,856142222,HDE Mini Portable Capsule Travel Mobile Pocket...,5,0,0,N,Y,Five Stars,"I like it, got some for the Grandchilren",2014-11-17,2014
4,US,53068431,R3SP31LN235GV3,B00000JBSN,670078724,JVC FS-7000 Executive MicroSystem (Discontinue...,3,5,5,N,N,Design flaws ruined the better functions,I returned mine for a couple of reasons: The ...,1999-07-13,1999
5,US,23604361,R1IYAZPPTRJF7E,B005LQ83EI,503838146,BlueRigger High Speed HDMI Cable with Ethernet...,3,0,0,N,Y,Never got around to returning the 1 out of 2 ...,Never got around to returning the 1 out of 2 t...,2014-11-17,2014
6,US,52036200,RDYOBX7A6YZ2X,B00000JHYS,738692522,Rio PMP 300 Special-Edition MP3 Player,4,0,0,N,N,Might want to wait.,"I really like the Rio. doesn't skip, light we...",1999-07-16,1999
7,US,12029527,R3RDD9FILG1LSN,B00CVB12RG,587294791,Brookstone 2.4GHz Wireless TV Headphones,5,3,3,N,Y,"Saved my. marriage, I swear to god.","Saved my.marriage, I swear to god.",2014-11-17,2014
8,US,51618361,R31I45LN3W8UUM,B00000J1SI,505031276,GPX C3860 Portable CD Player with 22-Track Pro...,5,8,9,N,N,LOW PRICE GOOD VALUE,"Very Good Product, good enough if you're tight...",1999-07-17,1999
9,US,12246316,R2RVEE19EN94YK,B00JAYS824,995064901,ABLEGRID Trademarked Car Adapter For Cradlepoi...,5,0,0,N,Y,Five Stars,thanks to this i have my own personal mobile c...,2014-11-17,2014


## Data analysis

In [4]:
from pydeequ.analyzers import AnalyzerContext
from pydeequ.analyzers import AnalysisRunner

**ApproxCountDistinct**: Approximate number of distinct value, computed with HyperLogLogPlusPlus sketches

In [5]:
from pydeequ.analyzers import ApproxCountDistinct
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(ApproxCountDistinct("customer_id"))\
                                          .addAnalyzer(ApproxCountDistinct("product_id"))\
                                          .addAnalyzer(ApproxCountDistinct("review_id"))\
                                          .run())\
.show(truncate=False)

+------+-----------+-------------------+--------+
|entity|instance   |name               |value   |
+------+-----------+-------------------+--------+
|Column|customer_id|ApproxCountDistinct|284020.0|
|Column|product_id |ApproxCountDistinct|58461.0 |
|Column|review_id  |ApproxCountDistinct|303650.0|
+------+-----------+-------------------+--------+



**ApproxQuantile**: Approximate quantile of a distribution.

In [5]:
from pydeequ.analyzers import ApproxQuantile
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(ApproxQuantile("star_rating", quantile = 0.5))\
                                          .run())\
.show(truncate=False)

+------+-----------+------------------+-----+
|entity|instance   |name              |value|
+------+-----------+------------------+-----+
|Column|star_rating|ApproxQuantile-0.5|5.0  |
+------+-----------+------------------+-----+



**ApproxQuantiles**: Approximate quantiles of a distribution.

In [6]:
from pydeequ.analyzers import ApproxQuantiles
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(ApproxQuantiles("star_rating", quantiles = [0.1, 0.5, 0.9]))\
                                          .run())\
.show(truncate=False)

+------+-----------+-------------------+-----+
|entity|instance   |name               |value|
+------+-----------+-------------------+-----+
|Column|star_rating|ApproxQuantiles-0.1|1.0  |
|Column|star_rating|ApproxQuantiles-0.5|5.0  |
|Column|star_rating|ApproxQuantiles-0.9|5.0  |
+------+-----------+-------------------+-----+



**Completeness**: Fraction of non-null values in a column.

In [7]:
from pydeequ.analyzers import Completeness
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Completeness("review_id"))\
                                          .run())\
.show(truncate=False)

+------+---------+------------+-----+
|entity|instance |name        |value|
+------+---------+------------+-----+
|Column|review_id|Completeness|1.0  |
+------+---------+------------+-----+



**Compliance**: Fraction of rows that comply with the given column constraint.

In [8]:
from pydeequ.analyzers import Compliance
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))\
                                          .run())\
.show(truncate=False)

+------+---------------+----------+------------------+
|entity|instance       |name      |value             |
+------+---------------+----------+------------------+
|Column|top star_rating|Compliance|0.7494070692849394|
+------+---------------+----------+------------------+



**Correlation**: Pearson correlation coefficient, measures the linear correlation between two columns. The result is in the range [-1, 1], where 1 means positive linear correlation, -1 means negative linear correlation, and 0 means no correlation.	

In [9]:
from pydeequ.analyzers import Correlation
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Correlation("total_votes", "star_rating"))\
                                          .run())\
.show(truncate=False)

+-----------+-----------------------+-----------+--------------------+
|entity     |instance               |name       |value               |
+-----------+-----------------------+-----------+--------------------+
|Mutlicolumn|total_votes,star_rating|Correlation|-0.03451097996538765|
+-----------+-----------------------+-----------+--------------------+



**CountDistinct**: Number of distinct values.

In [10]:
from pydeequ.analyzers import CountDistinct
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(CountDistinct("review_id"))\
                                          .run())\
.show(truncate=False)

+------+---------+-------------+---------+
|entity|instance |name         |value    |
+------+---------+-------------+---------+
|Column|review_id|CountDistinct|3109479.0|
+------+---------+-------------+---------+



**DataType**: Distribution of data types such as Boolean, Fractional, Integral, and String. The resulting histogram allows filtering by relative or absolute fractions.	

In [11]:
from pydeequ.analyzers import DataType
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(DataType("year"))\
                                          .run())\
.show(truncate=False)

+------+--------+--------------------------+---------+
|entity|instance|name                      |value    |
+------+--------+--------------------------+---------+
|Column|year    |Histogram.bins            |5.0      |
|Column|year    |Histogram.abs.Boolean     |0.0      |
|Column|year    |Histogram.ratio.Boolean   |0.0      |
|Column|year    |Histogram.abs.Fractional  |0.0      |
|Column|year    |Histogram.ratio.Fractional|0.0      |
|Column|year    |Histogram.abs.Integral    |3120938.0|
|Column|year    |Histogram.ratio.Integral  |1.0      |
|Column|year    |Histogram.abs.Unknown     |0.0      |
|Column|year    |Histogram.ratio.Unknown   |0.0      |
|Column|year    |Histogram.abs.String      |0.0      |
|Column|year    |Histogram.ratio.String    |0.0      |
+------+--------+--------------------------+---------+



**Distinctness**: Fraction of distinct values of a column over the number of all values of a column. Distinct values occur at least once. Example: [a, a, b] contains two distinct values a and b, so distinctness is 2/3.

In [12]:
from pydeequ.analyzers import Distinctness
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Distinctness("review_id"))\
                                          .run())\
.show(truncate=False)

+------+---------+------------+------------------+
|entity|instance |name        |value             |
+------+---------+------------+------------------+
|Column|review_id|Distinctness|0.9963283474391352|
+------+---------+------------+------------------+



**Entropy**: Entropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). Example: [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.

In [13]:
from pydeequ.analyzers import Entropy
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Entropy("star_rating"))\
                                          .run())\
.show(truncate=False)

+------+-----------+-------+------------------+
|entity|instance   |name   |value             |
+------+-----------+-------+------------------+
|Column|star_rating|Entropy|1.2339328231223723|
+------+-----------+-------+------------------+



**Maximum**: Maximum value.

In [14]:
from pydeequ.analyzers import Maximum
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Maximum("star_rating"))\
                                          .run())\
.show(truncate=False)

+------+-----------+-------+-----+
|entity|instance   |name   |value|
+------+-----------+-------+-----+
|Column|star_rating|Maximum|5.0  |
+------+-----------+-------+-----+



**Mean**: Mean value; null values are excluded.

In [15]:
from pydeequ.analyzers import Mean
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Mean("star_rating"))\
                                          .run())\
.show(truncate=False)

+------+-----------+----+-----------------+
|entity|instance   |name|value            |
+------+-----------+----+-----------------+
|Column|star_rating|Mean|4.036143941340712|
+------+-----------+----+-----------------+



**Minimum**: Minimum value.

In [16]:
from pydeequ.analyzers import Minimum
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Minimum("star_rating"))\
                                          .run())\
.show(truncate=False)

+------+-----------+-------+-----+
|entity|instance   |name   |value|
+------+-----------+-------+-----+
|Column|star_rating|Minimum|1.0  |
+------+-----------+-------+-----+



**MutualInformation**: Mutual information describes how much information about one column (one random variable) can be inferred from another column (another random variable). If the two columns are independent, mutual information is zero. If one column is a function of the other column, mutual information is the entropy of the column. Mutual information is symmetric and nonnegative.

In [17]:
from pydeequ.analyzers import MutualInformation
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(MutualInformation(["total_votes", "star_rating"]))\
                                          .run())\
.show(truncate=False)

+-----------+-----------------------+-----------------+--------------------+
|entity     |instance               |name             |value               |
+-----------+-----------------------+-----------------+--------------------+
|Mutlicolumn|total_votes,star_rating|MutualInformation|0.023555225905757905|
+-----------+-----------------------+-----------------+--------------------+



**PatternMatch**: Fraction of rows that comply with a given regular experssion.

In [18]:
from pydeequ.analyzers import PatternMatch
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(PatternMatch("marketplace", pattern_regex=r"\w{2}"))\
                                          .run())\
.show(truncate=False)

+------+-----------+------------+-----+
|entity|instance   |name        |value|
+------+-----------+------------+-----+
|Column|marketplace|PatternMatch|0.0  |
+------+-----------+------------+-----+



**Size**: Number of rows in a DataFrame.

In [19]:
from pydeequ.analyzers import Size
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Size())\
                                          .run())\
.show(truncate=False)

+-------+--------+----+---------+
|entity |instance|name|value    |
+-------+--------+----+---------+
|Dataset|*       |Size|3120938.0|
+-------+--------+----+---------+



**Sum**: Sum of all values of a column.

In [20]:
from pydeequ.analyzers import Sum
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Sum("total_votes"))\
                                          .run())\
.show(truncate=False)

+------+-----------+----+---------+
|entity|instance   |name|value    |
+------+-----------+----+---------+
|Column|total_votes|Sum |7427283.0|
+------+-----------+----+---------+



**UniqueValueRatio**: Fraction of unique values over the number of all distinct values of a column. Unique values occur exactly once; distinct values occur at least once. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.

In [21]:
from pydeequ.analyzers import UniqueValueRatio
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(UniqueValueRatio(["star_rating"]))\
                                          .run())\
.show(truncate=False)

+------+-----------+----------------+-----+
|entity|instance   |name            |value|
+------+-----------+----------------+-----+
|Column|star_rating|UniqueValueRatio|0.0  |
+------+-----------+----------------+-----+



**Uniqueness**: Fraction of unique values over the number of all values of a column. Unique values occur exactly once. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.

In [22]:
from pydeequ.analyzers import Uniqueness
AnalyzerContext.successMetricsAsDataFrame(spark,AnalysisRunner(spark)\
                                          .onData(df)\
                                          .addAnalyzer(Uniqueness(["star_rating"]))\
                                          .run())\
.show(truncate=False)

+------+-----------+----------+-----+
|entity|instance   |name      |value|
+------+-----------+----------+-----+
|Column|star_rating|Uniqueness|0.0  |
+------+-----------+----------+-----+



## Data Check
For writing tests on data, we start with the VerificationSuite and add Checks on attributes of the data
> Every scala check are available um python

In [7]:
from pydeequ.checks import Check
from pydeequ.checks import CheckLevel
from pydeequ.verification import VerificationSuite
from pydeequ.verification import VerificationResult

In [24]:
check = Check(spark, CheckLevel.Error, "Review Check")

checkResult = VerificationSuite(spark).onData(df).addCheck(check\
                                                           .hasSize(lambda x: x >= 300000)
                                                           .hasMin("star_rating", lambda x: x == 1.0)
                                                           .hasMax("star_rating", lambda x: x == 5.0)
                                                           .isComplete("review_id")
                                                           .isUnique("review_id")
                                                           .isComplete("marketplace")
                                                           .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
                                                           .isNonNegative("year") 
                                                          ).run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.toPandas()

INFO:Callback Server Starting
INFO:Socket listening on ('127.0.0.1', 25334)


Python Callback server started!


INFO:Callback Connection ready to receive messages
INFO:Received command c on object id p0
INFO:Received command c on object id p1
INFO:Received command c on object id p2


Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Error,Error,SizeConstraint(Size(None)),Success,
1,Review Check,Error,Error,"MinimumConstraint(Minimum(star_rating,None))",Success,
2,Review Check,Error,Error,"MaximumConstraint(Maximum(star_rating,None))",Success,
3,Review Check,Error,Error,"CompletenessConstraint(Completeness(review_id,...",Success,
4,Review Check,Error,Error,UniquenessConstraint(Uniqueness(List(review_id...,Failure,Value: 0.9926566948782706 does not meet the co...
5,Review Check,Error,Error,CompletenessConstraint(Completeness(marketplac...,Success,
6,Review Check,Error,Error,ComplianceConstraint(Compliance(marketplace co...,Success,
7,Review Check,Error,Error,ComplianceConstraint(Compliance(year is non-ne...,Success,


## Constraint Suggestions

In [25]:
from pydeequ.suggestions import ConstraintSuggestionRunner

In [26]:
from pydeequ.suggestions import CategoricalRangeRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(CategoricalRangeRule()
).run()['constraint_suggestions'])

INFO:Received command  on object id 
INFO:Closing down callback connection


Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,ComplianceConstraint(Compliance('star_rating' ...,star_rating,Compliance: 1,"'star_rating' has value range '5', '4', '1', '...",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""star_rating"", [""5"", ""4"", ""1"", ..."
1,ComplianceConstraint(Compliance('year' has val...,year,Compliance: 1,"'year' has value range '2014', '2015', '2013',...",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""year"", [""2014"", ""2015"", ""2013""..."
2,ComplianceConstraint(Compliance('vine' has val...,vine,Compliance: 1,"'vine' has value range 'N', 'Y'",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""vine"", [""N"", ""Y""])"
3,ComplianceConstraint(Compliance('marketplace' ...,marketplace,Compliance: 1,"'marketplace' has value range 'US', 'UK', 'DE'...",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""marketplace"", [""US"", ""UK"", ""DE..."
4,ComplianceConstraint(Compliance('verified_purc...,verified_purchase,Compliance: 1,"'verified_purchase' has value range 'Y', 'N'",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""verified_purchase"", [""Y"", ""N""])"


In [27]:
from pydeequ.suggestions import CompleteIfCompleteRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(CompleteIfCompleteRule()
).run()['constraint_suggestions'])

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,"CompletenessConstraint(Completeness(review_id,...",review_id,Completeness: 1.0,'review_id' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""review_id"")"
1,CompletenessConstraint(Completeness(customer_i...,customer_id,Completeness: 1.0,'customer_id' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""customer_id"")"
2,CompletenessConstraint(Completeness(review_dat...,review_date,Completeness: 1.0,'review_date' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""review_date"")"
3,CompletenessConstraint(Completeness(helpful_vo...,helpful_votes,Completeness: 1.0,'helpful_votes' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""helpful_votes"")"
4,CompletenessConstraint(Completeness(star_ratin...,star_rating,Completeness: 1.0,'star_rating' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""star_rating"")"
5,"CompletenessConstraint(Completeness(year,None))",year,Completeness: 1.0,'year' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""year"")"
6,CompletenessConstraint(Completeness(product_ti...,product_title,Completeness: 1.0,'product_title' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""product_title"")"
7,CompletenessConstraint(Completeness(product_id...,product_id,Completeness: 1.0,'product_id' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""product_id"")"
8,CompletenessConstraint(Completeness(total_vote...,total_votes,Completeness: 1.0,'total_votes' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""total_votes"")"
9,CompletenessConstraint(Completeness(product_pa...,product_parent,Completeness: 1.0,'product_parent' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""product_parent"")"


In [28]:
from pydeequ.suggestions import FractionalCategoricalRangeRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(FractionalCategoricalRangeRule()
).run()['constraint_suggestions'])

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,ComplianceConstraint(Compliance('marketplace' ...,marketplace,Compliance: 0.9949982985884372,'marketplace' has value range 'US' for at leas...,FractionalCategoricalRangeRule(0.9),If we see a categorical range for most values ...,".isContainedIn(""marketplace"", [""US""], lambda x..."
1,ComplianceConstraint(Compliance('vine' has val...,vine,Compliance: 0.9939271462617969,'vine' has value range 'N' for at least 99.0% ...,FractionalCategoricalRangeRule(0.9),If we see a categorical range for most values ...,".isContainedIn(""vine"", [""N""], lambda x: x >= 0..."
2,ComplianceConstraint(Compliance('year' has val...,year,Compliance: 0.9286063997426415,"'year' has value range '2014', '2015', '2013',...",FractionalCategoricalRangeRule(0.9),If we see a categorical range for most values ...,".isContainedIn(""year"", [""2014"", ""2015"", ""2013""..."
3,ComplianceConstraint(Compliance('star_rating' ...,star_rating,Compliance: 0.942110993553861,"'star_rating' has value range '5', '4', '1', '...",FractionalCategoricalRangeRule(0.9),If we see a categorical range for most values ...,".isContainedIn(""star_rating"", [""5"", ""4"", ""1"", ..."


In [29]:
from pydeequ.suggestions import NonNegativeNumbersRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(NonNegativeNumbersRule()
).run()['constraint_suggestions'])

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,ComplianceConstraint(Compliance('customer_id' ...,customer_id,Minimum: 10005.0,'customer_id' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""customer_id"")"
1,ComplianceConstraint(Compliance('helpful_votes...,helpful_votes,Minimum: 0.0,'helpful_votes' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""helpful_votes"")"
2,ComplianceConstraint(Compliance('star_rating' ...,star_rating,Minimum: 1.0,'star_rating' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""star_rating"")"
3,ComplianceConstraint(Compliance('year' has no ...,year,Minimum: 1999.0,'year' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""year"")"
4,ComplianceConstraint(Compliance('total_votes' ...,total_votes,Minimum: 0.0,'total_votes' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""total_votes"")"
5,ComplianceConstraint(Compliance('product_paren...,product_parent,Minimum: 6478.0,'product_parent' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""product_parent"")"


In [30]:
from pydeequ.suggestions import RetainCompletenessRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(RetainCompletenessRule()
).run()['constraint_suggestions'])

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,CompletenessConstraint(Completeness(review_hea...,review_headline,Completeness: 0.9999987183340393,'review_headline' has less than 1% missing values,RetainCompletenessRule(),"If a column is incomplete in the sample, we mo...",".hasCompleteness(""review_headline"", lambda x: ..."
1,CompletenessConstraint(Completeness(review_bod...,review_body,Completeness: 0.9999724441818453,'review_body' has less than 1% missing values,RetainCompletenessRule(),"If a column is incomplete in the sample, we mo...",".hasCompleteness(""review_body"", lambda x: x >=..."


In [31]:
from pydeequ.suggestions import RetainTypeRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(RetainTypeRule()
).run()['constraint_suggestions'])

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,"AnalysisBasedConstraint(DataType(customer_id,N...",customer_id,DataType: Integral,'customer_id' has type Integral,RetainTypeRule(),"If we detect a non-string type, we suggest a t...",".hasDataType(""customer_id"", ConstrainableDataT..."
1,AnalysisBasedConstraint(DataType(product_paren...,product_parent,DataType: Integral,'product_parent' has type Integral,RetainTypeRule(),"If we detect a non-string type, we suggest a t...",".hasDataType(""product_parent"", ConstrainableDa..."


In [32]:
from pydeequ.suggestions import UniqueIfApproximatelyUniqueRule
pd.DataFrame(ConstraintSuggestionRunner(spark).onData(df)\
             .addConstraintRule(UniqueIfApproximatelyUniqueRule()
).run()['constraint_suggestions'])

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,UniquenessConstraint(Uniqueness(List(review_id...,review_id,ApproxDistinctness: 0.9647650802419017,'review_id' is unique,UniqueIfApproximatelyUniqueRule(),If the ratio of approximate num distinct value...,".isUnique(""review_id"")"


## Anomaly detection

In [33]:
from pydeequ.repository import InMemoryMetricsRepository
from pydeequ.repository import ResultKey
from pydeequ.verification import VerificationSuite
from pydeequ.verification import RelativeRateOfChangeStrategy
from pyspark.sql import Row

metricsRepository = InMemoryMetricsRepository(spark)
tags = {'tag': 'pydeequ anomaly check'}

In [38]:
yesterdaysDF  = spark.sparkContext.parallelize(
    [
        Row(key=1, value="Thingy A", desc="awesome thing.", prop="high", var=0),
        Row(key=2, value="Thingy B", desc="available at http://thingb.com", prop=None, var=0)
    ]).toDF()

yesterdaysDF.show()
pd.DataFrame(
    VerificationSuite(spark) \
        .onData(yesterdaysDF) \
        .useRepository(metricsRepository)\
        .saveOrAppendResult(
            ResultKey(spark, ResultKey.current_milli_time() - 24 * 60 * 1000, tags)
        )\
        .addAnomalyCheck(
            RelativeRateOfChangeStrategy(maxRateIncrease=2.0),Size()) \
        .run()\
    .checkResults)

+---+--------+--------------------+----+---+
|key|   value|                desc|prop|var|
+---+--------+--------------------+----+---+
|  1|Thingy A|      awesome thing.|high|  0|
|  2|Thingy B|available at http...|null|  0|
+---+--------+--------------------+----+---+



Unnamed: 0,check_status,check_level,constraint_status,check,constraint_message,constraint
0,Success,Warning,Success,Anomaly check for Size(None),,AnomalyConstraint(Size(None))


In [39]:
todaysDF  = spark.sparkContext.parallelize(
    [
        Row(key=1, value="Thingy A", desc="awesome thing."                  , prop="high", var=0),
        Row(key=2, value="Thingy B", desc="available at http://thingb.com"  , prop=None  , var=0),
        Row(key=3, value=None      , desc=None                              , prop="low" , var=5),
        Row(key=4, value="Thingy D", desc="checkout https://thingd.ca"      , prop="low" , var=10),
        Row(key=5, value="Thingy E", desc=None                              , prop="high", var=12),
    ]).toDF()

todaysDF.show()
pd.DataFrame(
    VerificationSuite(spark) \
        .onData(todaysDF) \
        .useRepository(metricsRepository)\
        .saveOrAppendResult(
            ResultKey(spark, ResultKey.current_milli_time(), tags)
        )\
        .addAnomalyCheck(
            RelativeRateOfChangeStrategy(maxRateIncrease=2.0),Size()) \
        .run()\
    .checkResults)

+---+--------+--------------------+----+---+
|key|   value|                desc|prop|var|
+---+--------+--------------------+----+---+
|  1|Thingy A|      awesome thing.|high|  0|
|  2|Thingy B|available at http...|null|  0|
|  3|    null|                null| low|  5|
|  4|Thingy D|checkout https://...| low| 10|
|  5|Thingy E|                null|high| 12|
+---+--------+--------------------+----+---+



Unnamed: 0,check_status,check_level,constraint_status,check,constraint_message,constraint
0,Warning,Warning,Failure,Anomaly check for Size(None),Value: 5.0 does not meet the constraint requir...,AnomalyConstraint(Size(None))


## Profile

In [36]:
from pydeequ.profiles import ColumnProfilerRunner
result = ColumnProfilerRunner(spark).onData(df).run()

In [37]:
rows = list()
for col, profile in result.profiles.items():
    if profile.histogram is None:
        _histogram = [Row(value=str(), count=int(), ratio=float())]
    else:
        _histogram = profile.histogram
    rows.append(        
        Row(column=profile.column,
            distincts=profile.approximateNumDistinctValues,
            completeness=profile.completeness,
            dataType=profile.dataType,
            isInferred=profile.isDataTypeInferred,
            typeCounts=profile.typeCounts,
            histogram=_histogram,
        )
    )
spark.sparkContext.parallelize(rows).toDF() \
    .selectExpr(["*", "explode_outer(histogram)"]) \
    .select(["column","distincts","completeness","dataType","isInferred","typeCounts","col.value","col.count","col.ratio"]) \
    .orderBy(["column","col.value"]).show(100, truncate=False)


+-----------------+---------+------------------+--------+----------+----------------------------------------------------------------------------------+-----+-------+---------------------+
|column           |distincts|completeness      |dataType|isInferred|typeCounts                                                                        |value|count  |ratio                |
+-----------------+---------+------------------+--------+----------+----------------------------------------------------------------------------------+-----+-------+---------------------+
|customer_id      |2170036  |1.0               |Integral|false     |[Integral -> 3120938, Fractional -> 0, String -> 0, Unknown -> 0, Boolean -> 0]   |     |0      |0.0                  |
|helpful_votes    |872      |1.0               |Integral|false     |[]                                                                                |     |0      |0.0                  |
|marketplace      |5        |1.0               |String  |fal