<a href="https://colab.research.google.com/github/JonatanPolanco/Data_Quality_Testing/blob/main/PyDeequ_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Instalación de librerias**

In [1]:
!pip install pydeequ==1.0.1

Collecting pydeequ==1.0.1
  Downloading pydeequ-1.0.1-py3-none-any.whl (36 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.0.1


In [2]:
 !pip install pyspark==3.0.3

Collecting pyspark==3.0.3
  Downloading pyspark-3.0.3.tar.gz (209.1 MB)
[K     |████████████████████████████████| 209.1 MB 55 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.3-py2.py3-none-any.whl size=209435971 sha256=7af28fe1d02cad4d9f20f326be197df887829870fff78e66d050c80ab21b78f4
  Stored in directory: /root/.cache/pip/wheels/7e/6d/0a/6b0bf301bc056d9af03194b732b9f49ad2fceb205aab2984fd
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.3


**Configuración de sesión de PySpark**

In [None]:
from pyspark.sql import SparkSession, Row
import pydeequ
import pandas as pd

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Please set env variable SPARK_VERSION


**Cargando data**

In [None]:
df = spark.sparkContext.parallelize([
            Row(a="https://www.vldb.org/pvldb/vol11/p1781-schelter.pdf", b=1, c="jobici8705@gmail"),
            Row(a="https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/", b=2, c="jonatan@outlook.es"),
            Row(a="https://pydeequ.readthedocs.io/_/downloads/en/latest/pdf/", b=3, c='jobici8705@')]).toDF()

**Visualizar data**

In [None]:
df__ = df.toPandas()
df__.head()

Unnamed: 0,a,b,c
0,https://www.vldb.org/pvldb/vol11/p1781-schelte...,1,jobici8705@gmail
1,https://aws.amazon.com/blogs/big-data/test-dat...,2,jonatan@outlook.es
2,https://pydeequ.readthedocs.io/_/downloads/en/...,3,jobici8705@


**Analizadores AWS Deequ**

In [None]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("b")) \
                    .addAnalyzer(Completeness("c")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()



+-------+--------+------------+------------------+
| entity|instance|        name|             value|
+-------+--------+------------+------------------+
|Dataset|       *|        Size|               3.0|
| Column|       b|Completeness|               1.0|
| Column|       c|Completeness|0.6666666666666666|
+-------+--------+------------+------------------+



**Perfilamiento**

In [None]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: a: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 3
    },
    "histogram": [
        [
            "https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/",
            1,
            0.3333333333333333
        ],
        [
            "https://www.vldb.org/pvldb/vol11/p1781-schelter.pdf",
            1,
            0.3333333333333333
        ],
        [
            "baz",
            1,
            0.3333333333333333
        ]
    ]
}
NumericProfiles for column: b: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "1",
            1,
            0.3333333333333333
        ],
        [
      

**Sugerencias de restricciones**

In [None]:
from pydeequ.suggestions import *
import json

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format 
#print(json.dumps(suggestionResult["constraint_suggestions"], indent=2))   # column_name, description, rule_description, code_for_constraint

for key, value in suggestionResult.items():
  suggestion = pd.DataFrame(value)
suggestion

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,"CompletenessConstraint(Completeness(b,None))",b,Completeness: 1.0,'b' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""b"")"
1,ComplianceConstraint(Compliance('b' has no neg...,b,Minimum: 1.0,'b' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""b"")"
2,"UniquenessConstraint(Uniqueness(List(b),None))",b,ApproxDistinctness: 1.0,'b' is unique,UniqueIfApproximatelyUniqueRule(),If the ratio of approximate num distinct value...,".isUnique(""b"")"
3,"CompletenessConstraint(Completeness(a,None))",a,Completeness: 1.0,'a' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""a"")"
4,"UniquenessConstraint(Uniqueness(List(a),None))",a,ApproxDistinctness: 1.0,'a' is unique,UniqueIfApproximatelyUniqueRule(),If the ratio of approximate num distinct value...,".isUnique(""a"")"
5,"CompletenessConstraint(Completeness(c,None))",c,Completeness: 1.0,'c' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""c"")"
6,"UniquenessConstraint(Uniqueness(List(c),None))",c,ApproxDistinctness: 1.0,'c' is unique,UniqueIfApproximatelyUniqueRule(),If the ratio of approximate num distinct value...,".isUnique(""c"")"


**Verificación de restricciones**

In [None]:
from pyspark.sql.types import IntegerType
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .hasDataType("b",ConstrainableDataTypes.Integral) \
        .isComplete("c")  \
        .isComplete("b")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b") \
        .containsEmail("c") \
        .containsURL("a")) \
    .run()

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,SizeConstraint(Size(None)),Success,
1,Review Check,Warning,Warning,"MinimumConstraint(Minimum(b,None))",Failure,Value: 1.0 does not meet the constraint requir...
2,Review Check,Warning,Warning,"AnalysisBasedConstraint(DataType(b,None),<func...",Success,
3,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(c,None))",Success,
4,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(b,None))",Success,
5,Review Check,Warning,Warning,"UniquenessConstraint(Uniqueness(List(a),None))",Success,
6,Review Check,Warning,Warning,ComplianceConstraint(Compliance(a contained in...,Failure,Value: 0.0 does not meet the constraint requir...
7,Review Check,Warning,Warning,ComplianceConstraint(Compliance(b is non-negat...,Success,
8,Review Check,Warning,Warning,containsEmail(c),Failure,Value: 0.3333333333333333 does not meet the co...
9,Review Check,Warning,Warning,containsURL(a),Success,



**Cargando nueva data (100k de registros)**

In [243]:
from pyspark.sql import SparkSession, Row
import pydeequ
import pandas as pd
from pyspark import SparkContext 
from pyspark.sql.types import *

spark = (SparkSession
    .builder
    .appName("Example") \
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.driver.memory", "16g")\
    .config("spark.executor.memory", "4g") \
    .getOrCreate())

**Visualizar data**

In [244]:
DF2 = pd.read_csv("100000_Records.csv")
Headers = []
for col in DF2.columns:
    for i in range (1):
      Headers.append(col)

In [257]:
df2= spark.read.csv("100000_Records.csv")

In [203]:
from functools import reduce
from pyspark.sql.functions import *
from pyspark.sql.functions import monotonically_increasing_id as mi
id=mi()
old_columns = df2.schema.names
#df2 = reduce(lambda df2, idx: df2.withColumnRenamed(old_columns[idx], Headers[idx]), range(len(old_columns)), df2)
#indexed = df2.withColumn("index", mi())
#df2 = indexed.filter(col("index") >0 ).drop("index")

In [208]:
DF2.head()

Unnamed: 0,Emp ID,Name Prefix,First Name,Middle Initial,Last Name,Gender,E Mail,Father's Name,Mother's Name,Mother's Maiden Name,Date of Birth,Time of Birth,Age in Yrs.,Weight in Kgs.,Date of Joining,Quarter of Joining,Half of Joining,Year of Joining,Month of Joining,Month Name of Joining,Short Month,Day of Joining,DOW of Joining,Short DOW,Age in Company (Years),Salary,Last % Hike,SSN,Phone No.,Place Name,County,City,State,Zip,Region,User Name,Password
0,882966,Mrs.,Gwyn,E,Etzel,F,gwyn.etzel@aol.com,Sebastian Etzel,Joshua Etzel,Marden,5/20/1970,02:36:41 PM,47.22,60,3/1/2012,Q1,H1,2012,3,March,Mar,1,Thursday,Thu,5.41,120580,14%,640-29-9264,405-775-9884,Bromide,Johnston,Bromide,OK,74530,South,geetzel,c@sLdGgxI[sE|aJ
1,189028,Ms.,Ressie,I,Goodwyn,F,ressie.goodwyn@charter.net,Lucien Goodwyn,Sparkle Goodwyn,Lotts,5/2/1959,12:37:29 AM,58.28,43,12/6/1990,Q4,H2,1990,12,December,Dec,6,Thursday,Thu,26.66,74088,8%,375-37-8517,212-786-7533,Reading Center,Schuyler,Reading Center,NY,14876,Northeast,rigoodwyn,51CEPJ]r
2,479122,Mr.,Colton,B,Salzman,M,colton.salzman@hotmail.com,Dennis Salzman,Leesa Salzman,Slattery,9/30/1987,10:03:01 AM,29.85,64,9/29/2009,Q3,H2,2009,9,September,Sep,29,Tuesday,Tue,7.83,191052,29%,073-02-5563,385-775-0225,Ibapah,Tooele,Ibapah,UT,84034,West,cbsalzman,l!Hspr@#$Dk7sq
3,484002,Ms.,Marylynn,G,Ealey,F,marylynn.ealey@cox.net,Efren Ealey,Kayleen Ealey,Gandara,8/6/1971,09:47:31 PM,46.01,45,7/14/2011,Q3,H2,2011,7,July,Jul,14,Thursday,Thu,6.04,57691,15%,106-98-7919,216-984-8974,Wickliffe,Lake,Wickliffe,OH,44092,Midwest,mgealey,QeJJ:vj6
4,677207,Ms.,Bula,V,Reich,F,bula.reich@gmail.com,Dominic Reich,Daphne Reich,Lister,4/21/1971,12:48:20 AM,46.3,42,5/10/2008,Q2,H1,2008,5,May,May,10,Saturday,Sat,9.22,155184,4%,753-07-4655,217-307-8597,Forrest,Livingston,Forrest,IL,61741,Midwest,bvreich,6_<OFxu\]


**Sugerencias de restricciones**

In [258]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df2) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format 
#print(json.dumps(suggestionResult["constraint_suggestions"], indent=2))   # column_name, description, rule_description, code_for_constraint

for key, value in suggestionResult.items():
  suggestion = pd.DataFrame(value)

In [259]:
suggestion.head()

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,"CompletenessConstraint(Completeness(_c29,None))",_c29,Completeness: 1.0,'_c29' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""_c29"")"
1,"CompletenessConstraint(Completeness(_c11,None))",_c11,Completeness: 1.0,'_c11' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""_c11"")"
2,ComplianceConstraint(Compliance('_c18' has val...,_c18,Compliance: 1,"'_c18' has value range '7', '5', '6', '4', '3'...",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""_c18"", [""7"", ""5"", ""6"", ""4"", ""3..."
3,"CompletenessConstraint(Completeness(_c18,None))",_c18,Completeness: 1.0,'_c18' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""_c18"")"
4,ComplianceConstraint(Compliance('_c18' has val...,_c18,Compliance: 0.923280767192328,"'_c18' has value range '7', '5', '6', '4', '3'...",FractionalCategoricalRangeRule(0.9),If we see a categorical range for most values ...,".isContainedIn(""_c18"", [""7"", ""5"", ""6"", ""4"", ""3..."


**Verificacion de restricciones**

In [260]:
df2= spark.read.csv("100000_Records.csv", header = True)

In [261]:
from pyspark.sql.types import IntegerType
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df2) \
    .addCheck(
      check.hasDataType("Month of Joining",ConstrainableDataTypes.Integral) \
      .isComplete("Emp ID")  \
      .isUnique("Emp ID")  \
      .isContainedIn("Gender", ["M", "F"]) \
      .isNonNegative("Salary") \
      .containsEmail("E Mail") \
      .containsSocialSecurityNumber("SSN")) \
    .run()

print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Warning,Warning,AnalysisBasedConstraint(DataType(Month of Join...,Success,
1,Review Check,Warning,Warning,"CompletenessConstraint(Completeness(Emp ID,None))",Success,
2,Review Check,Warning,Warning,"UniquenessConstraint(Uniqueness(List(Emp ID),N...",Failure,Value: 0.89945 does not meet the constraint re...
3,Review Check,Warning,Warning,ComplianceConstraint(Compliance(Gender contain...,Success,
4,Review Check,Warning,Warning,ComplianceConstraint(Compliance(Salary is non-...,Success,
5,Review Check,Warning,Warning,containsEmail(E Mail),Success,
6,Review Check,Warning,Warning,containsSocialSecurityNumber(SSN),Success,


In [85]:
dataF = df2.toPandas()

In [86]:
ID = dataF['Emp ID']
print(f"Total de ID: {len(ID)}" , f"Distintos: {ID.nunique()}" )

Total de ID: 100000 Distintos: 94888
