<h1>Validação Ingestão Raw</h1>

In [None]:
# Import do projeto
import sys
import os

PROJECT_ROOT = "/repositorio/projeto"

if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

In [None]:
# Imports das configurações
from pyspark.sql import functions as F
from pyspark.sql.types import *
from src.utils.spark_session import criar_spark_session
from src.config.settings import settings

# Cria uma sessão do Spark
spark = criar_spark_session()

# Reduzir ruídos
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Leitura dos dados raw clientes
raw_clientes_path = (
    f"s3a://{settings.s3_bucket}/"
    f"{settings.s3_raw_path}/clientes/"
)

df_clientes = spark.read.parquet(raw_clientes_path)
df_clientes.show(5)
print(f'Quantidade de registros: {df_clientes.count()}')

In [None]:
# Leitura dos dados raw enderecos
raw_enderecos_path = (
    f"s3a://{settings.s3_bucket}/"
    f"{settings.s3_raw_path}/enderecos/"
)

df_enderecos = spark.read.parquet(raw_enderecos_path)
df_enderecos.show(5)
print(f'Quantidade de registros: {df_enderecos.count()}')

<h3>Validação tabela Clientes

In [None]:
# Verificando a quantidade de id_cliente nulos
df_clientes.filter(
    F.col("id_cliente").isNull()
).count()

In [None]:
# Verificando cpf invalido
df_clientes.filter(
    F.col("cpf") == "12345678910"
).count()

In [None]:
# Conferir data por partição
df_clientes.select("data_processamento").distinct().show()

<h3>Validação tabela Endereços

In [None]:
# Verificando a quantidade de id_endereco nulos
df_enderecos.filter(
    F.col("id_endereco").isNull()
).count()

In [None]:
# Verificando a quantidade de id_cliente em enderecos nulos
df_enderecos.filter(
    F.col("id_cliente").isNull()
).count()

In [None]:
# Verificando o se existe CEP com formato inválido
df_enderecos.filter(
    ~F.col("cep").rlike("^[0-9]{5}-[0-9]{3}$")
).select("id_endereco", "cep").show()

In [None]:
# verificando a data do evento nula
df_enderecos.filter(
    F.to_timestamp("data_evento", "yyyy-MM-dd HH:mm:ss").isNull()
).select("id_endereco", "data_evento").show()

In [None]:
# Validando integridade referencial
enderecos_sem_cliente = (
    df_enderecos
    .join(
        df_clientes
            .select("id_cliente").distinct(),
        on="id_cliente",
        how="left_anti"
    )
)

enderecos_sem_cliente.show()


In [None]:
# Conferir data por partição
df_enderecos.select("data_processamento").distinct().show()