### Configuração do Ambiente

**Propósito:** Inicializar o ambiente PySpark e configurar o contexto do catálogo Unity Catalog para a camada Bronze → Silver.

**Entrada:**
- Nenhuma tabela (apenas configuração inicial)

**Saída:**
- Variáveis de ambiente: `catalog_name`, `bronze_db_name`, `silver_db_name`
- Contexto Spark configurado para o schema Silver

**Resumo da lógica:**
Importa todas as funções PySpark necessárias para transformações de dados (normalização de texto, funções de janela, fuzzy matching com Levenshtein, timestamps). Define os nomes do catálogo e schemas Bronze/Silver, e configura o Spark para usar o schema Silver por padrão.

**Observações:**
- O catálogo usado é `atendimento_catalog` (Unity Catalog)
- Todas as transformações operam entre `bronze` (source) e `silver` (target)
- Levenshtein será usado para fuzzy matching de motivos de chamados

In [0]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, trim, upper, lower, initcap, current_timestamp,
    lit, coalesce, when, regexp_replace, length, row_number, to_timestamp, levenshtein, unix_timestamp, round
)
from pyspark.sql.types import IntegerType, StringType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F

catalog_name = "atendimento_catalog"
bronze_db_name = "bronze"
silver_db_name = "silver"


In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_db_name}")

spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {silver_db_name}")

### Funções Auxiliares de Transformação

**Propósito:** Definir funções reutilizáveis para validação, normalização de texto, conversão de tipos e parsing de timestamps.

**Entrada:**
- Nenhuma tabela (apenas definições de funções)

**Saída:**
- Funções Python disponíveis para uso nas transformações:
  - `safe_table_exists`: Verifica se uma tabela existe no catálogo
  - `safe_col`: Retorna coluna ou `NULL` se não existir
  - `safe_cast_int`: Converte para inteiro removendo caracteres não-numéricos
  - `remove_accents_udf`: Remove acentuação de texto (Unicode normalizado)
  - `normalize_text`: Remove caracteres especiais e normaliza espaços
  - `parse_to_brasilia_timezone`: Converte timestamps para timezone de São Paulo

**Resumo da lógica:**
Encapsula lógica comum de validação e transformação usada em todas as tabelas Silver. As funções de normalização aplicam remoção de acentos, trim, regex para espaços e caracteres inválidos. O parsing de timestamp corrige formatos brasileiros típicos (dd/MM/yyyy às HH:mm:ss) e converte para timezone UTC com offset de São Paulo.

**Observações:**
- `safe_cast_int` é tolerante a falhas: retorna `NULL` se conversão falhar
- `remove_accents_udf` preserva apenas caracteres ASCII normalizados (A-Z, a-z)
- Timestamps são armazenados em UTC mas exibidos com timezone de São Paulo
- Funções são idempotentes e NULL-safe

In [0]:
def safe_table_exists(spark, full_name: str) -> bool:
    try:
        spark.table(full_name)
        return True
    except:
        return False

def safe_col(df, name):
    return col(name) if name in df.columns else lit(None).cast(StringType())

def safe_cast_int(col_expr):
    return when(
        col_expr.isNotNull() & (trim(col_expr).cast(StringType()) != ""),
        F.regexp_replace(trim(col_expr).cast(StringType()), r'[^\d]', '').cast(IntegerType())
    ).otherwise(None)

def remove_accents_udf(text_col):
    return F.translate(
        text_col,
        "áàãâäéèêëíìîïóòõôöúùûüçñÁÀÃÂÄÉÈÊËÍÌÎÏÓÒÕÔÖÚÙÛÜÇÑ",
        "aaaaaeeeeiiiiooooouuuucnAAAAAEEEEIIIIOOOOOUUUUCN"
    )

def normalize_text(col_expr):
    return trim(regexp_replace(regexp_replace(col_expr, r'\s+', ' '), r'[^\x20-\x7E\u00C0-\u00FF]', ''))

def parse_to_brasilia_timezone(col_expr):
    cleaned = regexp_replace(col_expr, r'[ ]', ' ')
    cleaned = regexp_replace(cleaned, r'\bàs\b|\bas\b|\bàs\b', ' ')
    cleaned = regexp_replace(cleaned, r'\s+', ' ')
    cleaned = trim(cleaned)
    parsed_naive = to_timestamp(cleaned, 'dd/MM/yyyy HH:mm:ss')
    parsed_utc = F.to_utc_timestamp(parsed_naive, 'America/Sao_Paulo')
    return F.from_utc_timestamp(parsed_utc, 'America/Sao_Paulo')

### Transformação: ft_atendentes (Bronze → Silver)

**Propósito:** Normalizar e validar dados de atendentes, garantindo qualidade e consistência para análise downstream.

**Entrada:**
- `atendimento_catalog.bronze.ft_atendentes`: Dados brutos de atendentes (id, nome, nível de atendimento)

**Saída:**
- `atendimento_catalog.silver.ft_atendentes`: Tabela Silver validada e dedplicada com atendentes únicos

**Resumo da lógica:**
Aplica conversão segura de `id_atendente` para inteiro, normaliza nomes (initcap + remoção de acentos), valida `nivel_atendimento` (apenas valores 1 ou 2 são aceitos). Remove registros com campos obrigatórios nulos ou inválidos. Realiza deduplicação por `id_atendente` usando `ingestion_timestamp` DESC (registro mais recente prevalece). Adiciona timestamp de processamento e garante tipos de dados consistentes.

**Observações:**
- Apenas níveis 1 (inicial) e 2 (especializado) são aceitos
- Registros com `id_atendente <= 0` são descartados
- Nomes são convertidos para formato Title Case sem acentos
- Taxa de perda típica: 0-5% (registros inválidos ou duplicados)

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.ft_atendentes"
tgt_table = f"{catalog_name}.{silver_db_name}.ft_atendentes"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn(
    "id_atendente", 
    safe_cast_int(safe_col(df, "id_atendente"))
).withColumn(
    "nome_atendente", 
    initcap(remove_accents_udf(normalize_text(safe_col(df, "nome_atendente"))))
).withColumn(
    "nivel_atendimento",
    when(
        safe_cast_int(safe_col(df, "nivel_atendimento")).isin([1, 2]), 
        safe_cast_int(safe_col(df, "nivel_atendimento"))
    ).otherwise(None)
)

df = df.filter(
    (col("id_atendente").isNotNull()) & 
    (col("id_atendente") > 0) &
    (col("nome_atendente").isNotNull()) & 
    (trim(col("nome_atendente")) != "") &
    (col("nivel_atendimento").isNotNull())
)

In [0]:
w = Window.partitionBy("id_atendente").orderBy(
    col("ingestion_timestamp").desc_nulls_last() 
    if "ingestion_timestamp" in df.columns 
    else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_atendente", 
    "nome_atendente", 
    "nivel_atendimento", 
    "processed_timestamp", 
    "ingestion_timestamp"
]

df = df.select(*[c for c in final_cols if c in df.columns])

df_typed = df \
    .withColumn("id_atendente", col("id_atendente").cast(IntegerType())) \
    .withColumn("nome_atendente", col("nome_atendente").cast(StringType())) \
    .withColumn("nivel_atendimento", col("nivel_atendimento").cast(IntegerType())) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn(
        "ingestion_timestamp", 
        col("ingestion_timestamp").cast(TimestampType()) 
        if "ingestion_timestamp" in df.columns 
        else lit(None).cast(TimestampType())
    )

df_typed.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(tgt_table)

final = spark.table(tgt_table)
taxa = (final.count() / total_before * 100) if total_before > 0 else 0

print(f"FT_ATENDENTES | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {taxa:.1f}%")

### Transformação: ft_chamados_hora (Bronze → Silver)

**Propósito:** Normalizar timestamps de chamados com conversão para timezone de Brasília e validação de sequência temporal.

**Entrada:**
- `atendimento_catalog.bronze.ft_chamados_hora`: Dados brutos de timestamps de chamados

**Saída:**
- `atendimento_catalog.silver.ft_chamados_hora`: Tabela Silver com timestamps validados em timezone de São Paulo

**Resumo da lógica:**
Converte `ID_Chamado` para inteiro e `ID_Cliente` para string trimmed. Aplica parsing de timestamp brasileiro (dd/MM/yyyy HH:mm:ss) com conversão para timezone de São Paulo em todas as três colunas temporais: `hora_abertura_chamado`, `hora_inicio_atendimento`, `hora_finalizacao_atendimento`. Valida sequência temporal: abertura <= início <= finalização. Remove registros com timestamps nulos ou sequência inválida. Deduplica por `id_chamado` mantendo registro mais recente.

**Observações:**
- Todos os timestamps são armazenados em UTC com offset de São Paulo (UTC-3)
- Sequência temporal é validada rigorosamente: violações são descartadas
- `id_cliente` é mantido como string (suporta IDs alfanuméricos)
- Taxa de perda típica: 5-10% (timestamps inválidos ou sequência incorreta)

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.ft_chamados_hora"
tgt_table = f"{catalog_name}.{silver_db_name}.ft_chamados_hora"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn(
    "id_chamado", 
    safe_cast_int(safe_col(df, "ID_Chamado"))
).withColumn(
    "id_cliente", 
    trim(safe_col(df, "ID_Cliente"))
).withColumn(
    "hora_abertura_chamado_brasilia", 
    parse_to_brasilia_timezone(safe_col(df, "Hora_Abertura_Chamado"))
).withColumn(
    "hora_inicio_atendimento_brasilia", 
    parse_to_brasilia_timezone(safe_col(df, "Hora_Inicio_Atendimento"))
).withColumn(
    "hora_finalizacao_atendimento_brasilia", 
    parse_to_brasilia_timezone(safe_col(df, "Hora_Finalizacao_Atendimento"))
)

df = df.filter(
    (col("id_chamado").isNotNull()) & (col("id_chamado") > 0) &
    (col("id_cliente").isNotNull()) & (length(col("id_cliente")) > 0) &
    (col("hora_abertura_chamado_brasilia").isNotNull()) &
    (col("hora_inicio_atendimento_brasilia").isNotNull()) &
    (col("hora_finalizacao_atendimento_brasilia").isNotNull()) &
    (col("hora_abertura_chamado_brasilia") <= col("hora_inicio_atendimento_brasilia")) &
    (col("hora_inicio_atendimento_brasilia") <= col("hora_finalizacao_atendimento_brasilia"))
)

In [0]:
w = Window.partitionBy("id_chamado").orderBy(
    col("ingestion_timestamp").desc_nulls_last() 
    if "ingestion_timestamp" in df.columns 
    else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_chamado", 
    "id_cliente", 
    "hora_abertura_chamado_brasilia", 
    "hora_inicio_atendimento_brasilia", 
    "hora_finalizacao_atendimento_brasilia", 
    "processed_timestamp", 
    "ingestion_timestamp"
]

df = df.select(*[c for c in final_cols if c in df.columns])

df_typed = df \
    .withColumn("id_chamado", col("id_chamado").cast(IntegerType())) \
    .withColumn("id_cliente", col("id_cliente").cast(StringType())) \
    .withColumn(
        "hora_abertura_chamado_brasilia", 
        col("hora_abertura_chamado_brasilia").cast(TimestampType())
    ) \
    .withColumn(
        "hora_inicio_atendimento_brasilia", 
        col("hora_inicio_atendimento_brasilia").cast(TimestampType())
    ) \
    .withColumn(
        "hora_finalizacao_atendimento_brasilia", 
        col("hora_finalizacao_atendimento_brasilia").cast(TimestampType())
    ) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn(
        "ingestion_timestamp", 
        col("ingestion_timestamp").cast(TimestampType()) 
        if "ingestion_timestamp" in df.columns 
        else lit(None).cast(TimestampType())
    )

df_typed.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(tgt_table)

final = spark.table(tgt_table)
taxa = (final.count() / total_before * 100) if total_before > 0 else 0

print(f"FT_CHAMADOS_HORA | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {taxa:.1f}%")

### Transformação: dm_motivos (Bronze → Silver)

**Propósito:** Enriquecer motivos de chamados com categorização automática e correção de criticidade.

**Entrada:**
- `atendimento_catalog.bronze.ft_motivos`: Dados brutos de motivos de chamados

**Saída:**
- `atendimento_catalog.silver.dm_motivos`: Dimensão Silver com categorias derivadas (Financeiro, Cartão, Cadastral, Atendimento, Benefícios)

**Resumo da lógica:**
Converte `id_motivo` para inteiro e preserva `nome_motivo` original. Aplica categorização baseada em palavras-chave usando pattern matching (like): termos como "fatura", "limite", "pagamento" mapeiam para Financeiro; "cartao", "bloqueio" para Cartão; "dados", "cadastro" para Cadastral; "app", "site", "erro" para Atendimento; "ponto", "beneficio" para Benefícios. Motivos não classificados recebem categoria "Desconhecida". Corrige criticidade "Media" para "Média". Deduplica por `id_motivo` mantendo registro mais recente.

**Observações:**
- Categorização é case-insensitive e baseada em substrings
- Criticidade é normalizada para português correto (Média, Alta, Baixa)
- Motivos podem ter múltiplos termos mas apenas a primeira categoria matching é aplicada
- Taxa de perda típica: 0-2% (apenas registros com id_motivo nulo)

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.ft_motivos"
tgt_table = f"{catalog_name}.{silver_db_name}.dm_motivos"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn(
    "id_motivo", 
    safe_cast_int(safe_col(df, "id_motivo"))
).withColumn(
    "nome_motivo", 
    safe_col(df, "nome_motivo")
)

motivo_lower = lower(col("nome_motivo"))

df = df.withColumn(
    "categoria",
    when(
        motivo_lower.like("%fatura%") | 
        motivo_lower.like("%limite%") | 
        motivo_lower.like("%contrato%") | 
        motivo_lower.like("%pagamento%") | 
        motivo_lower.like("%divida%") | 
        motivo_lower.like("%renegocia%"), 
        lit("Financeiro")
    ).when(
        motivo_lower.like("%cartao%") | 
        motivo_lower.like("%bloqueio%") | 
        motivo_lower.like("%desbloqueio%") | 
        motivo_lower.like("%compra%") | 
        motivo_lower.like("%adicional%"), 
        lit("Cartão")
    ).when(
        motivo_lower.like("%dados%") | 
        motivo_lower.like("%cadastra%") | 
        motivo_lower.like("%telefone%") | 
        motivo_lower.like("%email%") | 
        motivo_lower.like("%agencia%") | 
        motivo_lower.like("%conta%") | 
        motivo_lower.like("%endereco%"), 
        lit("Cadastral")
    ).when(
        motivo_lower.like("%app%") | 
        motivo_lower.like("%aplicativo%") | 
        motivo_lower.like("%site%") | 
        motivo_lower.like("%chatbot%") | 
        motivo_lower.like("%ura%") | 
        motivo_lower.like("%problema%") | 
        motivo_lower.like("%erro%"), 
        lit("Atendimento")
    ).when(
        motivo_lower.like("%ponto%") | 
        motivo_lower.like("%beneficio%") | 
        motivo_lower.like("%programa%"), 
        lit("Benefícios")
    ).otherwise(lit("Desconhecida"))
).withColumn(
    "criticidade", 
    when(col("criticidade") == lit("Media"), lit("Média"))
    .otherwise(col("criticidade"))
)

df = df.withColumn(
    "categoria",
    when(
        motivo_lower.like("%fatura%") |
        motivo_lower.like("%limite%") |
        motivo_lower.like("%contrato%") |
        motivo_lower.like("%pagamento%") |
        motivo_lower.like("%divida%") |
        motivo_lower.like("%renegocia%"),
        lit("Financeiro")
    ).when(
        motivo_lower.like("%cartao%") |
        motivo_lower.like("%bloqueio%") |
        motivo_lower.like("%desbloqueio%") |
        motivo_lower.like("%compra%") |
        motivo_lower.like("%adicional%"),
        lit("Cartão")
    ).when(
        motivo_lower.like("%dados%") |
        motivo_lower.like("%cadastra%") |
        motivo_lower.like("%telefone%") |
        motivo_lower.like("%email%") |
        motivo_lower.like("%agencia%") |
        motivo_lower.like("%conta%") |
        motivo_lower.like("%endereco%"),
        lit("Cadastral")
    ).when(
        motivo_lower.like("%app%") |
        motivo_lower.like("%aplicativo%") |
        motivo_lower.like("%site%") |
        motivo_lower.like("%chatbot%") |
        motivo_lower.like("%ura%") |
        motivo_lower.like("%problema%") |
        motivo_lower.like("%erro%"),
        lit("Atendimento")
    ).when(
        motivo_lower.like("%ponto%") |
        motivo_lower.like("%beneficio%") |
        motivo_lower.like("%programa%"),
        lit("Benefícios")
    ).otherwise(lit("Desconhecida"))
).withColumn(
    "criticidade",
    when(col("criticidade") == lit("Media"), lit("Média"))
    .otherwise(col("criticidade"))
) \
.withColumn( 
    "nome_motivo",
    when(col("nome_motivo") == lit("Compra no autorizada"), lit("Compra não autorizada"))
    .otherwise(col("nome_motivo"))
)
df = df.filter(
    (col("id_motivo").isNotNull()) & (col("id_motivo") > 0) &
    (col("nome_motivo").isNotNull()) & (trim(col("nome_motivo")) != "")
)

In [0]:
w = Window.partitionBy("id_motivo").orderBy(
    col("ingestion_timestamp").desc_nulls_last() 
    if "ingestion_timestamp" in df.columns 
    else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = [
    "id_motivo", 
    "nome_motivo", 
    "categoria", 
    "criticidade", 
    "processed_timestamp", 
    "ingestion_timestamp"
]

df = df.select(*[c for c in final_cols if c in df.columns])

df_typed = df \
    .withColumn("id_motivo", col("id_motivo").cast(IntegerType())) \
    .withColumn("nome_motivo", col("nome_motivo").cast(StringType())) \
    .withColumn("categoria", col("categoria").cast(StringType())) \
    .withColumn("criticidade", col("criticidade").cast(StringType())) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn(
        "ingestion_timestamp", 
        col("ingestion_timestamp").cast(TimestampType()) 
        if "ingestion_timestamp" in df.columns 
        else lit(None).cast(TimestampType())
    )

df_typed.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(tgt_table)

final = spark.table(tgt_table)
taxa = (final.count() / total_before * 100) if total_before > 0 else 0

print(f"DM_MOTIVOS | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {taxa:.1f}%")

### Transformação: dm_canais (Bronze → Silver)

**Propósito:** Normalizar dimensão de canais de atendimento com correção de status e geração de ID sintético determinístico.

**Entrada:**
- `atendimento_catalog.bronze.dm_canais`: Dados brutos de canais (nome, status)

**Saída:**
- `atendimento_catalog.silver.dm_canais`: Dimensão Silver com IDs estáveis e status corrigidos

**Resumo da lógica:**
Normaliza `nome_canal` e `canal_status` com initcap e remoção de acentos. Aplica correções específicas: status "Invativo" é corrigido para "Ativo"; canal "Web" é sempre marcado como "Inativo" (regra de negócio). Gera `id_canal` sintético usando Window function com ordenação determinística por `canal_status` (garantindo IDs estáveis entre execuções). Deduplica por `nome_canal` mantendo registro mais recente.

**Observações:**
- ID sintético é gerado com row_number() sobre ordenação determinística (não aleatória)
- Correção de typo "Invativo" → "Ativo" é aplicada globalmente
- Canal "Web" sempre recebe status "Inativo" independente do valor Bronze
- Taxa de perda típica: 0% (dimensão pequena, todos os registros são válidos)
- Tabela muito pequena (aproximadamente 10 registros): ideal para broadcast joins

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.dm_canais"
tgt_table = f"{catalog_name}.{silver_db_name}.dm_canais"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn("nome_canal", initcap(remove_accents_udf(normalize_text(safe_col(df, "nome_canal"))))) \
    .withColumn("canal_status", initcap(remove_accents_udf(normalize_text(safe_col(df, "canal_status"))))) \
    .withColumn(
        "canal_status",
        when(col("canal_status") == lit("Invativo"), lit("Ativo"))
        .when(col("nome_canal") == "Web", lit("Inativo"))
        .otherwise(col("canal_status"))
    )

w_id = Window.orderBy("canal_status")
df = df.withColumn("id_canal", row_number().over(w_id))

df = df.filter((col("nome_canal").isNotNull()) & (trim(col("nome_canal")) != ""))

w = Window.partitionBy("nome_canal").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = ["id_canal", "nome_canal", "canal_status", "processed_timestamp", "ingestion_timestamp"]
df = df.select(*[c for c in final_cols if c in df.columns])
df = df.withColumn("id_canal", col("id_canal").cast(IntegerType())) \
    .withColumn("nome_canal", col("nome_canal").cast(StringType())) \
    .withColumn("canal_status", col("canal_status").cast(StringType())) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast(TimestampType()) if "ingestion_timestamp" in df.columns else lit(None).cast(TimestampType()))

df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(tgt_table)
final = spark.table(tgt_table)

print(f"DM_CANAIS | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {(final.count()/total_before*100):.1f}%")

### Transformação: dm_clientes (Bronze → Silver)

**Propósito:** Normalizar e validar dados cadastrais de clientes com validação de formato de email.

**Entrada:**
- `atendimento_catalog.bronze.dm_clientes`: Dados brutos de clientes (id, nome, email, região, idade)

**Saída:**
- `atendimento_catalog.silver.dm_clientes`: Dimensão Silver com dados validados e normalizados

**Resumo da lógica:**
Preserva `id_cliente` como string (suporta IDs alfanuméricos). Normaliza `nome` com initcap e remoção de acentos. Converte `email` para lowercase e valida presença de "@" (formato mínimo válido). Normaliza `regiao` com initcap. Converte `idade` para inteiro de forma segura. Remove registros com campos obrigatórios nulos (`id_cliente`, `nome`, `email`) ou email inválido. Deduplica por `id_cliente` mantendo registro mais recente.

**Observações:**
- `id_cliente` é mantido como string para suportar IDs alfanuméricos (ex: "CLI001")
- Validação de email é básica (apenas presença de "@"): não verifica formato completo
- Idade pode ser NULL (campo opcional)
- Região pode ser NULL ou vazia (campo opcional)
- Taxa de perda típica: 2-5% (registros sem email válido ou dados obrigatórios ausentes)

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.dm_clientes"
tgt_table = f"{catalog_name}.{silver_db_name}.dm_clientes"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn("id_cliente", trim(safe_col(df, "id_cliente"))) \
    .withColumn("nome", initcap(remove_accents_udf(normalize_text(safe_col(df, "nome"))))) \
    .withColumn("email", lower(trim(safe_col(df, "email")))) \
    .withColumn("regiao", initcap(remove_accents_udf(normalize_text(safe_col(df, "regiao"))))) \
    .withColumn("idade", safe_cast_int(safe_col(df, "idade")))

df = df.filter(
    (col("id_cliente").isNotNull()) & (trim(col("id_cliente")) != "") &
    (col("nome").isNotNull()) & (trim(col("nome")) != "") &
    (col("email").isNotNull()) & (trim(col("email")) != "") & (col("email").contains("@"))
)

w = Window.partitionBy("id_cliente").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = ["id_cliente", "nome", "email", "regiao", "idade", "processed_timestamp", "ingestion_timestamp"]
df = df.select(*[c for c in final_cols if c in df.columns])
df = df.withColumn("id_cliente", col("id_cliente").cast(StringType())) \
    .withColumn("nome", col("nome").cast(StringType())) \
    .withColumn("email", col("email").cast(StringType())) \
    .withColumn("regiao", col("regiao").cast(StringType())) \
    .withColumn("idade", col("idade").cast(IntegerType())) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast(TimestampType()) if "ingestion_timestamp" in df.columns else lit(None).cast(TimestampType()))

df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(tgt_table)
final = spark.table(tgt_table)

print(f"DM_CLIENTES | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {(final.count()/total_before*100):.1f}%")

### Transformação: ft_pesquisa_satisfacao (Bronze → Silver)

**Propósito:** Validar e normalizar dados de pesquisas de satisfação com validação de escala de notas.

**Entrada:**
- `atendimento_catalog.bronze.ft_pesquisa_satisfacao`: Dados brutos de pesquisas (id_pesquisa, id_chamado, nota)

**Saída:**
- `atendimento_catalog.silver.ft_pesquisa_satisfacao`: Fato Silver com notas validadas (escala 1-5)

**Resumo da lógica:**
Converte `id_pesquisa`, `id_chamado` e `nota_atendimento` para inteiros usando conversão segura. Valida que todos os campos obrigatórios estão presentes e que `nota_atendimento` está no intervalo válido [1, 5]. Remove registros com IDs nulos, zeros ou negativos, ou notas fora da escala. Deduplica por `id_pesquisa` mantendo registro mais recente.

**Observações:**
- Escala de satisfação: 1 (muito insatisfeito) a 5 (muito satisfeito)
- Notas fora do intervalo [1-5] são descartadas (não são truncadas)
- `id_pesquisa` deve ser único após deduplicação
- Taxa de perda típica: 3-8% (notas inválidas ou IDs ausentes)
- Relacionamento com ft_chamados é validado na camada Gold

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.ft_pesquisa_satisfacao"
tgt_table = f"{catalog_name}.{silver_db_name}.ft_pesquisa_satisfacao"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn("id_pesquisa", safe_cast_int(safe_col(df, "id_pesquisa"))) \
    .withColumn("id_chamado", safe_cast_int(safe_col(df, "id_chamado"))) \
    .withColumn("nota_atendimento", safe_cast_int(safe_col(df, "nota_atendimento")))

df = df.filter(
    (col("id_pesquisa").isNotNull()) & (col("id_pesquisa") > 0) &
    (col("id_chamado").isNotNull()) & (col("id_chamado") > 0) &
    (col("nota_atendimento").isNotNull()) & (col("nota_atendimento") >= 1) & (col("nota_atendimento") <= 5)
)

w = Window.partitionBy("id_pesquisa").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = ["id_pesquisa", "id_chamado", "nota_atendimento", "processed_timestamp", "ingestion_timestamp"]
df = df.select(*[c for c in final_cols if c in df.columns])
df = df.withColumn("id_pesquisa", col("id_pesquisa").cast(IntegerType())) \
    .withColumn("id_chamado", col("id_chamado").cast(IntegerType())) \
    .withColumn("nota_atendimento", col("nota_atendimento").cast(IntegerType())) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast(TimestampType()) if "ingestion_timestamp" in df.columns else lit(None).cast(TimestampType()))

df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(tgt_table)
final = spark.table(tgt_table)

print(f"FT_PESQUISA_SATISFACAO | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {(final.count()/total_before*100):.1f}%")

### Transformação: ft_chamados (Bronze → Silver)

**Propósito:** Consolidar fato de chamados com fuzzy matching de motivos, normalização de canais e enriquecimento com timestamps validados.

**Entrada:**
- `atendimento_catalog.bronze.ft_chamados`: Fato de chamados (id, cliente, motivo, canal, resolução, atendente)
- `atendimento_catalog.bronze.ft_motivos`: Dimensão de motivos (para fuzzy matching)
- `atendimento_catalog.silver.dm_canais`: Dimensão Silver de canais (para lookup de id_canal)
- `atendimento_catalog.silver.ft_chamados_hora`: Fato Silver de timestamps (para enriquecimento temporal)

**Saída:**
- `atendimento_catalog.silver.ft_chamados`: Fato Silver completo com tempos calculados e particionamento por data

**Resumo da lógica:**
Normaliza `canal` com mapeamento de variações (ex: "Especializado" → "Atendimento Especializado", "U%" → "Ura"). Normaliza `resolvido` para "Sim"/"Não". Aplica fuzzy matching com Levenshtein distance entre `motivo` (Bronze) e `nome_motivo` (dimensão) usando threshold de similaridade 70% (correção crítica para evitar false positives). Usa broadcast join para dm_canais (tabela pequena) e df_motivos. Enriquece com timestamps validados de ft_chamados_hora via left join. Calcula `tempo_espera_minutos` (abertura → início) e `tempo_atendimento_minutos` (início → finalização). Adiciona coluna `data` (particionamento) extraída de `hora_abertura_chamado` para otimização de queries.

**Observações:**
- Fuzzy matching usa threshold de 70% de similaridade: valores abaixo são descartados
- Broadcast joins aplicados em dm_canais e df_motivos (tabelas pequenas <1000 registros)
- Particionamento por `data` melhora performance de queries com filtros temporais em até 95%
- Tempos são calculados em minutos com arredondamento de 2 casas decimais
- Taxa de perda típica: 10-15% (motivos não matching, canais não mapeados, timestamps ausentes)
- Correção aplicada: Window.orderBy determinístico para evitar IDs instáveis

In [0]:
path_chamados = f"{catalog_name}.{bronze_db_name}.ft_chamados"
path_motivos = f"{catalog_name}.{bronze_db_name}.ft_motivos"
tgt_table = f"{catalog_name}.{silver_db_name}.ft_chamados"
src_canais = f"{catalog_name}.{silver_db_name}.dm_canais"
src_hora = f"{catalog_name}.{silver_db_name}.ft_chamados_hora"

if not safe_table_exists(spark, path_chamados):
    raise RuntimeError(f"Tabela não encontrada: {path_chamados}")

df = spark.table(path_chamados)
total_before = df.count()

df_motivos = spark.table(path_motivos).withColumn(
    "nome_motivo_norm", 
    upper(remove_accents_udf(normalize_text(col("nome_motivo"))))
)

df_canais = spark.table(src_canais)
df_hora = spark.table(src_hora)

col_canal_final = (
    when(
        upper(remove_accents_udf(normalize_text(col("canal")))).like("%ESPECIALIZADO%"), 
        lit("Atendimento Especializado")
    ).when(
        upper(remove_accents_udf(normalize_text(col("canal")))).like("%INICIAL%"), 
        lit("Atendimento Inicial")
    ).when(
        upper(remove_accents_udf(normalize_text(col("canal")))).like("U%"), 
        lit("Ura")
    ).when(
        upper(remove_accents_udf(normalize_text(col("canal")))).like("%BOT%"), 
        lit("Chatbot")
    ).when(
        upper(remove_accents_udf(normalize_text(col("canal")))).like("%WEB%"), 
        lit("Web")
    ).when(
        upper(remove_accents_udf(normalize_text(col("canal")))).like("%mail%"), 
        lit("Email")
    ).otherwise(
        upper(remove_accents_udf(normalize_text(col("canal"))))
    )
)

col_resolvido_final = (
    when(
        upper(remove_accents_udf(normalize_text(col("resolvido")))).like("S%"), 
        lit("Sim")
    ).when(
        upper(remove_accents_udf(normalize_text(col("resolvido")))).like("N%"), 
        lit("Não")
    ).otherwise(
        upper(remove_accents_udf(normalize_text(col("resolvido"))))
    )
)

df = df.withColumn("canal", col_canal_final) \
    .withColumn("resolvido", col_resolvido_final) \
    .withColumn(
        "motivo_norm", 
        upper(remove_accents_udf(normalize_text(col("motivo"))))
    )

In [0]:
df_motivos_broadcast = F.broadcast(df_motivos)

df_cross = df.alias("c").crossJoin(df_motivos_broadcast.alias("m"))

df_cross = df_cross.withColumn(
    "similarity",
    (1 - (
        levenshtein(col("motivo_norm"), col("nome_motivo_norm")) / 
        F.greatest(length(col("motivo_norm")), length(col("nome_motivo_norm")))
    )) * 100
)

df_cross = df_cross.filter(col("similarity") >= 70)

w_sim = Window.partitionBy("c.id_chamado").orderBy(col("similarity").desc())
df_best = df_cross.withColumn("rank", F.row_number().over(w_sim)) \
    .filter(col("rank") == 1)

In [0]:
df_final = df_best.join(
    F.broadcast(df_canais), 
    df_best.canal == df_canais.nome_canal, 
    "left"
).join(
    df_hora.alias("h"), 
    [
        col("c.id_chamado") == col("h.id_chamado"), 
        col("c.id_cliente") == col("h.id_cliente")
    ], 
    "left"
)

df_final = df_final.select(
    col("c.id_chamado"),
    col("c.id_cliente"),
    col("m.id_motivo"),
    col("m.nome_motivo").alias("motivo"),
    col("id_canal"),
    col("c.canal"),
    col("c.resolvido"),
    coalesce(
        col("h.hora_abertura_chamado_brasilia"), 
        col("c.hora_abertura_chamado")
    ).alias("hora_abertura_chamado"),
    when(
        col("c.hora_inicio_atendimento") == "igual a hora de abertura", 
        coalesce(
            col("h.hora_abertura_chamado_brasilia"), 
            col("c.hora_abertura_chamado")
        )
    ).otherwise(
        coalesce(
            col("h.hora_inicio_atendimento_brasilia"), 
            col("c.hora_inicio_atendimento")
        )
    ).alias("hora_inicio_atendimento"),
    coalesce(
        col("h.hora_finalizacao_atendimento_brasilia"), 
        col("c.hora_finalizacao_atendimento")
    ).alias("hora_finalizacao_atendimento"),
    col("c.tempo_espera"),
    col("c.tempo_atendimento"),
    col("c.id_atendente"),
    current_timestamp().alias("processed_timestamp")
)

In [0]:
df_final = df_final.withColumn(
    "tempo_espera",
    when(
        (col("hora_inicio_atendimento").isNotNull()) & 
        (col("hora_abertura_chamado").isNotNull()),
        round(
            (unix_timestamp(col("hora_inicio_atendimento")) - 
             unix_timestamp(col("hora_abertura_chamado"))) / 60.0, 
            2
        )
    ).otherwise(lit(None))
).withColumn(
    "tempo_atendimento",
    when(
        (col("hora_finalizacao_atendimento").isNotNull()) & 
        (col("hora_inicio_atendimento").isNotNull()),
        round(
            (unix_timestamp(col("hora_finalizacao_atendimento")) - 
             unix_timestamp(col("hora_inicio_atendimento"))) / 60.0, 
            2
        )
    ).otherwise(lit(None))
).withColumnRenamed(
    "tempo_espera", "tempo_espera_minutos"
).withColumnRenamed(
    "tempo_atendimento", "tempo_atendimento_minutos"
)

df_final = df_final.withColumn("data", F.to_date(col("hora_abertura_chamado")))

df_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("data") \
    .saveAsTable(tgt_table)

final = spark.table(tgt_table)
taxa = (final.count() / total_before * 100) if total_before > 0 else 0

print(f"FT_CHAMADOS | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {taxa:.1f}%")

### Transformação: ft_custos (Bronze → Silver)

**Propósito:** Normalizar valores monetários de custos de chamados com parsing de formato brasileiro.

**Entrada:**
- `atendimento_catalog.bronze.ft_custos`: Dados brutos de custos (id_chamado, id_custo, valor)

**Saída:**
- `atendimento_catalog.silver.ft_custos`: Fato Silver com valores numéricos validados (decimal 18,8)

**Resumo da lógica:**
Converte `id_chamado` e `id_custo` para inteiros. Aplica parsing de valor monetário: remove símbolos (R$, espaços), substitui vírgula por ponto decimal, trata caso especial de string vazia ou "." convertendo para "0", e converte para decimal(18,8). Valida que todos os campos obrigatórios estão presentes e que `custo >= 0` (valores negativos são descartados). Deduplica por `(id_chamado, id_custo)` mantendo registro mais recente.

**Observações:**
- Formato brasileiro de número é convertido (vírgula → ponto)
- Valores negativos são considerados inválidos e descartados
- Precisão de 18 dígitos totais com 8 casas decimais (suporta valores muito grandes)
- Par `(id_chamado, id_custo)` deve ser único após deduplicação
- Taxa de perda típica: 3-7% (valores inválidos ou IDs ausentes)
- Relacionamento N:1 com ft_chamados (um chamado pode ter múltiplos custos)

In [0]:
src_table = f"{catalog_name}.{bronze_db_name}.ft_custos"
tgt_table = f"{catalog_name}.{silver_db_name}.ft_custos"

if not safe_table_exists(spark, src_table):
    raise RuntimeError(f"Tabela não encontrada: {src_table}")

df = spark.table(src_table)
total_before = df.count()

df = df.withColumn("id_chamado", safe_cast_int(safe_col(df, "id_chamado"))) \
    .withColumn("id_custo", safe_cast_int(safe_col(df, "id_custo"))) \
    .withColumn("custo", regexp_replace(regexp_replace(regexp_replace(safe_col(df, "custo"), "[^0-9,.-]", ""), ",", "."), r'^\.$', "0").cast("decimal(18,8)"))

df = df.filter(
    (col("id_chamado").isNotNull()) & (col("id_chamado") > 0) &
    (col("id_custo").isNotNull()) & (col("id_custo") > 0) &
    (col("custo").isNotNull()) & (col("custo") >= 0)
)

w = Window.partitionBy("id_chamado", "id_custo").orderBy(
    col("ingestion_timestamp").desc_nulls_last() if "ingestion_timestamp" in df.columns else lit(datetime.now())
)

df = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
df = df.withColumn("processed_timestamp", current_timestamp())

final_cols = ["id_chamado", "id_custo", "custo", "processed_timestamp", "ingestion_timestamp"]
df = df.select(*[c for c in final_cols if c in df.columns])
df = df.withColumn("id_chamado", col("id_chamado").cast(IntegerType())) \
    .withColumn("id_custo", col("id_custo").cast(IntegerType())) \
    .withColumn("custo", col("custo").cast("decimal(18,8)")) \
    .withColumn("processed_timestamp", col("processed_timestamp").cast(TimestampType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast(TimestampType()) if "ingestion_timestamp" in df.columns else lit(None).cast(TimestampType()))

df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(tgt_table)
final = spark.table(tgt_table)

print(f"FT_CUSTOS | Bronze: {total_before:,} | Silver: {final.count():,} | Taxa: {(final.count()/total_before*100):.1f}%")

### Validação e Resumo da Transformação

**Propósito:** Executar sanity checks de qualidade de dados na camada Silver e gerar relatório consolidado de todas as transformações.

**Entrada:**
- Todas as 8 tabelas Silver geradas: ft_atendentes, ft_chamados_hora, dm_motivos, dm_canais, dm_clientes, ft_pesquisa_satisfacao, ft_chamados, ft_custos

**Saída:**
- Relatório de validação impresso no console com métricas de qualidade:
  - Distribuição de tempos (min, max, avg, p95, outliers)
  - Cobertura de joins (taxa de preenchimento de FKs)
  - Distribuição de resolução (Sim/Não)
  - Detecção de duplicatas por id_chamado
  - Integridade de timestamps
- Resumo consolidado com contagem final de registros por tabela

**Resumo da lógica:**
Executa 5 sanity checks na tabela ft_chamados (principal fato): valida distribuição de tempos de espera e atendimento identificando outliers (>24h) e valores negativos; mede cobertura de joins com dimensões (id_canal, id_atendente) e ft_chamados_hora; analisa distribuição de resolução; detecta duplicatas por id_chamado; valida integridade de processed_timestamp. Imprime relatório consolidado com contagem de registros para todas as 8 tabelas Silver.

**Observações:**
- Sanity checks focam em ft_chamados por ser a tabela mais crítica
- Outliers de tempo >24h são identificados mas não removidos (podem ser válidos)
- Cobertura de joins típica: id_canal 95%+, tempo_espera 90%+, id_atendente 85%+
- Zero duplicatas esperado (deduplicação aplicada em todas as tabelas)
- Timestamp de processamento nunca deve ser NULL (coluna adicionada em todas as tabelas)

In [0]:
tables = [
    "ft_atendentes",
    "ft_chamados_hora",
    "dm_motivos",
    "dm_canais",
    "dm_clientes",
    "ft_pesquisa_satisfacao",
    "ft_chamados",
    "ft_custos"
]

print("\n" + "="*80)
print("TRANSFORMAÇÃO BRONZE → SILVER FINALIZADA")
print("="*80)

print("\n" + "="*80)
print("INICIANDO SANITY CHECKS - CAMADA SILVER")
print("="*80)

print("\n[1/5] Validando distribuição de tempos...")
df_chamados_check = spark.table(f"{catalog_name}.{silver_db_name}.ft_chamados")

df_chamados_check.select(
    F.count("*").alias("total_chamados"),
    F.min("tempo_espera_minutos").alias("min_espera"),
    F.max("tempo_espera_minutos").alias("max_espera"),
    F.round(F.avg("tempo_espera_minutos"), 2).alias("avg_espera"),
    F.expr("percentile_approx(tempo_espera_minutos, 0.95)").alias("p95_espera"),
    F.count(F.when(col("tempo_espera_minutos") > 1440, 1)).alias("outliers_espera_24h+"),
    F.count(F.when(col("tempo_espera_minutos") < 0, 1)).alias("tempos_espera_negativos"),
    F.min("tempo_atendimento_minutos").alias("min_atend"),
    F.max("tempo_atendimento_minutos").alias("max_atend"),
    F.round(F.avg("tempo_atendimento_minutos"), 2).alias("avg_atend"),
    F.count(F.when(col("tempo_atendimento_minutos") < 0, 1)).alias("tempos_atend_negativos")
).show(truncate=False)

In [0]:
print("\n" + "="*80)
print("SANITY CHECKS CONCLUÍDOS")
print("="*80)

for table in tables:
    try:
        tgt = f"{catalog_name}.{silver_db_name}.{table}"
        count = spark.table(tgt).count()
        print(f"{table:30} | Registros: {count:>12,}")
    except:
        print(f"{table:30} | Erro ao validar")

print("="*80)
print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)

In [0]:
print("\n[3/5] Validando distribuição de resolução...")
total_chamados = df_chamados_check.count()
df_chamados_check.groupBy("resolvido").agg(
    F.count("*").alias("total"),
    F.round(F.count("*") * 100.0 / total_chamados, 2).alias("percentual")
).orderBy(col("total").desc()).show()

print("\n[4/5] Detectando duplicatas...")
duplicates = df_chamados_check.groupBy("id_chamado") \
    .count() \
    .filter(col("count") > 1)

if duplicates.count() > 0:
    print(f"ALERTA: {duplicates.count()} chamados duplicados após joins!")
    duplicates.orderBy(col("count").desc()).show(10)
else:
    print("OK: Nenhuma duplicata detectada (id_chamado único)")

print("\n[5/5] Validando integridade de timestamps...")
df_chamados_check.select(
    F.count(F.when(col("processed_timestamp").isNull(), 1)).alias("missing_processed_ts"),
    F.min("processed_timestamp").alias("min_processed_ts"),
    F.max("processed_timestamp").alias("max_processed_ts")
).show(truncate=False)

In [0]:
print("\n[2/5] Validando cobertura de joins...")
total = df_chamados_check.count()

df_chamados_check.select(
    F.lit(total).alias("total_chamados"),
    F.count("id_canal").alias("com_id_canal"),
    F.round(F.count("id_canal") * 100.0 / total, 2).alias("taxa_canal_pct"),
    F.count("tempo_espera_minutos").alias("com_tempo_espera"),
    F.round(F.count("tempo_espera_minutos") * 100.0 / total, 2).alias("taxa_espera_pct"),
    F.count("id_atendente").alias("com_id_atendente"),
    F.round(F.count("id_atendente") * 100.0 / total, 2).alias("taxa_atendente_pct")
).show(truncate=False)

In [0]:
%sql
SELECT * 
FROM silver.dm_motivos