# An√°lise de Leitos e Hospitais (DataSUS)

---

**Fonte dos Dados:** [DataSUS - Hospitais e Leitos](https://opendatasus.saude.gov.br/dataset/hospitais-e-leitos)

### Descri√ß√£o do Dataset
Este conjunto de dados apresenta informa√ß√µes detalhadas sobre a capacidade hospitalar no Brasil, extra√≠da do Cadastro Nacional de Estabelecimentos de Sa√∫de (CNES). O foco principal √© o monitoramento da quantidade de **leitos existentes** e **leitos SUS** (incluindo UTIs de diversas especialidades) ao longo do tempo.

A an√°lise abaixo realiza um processo de ETL (Extra√ß√£o, Transforma√ß√£o e Carga) automatizado que:
1.  Baixa os dados brutos do DataSUS.
2.  Normaliza e limpa os dados (tratamento de datas, tipos num√©ricos e textos).
3.  Armazena em um banco de dados local otimizado (DuckDB).
4.  Gera visualiza√ß√µes geogr√°ficas (mapas de calor) e temporais.

### Dicion√°rio de Dados ‚Äî Mapeamento e Tipos

A tabela abaixo descreve como as colunas originais do arquivo CSV/ZIP foram renomeadas e tipadas para an√°lise neste notebook.

| **Coluna Original** | **Nome Final (Normalizado)** | **Tipo SQL** |
|-------------------|------------------------------|--------------|
| `regiao` | `regiao_brasil_hospital` | `VARCHAR(12)` |
| `uf` | `uf_hospital` | `CHAR(2)` |
| `co_ibge` | `codigo_ibge` | `VARCHAR(7)` |
| `municipio` | `municipio_hospital` | `VARCHAR(60)` |
| `cnes` | `cnes` | `VARCHAR(7)` |
| `nome_estabelecimento` | `nome_hospital` | `VARCHAR(200)` |
| `tp_gestao` | `tipo_gestao_do_hospital` | `CHAR(1)` |
| `natureza_juridica` | `natureza_juridica_do_hospital` | `VARCHAR(4)` |
| `leitos_sus` | `leitos_sus` | `INT` |
| `leitos_existentes` | `leitos_geral` | `INT` |
| `uti_total_existente` | `uti_total` | `INT` |
| `uti_total_sus` | `uti_sus_total` | `INT` |
| `comp` | `data_competencia_info` | `DATE` |
*(...e demais colunas de UTI espec√≠ficas)*

In [None]:
# --- 1. DATA SOURCES ---
DATASET_URLS = {
    "hospitais_leitos": "https://opendatasus.saude.gov.br/dataset/hospitais-e-leitos"
}

# --- 2. FINAL TABLE SCHEMAS ---
SCHEMA_MAPS = {
    "hospitais_leitos": {
        'regiao': {
            'nome_final': 'regiao_brasil_hospital',
            'tipo_sql': 'VARCHAR(12)'
        },
        'uf': {
            'nome_final': 'uf_hospital',
            'tipo_sql': 'CHAR(2)'
        },
        'co_ibge': {
            'nome_final': 'codigo_ibge',
            'tipo_sql': 'VARCHAR(7)'
        },
        'municipio': {
            'nome_final': 'municipio_hospital',
            'tipo_sql': 'VARCHAR(60)'
        },
        'cnes': {
            'nome_final': 'cnes',
            'tipo_sql': 'VARCHAR(7)'
        },
        'no_logradouro': {
            'nome_final': 'endereco_hospital',
            'tipo_sql': 'VARCHAR(60)'
        },
        'nu_endereco': {
            'nome_final': 'numero_endereco_hospital',
            'tipo_sql': 'VARCHAR(10)'
        },
        'no_complemento': {
            'nome_final': 'complemento_endereco_hospital',
            'tipo_sql': 'VARCHAR(20)'
        },
        'no_bairro': {
            'nome_final': 'bairro_hospital',
            'tipo_sql': 'VARCHAR(60)'
        },
        'co_cep': {
            'nome_final': 'cep_hospital',
            'tipo_sql': 'CHAR(8)'
        },
        'nome_estabelecimento': {
            'nome_final': 'nome_hospital',
            'tipo_sql': 'VARCHAR(200)'
        },
        'razao_social': {
            'nome_final': 'nome_razao_social_hospital',
            'tipo_sql': 'VARCHAR(60)'
        },
        'tp_gestao': {
            'nome_final': 'tipo_gestao_do_hospital',
            'tipo_sql': 'CHAR(1)'
        },
        'co_tipo_unidade': {
            'nome_final': 'codigo_tipo_da_unidade',
            'tipo_sql': 'VARCHAR(2)'
        },
        'ds_tipo_unidade': {
            'nome_final': 'descricao_do_tipo_da_unidade',
            'tipo_sql': 'VARCHAR(60)'
        },
        'natureza_juridica': {
            'nome_final': 'natureza_juridica_do_hospital',
            'tipo_sql': 'VARCHAR(4)'
        },
        'desc_natureza_juridica': {
            'nome_final': 'descricao_da_natureza_juridica_do_hospital',
            'tipo_sql': 'VARCHAR(60)'
        },
        'motivo_desabilitacao': {
            'nome_final': 'motivo_desabilitacao_hospital',
            'tipo_sql': 'VARCHAR(60)'
        },
        'no_email': {
            'nome_final': 'email',
            'tipo_sql': 'VARCHAR(60)'
        },
        'nu_telefone': {
            'nome_final': 'telefone',
            'tipo_sql': 'VARCHAR(40)'
        },
        'leitos_sus': {
            'nome_final': 'leitos_sus',
            'tipo_sql': 'INT'
        },
        'leitos_existentes': {
            'nome_final': 'leitos_geral',
            'tipo_sql': 'INT',
            'aliases': ['leitos_existente']
        },
        'uti_total_existente': {
            'nome_final': 'uti_total',
            'tipo_sql': 'INT',
            'aliases': ['uti_total___exist', 'uti_total_exist']
        },
        'uti_total_sus': {
            'nome_final': 'uti_sus_total',
            'tipo_sql': 'INT',
            'aliases': ['uti_total___sus']
        },
        'uti_adulto_existente': {
            'nome_final': 'uti_adulto',
            'tipo_sql': 'INT',
            'aliases': ['uti_adulto___exist', 'uti_adulto_exist']
        },
        'uti_adulto_sus': {
            'nome_final': 'uti_sus_adulto',
            'tipo_sql': 'INT',
            'aliases': ['uti_adulto___sus']
        },
        'uti_pediatrico_existente': {
            'nome_final': 'uti_pediatrico',
            'tipo_sql': 'INT',
            'aliases': ['uti_pediatrico___exist', 'uti_pediatrico_exist']
        },
        'uti_pediatrico_sus': {
            'nome_final': 'uti_sus_pediatrico',
            'tipo_sql': 'INT',
            'aliases': ['uti_pediatrico___sus']
        },
        'uti_neonatal_existente': {
            'nome_final': 'uti_neonatal',
            'tipo_sql': 'INT',
            'aliases': ['uti_neonatal___exist', 'uti_neonatal_exist']
        },
        'uti_neonatal_sus': {
            'nome_final': 'uti_sus_neonatal',
            'tipo_sql': 'INT',
            'aliases': ['uti_neonatal___sus']
        },
        'uti_queimado_existente': {
            'nome_final': 'uti_queimado',
            'tipo_sql': 'INT',
            'aliases': ['uti_queimado___exist', 'uti_queimado_exist']
        },
        'uti_queimado_sus': {
            'nome_final': 'uti_sus_queimado',
            'tipo_sql': 'INT',
            'aliases': ['uti_queimado___sus']
        },
        'uti_coronariana_existente': {
            'nome_final': 'uti_coronariana',
            'tipo_sql': 'INT',
            'aliases': ['uti_coronariana___exist', 'uti_coronariana_exist']
        },
        'uti_coronariana_sus': {
            'nome_final': 'uti_sus_coronariana',
            'tipo_sql': 'INT',
            'aliases': ['uti_coronariana___sus']
        },
        'comp': {
            'nome_final': 'data_competencia_info',
            'tipo_sql': 'DATE',
            'date_format': '%Y%m'
        }
    }
}

In [None]:
# TEMPO DE EXECU√á√ÉO ~5 minutos

import pandas as pd
import duckdb
import warnings
import requests
from bs4 import BeautifulSoup as soup
import zipfile
import io
import time
import logging
from urllib.parse import urljoin
import re
import unicodedata

# ==============================================================================
# --- 1. CONFIGURA√á√ïES GERAIS E LOGGING ---
# ==============================================================================

DB_FILENAME = "datasus.db"
BASE_URL = "https://opendatasus.saude.gov.br"

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
warnings.filterwarnings('ignore', category=pd.errors.ParserWarning)


# ==============================================================================
# --- FUN√á√ÉO DE NORMALIZA√á√ÉO ---
# ==============================================================================

def normalize_name(name):
    if not isinstance(name, str):
        name = str(name)

    nfkd_form = unicodedata.normalize('NFKD', name)
    name_sem_acentos = "".join([c for c in nfkd_form if not unicodedata.combining(c)])

    name_lower = name_sem_acentos.lower()

    name_clean = re.sub(r'[^a-z0-9]+', '_', name_lower).strip('_')

    return name_clean

# ==============================================================================
# --- 2. FUN√á√ïES AUXILIARES DE REQUISI√á√ÉO E LEITURA ---
# ==============================================================================
def fetch_page_with_retries(url, max_retries=5, delay_seconds=10):
    for attempt in range(1, max_retries + 1):
        try:
            logging.info(f"Acessando p√°gina: {url} (tentativa {attempt}/{max_retries})")
            response = requests.get(url, timeout=30)
            response.raise_for_status()
            return soup(response.content, "html.parser")
        except requests.exceptions.RequestException as e:
            logging.warning(f"Tentativa {attempt} falhou: {e}")
            if attempt < max_retries:
                time.sleep(delay_seconds)
    logging.error(f"Falha ao acessar a p√°gina {url} ap√≥s {max_retries} tentativas.")
    return None

def read_csv_with_detection(file_bytes_io):
    possible_separators = [';', ',']
    possible_encodings = ['utf-8-sig', 'latin-1', 'cp1252']

    for enc in possible_encodings:
        file_bytes_io.seek(0)
        raw_text = file_bytes_io.read().decode(enc, errors='replace')

        raw_text = raw_text.replace('\x00', '')

        sep_counts = {sep: raw_text.count(sep) for sep in possible_separators}
        sep = max(sep_counts, key=sep_counts.get)

        lines = raw_text.splitlines()

        issue_detected = any(
            line and not line.startswith('"') and f'{sep}"' in line
            for line in lines[:10]
        )

        if issue_detected:
            logging.warning("Detectado CSV malformado (primeiro campo sem aspas). Aplicando corre√ß√£o em todo o arquivo.")
            corrected_lines = []
            for line in lines:
                if line and not line.startswith('"') and f'{sep}"' in line:
                    sep_index = line.find(sep)
                    if sep_index != -1:
                        first_field = line[:sep_index]
                        corrected_line = f'"{first_field}"{line[sep_index:]}'
                        corrected_lines.append(corrected_line)
                    else:
                        corrected_lines.append(line)
                else:
                    corrected_lines.append(line)

            raw_text = "\n".join(corrected_lines)
        else:
            raw_text = "\n".join(lines)

        cleaned_bytes = io.BytesIO(raw_text.encode('utf-8'))

        try:
            df = pd.read_csv(
                cleaned_bytes,
                sep=sep,
                engine='python',
                dtype=str,
                on_bad_lines='skip',
                quotechar='"',
            )

            if df.shape[1] > 2 and df.columns.notna().all():
                logging.info(f"SUCESSO: encoding={enc}, separador='{sep}', colunas={len(df.columns)}")
                return df
        except Exception as e:
            logging.warning(f"Falha com encoding={enc}, sep='{sep}': {e}")

    logging.error("Nenhuma combina√ß√£o de encoding/separador funcionou para este arquivo.")
    return None

def download_and_read_data_from_url(url):
    logging.info(f"Baixando dados de: {url}")
    try:
        response = requests.get(url, timeout=180)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        logging.error(f"Falha ao baixar {url}: {e}")
        return None
    file_bytes_io = io.BytesIO(response.content)
    if url.lower().endswith('.zip'):
        logging.info("Arquivo ZIP detectado. Descompactando em mem√≥ria...")
        try:
            with zipfile.ZipFile(file_bytes_io) as zf:
                csv_filenames = [f for f in zf.namelist() if f.lower().endswith('.csv')]
                if not csv_filenames:
                    logging.error("Nenhum .csv encontrado dentro do ZIP.")
                    return None
                logging.info(f"Extraindo: {csv_filenames[0]}")
                csv_bytes = zf.read(csv_filenames[0])
                return read_csv_with_detection(io.BytesIO(csv_bytes))
        except Exception as e:
            logging.error(f"ERRO ao processar o arquivo ZIP: {e}")
            return None
    elif url.lower().endswith('.csv'):
        logging.info("Arquivo CSV direto detectado.")
        return read_csv_with_detection(file_bytes_io)
    else:
        file_extension = url.split('.')[-1].upper()
        logging.warning(f"Formato de arquivo n√£o reconhecido ({file_extension}), pulando: {url}")
        return None

# ==============================================================================
# --- 3. L√ìGICA PRINCIPAL DE ETL ---
# ==============================================================================

def main_etl_process():
    logging.info("--- Iniciando ETL com recarga completa dos dados ---")
    with duckdb.connect(database=DB_FILENAME, read_only=False) as con:
        for dataset_key, dataset_page_url in DATASET_URLS.items():
            logging.info(f"\n{'='*60}\nProcessando dataset: '{dataset_key}'\n{'='*60}")
            schema_map = SCHEMA_MAPS.get(dataset_key)
            if not schema_map:
                logging.error(f"Esquema n√£o encontrado para o dataset '{dataset_key}'. Pulando.")
                continue

            table_name = normalize_name(dataset_key)
            try:
                sql_columns = [f'"{normalize_name(prop["nome_final"])}" {prop["tipo_sql"]}' for prop in schema_map.values()]
                create_table_sql = f"CREATE OR REPLACE TABLE {table_name} ({', '.join(sql_columns)});"
                con.execute(create_table_sql)
                logging.info(f"Tabela '{table_name}' criada/substitu√≠da com o esquema final.")
            except Exception as e:
                logging.error(f"ERRO FATAL ao criar a tabela '{table_name}': {e}")
                continue

            main_page_html = fetch_page_with_retries(dataset_page_url)
            if not main_page_html: continue
            resources = main_page_html.find_all("li", class_="resource-item")
            if not resources:
                logging.error("Nenhum recurso para download encontrado na p√°gina do dataset.")
                continue
            logging.info(f"Encontrados {len(resources)} arquivos/recursos potenciais para processar.")
            for resource in resources:
                link_tag = resource.find("a", class_="resource-url-analytics")
                if not link_tag or not link_tag.has_attr('href'): continue
                href = link_tag['href']
                final_download_url = None
                if href.lower().endswith(('.csv', '.zip')):
                    final_download_url = urljoin(BASE_URL, href)
                elif href.lower().endswith(('.pdf', 'xml', 'json')):
                    logging.info(f"Recurso do tipo PDF encontrado e ignorado: {href}")
                    continue
                else:
                    resource_page_url = urljoin(BASE_URL, href)
                    resource_page_html = fetch_page_with_retries(resource_page_url)
                    if resource_page_html:
                        download_button = resource_page_html.find("a", class_="btn-primary")
                        if download_button and download_button.has_attr('href'):
                            final_download_url = urljoin(BASE_URL, download_button['href'])
                        else:
                            logging.warning(f"Link n√£o √© um arquivo direto e bot√£o de download n√£o foi encontrado em {resource_page_url}. Pulando.")
                if not final_download_url: continue
                logging.info(f"\n--- Processando arquivo: {final_download_url.split('/')[-1]} ---")
                raw_df = download_and_read_data_from_url(final_download_url)
                if raw_df is None or raw_df.empty:
                    logging.warning("Arquivo pulado devido a falha no download ou leitura.")
                    continue

                try:
                    source_to_final_map = {}
                    final_to_schema_map = {}
                    for canonical_name, properties in schema_map.items():
                        final_name = normalize_name(properties['nome_final'])
                        if final_name not in final_to_schema_map:
                            final_to_schema_map[final_name] = properties
                        source_to_final_map[normalize_name(canonical_name)] = final_name
                        if 'aliases' in properties:
                            for alias in properties['aliases']:
                                source_to_final_map[normalize_name(alias)] = final_name

                    rename_map = {}
                    final_names_used = set()
                    for source_col in raw_df.columns:
                        norm_source_col = normalize_name(source_col)
                        if norm_source_col in source_to_final_map:
                            final_name = source_to_final_map[norm_source_col]
                            if final_name not in final_names_used:
                                rename_map[source_col] = final_name
                                final_names_used.add(final_name)
                            else:
                                logging.warning(
                                    f"Mapeamento duplicado para a coluna final '{final_name}'. "
                                    f"A coluna de origem '{source_col}' ser√° ignorada."
                                )

                    processing_df = raw_df[list(rename_map.keys())].rename(columns=rename_map)

                    final_df = pd.DataFrame()
                    for final_name, column_data in processing_df.items():
                        properties = final_to_schema_map.get(final_name)
                        if not properties:
                            final_df[final_name] = column_data.astype(str)
                            continue

                        sql_type = properties['tipo_sql'].upper()
                        clean_column = column_data.replace({'NULL': None, '': None})

                        if 'CHAR' in sql_type:
                            match = re.search(r'\((\d+)\)', sql_type)
                            length = int(match.group(1)) if match else None
                            final_df[final_name] = clean_column.astype(str).str.slice(0, length)

                        elif 'INT' in sql_type or 'BIGINT' in sql_type:
                            numeric_series = pd.to_numeric(
                                clean_column.astype(str).str.replace(',', '.', regex=False),
                                errors='coerce'
                            )
                            potential_loss_mask = (numeric_series.notna()) & ((numeric_series.fillna(0) % 1) != 0)
                            if potential_loss_mask.any():
                                examples = numeric_series[potential_loss_mask].head(3).tolist()
                                logging.warning(
                                    f"Perda de precis√£o ao for√ßar INT na coluna '{final_name}'. "
                                    f"Valores decimais ser√£o truncados. Exemplos: {examples}"
                                )
                            is_null_mask = numeric_series.isnull()
                            series_as_int = numeric_series.fillna(0).astype(int)
                            final_df[final_name] = series_as_int.astype('Int64').where(~is_null_mask, pd.NA)

                        elif 'DATE' in sql_type:
                            date_format = properties.get('date_format')
                            converted_dates = pd.to_datetime(
                                clean_column,
                                format=date_format,
                                errors='coerce'
                            )

                            if converted_dates.isnull().all() and date_format:
                                logging.warning(f"Formato de data '{date_format}' falhou para a coluna '{final_name}'. Tentando infer√™ncia autom√°tica.")
                                converted_dates = pd.to_datetime(clean_column, errors='coerce')

                            final_df[final_name] = converted_dates.dt.date

                        elif 'DECIMAL' in sql_type:
                            final_df[final_name] = pd.to_numeric(clean_column.astype(str).str.replace(',', '.', regex=False), errors='coerce')
                        else:
                            final_df[final_name] = clean_column.astype(str)

                    if not final_df.empty:
                        view_name = "novos_dados_temp"
                        con.register(view_name, final_df)
                        insert_sql = f"INSERT INTO {table_name} BY NAME SELECT * FROM {view_name};"
                        con.execute(insert_sql)
                        logging.info(f"-> Inseridas {len(final_df)} linhas na tabela '{table_name}'.")
                        con.unregister(view_name)
                    else:
                        logging.info("Nenhuma coluna mapeada encontrada no arquivo. Nada a inserir.")

                except Exception as e:
                    logging.error(f"ERRO GERAL ao transformar ou carregar dados de {final_download_url}: {e}", exc_info=True)

    logging.info(f"\n--- Processo finalizado! Dados em '{DB_FILENAME}' ---")

if __name__ == "__main__":
    main_etl_process()

In [None]:
import duckdb

DB_FILENAME = 'datasus.db'
TABLE_NAME = 'hospitais_leitos'

try:
    con = duckdb.connect(database=DB_FILENAME, read_only=True)

    print(f"--- Colunas encontradas na tabela '{TABLE_NAME}' ---")

    schema_info = con.execute(f'DESCRIBE "{TABLE_NAME}";').fetchdf()

    if not schema_info.empty:
        for col_name in schema_info['column_name']:
            print(col_name)
    else:
        print(f"N√£o foi poss√≠vel encontrar a tabela '{TABLE_NAME}'.")

except duckdb.Error as e:
    print(f"\nERRO DuckDB: N√£o foi poss√≠vel ler a tabela.")
    print(f"Verifique se o arquivo '{DB_FILENAME}' existe e se a tabela '{TABLE_NAME}' est√° correta.")
    print(f"Detalhe do erro: {e}")
except Exception as e:
    print(f"\nERRO inesperado: {e}")

finally:
    if 'con' in locals() and con:
        con.close()

In [None]:
import duckdb
import pandas as pd

DB_FILENAME = 'datasus.db'
TABLE_NAME = 'hospitais_leitos'

try:
    con = duckdb.connect(database=DB_FILENAME, read_only=True)

    sql_query = f"""
    SELECT DISTINCT ON (cnes) *
    FROM "{TABLE_NAME}"
    ORDER BY cnes, data_competencia_info DESC;
    """

    print(f"Executando a consulta SQL na tabela '{TABLE_NAME}'...")

    result_df = con.execute(sql_query).fetchdf()

    output_csv_filename = 'hospitais_leitos_latest.csv'

    result_df.to_csv(output_csv_filename, index=False)

    print(f"Dados exportados com sucesso para '{output_csv_filename}'.")
    print(f"N√∫mero de linhas exportadas: {len(result_df)}")

except duckdb.Error as e:
    print(f"\nERRO DuckDB ao executar a consulta ou exportar para CSV:")
    print(f"Verifique se o arquivo '{DB_FILENAME}' existe e se a tabela '{TABLE_NAME}' est√° correta.")
    print(f"Detalhe do erro: {e}")
except Exception as e:
    print(f"\nERRO inesperado: {e}")

finally:
    if 'con' in locals() and con:
        con.close()

In [None]:
import duckdb
import pandas as pd

DB_FILENAME = 'datasus.db'
TABLE_NAME = 'hospitais_leitos'

try:
    con = duckdb.connect(database=DB_FILENAME, read_only=True)

    # SQL query to select all records
    sql_query = f"""
    SELECT *
    FROM "{TABLE_NAME}";
    """

    print(f"Executando a consulta SQL para todos os registros na tabela '{TABLE_NAME}'...")

    result_df = con.execute(sql_query).fetchdf()

    # Output filename for the complete data
    output_csv_filename = 'hospitais_leitos_completo.csv'

    result_df.to_csv(output_csv_filename, index=False)

    print(f"Dados exportados com sucesso para '{output_csv_filename}'.")
    print(f"N√∫mero de linhas exportadas: {len(result_df)}")

except duckdb.Error as e:
    print(f"\nERRO DuckDB ao executar a consulta ou exportar para CSV:")
    print(f"Verifique se o arquivo '{DB_FILENAME}' existe e se a tabela '{TABLE_NAME}' est√° correta.")
    print(f"Detalhe do erro: {e}")
except Exception as e:
    print(f"\nERRO inesperado: {e}")

finally:
    if 'con' in locals() and con:
        con.close()

## Visualiza√ß√£o 1: Mapas de Calor por Estado (UF)

In [None]:
# -*- coding: utf-8 -*-
"""
Gerador de Heatmaps de Leitos por Estado (UF) no Brasil (Plotly)
Este script gera m√∫ltiplos mapas (um para cada tipo de leito)
baseado no script de UFs.
"""

import pandas as pd
import plotly.express as px
import requests
import plotly.io as pio # Import plotly.io

def criar_mapa_para_coluna_uf(df_base, brazil_states_geojson, col_leitos, coluna_estado):
    """
    Fun√ß√£o auxiliar que gera um mapa de calor de UF para uma coluna de dados espec√≠fica.
    """
    print(f"\n--- üó∫Ô∏è  Processando UF: {col_leitos} ---")

    # 1. Copiar e processar a coluna de dados
    df = df_base.copy()

    df[col_leitos] = pd.to_numeric(df[col_leitos], errors='coerce').fillna(0)

    # 2. Agregar dados
    dados_agregados = df.groupby(coluna_estado)[col_leitos].sum().reset_index()

    # Renomear para o plot
    dados_agregados = dados_agregados.rename(columns={col_leitos: "valor"})

    # Filtrar UFs sem dados
    dados_agregados = dados_agregados[dados_agregados["valor"] > 0]

    if dados_agregados.empty:
        print(f"‚ÑπÔ∏è  Sem dados v√°lidos (> 0) para '{col_leitos}'. Mapa n√£o ser√° gerado.")
        return

    print(f"üìä Total de UFs com dados para '{col_leitos}': {len(dados_agregados)}")

    # 3. Preparar Nomes
    titulo_grafico = col_leitos.replace('_', ' ').replace('uti', 'UTI').title()

    # 4. Gerar Mapa
    fig = px.choropleth(
        dados_agregados,
        geojson=brazil_states_geojson,
        locations=coluna_estado,        # Coluna com as siglas (ex: "MG")
        featureidkey="properties.sigla",  # Chave no GeoJSON com a sigla
        color="valor",                  # Coluna com os valores num√©ricos
        color_continuous_scale="YlOrRd",
        scope="south america",
        labels={"valor": titulo_grafico},
        title=f"Heatmap de {titulo_grafico} por Estado no Brasil",
        # Removed template argument due to error
        # template=pio.templates["plotly_white"] # Use pio.templates instead of px.templates
    )

    fig.update_geos(fitbounds="locations", visible=False)
    fig.update_layout(margin={"r":0,"t":50,"l":0,"b":0})

    # ALTERA√á√ÉO 2: Substituir fig.write_html() por fig.show()
    # 5. Exibir Resultado
    fig.show()
    print(f"‚úÖ Mapa para '{col_leitos}' exibido acima.")


def gerar_todos_heatmaps_uf(csv_filename='hospitais_leitos_latest.csv'):
    """
    Fun√ß√£o principal que carrega os dados uma vez e depois gera
    um mapa de calor de UF para cada coluna de leitos.
    """

    # ==========================================================
    # 1. LISTA DE COLUNAS PARA PROCESSAR
    # ==========================================================
    colunas_leitos_lista = [
        'leitos_sus',
        'leitos_geral',
        'uti_total',
        'uti_sus_total',
        'uti_adulto',
        'uti_sus_adulto',
        'uti_pediatrico',
        'uti_sus_pediatrico',
        'uti_neonatal',
        'uti_sus_neonatal',
        'uti_queimado',
        'uti_sus_queimado',
        'uti_coronariana',
        'uti_sus_coronariana'
    ]

    # Coluna de Estado (UF) do seu CSV
    coluna_estado = 'uf_hospital'

    # ==========================================================
    # 2. BAIXAR GEOJSON (APENAS UMA VEZ)
    # ==========================================================
    geojson_url = "https://raw.githubusercontent.com/codeforamerica/click_that_hood/master/public/data/brazil-states.geojson"

    print("Baixando o arquivo GeoJSON dos estados do Brasil...")
    try:
        response = requests.get(geojson_url)
        response.raise_for_status()
        brazil_states_geojson = response.json()
        print("‚úÖ Download do GeoJSON conclu√≠do com sucesso.")
    except Exception as e:
        print(f"‚ùå Erro ao baixar o GeoJSON: {e}")
        return

    # ==========================================================
    # 3. CARREGAR CSV
    # ==========================================================
    try:
        df = pd.read_csv(csv_filename, encoding='utf-8')
        print(f"‚úÖ Arquivo '{csv_filename}' carregado com sucesso.")
    except FileNotFoundError:
        print(f"‚ùå Arquivo '{csv_filename}' n√£o encontrado.")
        return

    # Checar se a coluna de UF existe
    if coluna_estado not in df.columns:
        print(f"‚ùå Coluna de estado '{coluna_estado}' n√£o encontrada no CSV.")
        print("Colunas dispon√≠veis:", list(df.columns))
        return

    # ==========================================================
    # 4. PR√â-PROCESSAMENTO
    # ==========================================================

    # Garantir que as siglas da UF estejam limpas e mai√∫sculas
    df[coluna_estado] = df[coluna_estado].astype(str).str.strip().str.upper()

    # Corrigir siglas no GeoJSON
    print("Processando GeoJSON para adicionar siglas...")
    for feature in brazil_states_geojson["features"]:
        props = feature["properties"]
        name = props.get("name", "").upper()
        # Mapeamento manual para garantir correspond√™ncia
        nome_para_sigla = {
            "ACRE": "AC", "ALAGOAS": "AL", "AMAP√Å": "AP", "AMAPA": "AP",
            "AMAZONAS": "AM", "BAHIA": "BA", "CEAR√Å": "CE", "CEARA": "CE",
            "DISTRITO FEDERAL": "DF", "ESP√çRITO SANTO": "ES", "ESPIRITO SANTO": "ES",
            "GOI√ÅS": "GO", "GOIAS": "GO", "MARANH√ÉO": "MA", "MARANHAO": "MA",
            "MATO GROSSO": "MT", "MATO GROSSO DO SUL": "MS", "MINAS GERAIS": "MG",
            "PAR√Å": "PA", "PARA": "PA", "PARA√çBA": "PB", "PARAIBA": "PB",
            "PARAN√Å": "PR", "PARANA": "PR", "PERNAMBUCO": "PE", "PIAU√ç": "PI",
            "PIAUI": "PI", "RIO DE JANEIRO": "RJ", "RIO GRANDE DO NORTE": "RN",
            "RIO GRANDE DO SUL": "RS", "ROND√îNIA": "RO", "RONDONIA": "RO",
            "RORAIMA": "RR", "SANTA CATARINA": "SC", "S√ÉO PAULO": "SP", "SAO PAULO": "SP",
            "SERGIPE": "SE", "TOCANTINS": "TO"
        }
        props["sigla"] = nome_para_sigla.get(name, None)

    # ==========================================================
    # 5. LOOP DE GERA√á√ÉO DOS MAPAS
    # ==========================================================
    print("\nIniciando gera√ß√£o dos mapas de UF em lote...")

    colunas_encontradas = 0
    for coluna in colunas_leitos_lista:
        if coluna not in df.columns:
            print(f"‚ö†Ô∏è  Coluna '{coluna}' n√£o encontrada no CSV. Pulando...")
            continue

        criar_mapa_para_coluna_uf(
            df,
            brazil_states_geojson,
            coluna,
            coluna_estado
        )
        colunas_encontradas += 1

    print(f"\nüéâ Processo conclu√≠do! {colunas_encontradas} mapas de UF foram exibidos.")

# ==========================================================
# Execu√ß√£o direta
# ==========================================================
if __name__ == "__main__":
    gerar_todos_heatmaps_uf(csv_filename='hospitais_leitos_latest.csv')

## Visualiza√ß√£o 2: Mapas de Calor por Munic√≠pio

In [None]:
import pandas as pd
import plotly.graph_objects as go
import requests
from unidecode import unidecode
import os
import numpy as np

def create_municipality_heatmap_go(df_base, municipios_geojson, col_leitos, col_municipio):
    """
    Generates a municipality-level heatmap using plotly.graph_objects.
    """
    print(f"\n--- üó∫Ô∏è  Processing column (go.Choropleth): {col_leitos} ---")

    df = df_base.copy()
    df[col_leitos] = pd.to_numeric(df[col_leitos], errors='coerce').fillna(0)

    # Aggregate data
    dados_agregados = (
        df.groupby(["MUN_KEY", col_municipio], as_index=False)[col_leitos]
        .sum()
        .rename(columns={col_leitos: "valor"})
    )

    # Filter municipalities with no data (optional, depending on desired visualization)
    # dados_agregados = dados_agregados[dados_agregados["valor"] > 0]

    if dados_agregados.empty or dados_agregados["valor"].sum() == 0:
        print(f"‚ÑπÔ∏è  No valid data (> 0) found for '{col_leitos}'. Map will not be generated.")
        return

    print(f"üìä Total municipalities with data for '{col_leitos}': {len(dados_agregados)}")

    # Prepare Names
    titulo_grafico = col_leitos.replace('_', ' ').replace('uti', 'UTI').title()

    # Create the figure
    fig = go.Figure()

    # Add Choropleth trace
    fig.add_trace(go.Choropleth(
        geojson=municipios_geojson,
        locations=dados_agregados["MUN_KEY"],
        z=dados_agregados["valor"],
        featureidkey="properties.NAME_KEY",
        colorscale="YlOrRd",
        colorbar_title=titulo_grafico.replace(" ", "<br>"),
        hovertext=dados_agregados[col_municipio], # Use original municipality name for hover
        hoverinfo='text+z', # Show municipality name and value on hover
        marker_line_width=0.5,
        marker_opacity=0.8,
    ))

    fig.update_layout(
        title_text=f"Heatmap of {titulo_grafico} by Municipality in Brazil",
        geo_scope="south america", # Focus on South America
        geo=dict(
            showcoastlines=False,
            showland=True,
            landcolor="white",
            showcountries=False,
            showsubunits=False,
            showframe=False,
            fitbounds="locations", # Adjust map bounds to fit the data locations
        ),
        margin={"r":0,"t":50,"l":0,"b":0},
        paper_bgcolor="white",
        plot_bgcolor="white",
    )

    # Display the figure
    fig.show()
    print(f"‚úÖ Map for '{col_leitos}' displayed above.")


def generate_all_municipality_heatmaps_go(csv_filename='hospitais_leitos_latest.csv'):
    """
    Main function to load data and generate municipality heatmaps using go.Choropleth.
    """

    # ==========================================================
    # 1. LIST OF COLUMNS TO PROCESS
    # ==========================================================
    columns_leitos_list = [
        'leitos_sus',
        'leitos_geral',
        'uti_total',
        'uti_sus_total',
        'uti_adulto',
        'uti_sus_adulto',
        'uti_pediatrico',
        'uti_sus_pediatrico',
        'uti_neonatal',
        'uti_sus_neonatal',
        'uti_queimado',
        'uti_sus_queimado',
        'uti_coronariana',
        'uti_sus_coronariana'
    ]

    col_municipio = 'municipio_hospital'

    # ==========================================================
    # 2. DOWNLOAD GEOJSON (only once)
    # ==========================================================
    geojson_url = "https://raw.githubusercontent.com/tbrugz/geodata-br/master/geojson/geojs-100-mun.json"
    print("Downloading simplified GeoJSON of Brazilian municipalities (~5MB)...")
    try:
        response = requests.get(geojson_url)
        response.raise_for_status()
        municipios_geojson = response.json()
        print("‚úÖ GeoJSON download successful.")
    except Exception as e:
        print(f"‚ùå Error downloading GeoJSON: {e}")
        return

    # ==========================================================
    # 3. LOAD CSV
    # ==========================================================
    try:
        df = pd.read_csv(csv_filename, encoding='utf-8')
        print(f"‚úÖ File '{csv_filename}' loaded successfully.")
    except FileNotFoundError:
        print(f"‚ùå File '{csv_filename}' not found.")
        return

    # Check if essential columns exist
    if col_municipio not in df.columns:
        print(f"‚ùå Municipality column '{col_municipio}' not found.")
        print("Please check if the column name in the CSV is correct.")
        print("Available columns:", list(df.columns))
        return

    # ==========================================================
    # 4. CLEANING AND NORMALIZATION
    # ==========================================================
    print("Normalizing municipality names...")
    df[col_municipio] = df[col_municipio].astype(str).str.strip().replace(["", "nan", "None"], np.nan)
    df.dropna(subset=[col_municipio], inplace=True) # Drop rows where municipality is null

    # Apply unidecode and uppercase for key creation
    df["MUN_KEY"] = df[col_municipio].apply(lambda x: unidecode(x).upper())

    # Manual replacements for known mismatches
    substituicoes = {
        "CEILANDIA": "BRASILIA", "SAMAMBAIA": "BRASILIA", "LAGO NORTE": "BRASILIA",
        "LAGO SUL": "BRASILIA", "NUCLEO BANDEIRANTE": "BRASILIA",
        "AUGUSTO SEVERO": "CAMPO GRANDE", "JANUARIO CICCO": "BOA SAUDE",
        "MOJI MIRIM": "MOGI MIRIM", "PARATI": "PARATY", "POXOREO": "POXOREU",
        "BRASOPOLIS": "BRAZOPOLIS", "IGUARACI": "IGUARACY",
        "BELEM DE SAO FRANCISCO": "BELEM DO SAO FRANCISCO",
        "LAGOA DO ITAENGA": "LAGOA DE ITAENGA",
        "SAO LUIS DO PARAITINGA": "SAO LUIZ DO PARAITINGA",
        "SAO VALERIO DA NATIVIDADE": "SAO VALERIO", "SERIDO": "CARNAUBA DOS DANTAS",
        "TRAJANO DE MORAIS": "TRAJANO DE MORAES"
    }
    df["MUN_KEY"] = df["MUN_KEY"].replace(substituicoes)

    # Prepare GeoJSON keys
    for feat in municipios_geojson["features"]:
        nome = feat["properties"].get("name", "")
        feat["properties"]["NAME_KEY"] = unidecode(nome).upper().strip()

    # Filter out municipalities from df whose MUN_KEY is not in the GeoJSON keys
    geojson_keys = {feat["properties"]["NAME_KEY"] for feat in municipios_geojson["features"]}
    df = df[df["MUN_KEY"].isin(geojson_keys)].copy()
    print(f"Filtered data: {len(df)} rows after matching with GeoJSON.")


    # ==========================================================
    # 5. LOOP FOR GENERATING MAPS
    # ==========================================================
    print("\nStarting batch map generation (using go.Choropleth)...")

    columns_found = 0
    for column in columns_leitos_list:
        if column not in df.columns:
            print(f"‚ö†Ô∏è  Column '{column}' not found in CSV. Skipping...")
            continue

        create_municipality_heatmap_go(df, municipios_geojson, column, col_municipio)
        columns_found += 1

    print(f"\nüéâ Process complete! {columns_found} municipality maps attempted.")

# ==========================================================
# Direct execution
# ==========================================================
if __name__ == "__main__":
    # Using the latest CSV file generated previously
    generate_all_municipality_heatmaps_go(csv_filename='hospitais_leitos_latest.csv')

## Visualiza√ß√£o 3: Evolu√ß√£o Temporal das Categorias

In [None]:
import pandas as pd
import duckdb
import plotly.express as px
import os
import shutil # Usado para criar/apagar a pasta temp
import re # Usaremos uma busca simples por <body>

# ==============================================================================
# 1. CONFIGURA√á√ïES
# ==============================================================================
DB_FILENAME = 'datasus.db'
TABLE_NAME = 'hospitais_leitos'

# Diret√≥rio para salvar os arquivos tempor√°rios
TEMP_DIR = "temp_graficos_v20"

# Colunas de Categoria
CATEGORIAS_PRINCIPAIS = [
    'tipo_gestao_do_hospital',
    'descricao_do_tipo_da_unidade',
    'descricao_da_natureza_juridica_do_hospital'
]

# Colunas de M√©trica
METRICAS_ANALISE = [
    'leitos_sus',
    'leitos_geral',
    'uti_total',
    'uti_sus_total',
    'uti_adulto',
    'uti_sus_adulto',
    'uti_pediatrico',
    'uti_sus_pediatrico',
    'uti_neonatal',
    'uti_sus_neonatal',
    'uti_queimado',
    'uti_sus_queimado',
    'uti_coronariana',
    'uti_sus_coronariana'
]

# ==============================================================================
# 2. FUN√á√ÉO AUXILIAR PARA EXTRAIR O CONTE√öDO DO <body>
# ==============================================================================

def extract_body_content(html_content):
    """
    Extrai todo o conte√∫do entre as tags <body> e </body>.
    """
    try:
        # Encontra o in√≠cio do conte√∫do (logo ap√≥s <body ...>)
        body_start_match = re.search(r'<body.*?>', html_content, re.IGNORECASE | re.DOTALL)
        if not body_start_match:
            print("AVISO: Tag <body> n√£o encontrada no arquivo tempor√°rio.")
            return ""

        start_index = body_start_match.end()

        # Encontra o fim do conte√∫do (logo antes de </body>)
        body_end_match = re.search(r'</body\s*>', html_content, re.IGNORECASE | re.DOTALL)
        if not body_end_match:
            print("AVISO: Tag </body> n√£o encontrada no arquivo tempor√°rio.")
            return ""

        end_index = body_end_match.start()

        return html_content[start_index:end_index]

    except Exception as e:
        print(f"Erro ao extrair conte√∫do do body: {e}")
        return ""

# ==============================================================================
# 3. PROCESSAMENTO E GERA√á√ÉO DE GR√ÅFICOS
# ==============================================================================

print(f"Iniciando gera√ß√£o (v20 - Agrega√ß√£o Robusta)...")

# Limpa e cria o diret√≥rio tempor√°rio
if os.path.exists(TEMP_DIR):
    shutil.rmtree(TEMP_DIR)
os.makedirs(TEMP_DIR, exist_ok=True)
print(f"Diret√≥rio tempor√°rio criado em: {os.path.abspath(TEMP_DIR)}")

try:
    con = duckdb.connect(database=DB_FILENAME, read_only=True)

    # === LOOP EXTERNO (pelas 3 categorias) ===
    for cat_col in CATEGORIAS_PRINCIPAIS:
        print(f"\n========================================================")
        print(f"Processando categoria: {cat_col}")
        print(f"========================================================")

        temp_files_list = [] # Lista para guardar os NOMES dos arquivos tempor√°rios

        # --- PARTE 1: Gerar os 14 arquivos HTML separados (L√≥gica v17) ---
        for met_col in METRICAS_ANALISE:
            print(f"  -> Gerando arquivo tempor√°rio para: {met_col}")
            try:
                # 1. Query DENTRO DO LOOP
                # Certifica-se que a coluna de data √© CAST para VARCHAR ANTES de agrupar para evitar erros de tipo
                query = f"""
                SELECT
                    CAST("data_competencia_info" AS VARCHAR) AS ano_mes,
                    "{cat_col}",
                    SUM(CAST("{met_col}" AS BIGINT)) AS "{met_col}"
                FROM "{TABLE_NAME}"
                WHERE
                    "data_competencia_info" IS NOT NULL
                    AND "{cat_col}" IS NOT NULL
                GROUP BY ano_mes, "{cat_col}"
                ORDER BY ano_mes, "{cat_col}"
                """
                df_agregado = con.execute(query).fetchdf()

                if df_agregado.empty:
                    print(f"    AVISO: Query vazia para '{met_col}'. Pulando.")
                    continue

                # Garante que 'ano_mes' √© string antes de usar para pivot
                df_agregado['ano_mes'] = df_agregado['ano_mes'].astype(str)
                df_agregado = df_agregado.dropna(subset=['ano_mes', cat_col])

                # Handle potential non-string categories before unique()
                df_agregado[cat_col] = df_agregado[cat_col].astype(str)

                todos_os_meses = sorted(df_agregado['ano_mes'].unique())

                # Ensure that the pivot table uses string types for columns
                df_pivot = df_agregado.pivot(index='ano_mes', columns=cat_col, values=met_col)
                df_pivot = df_pivot.reindex(todos_os_meses)

                # Convert index to datetime after reindexing
                df_pivot.index = pd.to_datetime(df_pivot.index, format='%Y-%m-%d', errors='coerce') # Ajustado formato para YYYY-MM-DD
                # Corrigido: Filtrar linhas onde a convers√£o da data falhou (√≠ndice √© NaT)
                df_pivot = df_pivot[df_pivot.index.notna()]
                df_pivot.index.name = 'Data'


                # 5. Plotagem
                title = f"Evolu√ß√£o Mensal de {met_col.replace('_', ' ').title()} por {cat_col.replace('_', ' ').title()}"
                fig = px.line(df_pivot, title=title, markers=True, labels={'value': 'Total', cat_col: f'Categoria ({cat_col})'})
                fig.update_traces(hovertemplate='<b>%{x|%Y-%m}</b><br>%{full_name}<br>Total: %{y}')

                # 6. Salva o arquivo HTML COMPLETO
                temp_filename = os.path.join(TEMP_DIR, f"temp_grafico_{cat_col}_{met_col}.html")
                fig.write_html(temp_filename, include_plotlyjs='cdn') # Usa CDN para ser mais leve
                temp_files_list.append(temp_filename)

            except Exception as e_plot:
                print(f"    ERRO ao plotar '{met_col}': {e_plot}")
                import traceback
                traceback.print_exc()


        # --- PARTE 2: O "Agregador" (L√≥gica Robusta) ---
        if temp_files_list:
            final_filename = f"evolucao_por_{cat_col}_v20_FINAL.html"
            print(f"\n  -> Agregando {len(temp_files_list)} arquivos em '{final_filename}'...")

            with open(final_filename, 'w', encoding='utf-8') as f_out:
                # Escreve o cabe√ßalho 1x
                f_out.write(f"""<html>
                <head>
                    <title>Evolu√ß√£o por {cat_col.replace('_', ' ').title()}</title>
                    <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
                </head>
                <body style="font-family: sans-serif; padding: 20px; background-color: #f4f4f4;">
                    <h1 style="text-align: center;">Relat√≥rio de Evolu√ß√£o por {cat_col.replace('_', ' ').title()}</h1>
                """)

                # Loop para ler cada arquivo tempor√°rio
                for i, temp_file in enumerate(temp_files_list):
                    if i > 0:
                        f_out.write('<hr style="margin-top: 40px; margin-bottom: 40px;">')

                    try:
                        with open(temp_file, 'r', encoding='utf-8') as f_in:
                            content = f_in.read()

                            # Usa a fun√ß√£o auxiliar para extrair o conte√∫do do body
                            body_content = extract_body_content(content)

                            if body_content:
                                f_out.write(body_content) # Cola o conte√∫do do body
                            else:
                                f_out.write(f"<p>ERRO: N√£o foi poss√≠vel parsear o gr√°fico de {temp_file}</p>")
                    except Exception as e_read:
                         f_out.write(f"<p>ERRO ao ler o arquivo tempor√°rio {temp_file}: {e_read}</p>")


                # Escreve o rodap√© 1x
                f_out.write("</body></html>")

            print(f"  -> ‚úÖ Arquivo final salvo com sucesso em '{os.path.abspath(final_filename)}'")
        else:
            print(f"  -> Nenhum gr√°fico foi gerado para '{cat_col}'.")

except duckdb.Error as e:
    print(f"\nERRO DuckDB: {e}")
except KeyError as e:
    print(f"\nERRO KeyError: A coluna {e} n√£o foi encontrada no DataFrame.")
except Exception as e:
    print(f"\nERRO inesperado: {e}")
    import traceback
    traceback.print_exc()


finally:
    if 'con' in locals() and con:
        con.close()
        print("\nConex√£o DuckDB fechada.")

# Limpa a pasta tempor√°ria
try:
    if os.path.exists(TEMP_DIR):
        shutil.rmtree(TEMP_DIR)
        print(f"\nDiret√≥rio tempor√°rio '{TEMP_DIR}' removido.")
except Exception as e:
    print(f"\nAVISO: N√£o foi poss√≠vel remover o diret√≥rio tempor√°rio '{TEMP_DIR}': {e}")

print("\nProcessamento conclu√≠do.")

In [None]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import os

def analise_evolucao_temporal(caminho_csv):
    print(f"üìÇ Lendo arquivo: {caminho_csv}")
    df = pd.read_csv(caminho_csv)

    # --- Padroniza nomes ---
    colunas = [c.lower().strip() for c in df.columns]
    mapa = dict(zip(colunas, df.columns))

    if "data_competencia_info" not in colunas:
        print("‚ùå Coluna 'data_competencia_info' n√£o encontrada.")
        print("‚úÖ Colunas dispon√≠veis:", df.columns.tolist())
        return

    df.rename(columns={mapa["data_competencia_info"]: "data_competencia_info"}, inplace=True)

    # --- Convers√£o robusta das datas ---
    print("üïê Convertendo coluna de datas...")
    df["data_competencia_info"] = (
        df["data_competencia_info"].astype(str).str.strip().replace(["", "nan", "None"], np.nan)
    )

    df["data_competencia_info"] = pd.to_datetime(df["data_competencia_info"], errors="coerce")

    # Se mais da metade for NaT, tenta formato alternativo
    if df["data_competencia_info"].isna().mean() > 0.5:
        df["data_competencia_info"] = pd.to_datetime(
            df["data_competencia_info"], format="%Y%m", errors="coerce"
        )

    print("Datas v√°lidas:", df["data_competencia_info"].notna().sum())
    print("Datas inv√°lidas:", df["data_competencia_info"].isna().sum())

    df.dropna(subset=["data_competencia_info"], inplace=True)
    if df.empty:
        print("‚ùå Nenhuma linha v√°lida com data encontrada!")
        return

    print("üìÖ Per√≠odo dos dados:",
          df["data_competencia_info"].min().strftime("%Y-%m-%d"),
          "‚Üí",
          df["data_competencia_info"].max().strftime("%Y-%m-%d"))

    # --- Detecta colunas de leitos ---
    colunas_leitos = {
        'leitos_geral': 'Leitos Gerais',
        'leitos_sus': 'Leitos SUS',
        'uti_total': 'UTI Total',
        'uti_sus_total': 'UTI SUS Total'
    }

    colunas_encontradas = {}
    for nome_col, titulo in colunas_leitos.items():
        if nome_col in colunas:
            colunas_encontradas[mapa[nome_col]] = titulo

    if not colunas_encontradas:
        print("‚ùå Nenhuma coluna de leitos encontrada!")
        print("‚úÖ Colunas dispon√≠veis:", df.columns.tolist())
        return

    print(f"üìä Colunas de leitos encontradas: {list(colunas_encontradas.keys())}")

    # Converte colunas para num√©rico
    for col in colunas_encontradas.keys():
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # --- Verifica duplica√ß√µes por hospital e data ---
    if "cnes" not in colunas:
        print("‚ö†Ô∏è Coluna 'cnes' n√£o encontrada ‚Äì agregando diretamente por data.")
        df_agg = df.copy()
    else:
        col_cnes = mapa["cnes"]
        duplicadas = df.duplicated(subset=[col_cnes, "data_competencia_info"], keep=False)
        n_dup = duplicadas.sum()
        print(f"üîç Registros duplicados por hospital/m√™s: {n_dup}")

        # Agrupa por hospital e compet√™ncia ‚Üí 1 linha por hospital/m√™s
        agg_dict = {col: 'max' for col in colunas_encontradas.keys()}
        df_agg = (
            df.groupby([col_cnes, "data_competencia_info"])
            .agg(agg_dict)
            .reset_index()
        )

    # --- Soma total de leitos no Brasil por m√™s ---
    agg_dict = {col: 'sum' for col in colunas_encontradas.keys()}
    df_total = df_agg.groupby("data_competencia_info").agg(agg_dict).reset_index()
    df_total = df_total.sort_values("data_competencia_info")

    # --- Cria diret√≥rio de sa√≠da ---
    os.makedirs("analises_leitos", exist_ok=True)

    # --- Gera UM √öNICO GR√ÅFICO com todas as curvas ---
    print(f"\nüìà Gerando gr√°fico com {len(colunas_encontradas)} curvas...")

    # Cores para cada tipo de leito
    cores = {
        'Leitos Gerais': '#1f77b4',
        'Leitos SUS': '#ff7f0e',
        'UTI Total': '#2ca02c',
        'UTI SUS Total': '#d62728'
    }

    fig = go.Figure()

    for col_original, titulo in colunas_encontradas.items():
        fig.add_trace(go.Scatter(
            x=df_total["data_competencia_info"],
            y=df_total[col_original],
            mode='lines+markers',
            name=titulo,
            line=dict(width=2, color=cores.get(titulo, '#333')),
            marker=dict(size=4),
            hovertemplate='<b>%{x|%Y-%m}</b><br>%{fullData.name}<br>Total: %{y:,.0f}<extra></extra>'
        ))

    fig.update_layout(
        title='Evolu√ß√£o Mensal de Leitos Hospitalares no Brasil',
        xaxis_title='Data',
        yaxis_title='Total',
        hovermode='x unified',
        template='plotly_white',
        height=600,
        legend=dict(
            orientation="v",
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=0.01,
            bgcolor="rgba(255, 255, 255, 0.8)"
        )
    )

    # --- Exibe o gr√°fico em vez de salvar como HTML ---
    fig.show()

    print("\nüéâ An√°lise temporal conclu√≠da com sucesso!")


# --- Execu√ß√£o direta ---
if __name__ == "__main__":
    analise_evolucao_temporal("hospitais_leitos_completo.csv")

## Visualiza√ß√£o 4: Outras visualiza√ß√µes

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import requests
from pathlib import Path
import numpy as np

# ==============================================================================
# CONFIGURA√á√ïES
# ==============================================================================

OUTPUT_DIR = "analises_leitos"
Path(OUTPUT_DIR).mkdir(exist_ok=True)

# Dados de popula√ß√£o por UF (IBGE 2022 - estimativa)
POPULACAO_UF = {
    'SP': 44411238, 'MG': 20539989, 'RJ': 16055174, 'BA': 14141626,
    'PR': 11444380, 'RS': 10882965, 'PE': 9058931, 'CE': 8794957,
    'PA': 8120131, 'MA': 6776699, 'SC': 7610361, 'GO': 7056495,
    'PB': 3974687, 'AM': 3941613, 'ES': 3833712, 'RN': 3302729,
    'AL': 3127683, 'MT': 3658649, 'PI': 3271199, 'DF': 2817381,
    'MS': 2757013, 'SE': 2210004, 'RO': 1581196, 'TO': 1511460,
    'AC': 830018, 'AP': 733759, 'RR': 636707
}

# Regi√µes do Brasil
REGIOES = {
    'Norte': ['AC', 'AP', 'AM', 'PA', 'RO', 'RR', 'TO'],
    'Nordeste': ['AL', 'BA', 'CE', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE'],
    'Centro-Oeste': ['DF', 'GO', 'MT', 'MS'],
    'Sudeste': ['ES', 'MG', 'RJ', 'SP'],
    'Sul': ['PR', 'RS', 'SC']
}

# Inverte o dicion√°rio para mapear UF -> Regi√£o
UF_PARA_REGIAO = {}
for regiao, ufs in REGIOES.items():
    for uf in ufs:
        UF_PARA_REGIAO[uf] = regiao

# ==============================================================================
# 1. LEITOS PER CAPITA
# ==============================================================================

def analise_leitos_per_capita(csv_filename='last.csv'):
    """
    Analisa leitos por 1.000 habitantes por estado.
    """
    print("\n" + "="*70)
    print("üë• AN√ÅLISE 1: LEITOS PER CAPITA")
    print("="*70 + "\n")

    print(f"üìÇ Carregando arquivo '{csv_filename}'...")
    try:
        df = pd.read_csv(csv_filename, encoding='utf-8')
        print(f"‚úÖ Carregado: {len(df):,} registros")
    except FileNotFoundError:
        print(f"‚ùå Arquivo '{csv_filename}' n√£o encontrado.")
        return

    # üîç QUERY SQL PARA VALIDA√á√ÉO
    print("\n" + "="*70)
    print("üîç QUERY SQL PARA VALIDA√á√ÉO DOS DADOS:")
    print("="*70)
    sql_validacao = """
-- Leitos totais por UF (√∫ltimos registros)
WITH ultimos_registros AS (
    SELECT DISTINCT ON (cnes) *
    FROM hospitais_leitos
    ORDER BY cnes, data_competencia_info DESC
)
SELECT
    uf_hospital,
    SUM(leitos_geral) as total_leitos_geral,
    SUM(leitos_sus) as total_leitos_sus,
    SUM(uti_total) as total_uti,
    SUM(uti_sus_total) as total_uti_sus,
    COUNT(*) as num_hospitais
FROM ultimos_registros
GROUP BY uf_hospital
ORDER BY total_leitos_geral DESC;
"""
    print(sql_validacao)
    print("="*70 + "\n")

    # Agregar por UF
    df['uf_hospital'] = df['uf_hospital'].str.strip().str.upper()

    leitos_por_uf = df.groupby('uf_hospital').agg({
        'leitos_geral': 'sum',
        'leitos_sus': 'sum',
        'uti_total': 'sum',
        'uti_sus_total': 'sum'
    }).reset_index()

    # Adicionar popula√ß√£o
    leitos_por_uf['populacao'] = leitos_por_uf['uf_hospital'].map(POPULACAO_UF)
    leitos_por_uf = leitos_por_uf.dropna(subset=['populacao'])

    # Calcular leitos per capita (por 1.000 habitantes)
    leitos_por_uf['leitos_geral_per_capita'] = (leitos_por_uf['leitos_geral'] / leitos_por_uf['populacao']) * 1000
    leitos_por_uf['leitos_sus_per_capita'] = (leitos_por_uf['leitos_sus'] / leitos_por_uf['populacao']) * 1000
    leitos_por_uf['uti_total_per_capita'] = (leitos_por_uf['uti_total'] / leitos_por_uf['populacao']) * 1000
    leitos_por_uf['uti_sus_per_capita'] = (leitos_por_uf['uti_sus_total'] / leitos_por_uf['populacao']) * 1000

    # Adicionar regi√£o
    leitos_por_uf['regiao'] = leitos_por_uf['uf_hospital'].map(UF_PARA_REGIAO)

    # Ordenar
    leitos_por_uf = leitos_por_uf.sort_values('leitos_geral_per_capita', ascending=True)

    print("üìä Dados calculados (amostra):")
    print(leitos_por_uf[['uf_hospital', 'leitos_geral', 'populacao', 'leitos_geral_per_capita']].tail(10).to_string(index=False))
    print()

    # Gr√°fico de barras horizontais
    fig = go.Figure()

    fig.add_trace(go.Bar(
        y=leitos_por_uf['uf_hospital'],
        x=leitos_por_uf['leitos_geral_per_capita'],
        name='Leitos Gerais',
        orientation='h',
        marker_color='#e74c3c'
    ))

    fig.add_trace(go.Bar(
        y=leitos_por_uf['uf_hospital'],
        x=leitos_por_uf['leitos_sus_per_capita'],
        name='Leitos SUS',
        orientation='h',
        marker_color='#3498db'
    ))

    fig.update_layout(
        title='üë• Leitos por 1.000 Habitantes - Por Estado',
        xaxis_title='Leitos por 1.000 habitantes',
        yaxis_title='Estado',
        barmode='group',
        template='plotly_white',
        height=800,
        showlegend=True
    )

    # --- Exibe o gr√°fico em vez de salvar como HTML ---
    fig.show()
    print(f"‚úÖ Gr√°fico de Leitos per Capita exibido acima.")


    # --- Gr√°fico 2: M√©dia por regi√£o com desvio padr√£o ---
    print(f"\nüìç Gerando gr√°fico de m√©dia por regi√£o...")

    media_por_regiao = leitos_por_uf.groupby('regiao').agg({
        'leitos_geral_per_capita': ['mean', 'std'],
        'leitos_geral': 'sum'
    }).reset_index()

    media_por_regiao.columns = ['regiao', 'media', 'desvio', 'total']

    fig_regional = go.Figure()

    fig_regional.add_trace(go.Bar(
        x=media_por_regiao['regiao'],
        y=media_por_regiao['media'],
        error_y=dict(type='data', array=media_por_regiao['desvio']),
        marker_color=['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6'],
        text=media_por_regiao['media'].round(2),
        textposition='outside'
    ))

    fig_regional.update_layout(
        title='üìç M√©dia de Leitos per Capita por Regi√£o (com desvio padr√£o)',
        xaxis_title='Regi√£o',
        yaxis_title='Leitos por 1.000 habitantes (m√©dia)',
        template='plotly_white',
        height=600
    )

    # --- Exibe o gr√°fico regional em vez de salvar como HTML ---
    fig_regional.show()
    print(f"‚úÖ Gr√°fico regional exibido acima.")


    # Estat√≠sticas
    print(f"\nüìä Estat√≠sticas Nacionais:")
    print(f"   M√©dia: {leitos_por_uf['leitos_geral_per_capita'].mean():.2f} leitos/1000 hab")
    print(f"   Mediana: {leitos_por_uf['leitos_geral_per_capita'].median():.2f} leitos/1000 hab")
    print(f"\nüèÜ Top 3 Estados:")
    for i, row in leitos_por_uf.tail(3).iterrows():
        print(f"   {row['uf_hospital']}: {row['leitos_geral_per_capita']:.2f} leitos/1000 hab")

    print(f"\n‚ö†Ô∏è  Bottom 3 Estados:")
    for i, row in leitos_por_uf.head(3).iterrows():
        print(f"   {row['uf_hospital']}: {row['leitos_geral_per_capita']:.2f} leitos/1000 hab\n")


# ==============================================================================
# 2. PROPOR√á√ÉO SUS vs PRIVADO
# ==============================================================================

def analise_proporcao_sus(csv_filename='last.csv'):
    """
    Analisa a propor√ß√£o de leitos SUS vs privados.
    """
    print("\n" + "="*70)
    print("üè• AN√ÅLISE 2: PROPOR√á√ÉO SUS vs PRIVADO")
    print("="*70 + "\n")

    print(f"üìÇ Carregando arquivo '{csv_filename}'...")
    try:
        df = pd.read_csv(csv_filename, encoding='utf-8')
        print(f"‚úÖ Carregado: {len(df):,} registros")
    except FileNotFoundError:
        print(f"‚ùå Arquivo '{csv_filename}' n√£o encontrado.")
        return

    # üîç QUERY SQL PARA VALIDA√á√ÉO
    print("\n" + "="*70)
    print("üîç QUERY SQL PARA VALIDA√á√ÉO DOS DADOS:")
    print("="*70)
    sql_validacao = """
-- Propor√ß√£o SUS vs Privado por UF
WITH ultimos_registros AS (
    SELECT DISTINCT ON (cnes) *
    FROM hospitais_leitos
    ORDER BY cnes, data_competencia_info DESC
)
SELECT
    uf_hospital,
    SUM(leitos_geral) as total_leitos,
    SUM(leitos_sus) as total_sus,
    SUM(leitos_geral) - SUM(leitos_sus) as total_privado,
    ROUND((SUM(leitos_sus)::numeric / NULLIF(SUM(leitos_geral), 0) * 100), 2) as percentual_sus,
    SUM(uti_total) as total_uti,
    SUM(uti_sus_total) as total_uti_sus,
    ROUND((SUM(uti_sus_total)::numeric / NULLIF(SUM(uti_total), 0) * 100), 2) as percentual_uti_sus
FROM ultimos_registros
GROUP BY uf_hospital
ORDER BY percentual_sus DESC;
"""
    print(sql_validacao)
    print("="*70 + "\n")

    # Agregar por UF
    df['uf_hospital'] = df['uf_hospital'].str.strip().str.upper()

    proporcao_uf = df.groupby('uf_hospital').agg({
        'leitos_geral': 'sum',
        'leitos_sus': 'sum',
        'uti_total': 'sum',
        'uti_sus_total': 'sum'
    }).reset_index()

    # Calcular propor√ß√µes
    proporcao_uf['proporcao_sus_geral'] = (proporcao_uf['leitos_sus'] / proporcao_uf['leitos_geral']) * 100
    proporcao_uf['proporcao_sus_uti'] = (proporcao_uf['uti_sus_total'] / proporcao_uf['uti_total']) * 100
    proporcao_uf['leitos_privados'] = proporcao_uf['leitos_geral'] - proporcao_uf['leitos_sus']
    proporcao_uf['uti_privada'] = proporcao_uf['uti_total'] - proporcao_uf['uti_sus_total']

    # Adicionar regi√£o
    proporcao_uf['regiao'] = proporcao_uf['uf_hospital'].map(UF_PARA_REGIAO)

    # Ordenar
    proporcao_uf = proporcao_uf.sort_values('proporcao_sus_geral', ascending=False)

    print("üìä Dados calculados (amostra):")
    print(proporcao_uf[['uf_hospital', 'leitos_geral', 'leitos_sus', 'proporcao_sus_geral']].head(10).to_string(index=False))
    print()

    # Gr√°fico: Percentual SUS
    fig = go.Figure()

    fig.add_trace(go.Bar(
        x=proporcao_uf['uf_hospital'],
        y=proporcao_uf['proporcao_sus_geral'],
        name='% Leitos SUS',
        marker_color='#2ecc71'
    ))

    fig.add_trace(go.Bar(
        x=proporcao_uf['uf_hospital'],
        y=proporcao_uf['proporcao_sus_uti'],
        name='% UTI SUS',
        marker_color='#f39c12'
    ))

    fig.add_hline(y=50, line_dash="dash", line_color="red",
                   annotation_text="50%", annotation_position="right")

    fig.update_layout(
        title='üìä Percentual de Leitos SUS por Estado',
        xaxis_title='Estado',
        yaxis_title='Percentual (%)',
        barmode='group',
        template='plotly_white',
        height=600
    )

    # --- Exibe o gr√°fico em vez de salvar como HTML ---
    fig.show()
    print(f"‚úÖ Gr√°fico de Percentual SUS exibido acima.")

    # Estat√≠sticas
    print(f"\nüìä Estat√≠sticas Nacionais:")
    total_geral = proporcao_uf['leitos_geral'].sum()
    total_sus = proporcao_uf['leitos_sus'].sum()
    print(f"   Propor√ß√£o SUS (Leitos Gerais): {(total_sus/total_geral)*100:.1f}%")

    total_uti = proporcao_uf['uti_total'].sum()
    total_uti_sus = proporcao_uf['uti_sus_total'].sum()
    print(f"   Propor√ß√£o SUS (UTI): {(total_uti_sus/total_uti)*100:.1f}%\n")


# ==============================================================================
# FUN√á√ÉO PRINCIPAL
# ==============================================================================

def gerar_todas_analises(csv_completo='hospitais_leitos_completo.csv',
                         csv_ultimo='last.csv'):
    """
    Executa todas as an√°lises recomendadas.

    Args:
        csv_completo: CSV com todos os registros hist√≥ricos (para an√°lise temporal)
        csv_ultimo: CSV com √∫ltimos registros por hospital (para an√°lises atuais)
    """

    print("\n" + "="*70)
    print("üöÄ GERADOR DE AN√ÅLISES AVAN√áADAS DE LEITOS HOSPITALARES")
    print("="*70)

    # An√°lise 1: Leitos per capita
    analise_leitos_per_capita(csv_ultimo)

    # An√°lise 2: Propor√ß√£o SUS
    analise_proporcao_sus(csv_ultimo)


    print("\n" + "="*70)
    print("‚úÖ TODAS AS AN√ÅLISES CONCLU√çDAS!")
    print("="*70)
    # Removed the output directory and file names as the plots are now displayed directly
    print("\nüí° Os gr√°ficos foram exibidos acima.")


# ==============================================================================
# EXECU√á√ÉO
# ==============================================================================

if __name__ == "__main__":

    gerar_todas_analises(
        csv_completo='hospitais_leitos_completo.csv',  # Todos os registros
        csv_ultimo='hospitais_leitos_latest.csv'  # √öltimos registros por hospital
    )