In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

In [0]:
# Mapeamento do Mês
mes_map = { 1: "january", 2: "february", 3: "march", 4: "april", 5: "may", 6: "june", 7: "july", 8: "august", 9: "september", 10: "october", 11: "november", 12: "december" }
mes_map_inv = {v: k for k, v in mes_map.items()} # Inverte para Mês (string) -> Número (int)

# Cria mapa Spark a partir do dicionário invertido
MES_SPARK_MAP = F.create_map([F.lit(x) for row in mes_map_inv.items() for x in row])

In [0]:
# Definindo o catálogo e schemas
CATALOGO = "catalogo_energia"
SCHEMA_BRONZE = "bronze"
SCHEMA_SILVER = "silver"

In [0]:
# Lendo as tabelas Delta da camada Bronze
df_geral_bronze = spark.read.table(f"{CATALOGO}.{SCHEMA_BRONZE}.consumo_geral")
df_limpa_bronze = spark.read.table(f"{CATALOGO}.{SCHEMA_BRONZE}.energia_limpa")

print(f"Total de linhas (Geral): {df_geral_bronze.count()}")
print(f"Total de linhas (Limpa): {df_limpa_bronze.count()}")

In [0]:
# 1. FILTRAGEM E SEPARAÇÃO DE DADOS
# O termo '2025' é usado para isolar os dados dos EUA.

termo_exclusao = "2025"

# 1.1 Dados Nacionais (2023 e 2024) - Excluindo '2025'
df_geral_nacional = df_geral_bronze.filter(~F.col("nome_arquivo_origem").contains(termo_exclusao))
df_limpa_nacional = df_limpa_bronze.filter(~F.col("nome_arquivo_origem").contains(termo_exclusao))
print(f"Linhas Nacionais (Geral): {df_geral_nacional.count()}")

# 1.2 Dados EUA/2025 (Para Comparativo) - Incluindo '2025'
df_geral_us = df_geral_bronze.filter(F.col("nome_arquivo_origem").contains(termo_exclusao))
df_limpa_us = df_limpa_bronze.filter(F.col("nome_arquivo_origem").contains(termo_exclusao))
print(f"Linhas EUA (Geral): {df_geral_us.count()}")

In [0]:
#Implementação da Limpeza e Padronização (Função Principal)
def limpar_e_padronizar(df):
    
    # --- 1. Padronização de Colunas de Texto Simples ---
    colunas_texto_chave = ["regiao", "cidade", "bairro"]
    for col in colunas_texto_chave:
        # F.trim: remove espaços antes/depois; F.initcap: 'sao paulo' -> 'Sao Paulo'.
        df = df.withColumn(col, F.initcap(F.trim(F.col(col))))

    # --- 2. Correção Mista e Conversão de Mês ---

    # Garante que a coluna mes_de_referencia seja STRING e lowercase
    df = df.withColumn("mes_de_referencia", F.lower(F.col("mes_de_referencia").cast("string")))

    df = df.withColumn(
        "mes_de_referencia_corrigido",
        F.coalesce(
            # Tenta 1: Se for nome do mês (ex: 'july'), usa o mapa spark para converter para número.
            MES_SPARK_MAP[F.col("mes_de_referencia")],
            
            # Tenta 2: Se o mapa retornar NULL (ou seja, já é um número '12'), tenta converter diretamente a string numérica para Integer.
            F.col("mes_de_referencia").cast(IntegerType())
        )
    )

    # Renomeia e Drop da Coluna Temporária
    df = df.drop("mes_de_referencia")
    df = df.withColumnRenamed("mes_de_referencia_corrigido", "mes_de_referencia")

    # --- 3. Tratamento de Nulos e Conversão para Numérico (DoubleType) ---

    colunas_numericas = ["consumo_em_kwh", "valor_da_conta", "valor_de_imposto"]
    df = df.fillna(0, subset=colunas_numericas)
    
    for col in colunas_numericas:
        # Conversão de Tipo (Cast) - Garantindo DoubleType para precisão
        df = df.withColumn(col, F.col(col).cast(DoubleType()))

    # --- 4. Padronização de Nomes de Cliente (Tradução de Negócio) ---

    df = df.withColumn("tipo_de_cliente", 
             F.when(F.col("tipo_de_cliente") == "Residential", "Residencial")
             .when(F.col("tipo_de_cliente") == "Commercial", "Comercial")
             .otherwise(F.col("tipo_de_cliente")))
        
    return df

# Aplicando a função em ambos os DataFrames (Nacional e EUA)
df_geral_nacional_limpo = limpar_e_padronizar(df_geral_nacional)
df_limpa_nacional_limpo = limpar_e_padronizar(df_limpa_nacional)

df_geral_us_limpo = limpar_e_padronizar(df_geral_us)
df_limpa_us_limpo = limpar_e_padronizar(df_limpa_us)

In [0]:
# --- Criação do Indicador e União ---

# 1. Adicionar o indicador de tipo de consumo (necessário para a análise comparativa)
df_geral_com_tipo = df_geral_limpo.withColumn("tipo_de_consumo", F.lit("Geral"))
df_limpa_com_tipo = df_limpa_limpo.withColumn("tipo_de_consumo", F.lit("Limpa"))

# 2. Unificar os DataFrames (Union).
df_unificado_silver = df_geral_com_tipo.unionByName(df_limpa_com_tipo)

print(f"Total de linhas Unificado (Silver): {df_unificado_silver.count()}")

In [0]:
# US-2.2: Criação da Chave de Negócio Única (BK)
# Combinação de localização, período e tipo de cliente.
df_silver_final = df_unificado_silver.withColumn(
    "chave_negocio_bk",
    F.sha2(
        # Concatena os campos chave com um separador '_'
        F.concat_ws("_", 
            F.col("regiao"), 
            F.col("cidade"), 
            F.col("bairro"), 
            F.col("mes_de_referencia"), 
            F.col("tipo_de_cliente"),
            F.col("tipo_de_consumo")
        ), 
        256 # Algoritmo SHA-256
    )
)

In [0]:
# Persistência na Camada Silver
df_silver_final.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA_SILVER}.energia_consumo_silver")

print(f"Dados salvos com sucesso na tabela {CATALOGO}.{SCHEMA_SILVER}.energia_consumo_silver")

# 02 - Processamento da Camada SILVER (Limpeza e Unificação)

## Objetivo
Transformar os dados brutos (Bronze) em um conjunto de dados limpo, padronizado, unificado e rastreável, pronto para ser agregado na camada Gold.

## User Stories Concluídas
* **US-2.1:** Limpeza e Padronização.
* **US-2.2:** Criação da Chave de Negócio e Persistência.
* **US-2.3:** Unificação e Enriquecimento.
* **US-2.4:** Documentação do Processo Silver.

## Transformações Aplicadas
1.  **Leitura:** Dados lidos do catálogo `catalogo_energia.bronze` (tabelas `consumo_geral` e `energia_limpa`).
2.  **Limpeza e Qualidade (US-2.1):**
    * Tratamento de valores `NULL` e uso do campo correto `consumo_em_kwh`.
    * **Tratamento de Mês:** Implementação da lógica `F.coalesce` com `F.create_map` para converter nomes de mês (`'january'`) e números (`'12'`) para **INTEGER** (1 a 12).
    * Padronização de *strings* e tradução de `tipo_de_cliente` (e.g., Residential -> Residencial).
3.  **Unificação e Enriquecimento (US-2.3):**
    * Criação da coluna `tipo_de_consumo` (`Geral` ou `Limpa`) e união dos DataFrames (`unionByName`).
4.  **Chave de Negócio (US-2.2):**
    * Criação da **Chave de Negócio (BK)** via *hashing* (SHA-256) para garantir a rastreabilidade de cada registro unificado.
5.  **Gravação:** Persistência no formato Delta na tabela `catalogo_energia.silver.energia_consumo_silver`.