# Bronze to Silver - Dimensões


In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, trim, upper, lower, regexp_replace, to_date,
    when, coalesce, lit, current_timestamp, row_number
)
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from datetime import datetime

In [0]:
# Paths
STORAGE_ACCOUNT = "mystoacc"
BRONZE_PATH = f"abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net"
SILVER_PATH = f"abfss://silver@{STORAGE_ACCOUNT}.dfs.core.windows.net"

# Catalog e Database
spark.sql("USE CATALOG hive_metastore")
spark.sql("CREATE DATABASE IF NOT EXISTS healthcare_silver LOCATION 'abfss://silver@mystoacc.dfs.core.windows.net/'")
spark.sql("USE healthcare_silver")

# Timestamp de processamento
PROCESSING_TIMESTAMP = datetime.now()
PROCESSING_DATE = PROCESSING_TIMESTAMP.strftime("%Y-%m-%d")

print(f"Processamento iniciado: {PROCESSING_TIMESTAMP}")

## Funções de Qualidade

In [0]:
def add_silver_metadata(df: DataFrame) -> DataFrame:
    """Adiciona colunas de metadados da Silver"""
    return df.withColumn("silver_processed_at", lit(PROCESSING_TIMESTAMP)) \
             .withColumn("silver_processing_date", lit(PROCESSING_DATE))

def remove_duplicates(df: DataFrame, key_columns: list, table_name: str) -> DataFrame:
    """Remove duplicatas mantendo o registro mais recente"""
    window_spec = Window.partitionBy(key_columns).orderBy(col("ingestion_timestamp").desc())
    
    df_dedup = df.withColumn("row_num", row_number().over(window_spec)) \
                 .filter(col("row_num") == 1) \
                 .drop("row_num")
    
    return df_dedup



## Transformações por Dimensão

In [0]:
def log_transformation(table_name: str, input_count: int, output_count: int):
    """Loga métricas de transformação"""
    rejected = input_count - output_count
    rejection_rate = (rejected / input_count * 100) if input_count > 0 else 0
    
    print(f"\n{table_name}:")
    print(f"  Input: {input_count} registros")
    print(f"  Output: {output_count} registros")
    print(f"  Rejeitados: {rejected} ({rejection_rate:.2f}%)")

# Transformações ajustadas para o schema real
def transform_dim_data(df: DataFrame) -> DataFrame:
    """
    Transformações dim_data:
    - Validar data_completa
    - Validar ano/mes/dia numéricos
    """
    df_transformed = df.select(
        col("sk_data").cast("int"),
        to_date(col("data_completa"), "yyyy-MM-dd").alias("data_completa"),
        col("ano").cast("int"),
        col("mes").cast("int"),
        col("dia").cast("int"),
        col("ingestion_timestamp")
    )
    
    # Remover registros com data inválida
    df_transformed = df_transformed.filter(
        col("data_completa").isNotNull() &
        col("ano").isNotNull() &
        col("mes").between(1, 12) &
        col("dia").between(1, 31)
    )
    
    return df_transformed

def transform_dim_clinica(df: DataFrame) -> DataFrame:
    """
    Transformações dim_clinica:
    - Padronizar tipo_clinica e estado (uppercase, trim)
    """
    df_transformed = df.select(
        col("sk_clinica").cast("int"),
        upper(trim(col("tipo_clinica"))).alias("tipo_clinica"),
        upper(trim(col("estado"))).alias("estado"),
        col("ingestion_timestamp")
    )
    
    return df_transformed

def transform_dim_medico(df: DataFrame) -> DataFrame:
    """
    Transformações dim_medico:
    - Padronizar especialidade e uf_crm (uppercase)
    - Validar ativo (0 ou 1)
    """
    df_transformed = df.select(
        col("sk_medico").cast("int"),
        upper(trim(col("especialidade"))).alias("especialidade"),
        upper(trim(col("uf_crm"))).alias("uf_crm"),
        coalesce(col("ativo").cast("int"), lit(1)).alias("ativo"),
        col("ingestion_timestamp")
    )
    
    # Ativo só pode ser 0 ou 1
    df_transformed = df_transformed.withColumn(
        "ativo",
        when(col("ativo").isin([0, 1]), col("ativo")).otherwise(1)
    )
    
    return df_transformed

def transform_dim_diagnostico(df: DataFrame) -> DataFrame:
    """
    Transformações dim_diagnostico:
    - Padronizar codigo_cid e descricao_cid (uppercase)
    - Limpar codigo_cid (sem espaços)
    """
    df_transformed = df.select(
        col("sk_diagnostico").cast("int"),
        upper(trim(regexp_replace(col("codigo_cid"), r"\s+", ""))).alias("codigo_cid"),
        upper(trim(col("descricao_cid"))).alias("descricao_cid"),
        col("ingestion_timestamp")
    )
    
    return df_transformed

def transform_dim_exame(df: DataFrame) -> DataFrame:
    """
    Transformações dim_exame:
    - Padronizar tipo_exame e categoria_exame (uppercase)
    """
    df_transformed = df.select(
        col("sk_exame").cast("int"),
        upper(trim(col("tipo_exame"))).alias("tipo_exame"),
        upper(trim(col("categoria_exame"))).alias("categoria_exame"),
        col("ingestion_timestamp")
    )
    
    return df_transformed

def transform_dim_paciente(df: DataFrame) -> DataFrame:
    """
    Transformações dim_paciente:
    - Validar token (obrigatório)
    - Padronizar sexo (M/F/O)
    - Validar ano_nascimento (1900-2026)
    - Padronizar cidade (uppercase)
    """
    current_year = datetime.now().year
    
    df_transformed = df.select(
        col("sk_paciente").cast("int"),
        trim(col("token_paciente")).alias("token_paciente"),
        upper(trim(col("sexo"))).alias("sexo_raw"),
        col("ano_nascimento").cast("int"),
        upper(trim(col("cidade"))).alias("cidade"),
        col("data_cadastro").cast("date"),
        col("ingestion_timestamp")
    )
    
    # Padronizar sexo
    df_transformed = df_transformed.withColumn(
        "sexo",
        when(col("sexo_raw").isin(["M", "MASCULINO", "MALE"]), "M")
        .when(col("sexo_raw").isin(["F", "FEMININO", "FEMALE"]), "F")
        .otherwise("O")
    ).drop("sexo_raw")
    
    # Validar ano de nascimento
    df_transformed = df_transformed.withColumn(
        "ano_nascimento",
        when(
            (col("ano_nascimento") >= 1900) & (col("ano_nascimento") <= current_year),
            col("ano_nascimento")
        ).otherwise(None)
    )
    
    # Token é obrigatório
    df_transformed = df_transformed.filter(col("token_paciente").isNotNull())
    
    return df_transformed

## Pipeline de Processamento

In [0]:
def process_dimension_to_silver(
    table_name: str,
    transformation_func,
    key_column: str
) -> dict:
    """
    Pipeline completo Bronze → Silver para uma dimensão
    
    Args:
        table_name: Nome da tabela
        transformation_func: Função de transformação
        key_column: Coluna chave para deduplicação e merge
    
    Returns:
        Dict com métricas do processamento
    """
    try:
        print(f"\n{'='*60}")
        print(f"Processando: {table_name}")
        print('='*60)
        
        # 1. Ler da Bronze
        bronze_path = f"{BRONZE_PATH}/{table_name}"
        df_bronze = spark.read.format("delta").load(bronze_path)
        
        input_count = df_bronze.count()
        print(f"Lidos da Bronze: {input_count} registros")
        
        # 2. Aplicar transformações
        df_transformed = transformation_func(df_bronze)
        
        # 3. Remover duplicatas
        df_transformed = remove_duplicates(df_transformed, [key_column], table_name)
        
        # 4. Adicionar metadados Silver
        df_transformed = add_silver_metadata(df_transformed)
        
        output_count = df_transformed.count()
        log_transformation(table_name, input_count, output_count)
        
        # 5. Escrever na Silver com MERGE (upsert)
        silver_path = f"{SILVER_PATH}/{table_name}"
        
        if DeltaTable.isDeltaTable(spark, silver_path):
            # Merge incremental
            delta_table = DeltaTable.forPath(spark, silver_path)
            
            delta_table.alias("target").merge(
                df_transformed.alias("source"),
                f"target.{key_column} = source.{key_column}"
            ).whenMatchedUpdateAll() \
             .whenNotMatchedInsertAll() \
             .execute()
            
            print(f"MERGE executado com sucesso")
            
        else:
            # Primeira carga
            df_transformed.write.format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .saveAsTable(f"healthcare_silver.{table_name}")
            
            # Registrar no catalog
            spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {table_name}
            USING DELTA
            LOCATION '{silver_path}'
            """)
            
            print(f"Tabela criada com sucesso")
        
        return {
            "table": table_name,
            "status": "SUCCESS",
            "input_count": input_count,
            "output_count": output_count,
            "rejected": input_count - output_count
        }
        
    except Exception as e:
        print(f"ERRO: {str(e)}")
        return {
            "table": table_name,
            "status": "FAILED",
            "error": str(e)
        }



## Execução

In [0]:
# Configuração de dimensões
dimensions_config = [
    {"table": "dim_data", "transformation": transform_dim_data, "key": "sk_data"},
    {"table": "dim_clinica", "transformation": transform_dim_clinica, "key": "sk_clinica"},
    {"table": "dim_medico", "transformation": transform_dim_medico, "key": "sk_medico"},
    {"table": "dim_diagnostico", "transformation": transform_dim_diagnostico, "key": "sk_diagnostico"},
    {"table": "dim_exame", "transformation": transform_dim_exame, "key": "sk_exame"},
    {"table": "dim_paciente", "transformation": transform_dim_paciente, "key": "sk_paciente"}
]

# Processar todas as dimensões
results = []
for config in dimensions_config:
    result = process_dimension_to_silver(
        table_name=config["table"],
        transformation_func=config["transformation"],
        key_column=config["key"]
    )
    results.append(result)



In [0]:
## Sumário de Execução

In [0]:
import pandas as pd

# Exibir resultados
results_df = pd.DataFrame(results)
print("\n" + "="*80)
print("SUMÁRIO DE EXECUÇÃO - BRONZE TO SILVER")
print("="*80)
display(results_df)

# Estatísticas
total_tables = len(results)
success_count = len([r for r in results if r['status'] == 'SUCCESS'])
failed_count = len([r for r in results if r['status'] == 'FAILED'])

if success_count > 0:
    total_input = sum([r.get('input_count', 0) for r in results if r['status'] == 'SUCCESS'])
    total_output = sum([r.get('output_count', 0) for r in results if r['status'] == 'SUCCESS'])
    total_rejected = sum([r.get('rejected', 0) for r in results if r['status'] == 'SUCCESS'])
    
    print(f"\nTabelas processadas: {success_count}/{total_tables}")
    print(f"Total de registros input: {total_input:,}")
    print(f"Total de registros output: {total_output:,}")
    print(f"Total rejeitado: {total_rejected:,} ({total_rejected/total_input*100:.2f}%)")

if failed_count > 0:
    print(f"\nALERTA: {failed_count} tabelas falharam!")

print("\n" + "="*80)