# Notebook Bronze --> Silver

## Principais Tratamentos

### 1. Renomeação e padronização de nomes de colunas
Verificar se existe alguma coluna cujo nome destoe das demais.

### 2. Tipagem correta das colunas
Converter os tipos de dados brutos:
- Transformar datas em tipo `date` ou `timestamp`;
- Padronizar valores financeiros como `DECIMAL(12,2)`.

### 3. Padronização de formatos e textos
- Remover espaços extras (`trim`);
- Uniformizar letras maiúsculas/minúsculas ou primeira letra maiúscula nas colunas textuais.

### 4. Tratamento de valores nulos
Remover registros inválidos (principalmente quando a chave primária não está presente).

### 5. Deduplicação de registros
Verificar se há duplicatas, seja linhas inteiras ou chaves primárias repetidas.  
Antes de remover, analisar a situação; em alguns cenários, pode ser adequado manter o registro mais recente.

### 6. Criação de coluna de data de ingestão
Permite rastrear quando os dados foram processados.

### 7. Validação dos dados
Garantir a consistência e qualidade dos dados antes da carga:
- Validar ranges numéricos (ex.: preços > 0, datas válidas, valores dentro de limites esperados).
- Conferir se existem valores inesperados em colunas categóricas (ex.: status, categorias, tipos).
- Validar relações entre tabelas.




In [0]:
# Importando bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import LongType, IntegerType, DecimalType

spark = SparkSession.builder.appName("CaseV-Credit").getOrCreate()


In [0]:
# Definição de caminho da camada Silver
catalogo = "medalhao_case"
silver_db_name = "silver"

In [0]:
# Leitura das tabelas Bronze

bronze_base_atendentes_df = spark.table("medalhao_case.bronze.bronze_base_atendentes")
bronze_base_canais_df = spark.table("medalhao_case.bronze.bronze_base_canais")
bronze_base_motivos_df = spark.table("medalhao_case.bronze.bronze_base_motivos")
bronze_chamados_df = spark.table("medalhao_case.bronze.bronze_chamados")
bronze_chamados_hora_df = spark.table("medalhao_case.bronze.bronze_chamados_hora")
bronze_clientes_df = spark.table("medalhao_case.bronze.bronze_clientes")
bronze_custos_df = spark.table("medalhao_case.bronze.bronze_custos")
bronze_pesquisa_satisfacao_df = spark.table("medalhao_case.bronze.bronze_pesquisa_satisfacao")

##bronze_base_atendentes


###Tranformações Realizadas:
- Limpeza com uso do .trim e confirmação dos niveis de atendimento como inteiros
- Filtro de qualidade dos dados com dropDuplicates
- Verificação de nulos em ids e nomes do atendente
- Adição da hora de ingestão da tabela na camada silver
- Ordenação pelo nível de atendimento
- Validação do número de registros e de quantos atendentes são de nível 2(6) e de nível 1(14)

In [0]:

silver_base_atendentes_df = (
    bronze_base_atendentes_df
    # Limpeza dos dados
    .withColumn("nome_atendente", F.trim(F.col("nome_atendente")))
    .withColumn("nivel_atendimento", F.col("nivel_atendimento").cast(IntegerType()))

    # Filro de qualidade dos dados
    .dropDuplicates(["id_atendente"])
    .dropDuplicates(["nome_atendente"])
    .filter(F.col("id_atendente").isNotNull())
    .filter(F.col("nome_atendente").isNotNull())

    # Ordena de acordo com nível de atendimento do 1 ao 2
    .orderBy("nivel_atendimento")
)

# Validações
print({silver_base_atendentes_df.count()})
silver_base_atendentes_df.groupBy("nivel_atendimento").count().show()

# Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_base_atendentes')
silver_base_atendentes_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.silver_base_atendentes")


## bronze_base_canais

###Tranformações Realizadas:
- Renomeação de colunas para snake_case (Nome_Canal -> nome_canal)
- Padronização de texto (Trim e Upper) nos nomes dos canais
- Correção de erros de digitação no status ('invativo' -> 'inativo')
- Deduplicação de registros por nome do canal

In [0]:
silver_base_canais_df = (
    bronze_base_canais_df
    .withColumnRenamed("Nome_Canal", "nome_canal")
    .withColumnRenamed("canal_status", "status_canal")
    
    # Padronização Adotada: Canais em MAIÚSCULO e sem espaços
    .withColumn("nome_canal", F.upper(F.trim(F.col("nome_canal")))) 
    
    # Correção de Status e normalização: tudo minúsculo e corrigindo a discrepância (invativo e inativo)
    .withColumn("status_canal", F.lower(F.trim(F.col("status_canal"))))
    .withColumn("status_canal", 
        F.when(F.col("status_canal") == "invativo", "inativo")
         .otherwise(F.col("status_canal"))
    )
    
    .dropDuplicates(["nome_canal"])
)
# Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_canais')
silver_base_canais_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.silver_base_canais")

## bronze_base_motivos

###Tranformações Realizadas:
- Renomeação e tipagem correta (ID para Inteiro)
- Limpeza de caracteres indesejados (aspas) em nomes e categorias
- Tratamento de Categorização
- Padronização da criticidade (Initcap) para corrigir variações (baixa/Baixa)
- Deduplicação por ID

In [0]:
silver_base_motivos_df = (
    bronze_base_motivos_df
    .withColumnRenamed("ID_Motivo", "id_motivo")
    .withColumnRenamed("Nome_Motivo", "nome_motivo")
    .withColumnRenamed("Categoria", "categoria")
    .withColumnRenamed("Criticidade", "criticidade")
    
    .withColumn("id_motivo", F.col("id_motivo").cast(IntegerType()))
    
    # Limpeza de aspas e espaços
    .withColumn("nome_motivo", F.trim(F.regexp_replace(F.col("nome_motivo"), '"', '')))
    .withColumn("categoria", F.trim(F.regexp_replace(F.col("categoria"), '"', '')))
    .withColumn("nome_motivo", F.trim(F.regexp_replace(F.col("nome_motivo"), "no autorizada", "não autorizada")))
    
    # Categorização (a anterior informando que tava sem categoria não deu certo)
    .withColumn("nome_lower", F.lower(F.col("nome_motivo"))) # Temporária pra comparativo
    .withColumn("categoria", 
        F.when(
            (F.col("nome_lower").like("%fatura%")) | 
            (F.col("nome_lower").like("%limite%")) | 
            (F.col("nome_lower").like("%contrato%")) | 
            (F.col("nome_lower").like("%dívida%")) | 
            (F.col("nome_lower").like("%pagamento%")), 
            "Financeiro"
        )
        .when(
            (F.col("nome_lower").like("%cartão%")) | 
            (F.col("nome_lower").like("%compra%")), 
            "Cartão"
        )
        .when(
            (F.col("nome_lower").like("%cadastrais%")) | 
            (F.col("nome_lower").like("%agência%")) |
            (F.col("nome_lower").like("%email%")) |
            (F.col("nome_lower").like("%telefone%")), 
            "Cadastral"
        )
        .when(
            (F.col("nome_lower").like("%aplicativo%")) | 
            (F.col("nome_lower").like("%site%")) | 
            (F.col("nome_lower").like("%ura%")) | 
            (F.col("nome_lower").like("%chatbot%")) |
            (F.col("nome_lower").like("%problema%")), 
            "Atendimento"
        )
        .when(
            (F.col("nome_lower").like("%pontos%")) | 
            (F.col("nome_lower").like("%benefícios%")), 
            "Benefícios"
        )
        .otherwise("Outros") # Fallback caso algo não se encaixe
    )
    .drop("nome_lower") # Removendo a coluna temporária
    
    # Padronização Criticidade (Baixa, Média, Alta)
    .withColumn("criticidade", F.initcap(F.trim(F.col("criticidade"))))
    
    .dropDuplicates(["id_motivo"])
)
# Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_motivos')
silver_base_motivos_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.silver_base_motivos")
display(silver_base_motivos_df)


## bronze_chamados_hora

%md
###Tranformações Realizadas:
- erros de caracteres especiais: �
- padronização do nome das colunas com as demais 
- transformação de string para timestamp

In [0]:
ts_format = "dd/MM/yyyyHH:mm:ss"

silver_chamados_hora_df = (
    bronze_chamados_hora_df
    .withColumnRenamed('ID_Chamado','id_chamado')
    .withColumnRenamed('ID_Cliente','id_cliente')
    .withColumn(
        "hora_abertura_chamado",
        F.try_to_timestamp(
            F.regexp_replace("Hora_Abertura_Chamado", " �s ", ""),F.lit(ts_format)
        )
    )
    .withColumn(
        "hora_inicio_atendimento",
        F.try_to_timestamp(
        F.regexp_replace("Hora_Inicio_Atendimento", " �s ", ""),F.lit(ts_format) 
        )
    )
    .withColumn(
        "hora_finalizacao_atendimento",
        F.try_to_timestamp(
            F.regexp_replace("Hora_Finalizacao_Atendimento", " �s ", ""),F.lit(ts_format)
        )
    )
)
# Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_chamados_hora')
silver_chamados_hora_df.write.saveAsTable(f'{catalogo}.{silver_db_name}.silver_chamados_hora')

## bronze_chamados

###Tranformações Realizadas:
- erros de caracteres especiais: �
- padronização da coluna "canal"
- colunas de horários incompletas

In [0]:

silver_chamados_df = (
    (
        bronze_chamados_df.drop('hora_abertura_chamado','hora_inicio_atendimento','hora_finalizacao_atendimento')

    # Substituição de erros de caracteres especiais
        .withColumn('resolvido',F.regexp_replace('resolvido','�','ã'))
        .withColumn('canal',F.regexp_replace('canal','[._ ]','%'))
        .withColumn('motivo',F.regexp_replace('motivo','�','%'))
    )
    .join(
        silver_base_motivos_df.select('nome_motivo'),
        silver_base_motivos_df['nome_motivo'].ilike(F.col('motivo')),
        'left'
    )
    .join(
        bronze_base_canais_df.select('nome_canal'),
        bronze_base_canais_df['nome_canal'].ilike(F.col('canal')),
        'left'
    )
    .join(
        silver_chamados_hora_df.select('id_chamado','hora_abertura_chamado','hora_inicio_atendimento','hora_finalizacao_atendimento'),
        'id_chamado',
        'left'
    )
    .withColumn('motivo',F.col('nome_motivo'))
    .withColumn('canal',F.col('nome_canal'))
    .drop('nome_motivo','nome_canal')
    #

    # Cálculo de tempo de espera/atendimento
    .withColumn('tempo_espera',F.timestamp_diff('SECOND',F.col('hora_abertura_chamado'),F.col('hora_inicio_atendimento')))
    .withColumn('tempo_atendimento',F.timestamp_diff('SECOND',F.col('hora_inicio_atendimento'),F.col('hora_finalizacao_atendimento')))
    #
    
)

# Correção de bug de tempo atendimento negativo (provavelmente ligado a anos bissextos)
silver_chamados_df = (
    silver_chamados_df
    .withColumn('hora_finalizacao_atendimento',
        F.when(
        (F.col('tempo_atendimento') < 0) & 
        (F.month(F.col("hora_finalizacao_atendimento")) == 2) & 
        (F.dayofmonth(F.col("hora_finalizacao_atendimento")) == 28),
        F.col("hora_finalizacao_atendimento") + F.expr("INTERVAL 1 DAY"))
        .otherwise(F.col("hora_finalizacao_atendimento"))
    )
    .withColumn('tempo_atendimento',F.timestamp_diff('SECOND',F.col('hora_inicio_atendimento'),F.col('hora_finalizacao_atendimento')))
)
#Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_chamados')
silver_chamados_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.silver_chamados")

## bronze_clientes

###Tranformações Realizadas:
#### Normalização 
- Seleção das colunas relevantes vindas da camada Bronze
- Manutenção dos nomes originais em snake_case
- Padronização inicial do dataframe de trabalho em clientes_norm_df

#### Tipagem
- Conversão do campo id_cliente para LongType(), garantindo suporte a identificadores longos
- Conversão de idade para Int
- Resultado armazenado em clientes_typed_df

## Formatação
- Remoção de espaços extras
- Padronização de textos:
 - nome com InitCap (primeira letra maiúscula)
 - email: tudo em minúsculas
 - regiao: tudo em maiúsculas
- Resultado armazenado em clientes_fmt_df

#### Tratamento
- Remoção de registros com id_cliente nulo
- Validação da faixa etária aceitável
- Resultado armazenado em clientes_valid_df
- Validação do formato de e-mail com um regex
- Emails inválidos são substituídos por NULL

#### Criação da coluna de data de ingestão na Silver
- Inclusão da coluna dt_ingestao_silver utilizando current_timestamp
- Registra o momento de processamento da linha para rastreabilidade

#### Deduplicação de registros
- Utilização de janela particionada por id_cliente, ordenando pela coluna hora_da_ingestao em ordem decrescente.
- Retenção apenas do registro mais recente para cada cliente
- Resultado final armazenado em silver_clientes_df.

In [0]:
# Normalização
clientes_norm_df = ( bronze_clientes_df.select(
    F.col("id_cliente"),
    F.col("nome"),
    F.col("email"),
    F.col("regiao"),
    F.col("idade"),
    F.col("hora_da_ingestao")
    )
)

# Tipagem
clientes_typed_df = ( clientes_norm_df
                     .withColumn("id_cliente", F.col("id_cliente").cast(LongType()))
                     .withColumn("idade",      F.col("idade").cast(IntegerType()))
)

# Formatação
clientes_fmt_df = ( clientes_typed_df
                   .withColumn("nome",   F.initcap(F.trim(F.col("nome"))))
                   .withColumn("email",  F.lower(F.trim(F.col("email"))))
                   .withColumn("regiao", F.upper(F.trim(F.col("regiao"))))
)

# Validação
email_regex = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"

clientes_valid_df = ( clientes_fmt_df
                     .where(F.col("id_cliente").isNotNull())
                     .where((F.col("idade").isNull()) | ((F.col("idade") >= 18) & (F.col("idade") <= 120)))
                     .withColumn("email",
                                 F.when(F.col("email").rlike(email_regex), F.col("email"))
                                 .otherwise(None)
                                )
                    )


# Deduplicação
janela_clientes = ( Window
                   .partitionBy("id_cliente")
                   .orderBy(F.col("hora_da_ingestao").desc_nulls_last())
)

silver_clientes_df = ( clientes_valid_df
                      .withColumn("rn", F.row_number().over(janela_clientes))
                      .where(F.col("rn") == 1)
                      .drop("rn")
)

#Salvamento
spark.sql(f'drop table if exists {catalogo}.{silver_db_name}.silver_clientes')
silver_clientes_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.silver_clientes")


## bronze_custos

###Tranformações Realizadas:
#### Normalização das colunas
- Seleção apenas das colunas relevantes da tabela Bronze.
- Conversão dos campos id_custo e id_chamado para o tipo Int

#### Padronização e limpeza dos campos
- Criação da coluna auxiliar custo_str, convertendo o valor original para texto
- Limpeza dos valores:
 - Remoção da palavra "reais"
 - Remoção do símbolo "R$"
 - Troca de vírgula decimal por ponto
- Conversão do valor limpo para número na coluna custo_num.

#### Tratamento de valores nulos e inválidos
- Remoção de registros onde:
 - id_custo é nulo;
 - custo_num é nulo ou menor ou igual a zero.

#### Criação da coluna de data de ingestão na Silver
- Inclusão da coluna dt_ingestao_silver utilizando current_timestamp, garantindo o rastreamento dos dados

#### Conversão final
- Criação da coluna definitiva custo, convertendo custo_num para o tipo decimal, mantendo o valor original
- Remoção das colunas auxiliares

#### Deduplicação
- Aplicação de janela particionada por id_custo, ordenando por hora_da_ingestao
- Retenção apenas do registro mais recente para cada id_custo

In [0]:
# Normalização
silver_custos_df = ( bronze_custos_df.select(
    "id_custo",
    "id_chamado",
    "custo",
    "hora_da_ingestao"
    )
    .withColumn("id_custo",   F.col("id_custo").cast(IntegerType()))
    .withColumn("id_chamado", F.col("id_chamado").cast(IntegerType()))
)

# Limpeza
silver_custos_df = ( silver_custos_df
                    .withColumn("custo_str", F.trim(F.col("custo").cast("string")))
                    .withColumn("custo_str", F.regexp_replace("custo_str", "reais", ""))
                    .withColumn("custo_str", F.regexp_replace("custo_str", "R\\$", ""))
                    .withColumn("custo_str", F.regexp_replace("custo_str", ",", "."))
                    .withColumn("custo_num", F.col("custo_str").cast("double"))
)

# Tratamento
silver_custos_df = ( silver_custos_df
                    .where(F.col("id_custo").isNotNull())
                    .where(F.col("custo_num").isNotNull())
                    .where(F.col("custo_num") > 0)
                    .withColumn("custo", F.col("custo_num").cast(DecimalType(12, 8)))
                    .drop("custo_str", "custo_num")
)

# Deduplicação
janela_custos = ( Window
                 .partitionBy("id_custo")
                 .orderBy(F.col("hora_da_ingestao").desc_nulls_last())
)

silver_custos_df = ( silver_custos_df
                    .withColumn("rn", F.row_number().over(janela_custos))
                    .where(F.col("rn") == 1)
                    .drop("rn")
)

#Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_custos')
silver_custos_df.write.format("delta").mode("overwrite").option('overwriteSchema','true').saveAsTable(f"{catalogo}.{silver_db_name}.silver_custos")


## bronze_pesquisa_satisfacao

In [0]:
silver_pesquisa_satisfacao_df = bronze_pesquisa_satisfacao_df

#Salvamento
spark.sql(f'DROP TABLE IF EXISTS {catalogo}.{silver_db_name}.silver_pesquisa_satisfacao')
silver_pesquisa_satisfacao_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalogo}.{silver_db_name}.silver_pesquisa_satisfacao")
display(silver_pesquisa_satisfacao_df)


## Validações

###Validações Realizadas
- Consistência entre clientes e chamados.
- Correspondência entre chamados e motivos.
- Verificação entre chamados e base de atendentes.
- Checagem entre chamados e custos.
- Consistência Temporal.
- Validação dos motivos.
- Validação da coluna de regiões em clientes.

### Consistência entre clientes e chamados

In [0]:
# Identifica se há chamados órfãos de consumidores
silver_chamados_df.join(
    silver_clientes_df,
    on="id_cliente",
    how="left_anti"
).count()

### Correspondência entre chamados e motivos

In [0]:
# Campo motivo em chamados deve existir em base_motivos
silver_chamados_df.join(
    silver_base_motivos_df, 
    silver_chamados_df.motivo == silver_base_motivos_df.nome_motivo,
    how="left_anti"
).count()

### Verificação entre chamados e base de atendentes

In [0]:
# Campo id_atendente em chamados deve existir em base_atendentes
silver_chamados_df \
    .where("id_atendente IS NOT NULL") \
    .join(
        silver_base_atendentes_df,
        "id_atendente",
        "left_anti") \
    .count()

### Checagem entre chamados e custos

In [0]:
# Campo id_chamado da tabela custos deve existir em chamados
silver_custos_df.join(
    silver_chamados_df,
    on="id_chamado",
    how="left_anti"
).count()

### Consistência Temporal - chamados_hora

In [0]:
#checar se existem casos em que o tempo de espera é negativo
display(silver_chamados_df.where(F.col("tempo_espera") < 0).count())

In [0]:
#checar se existem casos em que o tempo de atendimento é negativo
display(silver_chamados_df.where(F.col("tempo_atendimento") < 0).count())

### Validação dos motivos

In [0]:
# Checar se os motivos estão descritos corretamente
silver_base_motivos_df.groupby("nome_motivo").count().display()

### Validação da Coluna de regiões

In [0]:
# Checando se existe alguma região incorreta
silver_clientes_df.groupBy("regiao").count().display()