In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("test_session").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/13 22:45:05 WARN Utils: Your hostname, tatiane-Inspiron-3583, resolves to a loopback address: 127.0.1.1; using 192.168.1.52 instead (on interface wlo1)
25/10/13 22:45:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/13 22:45:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import sys, os, time, getpass

sys.path.append("/home/tatiane/lib/")

import pessoal
from pessoal import *
#print('AppID: ', sc.applicationId)

# Pedir ao spark para exibir só erros, escondendo avisos (WARN) e infos
# O correto é resolver a causa como, por exemplo: ordenados = Window.partitionBy().orderBy(dfs[i].columns[0])
spark.sparkContext.setLogLevel("ERROR")

Tempo inicial da execucao: 2025-10-13 22:45:18.326977
User: tatiane
Node: tatiane-Inspiron-3583


## Leitura dos dataFrames

In [3]:
padronizado_path = '/home/tatiane/Documentos/tratamento_dados/tratamento_dados.csv'
original_path = '/home/tatiane/Downloads/base_dados_suja.csv'

In [4]:
padronizado = spark.read.csv(padronizado_path, header=True, inferSchema=True)
original = spark.read.csv(original_path, header=True, inferSchema=True)

                                                                                

In [5]:
print('Base original ')
pessoal.completudeSchema(original)
print('Base tratada ')
pessoal.completudeSchema(padronizado)

Base original 
Qtd. registros: 100 | Quantidade de colunas:  5
root
 |-- nome: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- municipio_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- id_unico: integer (nullable = true)

Base tratada 
Qtd. registros: 100 | Quantidade de colunas:  5
root
 |-- municipio_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- id_unico: integer (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- nome: string (nullable = true)



## Validação de variáveis

In [6]:
def validar_colunas(df):
    """
    Retorna um DataFrame com métricas de validação para cada coluna:
    - nulos
    - preenchidos
    - únicos
    - duplicados
    """
    total = df.count()
    resultado = []

    for col in df.columns:
        nulos = df.filter(F.col(col).isNull()).count()
        preenchidos = total - nulos
        unicos = df.select(col).distinct().count()
        duplicados = total - unicos
        
        resultado.append((col, nulos, preenchidos, unicos, duplicados))
    
    # Criar DataFrame PySpark com resultados
    schema = ["coluna", "nulos", "preenchidos", "unicos", "duplicados"]
    return spark.createDataFrame(resultado, schema)

In [7]:
print('Base original ')
original_completude = validar_colunas(original)
original_completude.show(truncate=False)

Base original 


                                                                                

+--------------------+-----+-----------+------+----------+
|coluna              |nulos|preenchidos|unicos|duplicados|
+--------------------+-----+-----------+------+----------+
|nome                |0    |100        |80    |20        |
|data_nascimento     |0    |100        |100   |0         |
|municipio_nascimento|0    |100        |10    |90        |
|sexo                |0    |100        |2     |98        |
|id_unico            |0    |100        |100   |0         |
+--------------------+-----+-----------+------+----------+



In [8]:
print('Base tratada ')
padronizado_completude = validar_colunas(padronizado)
padronizado_completude.show(truncate=False)

Base tratada 
+--------------------+-----+-----------+------+----------+
|coluna              |nulos|preenchidos|unicos|duplicados|
+--------------------+-----+-----------+------+----------+
|municipio_nascimento|0    |100        |10    |90        |
|sexo                |0    |100        |2     |98        |
|id_unico            |0    |100        |100   |0         |
|data_nascimento     |0    |100        |100   |0         |
|nome                |0    |100        |20    |80        |
+--------------------+-----+-----------+------+----------+



## Validação das datas
1. Dividir variaveis por ano, mes e dia
2. Contabilizar distribuição de cada um, antes e após padronização
3. Máximo e minimo de cada uma das colunas (ano, mes e dia)

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import DataFrame
import re

# Dicionários de meses por extenso
meses_pt = {
    "janeiro": "01",
    "fevereiro": "02",
    "março": "03",
    "abril": "04",
    "maio": "05",
    "junho": "06",
    "julho": "07",
    "agosto": "08",
    "setembro": "09",
    "outubro": "10",
    "novembro": "11",
    "dezembro": "12"
}

meses_en = {
    "january": "01",
    "february": "02",
    "march": "03",
    "april": "04",
    "may": "05",
    "june": "06",
    "july": "07",
    "august": "08",
    "september": "09",
    "october": "10",
    "november": "11",
    "december": "12"
}

In [10]:
# Função Python para extrair dia, mês e ano originais
def extrai_data_original(data_str):
    if not data_str:
        return None, None, None
    s = str(data_str).strip()
    
    # DD/MM/YYYY
    m1 = re.match(r"(\d{1,2})/(\d{1,2})/(\d{4})$", s)
    if m1:
        return m1.group(1), m1.group(2), m1.group(3)
    
    # YYYY/MM/DD
    m2 = re.match(r"(\d{4})/(\d{1,2})/(\d{1,2})$", s)
    if m2:
        return m2.group(3), m2.group(2), m2.group(1)
    
    # DD/mês/AAAA (pt ou en)
    m3 = re.match(r"(\d{1,2})/([a-zA-Z]+)/(\d{4})$", s)
    if m3:
        dia = m3.group(1)
        mes_str = m3.group(2).lower()
        ano = m3.group(3)
        if mes_str in meses_pt:
            mes = meses_pt[mes_str]
        elif mes_str in meses_en:
            mes = meses_en[mes_str]
        else:
            mes = mes_str  # se não reconhecido, mantém como veio
        return dia, mes, ano
    
    return None, None, None

In [11]:
# Definir schema do struct
schema = StructType([
    StructField("dia", StringType(), True),
    StructField("mes", StringType(), True),
    StructField("ano", StringType(), True)
])

# Registrar UDF
extrai_udf = F.udf(extrai_data_original, schema)

# Aplicar ao DataFrame
def separar_data_original(df: DataFrame, coluna: str) -> DataFrame:
    df = df.withColumn("data_sep", extrai_udf(F.col(coluna)))
    df = df.withColumn("dia", F.col("data_sep.dia")) \
           .withColumn("mes", F.col("data_sep.mes")) \
           .withColumn("ano", F.col("data_sep.ano")) \
           .drop("data_sep")
    return df


In [12]:
original = separar_data_original(original, "data_nascimento")
original.select("data_nascimento", "dia", "mes", "ano").show(truncate=False)

[Stage 112:>                                                        (0 + 1) / 1]

+-----------------+---+---+----+
|data_nascimento  |dia|mes|ano |
+-----------------+---+---+----+
|15/fevereiro/1997|15 |02 |1997|
|2001/07/16       |16 |07 |2001|
|12/fevereiro/1973|12 |02 |1973|
|1990/06/15       |15 |06 |1990|
|08/junho/2010    |08 |06 |2010|
|08/junho/1976    |08 |06 |1976|
|20/11/2003       |20 |11 |2003|
|13/outubro/2008  |13 |10 |2008|
|07/01/1971       |07 |01 |1971|
|20/10/1997       |20 |10 |1997|
|1985/11/09       |09 |11 |1985|
|06/fevereiro/2002|06 |02 |2002|
|16/janeiro/1978  |16 |01 |1978|
|04/08/1997       |04 |08 |1997|
|15/fevereiro/2010|15 |02 |2010|
|24/dezembro/1976 |24 |12 |1976|
|10/setembro/1974 |10 |09 |1974|
|1994/12/27       |27 |12 |1994|
|2009/08/26       |26 |08 |2009|
|26/05/1978       |26 |05 |1978|
+-----------------+---+---+----+
only showing top 20 rows


                                                                                

In [13]:
padronizado = separar_data_original(padronizado, "data_nascimento")
padronizado.select("data_nascimento", "dia", "mes", "ano").show(truncate=False)

+---------------+---+---+----+
|data_nascimento|dia|mes|ano |
+---------------+---+---+----+
|1997/02/15     |15 |02 |1997|
|2001/07/16     |16 |07 |2001|
|1973/02/12     |12 |02 |1973|
|1990/06/15     |15 |06 |1990|
|2010/06/08     |08 |06 |2010|
|1976/06/08     |08 |06 |1976|
|2003/11/20     |20 |11 |2003|
|2008/10/13     |13 |10 |2008|
|1971/01/07     |07 |01 |1971|
|1997/10/20     |20 |10 |1997|
|1985/11/09     |09 |11 |1985|
|2002/02/06     |06 |02 |2002|
|1978/01/16     |16 |01 |1978|
|1997/08/04     |04 |08 |1997|
|2010/02/15     |15 |02 |2010|
|1976/12/24     |24 |12 |1976|
|1974/09/10     |10 |09 |1974|
|1994/12/27     |27 |12 |1994|
|2009/08/26     |26 |08 |2009|
|1978/05/26     |26 |05 |1978|
+---------------+---+---+----+
only showing top 20 rows


In [14]:
print('Base original ')
pessoal.completudeSchema(original)
print('Base tratada ')
pessoal.completudeSchema(padronizado)

Base original 
Qtd. registros: 100 | Quantidade de colunas:  8
root
 |-- nome: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- municipio_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- id_unico: integer (nullable = true)
 |-- dia: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- ano: string (nullable = true)

Base tratada 
Qtd. registros: 100 | Quantidade de colunas:  8
root
 |-- municipio_nascimento: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- id_unico: integer (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- dia: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- ano: string (nullable = true)



2. Contabilizar distribuição de cada um, antes e após padronização

In [15]:
lista = ['ano', 'mes', 'dia']

for col in lista:
    dist_orig = original.groupBy(col).count().withColumnRenamed("count", "qtd_original")
    dist_novo = padronizado.groupBy(col).count().withColumnRenamed("count", "qtd_padronizada")

    comparacao = (
        dist_orig.join(dist_novo, col, "outer")
        .select(col, "qtd_original", "qtd_padronizada")
        .orderBy(F.asc(col))
    )

    comparacao.show()

                                                                                

+----+------------+---------------+
| ano|qtd_original|qtd_padronizada|
+----+------------+---------------+
|NULL|           2|           NULL|
|1970|           1|              1|
|1971|           1|              1|
|1973|           4|              5|
|1974|           4|              4|
|1975|           1|              1|
|1976|           5|              5|
|1977|           1|              1|
|1978|           3|              3|
|1979|           2|              2|
|1980|           2|              2|
|1981|           2|              2|
|1982|           4|              4|
|1984|           3|              3|
|1985|           8|              8|
|1986|           2|              2|
|1987|           2|              2|
|1988|           1|              1|
|1989|           2|              2|
|1990|           3|              3|
+----+------------+---------------+
only showing top 20 rows
+----+------------+---------------+
| mes|qtd_original|qtd_padronizada|
+----+------------+---------------+
|NU

- Analisando os nulos

In [16]:
print('Base original ')
original.filter(F.col('dia').isNull()).show()

Base original 
+----------------+---------------+--------------------+----+--------+----+----+----+
|            nome|data_nascimento|municipio_nascimento|sexo|id_unico| dia| mes| ano|
+----------------+---------------+--------------------+----+--------+----+----+----+
|Ricardo Teixeira|  04/março/1993|              Recife|   F|      21|NULL|NULL|NULL|
|     Pedro Gomes|  13/março/1973|              Manaus|   M|      47|NULL|NULL|NULL|
+----------------+---------------+--------------------+----+--------+----+----+----+



In [17]:
print('Base tratada ')
padronizado.filter((F.col('id_unico') == '21') | (F.col('id_unico') == '47')).show()

Base tratada 
+--------------------+----+--------+---------------+----------------+---+---+----+
|municipio_nascimento|sexo|id_unico|data_nascimento|            nome|dia|mes| ano|
+--------------------+----+--------+---------------+----------------+---+---+----+
|              Recife|   F|      21|     1993/03/04|Ricardo Teixeira| 04| 03|1993|
|              Manaus|   M|      47|     1973/03/13|     Pedro Gomes| 13| 03|1973|
+--------------------+----+--------+---------------+----------------+---+---+----+



3. Máximo e minimo de cada uma das colunas (ano, mes e dia)

In [18]:
lista = ['ano', 'mes', 'dia']

# Cria uma lista de dicionários para armazenar resultados
resultados = []

for col in lista:
    # Seleciona min e max de cada coluna no dataframe original
    min_orig, max_orig = original.select(
        F.min(F.col(col)).alias(f"min_{col}_antes"),
        F.max(F.col(col)).alias(f"max_{col}_antes")
    ).first()

    # Seleciona min e max de cada coluna no dataframe padronizado
    min_pad, max_pad = padronizado.select(
        F.min(F.col(col)).alias(f"min_{col}_depois"),
        F.max(F.col(col)).alias(f"max_{col}_depois")
    ).first()

    # Adiciona os resultados à lista
    resultados.append({
        "variavel": col,
        "min_antes": min_orig,
        "min_depois": min_pad,
        "max_antes": max_orig,
        "max_depois": max_pad
    })

# Converte a lista em DataFrame do Spark para exibir bonitinho
schema = ["min_antes", "min_depois", "max_antes",  "max_depois", "variavel"]
resultado_df = spark.createDataFrame(resultados, schema)

resultado_df.show()

+---------+----------+---------+----------+--------+
|min_antes|min_depois|max_antes|max_depois|variavel|
+---------+----------+---------+----------+--------+
|     2010|      2010|     1970|      1970|     ano|
|       12|        12|       01|        01|     mes|
|       27|        27|       01|        01|     dia|
+---------+----------+---------+----------+--------+



### Validação da limpeza de nomes

In [19]:
df_padronizado = padronizado.withColumnRenamed('nome', 'nome_limpo')
df_original = original.withColumnRenamed('nome', 'nome_original')

In [20]:
comparacao_nomes = (
    df_original
    .join(df_padronizado, on="id_unico", how="inner")  # use a coluna chave (ex: id)
    .withColumn("nome_igual", F.col("nome_original") == F.col("nome_limpo"))
    .withColumn("tam_antes", F.length("nome_original"))
    .withColumn("tam_depois", F.length("nome_limpo"))
    .withColumn("dif_tamanho", F.col("tam_depois") - F.col("tam_antes"))
)

resumo = (
    comparacao_nomes.agg(
        F.count("*").alias("total_registros"),
        F.sum(F.when(F.col("nome_igual"), 1).otherwise(0)).alias("iguais"),
        F.sum(F.when(~F.col("nome_igual"), 1).otherwise(0)).alias("alterados"),
        F.avg(F.col("dif_tamanho")).alias("media_dif_tamanho")
    )
    .withColumn("perc_alterados", (F.col("alterados") / F.col("total_registros")) * 100)
)

resumo.show(truncate=False)

+---------------+------+---------+-----------------+--------------+
|total_registros|iguais|alterados|media_dif_tamanho|perc_alterados|
+---------------+------+---------+-----------------+--------------+
|100            |35    |65       |-2.77            |65.0          |
+---------------+------+---------+-----------------+--------------+



In [21]:
### Amostra de alterações
comparacao_nomes.filter(~F.col("nome_igual")).select("nome_original", "nome_limpo").show(20, truncate=False)

+----------------------+----------------+
|nome_original         |nome_limpo      |
+----------------------+----------------+
|_ok Pedro Gomes       |Pedro Gomes     |
|Camila _ok Monteiro   |Camila Monteiro |
|_ok Ricardo Teixeira  |Ricardo Teixeira|
|Paula Azevedo NULL    |Paula Azevedo   |
|Fernanda Martins NULL |Fernanda Martins|
|NULL Sofia Melo       |Sofia Melo      |
|  André Pinto         |André Pinto     |
|123 Bruno Santos      |Bruno Santos    |
|@ Carla Nogueira      |Carla Nogueira  |
|Maria Silva temp      |Maria Silva     |
|temp Beatriz Costa    |Beatriz Costa   |
|teste Camila Monteiro |Camila Monteiro |
|### Maria Silva       |Maria Silva     |
|Paula sem dado Azevedo|Paula Azevedo   |
|Rafael @ Lima         |Rafael Lima     |
|Aline Barbosa sem dado|Aline Barbosa   |
|XX Ricardo Teixeira   |Ricardo Teixeira|
|Paula sem dado Azevedo|Paula Azevedo   |
|Fernanda Martins temp |Fernanda Martins|
|Carlos Pereira XX     |Carlos Pereira  |
+----------------------+----------

In [22]:
### Comparação de similaridade
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from difflib import SequenceMatcher

similaridade_udf = udf(lambda a, b: float(SequenceMatcher(None, a or "", b or "").ratio()), DoubleType())

df_sim = comparacao_nomes.withColumn("similaridade", similaridade_udf("nome_original", "nome_limpo"))

df_sim.select(F.avg("similaridade").alias("media_similaridade")).show()

+------------------+
|media_similaridade|
+------------------+
|0.9108842158080719|
+------------------+



In [23]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [24]:
def comparar_distribuicao_datas(
    df_original,
    df_padronizado,
    col_original: str,
    col_padronizada: str,
    titulo_geral: str = "Comparação de distribuição de datas (antes x depois)",
    caminho_html: str = None
):
    """
    Gera visualizações lado a lado da distribuição de ano, mês e dia
    entre data original e padronizada.

    Args:
        df_original: DataFrame PySpark com a coluna de data original
        df_padronizado: DataFrame PySpark com a coluna de data padronizada
        col_original: nome da coluna de data original
        col_padronizada: nome da coluna de data padronizada
        titulo_geral: título do gráfico (default: Comparação de distribuição de datas)
        caminho_html: caminho opcional para salvar o HTML (ex: 'validacao_datas.html')

    Returns:
        fig: objeto Plotly Figure
    """

    # ===== Converter para Pandas =====
    pdf1 = original.select("ano", "mes", "dia").toPandas()
    pdf2 = padronizado.select("ano", "mes", "dia").toPandas()

    # ===== Criar figura 3 linhas × 2 colunas =====
    fig = make_subplots(
        rows=3, cols=2,
        subplot_titles=(
            "Ano - Antes", "Ano - Depois",
            "Mês - Antes", "Mês - Depois",
            "Dia - Antes", "Dia - Depois",
        )
    )

    # ===== Histogramas =====
    componentes = ["ano", "mes", "dia"]
    cores = ["indianred", "seagreen"]

    for i, comp in enumerate(componentes, start=1):
        # Antes
        fig.add_trace(
            go.Histogram(x=pdf1[comp], name=f"{comp.capitalize()} antes", marker_color=cores[0], opacity=0.75),
            row=i, col=1
        )
        # Depois
        fig.add_trace(
            go.Histogram(x=pdf2[comp], name=f"{comp.capitalize()} depois", marker_color=cores[1], opacity=0.75),
            row=i, col=2
        )

    # ===== Layout =====
    fig.update_layout(
        title_text=titulo_geral,
        height=1000,
        width=1000,
        bargap=0.2,
        showlegend=False
    )

    for i, label in enumerate(["Ano", "Mês", "Dia"], start=1):
        fig.update_xaxes(title_text=label, row=i, col=1)
        fig.update_xaxes(title_text=label, row=i, col=2)
        fig.update_yaxes(title_text="Contagem", row=i, col=1)
        fig.update_yaxes(title_text="Contagem", row=i, col=2)

    # ===== Exportar opcionalmente =====
    if caminho_html:
        fig.write_html(caminho_html)

    return fig

In [25]:
fig = comparar_distribuicao_datas(
    df_original = original,
    df_padronizado = padronizado,
    col_original = "data_nascimento",
    col_padronizada = "data_nascimento",
    titulo_geral = "Distribuição de datas antes e depois da padronização",
    caminho_html = "comparacao_datas.html"  # opcional
)


In [26]:
# import plotly.io as pio
# pio.renderers.default = "iframe"
# fig.show()

In [27]:
from IPython.display import HTML
HTML(fig.to_html(include_plotlyjs='cdn', full_html=False))

In [28]:
#download HTML
!jupyter nbconvert --to html --no-input validando_padronizacao.ipynb

[NbConvertApp] Converting notebook validando_padronizacao.ipynb to html
[NbConvertApp] Writing 307115 bytes to validando_padronizacao.html
