### importando bibliotecas

In [None]:
import geopandas as gpd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from geobr import read_municipality
from esda.moran import Moran, Moran_Local
from libpysal.weights import Queen
from splot.esda import moran_scatterplot
import sidrapy
import unicodedata
import locale

import libpysal as lps
import esda
from splot.esda import moran_scatterplot, lisa_cluster, plot_local_autocorrelation

In [None]:
def normalize_text(text):
    """Remove acentos e converte para caixa alta"""
    text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
    return text.upper()

### funcao que combina dados do SIH com dados geografico -> gdf

In [None]:
def load_piaui_combined(year, admissions_filename, population_filename):
    """
    Filtra dados do Piauí de CSVs nacionais e retorna um GeoDataFrame com as métricas.
    
    Args:
        year: Ano de referência dos dados
        admissions_filename: Caminho para o CSV de internações
        population_filename: Caminho para o CSV de população
        
    Returns:
        GeoDataFrame com geometrias e métricas dos municípios do Piauí
    """
    # Passo 1: Carrega municípios do Piauí (códigos oficiais de 7 dígitos)
    piaui = read_municipality(code_muni="PI", year=2022)
    
    # Garantir que code_muni seja string para operações de texto
    piaui["code_muni"] = piaui["code_muni"].astype(str)
    
    # Criar versão de 6 dígitos do código para correspondência com os CSVs
    piaui["code_muni_6d"] = piaui["code_muni"].str[:6]
    
    # Normalizar nomes para casos onde seja necessário fazer correspondência por nome
    piaui["nome_norm"] = piaui["name_muni"].astype(str).apply(normalize_text)
    
    # Imprimir informações de diagnóstico
    print(f"Total de municípios do Piauí carregados: {len(piaui)}")
    print("Exemplos de códigos e nomes de municípios:")
    print(piaui[["code_muni", "code_muni_6d", "name_muni", "nome_norm"]].head(3))
    
    # Passo 2: Função para processar CSVs nacionais
    def process_csv(filename, metric_name):
        #print(f"\nProcessando arquivo {filename} para métrica {metric_name}...")
        df = pd.read_csv(filename, encoding="iso-8859-1")
        
        # Verificar as primeiras linhas do CSV para diagnóstico
        #print("Primeiras linhas do CSV:")
        #print(df.head(2))
        
        # Método 1: Extração do código IBGE dos primeiros dígitos da coluna Município
        # Usar regex para extrair o código no início da string
        df["code_muni_csv"] = df["Município"].astype(str).str.extract(r'^(\d{6})').fillna("")
        
        # Verificar se os códigos estão no formato esperado (6 dígitos)
        valid_codes = df["code_muni_csv"].str.match(r'^\d{6}$')
        #if not valid_codes.all():
        #    print(f"Aviso: Alguns códigos não estão no formato de 6 dígitos. Total: {(~valid_codes).sum()}")
        #    print("Exemplos de entradas problemáticas:")
        #    print(df[~valid_codes]["Município"].head())
        
        # Extrair nome do município para correspondência alternativa
        # Usar regex para remover o código e espaços no início
        df["nome_municipio"] = df["Município"].astype(str).str.replace(r'^\d+\s+', '', regex=True).str.strip()
        df["nome_norm"] = df["nome_municipio"].apply(normalize_text)
        
        # Diagnóstico - verificar extração de códigos e nomes
        #print("\nExemplos de extração de códigos e nomes:")
        #print(df[["Município", "code_muni_csv", "nome_municipio", "nome_norm"]].head(3))
        
        # Processa a coluna do ano
        col_year = str(year)
        if col_year in df.columns:
            df[metric_name] = (
                df[col_year]
                .replace("-", "0")
                .apply(pd.to_numeric, errors="coerce")
                .fillna(0)
            )
        else:
            #print(f"Aviso: Coluna {col_year} não encontrada em {filename}.")
            # Verificar as colunas disponíveis
            #print(f"Colunas disponíveis: {df.columns.tolist()}")
            df[metric_name] = 0
        
        # Filtrar apenas municípios do Piauí (código começa com 22)
        df_piaui = df[df["code_muni_csv"].str.startswith("22")]
        #print(f"Municípios filtrados para o Piauí: {len(df_piaui)} de {len(df)}")
        
        if len(df_piaui) == 0:
            print("ALERTA: Nenhum município do Piauí encontrado pelo código!")
            # Verificar se existem municípios com nomes que contêm Piauí
            piaui_by_name = df[df["nome_municipio"].str.contains("PIAUI", case=False)]
            print(f"Municípios com 'Piauí' no nome: {len(piaui_by_name)}")
            if len(piaui_by_name) > 0:
                print(piaui_by_name[["Município", "nome_municipio"]].head())
        
        # Correspondência com a base geobr usando as diferentes estratégias
        
        # 1. Tentar correspondência direta com código de 6 dígitos
        merged_df = piaui.copy()
        
        # Preparar um dicionário para mapear código → valor da métrica
        metric_dict = dict(zip(df_piaui["code_muni_csv"], df_piaui[metric_name]))
        
        # Aplicar o mapeamento para os municípios do Piauí
        merged_df[metric_name] = merged_df["code_muni_6d"].map(metric_dict)
        
        # 2. Para municípios não correspondidos, tentar pelo nome normalizado
        missing_munis = merged_df[merged_df[metric_name].isna()]
        if len(missing_munis) > 0:
            
            # Preparar um dicionário para mapear nome normalizado → valor da métrica
            name_dict = dict(zip(df_piaui["nome_norm"], df_piaui[metric_name]))
            
            # Aplicar apenas para municípios sem valor
            for idx, row in missing_munis.iterrows():
                if row["nome_norm"] in name_dict:
                    merged_df.loc[idx, metric_name] = name_dict[row["nome_norm"]]
            
            # Verificar quantos foram preenchidos
            still_missing = merged_df[merged_df[metric_name].isna()]
            filled_by_name = len(missing_munis) - len(still_missing)
            
        
        # Verificar resultados
        filled_count = merged_df[metric_name].notna().sum()
        
        # Se ainda existirem valores ausentes, preencher com 0
        merged_df[metric_name] = merged_df[metric_name].fillna(0)
        
        return merged_df[["code_muni", "geometry", "name_muni", metric_name]]
    
    # Passo 3: Processar dados de internações e população
    piaui_with_admissions = process_csv(admissions_filename, "internacoes")
    
    # Usar o resultado do processamento anterior para adicionar dados de população
    piaui_merged = process_csv(population_filename, "populacao")
    
    # Garantir que temos apenas um conjunto de dados consolidado
    final_data = piaui_with_admissions.merge(
        piaui_merged[["code_muni", "populacao"]],
        on="code_muni",
        how="left"
    )
    
    # Garantir que não existem valores ausentes
    final_data["internacoes"] = final_data["internacoes"].fillna(0)
    final_data["populacao"] = final_data["populacao"].fillna(0)
    
    # Calcular a taxa de internação (por 100.000 habitantes)
    final_data["taxa_internacao"] = np.where(
        final_data["populacao"] > 0,
        (final_data["internacoes"] / final_data["populacao"]) * 100000,
        0
    )
    
    # Adicionar o ano de referência
    final_data["ano"] = year
    
    return final_data

### funcao que plota e faz analise LISA e I moran

In [None]:
def plot_choropleth(gdf):
    fig, ax = plt.subplots(figsize=(10, 8))
    gdf.plot(column='taxa_internacao', 
             cmap='Blues', 
             legend=True,
             #scheme='quantiles',
             edgecolor='black', 
             linewidth=0.3,
             ax=ax)
    ax.set_title('Taxa de Internação por Município', fontsize=16)
    plt.tight_layout()
    plt.show()

# 2. Moran's I Calculation (Corrigido)
def calculate_moran(gdf):
    # Matriz de pesos espaciais
    #w = lps.weights.Queen.from_dataframe(gdf)
    w = lps.weights.Queen.from_dataframe(gdf, use_index=True)
    w.transform = 'r'
    
    # Cálculo do Moran's I
    y = gdf['taxa_internacao'].values
    moran = esda.Moran(y, w)
    
    print(f"Moran's I: {moran.I:.4f}")
    print(f"P-value: {moran.p_sim:.4f}")
    
    # Plot corrigido
    fig, ax = plt.subplots(figsize=(8, 6))
    moran_scatterplot(moran, ax=ax)
    ax.set_title("Diagrama de Dispersão de Moran", fontsize=14)
    plt.show()
    
    return moran, w

def lisa_analysis(gdf, w):
    lisa = esda.Moran_Local(gdf['taxa_internacao'].values, w)
    
    # Categorias de clusters
    categories = {
        1: 'HH',  # Alto-Alto
        2: 'LH',  # Baixo-Alto
        3: 'LL',  # Baixo-Baixo
        4: 'HL'   # Alto-Baixo
    }
    
    # Filtro de significância (correção: use p_sim em vez de p)
    sig = lisa.p_sim < 0.05  # <--- Alteração aqui!
    gdf['lisa_cluster'] = 'Non-significant'
    gdf.loc[sig, 'lisa_cluster'] = [categories[q] for q in lisa.q[sig]]
    
    # Plot LISA
    fig, ax = plt.subplots(figsize=(10, 8))
    lisa_cluster(lisa, gdf, p=0.05, ax=ax)
    ax.set_title('Clusters LISA', fontsize=16)
    plt.show()
    
    return lisa

# 4. Print Clusters (Mesma função)
def print_clusters(gdf):
    hh = gdf[gdf['lisa_cluster'] == 'HH']
    ll = gdf[gdf['lisa_cluster'] == 'LL']
    
    print("\nClusters HH (Alto-Alto):")
    print(hh[['name_muni', 'taxa_internacao']].to_string(index=False))
    
    print("\nClusters LL (Baixo-Baixo):")
    print(ll[['name_muni', 'taxa_internacao']].to_string(index=False))


In [9]:
# Execução completa
def full_analysis(year,filename):
    gdf = load_piaui_combined(year,filename, "data/P_residente.csv")
    plot_choropleth(gdf)
    moran, w = calculate_moran(gdf)
    lisa = lisa_analysis(gdf, w)
    print_clusters(gdf)

# Execute com seu DataFrame geoespacial


### analises

In [None]:
full_analysis(2023,"data/I_internacao.csv")

In [20]:
def analyze_multiple_years(anos, filename, pop_file="data/P_residente.csv"):
    """
    Realiza análise espacial (Moran I e LISA) para múltiplos anos e retorna uma tabela com os resultados.
    Utiliza a função load_piaui_combined para carregar os dados de cada ano.
    
    Parâmetros:
    -----------
    anos : list
        Lista com os anos a serem analisados
    filename : str
        Caminho para o arquivo CSV com os dados de taxa de internação
    pop_file : str, opcional
        Caminho para o arquivo CSV com os dados de população residente (padrão: "data/P_residente.csv")
        
    Retorno:
    --------
    pandas.DataFrame
        Tabela com os resultados das análises por ano
    """
    
    # Criar DataFrame para armazenar os resultados
    resultados = pd.DataFrame(
        index=anos,
        columns=[
            'Moran_I', 'Moran_p_value', 
            'HH_count', 'LL_count', 'LH_count', 'HL_count',
            'HH_mean', 'LL_mean', 'LH_mean', 'HL_mean'
        ]
    )
    municipios_por_cluster = {}
    # Para cada ano, carregar os dados e calcular as estatísticas
    for ano in anos:
        # Carregar os dados usando a função load_piaui_combined
        gdf = load_piaui_combined(ano, filename, pop_file)
        
        # Calcular Moran's I
        w = lps.weights.Queen.from_dataframe(gdf, use_index=True)
        w.transform = 'r'
        y = gdf['taxa_internacao'].values
        moran = esda.Moran(y, w)
        
        # Calcular LISA
        lisa = esda.Moran_Local(y, w)
        sig = lisa.p_sim < 0.05
        
        # Categorias de clusters
        gdf['lisa_cluster'] = 'Non-significant'
        categories = {1: 'HH', 2: 'LH', 3: 'LL', 4: 'HL'}
        gdf.loc[sig, 'lisa_cluster'] = [categories[q] for q in lisa.q[sig]]
        
        # Contagem de clusters
        hh = gdf[gdf['lisa_cluster'] == 'HH']
        ll = gdf[gdf['lisa_cluster'] == 'LL']
        lh = gdf[gdf['lisa_cluster'] == 'LH']
        hl = gdf[gdf['lisa_cluster'] == 'HL']

        municipios_por_cluster[ano] = {
            'HH': hh['name_muni'].tolist(),
            'LL': ll['name_muni'].tolist(),
            'LH': lh['name_muni'].tolist(),
            'HL': hl['name_muni'].tolist()
        }
        
        # Preencher resultados no DataFrame
        resultados.loc[ano, 'Moran_I'] = moran.I
        resultados.loc[ano, 'Moran_p_value'] = moran.p_sim
        
        # Contagem de clusters
        resultados.loc[ano, 'HH_count'] = len(hh)
        resultados.loc[ano, 'LL_count'] = len(ll)
        resultados.loc[ano, 'LH_count'] = len(lh)
        resultados.loc[ano, 'HL_count'] = len(hl)
        
        # Média da taxa de internação por tipo de cluster
        resultados.loc[ano, 'HH_mean'] = hh['taxa_internacao'].mean() if not hh.empty else np.nan
        resultados.loc[ano, 'LL_mean'] = ll['taxa_internacao'].mean() if not ll.empty else np.nan
        resultados.loc[ano, 'LH_mean'] = lh['taxa_internacao'].mean() if not lh.empty else np.nan
        resultados.loc[ano, 'HL_mean'] = hl['taxa_internacao'].mean() if not hl.empty else np.nan
    
    # Formatar o DataFrame para melhor visualização
    formato = {
        'Moran_I': '{:.4f}',
        'Moran_p_value': '{:.4f}',
        'HH_mean': '{:.2f}',
        'LL_mean': '{:.2f}',
        'LH_mean': '{:.2f}',
        'HL_mean': '{:.2f}'
    }
    
    for col, fmt in formato.items():
        resultados[col] = resultados[col].apply(lambda x: fmt.format(x) if not pd.isna(x) else 'N/A')
    
    return resultados, municipios_por_cluster

# Exemplo de uso:
# anos = [2010, 2011, 2012, 2013, 2014, 2015]
# resultados = analyze_multiple_years(anos, "data/taxas_internacao.csv")
# print(resultados)

In [27]:
anos = [2010, 2011, 2012, 2013, 2014, 2015,2016,2017,2018,2019,2020,2021,2022,2023]
resultados, municipios_cluster = analyze_multiple_years(anos, "data/I_residente.csv")

Total de municípios do Piauí carregados: 224
Exemplos de códigos e nomes de municípios:
   code_muni code_muni_6d     name_muni     nome_norm
0  2200053.0       220005         Acauã         ACAUA
1  2200103.0       220010  Agricolândia  AGRICOLANDIA
2  2200202.0       220020   Água Branca   AGUA BRANCA
Total de municípios do Piauí carregados: 224
Exemplos de códigos e nomes de municípios:
   code_muni code_muni_6d     name_muni     nome_norm
0  2200053.0       220005         Acauã         ACAUA
1  2200103.0       220010  Agricolândia  AGRICOLANDIA
2  2200202.0       220020   Água Branca   AGUA BRANCA
Total de municípios do Piauí carregados: 224
Exemplos de códigos e nomes de municípios:
   code_muni code_muni_6d     name_muni     nome_norm
0  2200053.0       220005         Acauã         ACAUA
1  2200103.0       220010  Agricolândia  AGRICOLANDIA
2  2200202.0       220020   Água Branca   AGUA BRANCA
Total de municípios do Piauí carregados: 224
Exemplos de códigos e nomes de municípios:


In [29]:
print(resultados)
print(municipios_cluster)

     Moran_I Moran_p_value HH_count LL_count LH_count HL_count HH_mean  \
2010  0.3113        0.0010       17       15        6        4  142.41   
2011  0.2604        0.0010       10       15        7        3  156.52   
2012  0.1747        0.0010       14        8        3        1  137.77   
2013  0.0861        0.0210       15        8        7        6  117.05   
2014  0.2346        0.0010       13       18        5        5  138.63   
2015  0.0663        0.0460        7       11       11        2   99.59   
2016  0.0524        0.0900        7        6        6        3  111.67   
2017  0.2404        0.0010       15       14        4        4  138.49   
2018  0.1926        0.0010       19       14        3        3  126.23   
2019  0.1563        0.0020       11       16        7        6  140.86   
2020  0.0798        0.0230        8       14       10        5   87.00   
2021  0.1514        0.0020       11       17        9        5   90.33   
2022  0.1313        0.0020       14   