Código consolidado:

In [1]:
import pandas as pd
import geopandas as gpd
import numpy as np
import os
import glob
from pathlib import Path

# ==============================================================================
# 1. CONFIGURAÇÕES E CAMINHOS
# ==============================================================================

# Caminhos de Entrada
# Ajuste se necessário, considerando onde você roda o script
path_csvs_ibge = Path('../data/raw/ibge/censo/2022/agregados_por_setores/csv/')
input_gpkg_favelas = Path('../data/raw/ibge/censo/2022/setores_censitarios/br_setores.gpkg')
path_base_h3 = Path('../data/interim/norm_winsorized/br_h3_res9.parquet')

# Caminhos de Saída (Pasta única)
output_dir = Path('../data/interim/norm_winsorized')

# Chaves de Ligação
csv_key = 'cd_setor'  # Padronizado para minúsculo
geo_key = 'cd_setor'

# Lista de TODAS as variáveis necessárias (EM MINÚSCULO)
required_vars = [
    # Demografia básica
    'v00001', 'v01006', 
    # V1: Renda
    'v06004',
    # V3: Saneamento
    'v00201', 'v00238', 
    # V4: Educação
    'v00853', 'v00855', 'v00857',
    # I1: Gênero
    'v01042', 
    # I2: Faixa etária
    'v01063', 'v01031', 'v01041', 
    # I3: Cor e Raça
    'v01317', 'v01318', 'v01319', 'v01320', 'v01321', 
    # Comunidades Tradicionais
    'v01500', 'v03000'
]

# ==============================================================================
# 2. FUNÇÕES AUXILIARES
# ==============================================================================

def ensure_folder(file_path):
    """Cria a pasta do arquivo se ela não existir."""
    if file_path.suffix: # É arquivo
        file_path.parent.mkdir(parents=True, exist_ok=True)
    else: # É pasta
        file_path.mkdir(parents=True, exist_ok=True)

def normalize_direct(series, lower_limit=0.01, upper_limit=0.99):
    """
    Aplica Winsorization (corte de outliers) e depois normaliza de 0 a 1.
    """
    # Garante numérico e preenche NaN com 0
    series = pd.to_numeric(series, errors='coerce').fillna(0)
    
    # Calcula os limites (percentis 1% e 99%)
    lower_val = series.quantile(lower_limit)
    upper_val = series.quantile(upper_limit)
    
    # Faz o 'clip' (Winsorization)
    series_clipped = series.clip(lower=lower_val, upper=upper_val)
    
    # Normalização Min-Max nos dados já cortados
    min_val = series_clipped.min()
    max_val = series_clipped.max()
    
    if max_val == min_val:
        return 0
        
    return (series_clipped - min_val) / (max_val - min_val)

# ==============================================================================
# 3. EXECUÇÃO
# ==============================================================================

def main():
    print("--- INICIANDO PROCESSAMENTO INTEGRADO (SC + H3) ---")
    ensure_folder(output_dir)

    # --------------------------------------------------------------------------
    # A. PROCESSAR FAVELAS (Gera indicador v2)
    # --------------------------------------------------------------------------
    print(f"\n1. Processando Favelas a partir de: {input_gpkg_favelas.name}")
    
    try:
        # Lê apenas colunas essenciais do GPKG
        gdf_setores = gpd.read_file(input_gpkg_favelas, ignore_geometry=True)
        
        # Garante que as colunas existam e estejam em minúsculo
        gdf_setores.columns = [c.lower() for c in gdf_setores.columns]
        
        # Prepara dataframe de favelas
        # cd_fcu é o código da favela. Se existir, é favela.
        if 'cd_fcu' in gdf_setores.columns:
            df_fav = gdf_setores[[geo_key, 'cd_fcu']].copy()
            df_fav['v2_mor_norm'] = np.where(df_fav['cd_fcu'].notnull(), 1, 0)
        else:
            print("AVISO: Coluna 'cd_fcu' não encontrada. Criando v2_mor_norm zerado.")
            df_fav = gdf_setores[[geo_key]].copy()
            df_fav['v2_mor_norm'] = 0

        # Garante tipo string para merge
        df_fav[geo_key] = df_fav[geo_key].astype(str)
        df_fav = df_fav[[geo_key, 'v2_mor_norm']].drop_duplicates(subset=[geo_key])
        
        print(f"-> Favelas processadas. Linhas: {len(df_fav)}")
        
    except Exception as e:
        print(f"ERRO CRÍTICO ao processar favelas: {e}")
        return

    # --------------------------------------------------------------------------
    # B. LER E AGREGAR CSVs DO CENSO
    # --------------------------------------------------------------------------
    print("\n2. Lendo CSVs do Censo IBGE...")
    
    # Começamos o DataFrame base com os dados da Favela
    df_base = df_fav.copy()
    
    csv_files = glob.glob(str(path_csvs_ibge / "*.csv"))
    print(f"-> Encontrados {len(csv_files)} arquivos CSV.")

    for f in csv_files:
        try:
            # Lê cabeçalho para checar colunas
            # Tenta separador ; primeiro
            df_header = pd.read_csv(f, nrows=0, sep=';', encoding='utf-8')
            if len(df_header.columns) <= 1: # Fallback para vírgula
                df_header = pd.read_csv(f, nrows=0, sep=',', encoding='utf-8')
            
            # Normaliza nomes de colunas para minúsculo para comparação
            file_cols_lower = [c.lower() for c in df_header.columns]
            
            # Verifica interseção com variáveis requeridas
            vars_in_file = [v for v in required_vars if v in file_cols_lower]
            
            if vars_in_file and csv_key in file_cols_lower:
                # Mapeia colunas originais para leitura
                # Precisamos saber o nome original (maiúsculo/minúsculo) para o usecols
                orig_cols_map = {c.lower(): c for c in df_header.columns}
                cols_to_read = [orig_cols_map[csv_key]] + [orig_cols_map[v] for v in vars_in_file]
                
                # Lê o arquivo
                try:
                    df_temp = pd.read_csv(f, usecols=cols_to_read, sep=';', dtype=str)
                except:
                    df_temp = pd.read_csv(f, usecols=cols_to_read, sep=',', dtype=str)
                
                # Padroniza nomes para minúsculo
                df_temp.columns = [c.lower() for c in df_temp.columns]
                
                # Garante chave string
                df_temp[geo_key] = df_temp[geo_key].astype(str)
                
                # Remove duplicatas
                df_temp = df_temp.drop_duplicates(subset=[geo_key])
                
                # Converte dados para numérico (forçando erros a virar NaN)
                for col in vars_in_file:
                    df_temp[col] = pd.to_numeric(df_temp[col].str.replace(',', '.'), errors='coerce')
                
                # Merge no DataFrame base
                df_base = pd.merge(df_base, df_temp, on=geo_key, how='outer')
                
        except Exception as e:
            print(f"Erro ao ler {os.path.basename(f)}: {e}")

    # Preencher NAs com 0
    df_base = df_base.fillna(0)
    print(f"-> Consolidação concluída. Total colunas: {len(df_base.columns)}")

    # --------------------------------------------------------------------------
    # C. CÁLCULOS DOS ÍNDICES (SETORES)
    # --------------------------------------------------------------------------
    print("\n3. Calculando índices Setoriais...")

    total_dom = df_base['v00001'].replace(0, np.nan) # Domicílios
    total_pop = df_base['v01006'].replace(0, np.nan) # População

    # --- VULNERABILIDADE ---
    # v1: Renda (Invertido: Renda Alta = Vul Baixa)
    if 'v06004' in df_base.columns:
        df_base['v1_ren'] = df_base['v06004']
        v1_norm_calc = normalize_direct(df_base['v1_ren'])
        df_base['v1_ren_norm'] = 1.0 - v1_norm_calc
    
    # v2: Moradia (Já calculado na etapa A como v2_mor_norm)
    
    # v3: Infraestrutura (Água + Esgoto) / Domicílios
    if 'v00201' in df_base.columns and 'v00238' in df_base.columns:
        df_base['v3_inf'] = (df_base['v00201'] + df_base['v00238']) / total_dom
        df_base['v3_inf_norm'] = normalize_direct(df_base['v3_inf'].fillna(0))

    # v4: Educação (Analfabetismo) / População
    if 'v00853' in df_base.columns:
        df_base['v4_edu'] = (df_base['v00853'] + df_base['v00855'] + df_base['v00857']) / total_pop
        df_base['v4_edu_norm'] = normalize_direct(df_base['v4_edu'].fillna(0))

    # --- INTERSECCIONALIDADE ---
    # i1: Gênero (Chefe Mulher) / Domicílios
    if 'v01063' in df_base.columns:
        df_base['i1_gen'] = df_base['v01063'] / total_dom
        df_base['i1_gen_norm'] = normalize_direct(df_base['i1_gen'].fillna(0))

    # i2: Faixa Etária (Jovens + Idosos) / População
    if 'v01031' in df_base.columns:
        df_base['i2_fax'] = (df_base['v01031'] + df_base['v01041']) / total_pop
        df_base['i2_fax_norm'] = normalize_direct(df_base['i2_fax'].fillna(0))

    # i3: Cor (Não Brancos) / População
    if 'v01317' in df_base.columns:
        # v01317 são os brancos. (Total - Brancos) = Pretos, Pardos, Indígenas, Amarelos
        df_base['i3_cor'] = (total_pop - df_base['v01317']) / total_pop
        df_base['i3_cor_norm'] = normalize_direct(df_base['i3_cor'].fillna(0))

    # i4: Comunidades Tradicionais (Indigenas + Quilombolas) / Domicílios
    if 'v01500' in df_base.columns:
        df_base['i4_com'] = (df_base['v01500'] + df_base['v03000']) / total_dom
        df_base['i4_com_norm'] = normalize_direct(df_base['i4_com'].fillna(0))

    # --------------------------------------------------------------------------
    # D. SALVAR ARQUIVOS SETORIAIS (SC)
    # --------------------------------------------------------------------------
    print("\n4. Salvando arquivos de Setores Censitários...")

    # Definir colunas finais
    cols_vul_export = [geo_key, 'v1_ren_norm', 'v2_mor_norm', 'v3_inf_norm', 'v4_edu_norm']
    cols_int_export = [geo_key, 'i1_gen_norm', 'i2_fax_norm', 'i3_cor_norm', 'i4_com_norm']

    # Criar DataFrames finais (com dropna para limpar sujeira se houver, mas mantendo 0s)
    df_vul_final = df_base[[c for c in cols_vul_export if c in df_base.columns]].fillna(0)
    df_int_final = df_base[[c for c in cols_int_export if c in df_base.columns]].fillna(0)

    # Paths de Saída
    out_sc_vul_csv = output_dir / 'br_sc_vulnerabilidade.csv'
    out_sc_vul_parq = output_dir / 'br_sc_vulnerabilidade.parquet'
    
    out_sc_int_csv = output_dir / 'br_sc_interseccionalidade.csv'
    out_sc_int_parq = output_dir / 'br_sc_interseccionalidade.parquet'

    # Salvar Vulnerabilidade
    df_vul_final.to_csv(out_sc_vul_csv, index=False)
    df_vul_final.to_parquet(out_sc_vul_parq, index=False)
    print(f"-> SC Vulnerabilidade salvo (CSV/Parquet). Shape: {df_vul_final.shape}")

    # Salvar Interseccionalidade
    df_int_final.to_csv(out_sc_int_csv, index=False)
    df_int_final.to_parquet(out_sc_int_parq, index=False)
    print(f"-> SC Interseccionalidade salvo (CSV/Parquet). Shape: {df_int_final.shape}")

    # --------------------------------------------------------------------------
    # E. PROCESSAR E SALVAR H3 (HEXÁGONOS)
    # --------------------------------------------------------------------------
    print("\n5. Processando malha H3...")

    if not path_base_h3.exists():
        print(f"ERRO: Base H3 não encontrada em {path_base_h3}. Pule esta etapa.")
    else:
        # Lê base H3
        df_h3 = pd.read_parquet(path_base_h3)
        # Garante chave string para o merge
        df_h3['cd_setor'] = df_h3['cd_setor'].astype(str)

        # Merge H3 + Vulnerabilidade (Inner join: só hexágonos que têm setor)
        df_h3_vul = pd.merge(df_h3[['h3_id', 'cd_setor']], df_vul_final, on='cd_setor', how='inner')
        # Limpa colunas duplicadas ou desnecessárias se houver
        df_h3_vul = df_h3_vul.drop(columns=['cd_setor']) 
        
        # Merge H3 + Interseccionalidade
        df_h3_int = pd.merge(df_h3[['h3_id', 'cd_setor']], df_int_final, on='cd_setor', how='inner')
        df_h3_int = df_h3_int.drop(columns=['cd_setor'])

        # Paths de Saída H3
        out_h3_vul = output_dir / 'br_h3_vulnerabilidade.parquet'
        out_h3_int = output_dir / 'br_h3_interseccionalidade.parquet'

        # Salvar
        df_h3_vul.to_parquet(out_h3_vul, index=False)
        print(f"-> H3 Vulnerabilidade salvo: {out_h3_vul} | Shape: {df_h3_vul.shape}")

        df_h3_int.to_parquet(out_h3_int, index=False)
        print(f"-> H3 Interseccionalidade salvo: {out_h3_int} | Shape: {df_h3_int.shape}")

    print("\n--- PROCESSO CONCLUÍDO COM SUCESSO ---")

if __name__ == "__main__":
    main()

--- INICIANDO PROCESSAMENTO INTEGRADO (SC + H3) ---

1. Processando Favelas a partir de: br_setores.gpkg
-> Favelas processadas. Linhas: 468099

2. Lendo CSVs do Censo IBGE...
-> Encontrados 13 arquivos CSV.
-> Consolidação concluída. Total colunas: 21

3. Calculando índices Setoriais...

4. Salvando arquivos de Setores Censitários...
-> SC Vulnerabilidade salvo (CSV/Parquet). Shape: (468099, 5)
-> SC Interseccionalidade salvo (CSV/Parquet). Shape: (468099, 5)

5. Processando malha H3...
-> H3 Vulnerabilidade salvo: ..\data\interim\norm_winsorized\br_h3_vulnerabilidade.parquet | Shape: (4572203, 5)
-> H3 Interseccionalidade salvo: ..\data\interim\norm_winsorized\br_h3_interseccionalidade.parquet | Shape: (4572203, 5)

--- PROCESSO CONCLUÍDO COM SUCESSO ---
