In [49]:
import os
from dotenv import load_dotenv
load_dotenv()

os.environ['SPARK_VERSION'] = os.getenv('SPARK_VERSION')
os.environ['PYSPARK_PYTHON'] = os.getenv('PYSPARK_PYTHON')
os.environ['SPARK_HOME'] = os.getenv('SPARK_HOME')
os.environ['HADOOP_HOME'] = os.getenv('HADOOP_HOME')
os.environ['JAVA_HOME'] = os.getenv('JAVA_HOME')

In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pydeequ
from pydeequ.analyzers import *

In [51]:
spark = (
    SparkSession.builder
    .config('spark.ui.port', '4050')
    .config('spark.jars.packages', pydeequ.deequ_maven_coord)
    .config('spark.jars.excludes', pydeequ.f2j_maven_coord)
    .appName('SparkSQL')
    .getOrCreate()
)

In [52]:
ui_url = spark.sparkContext.uiWebUrl
print("URL da Spark UI:", ui_url)

URL da Spark UI: http://DESKTOP-E8QPPRO:4050


In [53]:
schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])


schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('transaction_date', TimestampType()),
    StructField('chave_pix', StringType()),
    StructField('categoria', StringType()),
    StructField('fraude', IntegerType())
])

PATH_JSON = './data/case_final.json'

df = spark.read.json(
    PATH_JSON,
    schema=schema_base_pix,
    timestampFormat='yyyy-MM-dd HH:mm:ss'
)

df = df.withColumn(
    'destinatario_nome', col('destinatario').getField('nome')
).withColumn(
    'destinatario_banco', col('destinatario').getField('banco')
).withColumn(
    'destinatario_tipo', col('destinatario').getField('tipo')
).withColumn(
    'remetente_nome', col('remetente').getField('nome')
).withColumn(
    'remetente_banco', col('remetente').getField('banco')
).withColumn(
    'remetente_tipo', col('remetente').getField('tipo')
).drop('remetente', 'destinatario')

In [54]:
df.show()

+------------+------------------+-------------------+---------+-------------+------+--------------------+------------------+-----------------+------------------+---------------+--------------+
|id_transacao|             valor|   transaction_date|chave_pix|    categoria|fraude|   destinatario_nome|destinatario_banco|destinatario_tipo|    remetente_nome|remetente_banco|remetente_tipo|
+------------+------------------+-------------------+---------+-------------+------+--------------------+------------------+-----------------+------------------+---------------+--------------+
|        1000|            588.08|2021-07-16 05:00:55|aleatoria|       outros|     0|         Calebe Melo|             Caixa|               PF|Jonathan Gonsalves|            BTG|            PF|
|        1001|           80682.5|2022-04-20 12:34:01|  celular|transferencia|     1|  Davi Lucas Pereira|             Caixa|               PJ|Jonathan Gonsalves|            BTG|            PF|
|        1002|             549.9|20

In [55]:
analysisResult = (
    AnalysisRunner(spark).onData(df)
    .addAnalyzer(Size())
    .addAnalyzer(Completeness('id_transacao'))
    .addAnalyzer(Compliance('valor', 'valor > 0'))
    .run()
)

In [56]:
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

In [57]:
analysisResult_df.show()

+-------+------------+------------+--------+
| entity|    instance|        name|   value|
+-------+------------+------------+--------+
|Dataset|           *|        Size|100000.0|
| Column|id_transacao|Completeness|     1.0|
| Column|       valor|  Compliance| 0.99972|
+-------+------------+------------+--------+



In [58]:
from pydeequ.suggestions import ConstraintSuggestionRunner, DEFAULT

suggestionResult = ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()

In [59]:
for sugg in suggestionResult['constraint_suggestions']:
    print(f'Sugestão de constraint: \"{sugg["column_name"]}\": {sugg["description"]}')
    print(f'PySpark Code: {sugg["code_for_constraint"]}\n')


Sugestão de constraint: "destinatario_nome": 'destinatario_nome' is not null
PySpark Code: .isComplete("destinatario_nome")

Sugestão de constraint: "remetente_nome": 'remetente_nome' has value range 'Jonathan Gonsalves'
PySpark Code: .isContainedIn("remetente_nome", ["Jonathan Gonsalves"])

Sugestão de constraint: "remetente_nome": 'remetente_nome' is not null
PySpark Code: .isComplete("remetente_nome")

Sugestão de constraint: "id_transacao": 'id_transacao' is not null
PySpark Code: .isComplete("id_transacao")

Sugestão de constraint: "id_transacao": 'id_transacao' has no negative values
PySpark Code: .isNonNegative("id_transacao")

Sugestão de constraint: "id_transacao": 'id_transacao' is unique
PySpark Code: .isUnique("id_transacao")

Sugestão de constraint: "remetente_banco": 'remetente_banco' has value range 'BTG'
PySpark Code: .isContainedIn("remetente_banco", ["BTG"])

Sugestão de constraint: "remetente_banco": 'remetente_banco' is not null
PySpark Code: .isComplete("remetente_

In [60]:
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
from pydeequ.verification import VerificationResult, VerificationSuite

check = Check(spark, CheckLevel.Warning, 'Review CHeck')
error = Check(spark, CheckLevel.Error, 'Error')

In [61]:
checkResult = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(
        check.hasDataType('id_transacao', ConstrainableDataTypes.Integral)
        .isNonNegative('id_transacao')
        .isComplete('id_transacao')
    ).run()
)

In [62]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|check       |check_level|check_status|constraint                                                                                                                  |constraint_status|constraint_message|
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+



In [63]:
checkResult = (
    VerificationSuite(spark)
    .onData(df)
    .addCheck(
        error.isContainedIn('remetente_tipo', ['CNPJ'])
    ).run()
)

In [64]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

+-----+-----------+------------+--------------------------------------------------------------------------------------------------------------------------------+-----------------+----------------------------------------------------+
|check|check_level|check_status|constraint                                                                                                                      |constraint_status|constraint_message                                  |
+-----+-----------+------------+--------------------------------------------------------------------------------------------------------------------------------+-----------------+----------------------------------------------------+
|Error|Error      |Error       |ComplianceConstraint(Compliance(remetente_tipo contained in CNPJ,`remetente_tipo` IS NULL OR `remetente_tipo` IN ('CNPJ'),None))|Failure          |Value: 0.0 does not meet the constraint requirement!|
+-----+-----------+------------+------------------------------------