# üèÜ Gold Layer - KPIs e Agrega√ß√µes CNES
 
## **Objetivo**: Criar datasets anal√≠ticos prontos para consumo
 
## üìä Agrega√ß√µes:
### 1. **Cobertura geogr√°fica** - Estabelecimentos por regi√£o/UF/munic√≠pio
### 2. **An√°lise de capacidade** - Distribui√ß√£o de servi√ßos
### 3. **Indicadores de qualidade** - Scores e classifica√ß√µes
### 4. **An√°lise de complexidade** - Estrutura da rede de sa√∫de
### 5. **Geolocaliza√ß√£o** - Dados para mapas interativos

## 1Ô∏è‚É£ Setup

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
import time

print("‚úÖ Bibliotecas importadas!")

In [0]:
catalog_name = "datasus_project"
schema_silver = "silver"
schema_gold = "gold"

spark.sql(f"USE CATALOG {catalog_name}")
print(f"‚úÖ Catalog: {catalog_name}")

## 2Ô∏è‚É£ Ler Dados Silver

In [0]:
# Ler tabela Silver
df_silver = spark.table(f"{catalog_name}.{schema_silver}.cnes_estabelecimentos_clean")

print("‚úÖ Dados Silver carregados!")
print(f"   Registros: {df_silver.count():,}")

# View Tempor√°ria para Performance
df_silver.createOrReplaceTempView("silver_cnes_estabelecimentos")

## 3Ô∏è‚É£ Agrega√ß√£o 1 - Cobertura Geogr√°fica por Regi√£o


In [0]:
print("üìç CRIANDO: Agrega√ß√£o por Regi√£o")
print("="*70)

# Agrega√ß√£o por regi√£o
df_gold_regiao = df_silver.groupBy("REGIAO") \
    .agg(
        F.count("*").alias("total_estabelecimentos"),
        F.countDistinct("UF_SIGLA").alias("total_estados"),
        F.countDistinct("CO_IBGE").alias("total_municipios"),
        
        # Totais por complexidade
        F.sum(F.when(F.col("COMPLEXIDADE") == "ATEN√á√ÉO B√ÅSICA", 1).otherwise(0)).alias("total_atencao_basica"),
        F.sum(F.when(F.col("COMPLEXIDADE") == "M√âDIA COMPLEXIDADE", 1).otherwise(0)).alias("total_media_complexidade"),
        F.sum(F.when(F.col("COMPLEXIDADE") == "ALTA COMPLEXIDADE", 1).otherwise(0)).alias("total_alta_complexidade"),
        
        # Totais por flags de servi√ßos
        F.sum(F.when(F.col("FLAG_CENTRO_CIRURGICO") == True, 1).otherwise(0)).alias("total_centro_cirurgico"),
        F.sum(F.when(F.col("FLAG_CENTRO_OBSTETRICO") == True, 1).otherwise(0)).alias("total_centro_obstetrico"),
        F.sum(F.when(F.col("FLAG_CENTRO_NEONATAL") == True, 1).otherwise(0)).alias("total_centro_neonatal"),
        F.sum(F.when(F.col("FLAG_ATEND_HOSPITALAR") == True, 1).otherwise(0)).alias("total_atend_hospitalar"),
        
        # M√©dias
        F.round(F.avg("SCORE_CAPACIDADE"), 2).alias("score_medio_capacidade"),
        F.round(F.avg("score_qualidade"), 2).alias("score_medio_qualidade"),
        
        # Com localiza√ß√£o v√°lida
        F.sum(F.when(F.col("LOCALIZACAO_VALIDA") == True, 1).otherwise(0)).alias("total_com_localizacao")
    ) \
    .withColumn("percentual_com_localizacao", 
        F.round((F.col("total_com_localizacao") / F.col("total_estabelecimentos")) * 100, 2)
    ) \
    .withColumn("data_atualizacao", F.current_timestamp())

print("‚úÖ Agrega√ß√£o por regi√£o criada!")
df_gold_regiao.show(truncate=False)

In [0]:
# Salvar
table_gold_regiao = f"{catalog_name}.{schema_gold}.kpi_estabelecimentos_por_regiao"

df_gold_regiao.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_gold_regiao)

print(f"‚úÖ Salvo: {table_gold_regiao}")

## 4Ô∏è‚É£ Agrega√ß√£o 2 - Detalhamento por UF

In [0]:
print("\nüìç CRIANDO: Agrega√ß√£o por UF")
print("="*70)

df_gold_uf = df_silver.groupBy("REGIAO", "UF_SIGLA", "UF_NOME") \
    .agg(
        F.count("*").alias("total_estabelecimentos"),
        F.countDistinct("CO_IBGE").alias("total_municipios"),
        
        # Por complexidade
        F.sum(F.when(F.col("COMPLEXIDADE") == "ATEN√á√ÉO B√ÅSICA", 1).otherwise(0)).alias("atencao_basica"),
        F.sum(F.when(F.col("COMPLEXIDADE") == "M√âDIA COMPLEXIDADE", 1).otherwise(0)).alias("media_complexidade"),
        F.sum(F.when(F.col("COMPLEXIDADE") == "ALTA COMPLEXIDADE", 1).otherwise(0)).alias("alta_complexidade"),
        
        # Por categoria de capacidade
        F.sum(F.when(F.col("CATEGORIA_CAPACIDADE") == "ALTA CAPACIDADE", 1).otherwise(0)).alias("alta_capacidade"),
        F.sum(F.when(F.col("CATEGORIA_CAPACIDADE") == "M√âDIA CAPACIDADE", 1).otherwise(0)).alias("media_capacidade"),
        F.sum(F.when(F.col("CATEGORIA_CAPACIDADE") == "BAIXA CAPACIDADE", 1).otherwise(0)).alias("baixa_capacidade"),
        
        # Servi√ßos especializados
        F.sum(F.when(F.col("FLAG_CENTRO_CIRURGICO") == True, 1).otherwise(0)).alias("com_centro_cirurgico"),
        F.sum(F.when(F.col("FLAG_CENTRO_OBSTETRICO") == True, 1).otherwise(0)).alias("com_centro_obstetrico"),
        F.sum(F.when(F.col("FLAG_ATEND_HOSPITALAR") == True, 1).otherwise(0)).alias("com_atend_hospitalar"),
        F.sum(F.when(F.col("FLAG_ATEND_AMBULATORIAL") == True, 1).otherwise(0)).alias("com_atend_ambulatorial"),
        
        # Qualidade
        F.round(F.avg("score_qualidade"), 2).alias("score_medio_qualidade"),
        F.sum(F.when(F.col("CLASSIFICACAO_QUALIDADE") == "EXCELENTE", 1).otherwise(0)).alias("qualidade_excelente"),
        F.sum(F.when(F.col("CLASSIFICACAO_QUALIDADE") == "BOA", 1).otherwise(0)).alias("qualidade_boa"),
        
        # Localiza√ß√£o
        F.sum(F.when(F.col("LOCALIZACAO_VALIDA") == True, 1).otherwise(0)).alias("com_localizacao_valida")
    ) \
    .withColumn("percentual_localizacao_valida",
        F.round((F.col("com_localizacao_valida") / F.col("total_estabelecimentos")) * 100, 2)
    ) \
    .withColumn("data_atualizacao", F.current_timestamp()) \
    .orderBy("REGIAO", "UF_SIGLA")

print("‚úÖ Agrega√ß√£o por UF criada!")
df_gold_uf.show(10, truncate=False)

In [0]:
# Salvar
table_gold_uf = f"{catalog_name}.{schema_gold}.kpi_estabelecimentos_por_uf"

df_gold_uf.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_gold_uf)

print(f"‚úÖ Salvo: {table_gold_uf}")


## 5Ô∏è‚É£ Agrega√ß√£o 3 - Ranking de Munic√≠pios

In [0]:
print("\nüèôÔ∏è CRIANDO: Top Munic√≠pios com mais estabelecimentos")
print("="*70)

df_gold_municipios = df_silver.groupBy(
    "CO_IBGE", "UF_SIGLA", "UF_NOME", "REGIAO"
) \
    .agg(
        F.count("*").alias("total_estabelecimentos"),
        F.sum(F.when(F.col("FLAG_ATEND_HOSPITALAR") == True, 1).otherwise(0)).alias("com_internacao"),
        F.sum(F.when(F.col("COMPLEXIDADE") == "ALTA COMPLEXIDADE", 1).otherwise(0)).alias("alta_complexidade"),
        F.round(F.avg("SCORE_CAPACIDADE"), 2).alias("score_medio_capacidade")
    ) \
    .withColumn("data_atualizacao", F.current_timestamp())

# Adicionar ranking
window_brasil = Window.orderBy(F.desc("total_estabelecimentos"))
window_uf = Window.partitionBy("UF_SIGLA").orderBy(F.desc("total_estabelecimentos"))

df_gold_municipios = df_gold_municipios \
    .withColumn("ranking_brasil", F.row_number().over(window_brasil)) \
    .withColumn("ranking_uf", F.row_number().over(window_uf))

# Top 50 munic√≠pios
df_gold_top_municipios = df_gold_municipios.filter(F.col("ranking_brasil") <= 50)

print("‚úÖ Top 50 munic√≠pios:")
df_gold_top_municipios.show(20, truncate=False)

In [0]:
# Salvar ranking completo
table_gold_municipios = f"{catalog_name}.{schema_gold}.kpi_estabelecimentos_por_municipio"

df_gold_municipios.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("UF_SIGLA") \
    .saveAsTable(table_gold_municipios)

print(f"‚úÖ Salvo: {table_gold_municipios}")

## 6Ô∏è‚É£ Agrega√ß√£o 4 - An√°lise por Tipo de Estabelecimento

In [0]:
print("\nüè• CRIANDO: An√°lise por Tipo de Estabelecimento")
print("="*70)

df_gold_tipos = df_silver.groupBy("TIPO_ESTABELECIMENTO", "COMPLEXIDADE") \
    .agg(
        F.count("*").alias("total"),
        F.countDistinct("UF_SIGLA").alias("presente_em_ufs"),
        F.round(F.avg("SCORE_CAPACIDADE"), 2).alias("score_medio"),
        F.sum(F.when(F.col("FLAG_ATEND_HOSPITALAR") == True, 1).otherwise(0)).alias("com_internacao"),
        F.sum(F.when(F.col("LOCALIZACAO_VALIDA") == True, 1).otherwise(0)).alias("com_coordenadas")
    ) \
    .withColumn("percentual_com_coordenadas",
        F.round((F.col("com_coordenadas") / F.col("total")) * 100, 2)
    ) \
    .withColumn("data_atualizacao", F.current_timestamp()) \
    .orderBy(F.desc("total"))

print("‚úÖ An√°lise por tipo criada!")
df_gold_tipos.show(20, truncate=False)

In [0]:
# Salvar
table_gold_tipos = f"{catalog_name}.{schema_gold}.kpi_por_tipo_estabelecimento"

df_gold_tipos.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_gold_tipos)

print(f"‚úÖ Salvo: {table_gold_tipos}")

## 7Ô∏è‚É£ Agrega√ß√£o 5 - Dataset para Mapa (Geolocaliza√ß√£o)

In [0]:
print("\nüó∫Ô∏è CRIANDO: Dataset para visualiza√ß√£o em mapa")
print("="*70)

# Filtrar apenas estabelecimentos com localiza√ß√£o v√°lida
df_gold_mapa = df_silver.filter(F.col("LOCALIZACAO_VALIDA") == True) \
    .select(
        "CO_CNES",
        "NO_FANTASIA_LIMPO",
        "TIPO_ESTABELECIMENTO",
        "COMPLEXIDADE",
        "UF_SIGLA",
        "REGIAO",
        "NU_LATITUDE",
        "NU_LONGITUDE",
        "FLAG_ATEND_HOSPITALAR",
        "FLAG_CENTRO_CIRURGICO",
        "FLAG_CENTRO_OBSTETRICO",
        "SCORE_CAPACIDADE",
        "CATEGORIA_CAPACIDADE",
        "CLASSIFICACAO_QUALIDADE"
    ) \
    .withColumn("data_atualizacao", F.current_timestamp())

total_mapa = df_gold_mapa.count()
print(f"‚úÖ Estabelecimentos com coordenadas v√°lidas: {total_mapa:,}")

# Amostra
df_gold_mapa.show(5, truncate=True)

In [0]:
# Salvar
table_gold_mapa = f"{catalog_name}.{schema_gold}.dataset_mapa_estabelecimentos"

df_gold_mapa.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("REGIAO") \
    .saveAsTable(table_gold_mapa)

print(f"‚úÖ Salvo: {table_gold_mapa}")

## 8Ô∏è‚É£ Agrega√ß√£o 6 - KPIs Gerais (Dashboard Principal)

In [0]:
print("\nüìä CRIANDO: KPIs Gerais para Dashboard")
print("="*70)

# Calcular KPIs consolidados
kpis_gerais = df_silver.agg(
    F.count("*").alias("total_estabelecimentos"),
    F.countDistinct("UF_SIGLA").alias("total_ufs"),
    F.countDistinct("CO_IBGE").alias("total_municipios"),
    F.countDistinct("REGIAO").alias("total_regioes"),
    
    # Por complexidade
    F.sum(F.when(F.col("COMPLEXIDADE") == "ATEN√á√ÉO B√ÅSICA", 1).otherwise(0)).alias("total_atencao_basica"),
    F.sum(F.when(F.col("COMPLEXIDADE") == "M√âDIA COMPLEXIDADE", 1).otherwise(0)).alias("total_media_complexidade"),
    F.sum(F.when(F.col("COMPLEXIDADE") == "ALTA COMPLEXIDADE", 1).otherwise(0)).alias("total_alta_complexidade"),
    
    # Servi√ßos
    F.sum(F.when(F.col("FLAG_ATEND_HOSPITALAR") == True, 1).otherwise(0)).alias("total_com_internacao"),
    F.sum(F.when(F.col("FLAG_CENTRO_CIRURGICO") == True, 1).otherwise(0)).alias("total_com_cirurgia"),
    F.sum(F.when(F.col("FLAG_CENTRO_OBSTETRICO") == True, 1).otherwise(0)).alias("total_com_obstetricia"),
    F.sum(F.when(F.col("FLAG_CENTRO_NEONATAL") == True, 1).otherwise(0)).alias("total_com_neonatal"),
    
    # Qualidade
    F.round(F.avg("score_qualidade"), 2).alias("score_medio_qualidade"),
    F.round(F.avg("SCORE_CAPACIDADE"), 2).alias("score_medio_capacidade"),
    
    # Localiza√ß√£o
    F.sum(F.when(F.col("LOCALIZACAO_VALIDA") == True, 1).otherwise(0)).alias("total_com_coordenadas")
).collect()[0]

# Converter para DataFrame
kpis_data = [(
    "GERAL",
    kpis_gerais["total_estabelecimentos"],
    kpis_gerais["total_ufs"],
    kpis_gerais["total_municipios"],
    kpis_gerais["total_regioes"],
    kpis_gerais["total_atencao_basica"],
    kpis_gerais["total_media_complexidade"],
    kpis_gerais["total_alta_complexidade"],
    kpis_gerais["total_com_internacao"],
    kpis_gerais["total_com_cirurgia"],
    kpis_gerais["total_com_obstetricia"],
    kpis_gerais["total_com_neonatal"],
    kpis_gerais["score_medio_qualidade"],
    kpis_gerais["score_medio_capacidade"],
    kpis_gerais["total_com_coordenadas"],
    round((kpis_gerais["total_com_coordenadas"] / kpis_gerais["total_estabelecimentos"]) * 100, 2),
    datetime.now()
)]

schema_kpis = StructType([
    StructField("escopo", StringType()),
    StructField("total_estabelecimentos", LongType()),
    StructField("total_ufs", LongType()),
    StructField("total_municipios", LongType()),
    StructField("total_regioes", LongType()),
    StructField("total_atencao_basica", LongType()),
    StructField("total_media_complexidade", LongType()),
    StructField("total_alta_complexidade", LongType()),
    StructField("total_com_internacao", LongType()),
    StructField("total_com_cirurgia", LongType()),
    StructField("total_com_obstetricia", LongType()),
    StructField("total_com_neonatal", LongType()),
    StructField("score_medio_qualidade", DoubleType()),
    StructField("score_medio_capacidade", DoubleType()),
    StructField("total_com_coordenadas", LongType()),
    StructField("percentual_com_coordenadas", DoubleType()),
    StructField("data_atualizacao", TimestampType())
])

df_kpis_gerais = spark.createDataFrame(kpis_data, schema_kpis)

print("‚úÖ KPIs Gerais calculados!")
df_kpis_gerais.show(truncate=False)

In [0]:
# Salvar
table_kpis_gerais = f"{catalog_name}.{schema_gold}.kpis_gerais_dashboard"

df_kpis_gerais.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_kpis_gerais)

print(f"‚úÖ Salvo: {table_kpis_gerais}")

## 9Ô∏è‚É£ Criar View Consolidada

In [0]:
print("\nüìä CRIANDO: View consolidada para BI")
print("="*70)

# Criar view que une informa√ß√µes principais
spark.sql(f"""
    CREATE OR REPLACE VIEW {catalog_name}.{schema_gold}.vw_analise_completa_cnes AS
    SELECT 
        s.CO_CNES,
        s.NO_FANTASIA_LIMPO as nome_fantasia,
        s.NO_RAZAO_SOCIAL as razao_social,
        s.UF_SIGLA as uf,
        s.UF_NOME as estado,
        s.REGIAO as regiao,
        s.TIPO_ESTABELECIMENTO as tipo,
        s.COMPLEXIDADE as complexidade,
        s.NATUREZA_ORGANIZACAO_DESC as natureza,
        s.CATEGORIA_CAPACIDADE as categoria_capacidade,
        s.SCORE_CAPACIDADE as score_capacidade,
        s.CLASSIFICACAO_QUALIDADE as qualidade,
        s.FLAG_ATEND_HOSPITALAR as tem_internacao,
        s.FLAG_CENTRO_CIRURGICO as tem_cirurgia,
        s.FLAG_CENTRO_OBSTETRICO as tem_obstetricia,
        s.FLAG_CENTRO_NEONATAL as tem_neonatal,
        s.FLAG_ATEND_AMBULATORIAL as tem_ambulatorio,
        s.LOCALIZACAO_VALIDA as tem_coordenadas_validas,
        s.NU_LATITUDE as latitude,
        s.NU_LONGITUDE as longitude,
        s.data_processamento_silver as data_processamento
    FROM {catalog_name}.{schema_silver}.cnes_estabelecimentos_clean s
    WHERE s.CLASSIFICACAO_QUALIDADE IN ('EXCELENTE', 'BOA')
""")

print("‚úÖ View criada: vw_analise_completa_cnes")

# Testar view
spark.sql(f"SELECT * FROM {catalog_name}.{schema_gold}.vw_analise_completa_cnes LIMIT 5").show()

## üîü Otimizar Tabelas Gold

In [0]:
print("\n‚ö° OTIMIZANDO TABELAS GOLD")
print("="*70)

gold_tables = [
    table_gold_regiao,
    table_gold_uf,
    table_gold_municipios,
    table_gold_tipos,
    table_gold_mapa,
    table_kpis_gerais
]

for table in gold_tables:
    try:
        spark.sql(f"OPTIMIZE {table}")
        print(f"‚úÖ {table.split('.')[-1]} otimizada")
    except Exception as e:
        print(f"‚ö†Ô∏è Erro ao otimizar {table}: {e}")

print("\n‚úÖ Otimiza√ß√£o conclu√≠da!")


## üìä Resumo Final e Estat√≠sticas

In [0]:
print("\n" + "="*70)
print("üìä RESUMO DAS TABELAS GOLD CRIADAS")
print("="*70)

# Listar todas as tabelas gold
gold_tables_list = spark.sql(f"SHOW TABLES IN {catalog_name}.{schema_gold}").collect()

print(f"\n‚úÖ Total de tabelas/views: {len(gold_tables_list)}\n")

for table in gold_tables_list:
    table_name = table.tableName
    table_type = "VIEW" if table.isTemporary else "TABLE"
    
    try:
        if table_type == "TABLE":
            count = spark.table(f"{catalog_name}.{schema_gold}.{table_name}").count()
            print(f"üìä {table_name:50s} | Registros: {count:>10,}")
        else:
            print(f"üëÅÔ∏è  {table_name:50s} | VIEW")
    except:
        print(f"‚ö†Ô∏è  {table_name:50s} | Erro ao contar")

In [0]:
# Registrar log
def log_pipeline_execution(pipeline_name, status, records=0, exec_time=0, error=None):
    schema_control = StructType([
        StructField("pipeline_name", StringType(), False),
        StructField("execution_date", TimestampType(), False),
        StructField("status", StringType(), False),
        StructField("records_processed", LongType(), True),
        StructField("execution_time_seconds", DoubleType(), True),
        StructField("error_message", StringType(), True)
    ])
    log_data = [(pipeline_name, datetime.now(), status, records, exec_time, error)]
    df_log = spark.createDataFrame(log_data, schema_control)
    df_log.write.format("delta").mode("append").saveAsTable(f"{catalog_name}.bronze.pipeline_control")

log_pipeline_execution("gold_aggregations_cnes", "SUCCESS", 0, 0)
print("\n‚úÖ Pipeline registrado no log!")


## CONCLUS√ÉO

In [0]:
print("\n" + "="*70)
print("GOLD LAYER - CONCLU√çDA")
print("="*70)

print("\n‚úÖ AGREGA√á√ïES CRIADAS:")
print("   1Ô∏è‚É£ KPIs por Regi√£o (5 regi√µes)")
print("   2Ô∏è‚É£ Detalhamento por UF (27 estados)")
print("   3Ô∏è‚É£ Ranking de Munic√≠pios (todos + top 50)")
print("   4Ô∏è‚É£ An√°lise por Tipo de Estabelecimento")
print("   5Ô∏è‚É£ Dataset para Mapas (com coordenadas)")
print("   6Ô∏è‚É£ KPIs Gerais para Dashboard")
print("   7Ô∏è‚É£ View Consolidada para BI")

print("\nüìä M√âTRICAS DISPON√çVEIS:")
print("   ‚Ä¢ Total de estabelecimentos por regi√£o/UF")
print("   ‚Ä¢ Distribui√ß√£o por complexidade")
print("   ‚Ä¢ Cobertura de servi√ßos especializados")
print("   ‚Ä¢ Scores de capacidade e qualidade")
print("   ‚Ä¢ Rankings e comparativos")
print("   ‚Ä¢ Dados geoespaciais para visualiza√ß√£o")

print("\nüéØ PR√ìXIMOS PASSOS:")
print("   1. Conectar Power BI / Tableau")
print("   2. Criar dashboards interativos")
print("   3. An√°lises explorat√≥rias com SQL")
print("   4. Exportar dados para apresenta√ß√µes")

print("\nüöÄ SEU PROJETO EST√Å COMPLETO!")
print("   Bronze ‚úÖ ‚Üí Silver ‚úÖ ‚Üí Gold ‚úÖ")

print("\n" + "="*70)

In [0]:
%sql
-- Ver KPIs gerais
SELECT * FROM datasus_project.gold.kpis_gerais_dashboard;

-- Top 10 UFs com mais estabelecimentos
SELECT UF_NOME, total_estabelecimentos 
FROM datasus_project.gold.kpi_estabelecimentos_por_uf
ORDER BY total_estabelecimentos DESC
LIMIT 10;

-- Estabelecimentos de alta complexidade por regi√£o
SELECT REGIAO, total_alta_complexidade
FROM datasus_project.gold.kpi_estabelecimentos_por_regiao;

-- View completa para an√°lises
SELECT * FROM datasus_project.gold.vw_analise_completa_cnes
WHERE regiao = 'Nordeste' AND complexidade = 'ALTA COMPLEXIDADE';