In [3]:
!pip install pydeequ

Collecting pydeequ
  Downloading pydeequ-1.0.1-py3-none-any.whl (36 kB)
Installing collected packages: pydeequ
Successfully installed pydeequ-1.0.1


In [11]:
!wget https://s3-sa-east-1.amazonaws.com/ckan.saude.gov.br/dados-go-1.csv

--2021-09-06 23:34:18--  https://s3-sa-east-1.amazonaws.com/ckan.saude.gov.br/dados-go-1.csv
Resolving s3-sa-east-1.amazonaws.com (s3-sa-east-1.amazonaws.com)... 52.95.165.74
Connecting to s3-sa-east-1.amazonaws.com (s3-sa-east-1.amazonaws.com)|52.95.165.74|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 310178081 (296M) [application/octet-stream]
Saving to: ‘dados-go-1.csv.1’


2021-09-06 23:38:03 (1.32 MB/s) - ‘dados-go-1.csv.1’ saved [310178081/310178081]



In [12]:
!wget https://s3-sa-east-1.amazonaws.com/ckan.saude.gov.br/dados-go-2.csv

--2021-09-06 23:38:04--  https://s3-sa-east-1.amazonaws.com/ckan.saude.gov.br/dados-go-2.csv
Resolving s3-sa-east-1.amazonaws.com (s3-sa-east-1.amazonaws.com)... 52.95.164.30
Connecting to s3-sa-east-1.amazonaws.com (s3-sa-east-1.amazonaws.com)|52.95.164.30|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 316133278 (301M) [application/octet-stream]
Saving to: ‘dados-go-2.csv.1’


2021-09-06 23:40:31 (2.06 MB/s) - ‘dados-go-2.csv.1’ saved [316133278/316133278]



In [13]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col,when
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [2]:
#importação do primeiro dataset de Goias
df1 = (spark
       .read
       .format("csv")
       .option("header", "true")
       .option("encoding", "ISO-8859-1")
       .load("dados-go-1.csv", sep = ';'))

df2 = (spark
       .read
       .format("csv")
       .option("header", "true")
       .option("encoding", "ISO-8859-1")
       .load("dados-go-2.csv", sep = ';'))

# juntando os dois datasets
df = df1.union(df2)   

In [3]:
# realizando algumas limpezas prévia nos dados
df = df.withColumnRenamed('ÿid', 'id')
df = df.withColumn("resultadoTeste", when(
       col("resultadoTeste") == "null", None).otherwise(col("resultadoTeste")))
df = df.withColumn("sintomas", when(
       col("sintomas") == "null", None).otherwise(col("sintomas")))
df = df.withColumn("dataTeste", when(
       col("dataTeste") == "null", None).otherwise(col("dataTeste")))
df = df.withColumn("tipoTeste", when(
       col("tipoTeste") == "null", None).otherwise(col("tipoTeste")))
df = df.withColumn("idade", when(col("idade") == "null", 0).otherwise(col("idade")))
df = df.withColumn("dataNascimento", when(
       col("dataNascimento") == "undefined", None).otherwise(col("dataNascimento")))
df = df.withColumn("sexo", when(col("sexo") == "Feminino", 0).when(
  col("sexo") == "Masculino", 1).when(col("sexo") == "Indefinido", 2).otherwise(col("sexo")))
df = df.withColumn("idade", col("idade").cast('int'))
df = df.withColumn("sexo", col("sexo").cast('int')) 

In [4]:
from pydeequ.analyzers import Completeness, Size, ApproxCountDistinct, Correlation, AnalysisRunner, Mean, AnalyzerContext

analysisResult = (AnalysisRunner(spark)
                    .onData(df)
                    .addAnalyzer(Size())
                    .addAnalyzer(Completeness("dataNascimento"))
                    .addAnalyzer(Completeness("idade"))
                    .addAnalyzer(Completeness("sintomas"))
                    .addAnalyzer(Completeness("dataTeste"))
                    .addAnalyzer(Completeness("tipoTeste"))
                    .addAnalyzer(Completeness("resultadoTeste"))
                    .addAnalyzer(Mean("idade"))
                    .run())

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------------+------------+------------------+
| entity|      instance|        name|             value|
+-------+--------------+------------+------------------+
| Column|         idade|Completeness|0.9999990438033677|
| Column|         idade|        Mean| 37.38176282667294|
| Column|      sintomas|Completeness|0.9998871687973915|
| Column|resultadoTeste|Completeness|0.7253707652441648|
| Column|     dataTeste|Completeness|0.7616000994444497|
|Dataset|             *|        Size|         1045810.0|
| Column|dataNascimento|Completeness|               0.0|
| Column|     tipoTeste|Completeness|0.8788718792132414|
+-------+--------------+------------+------------------+



In [5]:

from pydeequ.checks import *
from pydeequ.verification import *


check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = (VerificationSuite(spark)
    .onData(df)
    .addCheck(
        check.isUnique("id")
        .hasMin("idade", _ == 0)
        .hasMax("idade", _ == 110)
        .isContainedIn("resultadoTeste", ["Negativo", "Positivo", "Inconclusivo ou Indeterminado"])
    )
    .run())

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

Python Callback server started!
+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



In [14]:
from pydeequ.repository import *
from pydeequ.analyzers import *

repository = FileSystemMetricsRepository(spark, 's3://deequ-testes/reports/metrics.json')
key_tags = {'tag': 'pydeequ medium'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

analysisResult = (AnalysisRunner(spark)
                    .onData(df)
                    .addAnalyzer(Size())
                    .addAnalyzer(Completeness("dataNascimento"))
                    .addAnalyzer(Completeness("idade"))
                    .addAnalyzer(Completeness("sintomas"))
                    .addAnalyzer(Completeness("dataTeste"))
                    .addAnalyzer(Completeness("tipoTeste"))
                    .addAnalyzer(Completeness("resultadoTeste"))
                    .addAnalyzer(Mean("idade"))
                    .useRepository(repository)
                    .saveOrAppendResult(resultKey)
                    .run())