In [1]:
import pandas as pd
import numpy as np
import os
import pyarrow as pa
import pyarrow.parquet as pq

# Funções de tratamento otimizadas
def tratar_num_filhos(df):
    """Trata NUM_FILHOS com arredondamento da média para melhor precisão"""
    if 'NUM_FILHOS' in df.columns:
        df['NUM_FILHOS'] = pd.to_numeric(df['NUM_FILHOS'], errors='coerce').fillna(0)
        # Calcula média dos valores válidos e arredonda (melhor da V1)
        media_filhos = df.loc[df['NUM_FILHOS'] <= 20, 'NUM_FILHOS'].mean()
        media_filhos = int(round(media_filhos)) if not pd.isna(media_filhos) else 0
        df.loc[df['NUM_FILHOS'] > 20, 'NUM_FILHOS'] = media_filhos
        df['NUM_FILHOS'] = df['NUM_FILHOS'].astype(int)
    return df

def tratar_instrucao(df):
    """Trata INSTRU com verificação de coluna existente"""
    if 'INSTRU' not in df.columns:
        return df
    df['INSTRU'] = df['INSTRU'].astype(str).str.zfill(2)
    df['INSTRU'] = df['INSTRU'].replace(['00', 'nan'], '0')
    return df

def padronizar_cids(df, colunas_cid):
    """Padroniza CIDs usando mask para evitar warnings"""
    colunas_existentes = [col for col in colunas_cid if col in df.columns]
    if not colunas_existentes:
        return df
    
    print(f"Tratando {len(colunas_existentes)} variáveis CID...")
    for col in colunas_existentes:
        df[col] = df[col].astype(str).str.upper().str.strip()
        # Usa mask para evitar FutureWarning (melhor da V1)
        mask = df[col].isin(['0000', 'NAN', ''])
        df.loc[mask, col] = np.nan
    return df

def tratar_cids(df):
    """Aplica padronização CID em todas as colunas relevantes"""
    colunas_cid = [
        'DIAG_PRINC', 'DIAG_SECUN', 'CID_NOTIF', 'CID_ASSO', 'CID_MORTE'
    ] + [f'DIAGSEC{i}' for i in range(1, 10)]
    return padronizar_cids(df, colunas_cid)

def tratar_idade(df):
    """Trata idade mantendo como float para preservar precisão decimal"""
    if 'IDADE' in df.columns:
        # Mantém como float para melhor precisão 
        df['IDADE'] = pd.to_numeric(df['IDADE'], errors='coerce').fillna(0).astype(float)

    if 'COD_IDADE' in df.columns:
        df['COD_IDADE'] = pd.to_numeric(df['COD_IDADE'], errors='coerce').fillna(0).astype(int)
        
        # Conversões de unidades de idade
        df.loc[df['COD_IDADE'] == 1, 'IDADE'] = 0  
        df.loc[df['COD_IDADE'] == 2, 'IDADE'] = (df.loc[df['COD_IDADE'] == 2, 'IDADE'] / 365).round(1) 
        df.loc[df['COD_IDADE'] == 3, 'IDADE'] = (df.loc[df['COD_IDADE'] == 3, 'IDADE'] / 12).round(1)   
    return df

def tratar_sexo(df):
    """Trata SEXO usando nullable integer para lidar com NaNs"""
    if 'SEXO' not in df.columns:
        return df
    print("Tratando coluna SEXO...")
    # Usa Int64 nullable para lidar melhor com NaNs (melhor da V1)
    df['SEXO'] = pd.to_numeric(df['SEXO'], errors='coerce').astype('Int64')
    return df

def tratar_datas(df):
    """Converte colunas de data para datetime"""
    colunas_datas = ['DT_INTER', 'DT_SAIDA', 'NASC']
    colunas_existentes = [col for col in colunas_datas if col in df.columns]
    
    for col in colunas_existentes:
        df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y%m%d')
    return df

def tratar_valores(df):
    """Converte colunas de valores monetários para float"""
    colunas_valores = ['VAL_SH', 'VAL_SP', 'VAL_TOT', 'VAL_UTI']
    
    for col in colunas_valores:
        if col in df.columns:
            print(f"Tratando coluna {col}...")
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(float)
    return df

def tratar_inteiros(df):
    """Converte colunas inteiras para int"""
    colunas_inteiras = ['UTI_MES_TO', 'UTI_INT_TO', 'DIAR_ACOM', 'QT_DIARIAS', 'DIAS_PERM']
    
    for col in colunas_inteiras:
        if col in df.columns:
            print(f"Tratando coluna {col}...")
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
    return df

# Função para aplicar tratamentos sequencialmente
def aplicar_tratamentos(df, tratamentos):
    """Aplica lista de funções de tratamento ao DataFrame"""
    for func in tratamentos:
        df = func(df)
    return df

# Processamento em chunks para arquivos grandes
def processar_chunks(parquet_entrada, parquet_saida, tratamentos, batch_size=500_000):
    """Processa arquivo em chunks para economia de memória"""
    print(f"Processando em chunks de {batch_size:,} registros...")
    
    parquet_file = pq.ParquetFile(parquet_entrada)
    total_registros = 0
    writer = None
    schema_unificado = None
    
    try:
        for i, batch in enumerate(parquet_file.iter_batches(batch_size=batch_size)):
            df_chunk = batch.to_pandas()
            print(f"Processando chunk {i+1} ({len(df_chunk):,} registros)...")

            df_chunk = aplicar_tratamentos(df_chunk, tratamentos)

            table = pa.Table.from_pandas(df_chunk, preserve_index=False)
            
            if writer is None:
                # Usar schema do primeiro chunk como referência
                schema_unificado = table.schema
                writer = pq.ParquetWriter(parquet_saida, schema_unificado, compression="snappy")
            else:
                # Garantir que o table tenha o mesmo schema do primeiro chunk
                table = table.cast(schema_unificado)
            
            writer.write_table(table)
            total_registros += len(df_chunk)
            
    finally:
        if writer is not None:
            writer.close()
    
    print(f"Arquivo tratado salvo: {parquet_saida}")
    print(f"Total de registros processados: {total_registros:,}")

# Processamento simples para arquivos menores
def processar_simples(parquet_entrada, parquet_saida, tratamentos):
    """Processa arquivo inteiro na memória para melhor performance"""
    print("Processando arquivo completo na memória...")
    
    df = pd.read_parquet(parquet_entrada)
    print(f"Arquivo carregado: {len(df):,} registros")
    
    for tratamento in tratamentos:
        df = tratamento(df)
    
    df.to_parquet(parquet_saida, index=False, engine="pyarrow", compression="snappy")
    print(f"Arquivo tratado salvo: {parquet_saida}")
    print(f"Total de registros: {len(df):,}")

# Função principal com detecção automática do método
def processar_tratamento(parquet_entrada, parquet_saida, limite_mb=500, batch_size=500_000):
    """
    Processa tratamento dos dados com detecção automática do melhor método
    
    Args:
        parquet_entrada: Caminho do arquivo de entrada
        parquet_saida: Caminho do arquivo de saída
        limite_mb: Limite em MB para escolha do método (padrão: 500MB)
        batch_size: Tamanho dos chunks para processamento em lotes
    """
    
    # Lista de tratamentos
    tratamentos = [
        tratar_num_filhos,
        tratar_instrucao,
        tratar_cids,
        tratar_idade,
        tratar_sexo,
        tratar_datas,
        tratar_valores,
        tratar_inteiros
    ]
    
    # Verificar se arquivo existe
    if not os.path.exists(parquet_entrada):
        print(f"ERRO: Arquivo {parquet_entrada} não encontrado!")
        return
    
    # Detectar tamanho do arquivo
    tamanho_mb = os.path.getsize(parquet_entrada) / (1024 * 1024)
    print(f"Tamanho do arquivo: {tamanho_mb:.1f} MB")
    
    # Criar pasta de saída se necessário
    os.makedirs(os.path.dirname(parquet_saida), exist_ok=True)
    
    # Escolher método baseado no tamanho
    if tamanho_mb > limite_mb:
        print(f"Arquivo grande detectado (>{limite_mb}MB) - usando processamento em chunks")
        processar_chunks(parquet_entrada, parquet_saida, tratamentos, batch_size)
    else:
        print(f"Arquivo pequeno detectado (<={limite_mb}MB) - processamento na memória")
        processar_simples(parquet_entrada, parquet_saida, tratamentos)
    
    print("\nTratamento concluído com sucesso!")
    
    # Informações finais do arquivo
    tamanho_final_mb = os.path.getsize(parquet_saida) / (1024 * 1024)
    print(f"Arquivo final: {tamanho_final_mb:.1f} MB")

if __name__ == "__main__":
    # Caminhos (ajustados para estrutura correta)
    entrada = "../../banco/parquet_unificado/sih_rs.parquet"
    saida = "../../banco/parquet_unificado/sih_rs_tratado.parquet"
        processar_tratamento(entrada, saida, limite_mb=500, batch_size=500_000)

Tamanho do arquivo: 792.4 MB
Arquivo grande detectado (>500MB) - usando processamento em chunks
Processando em chunks de 500,000 registros...
Processando chunk 1 (500,000 registros)...
Tratando 14 variáveis CID...
Tratando coluna SEXO...
Tratando coluna VAL_SH...
Tratando coluna VAL_SP...
Tratando coluna VAL_TOT...
Tratando coluna VAL_UTI...
Tratando coluna UTI_MES_TO...
Tratando coluna UTI_INT_TO...
Tratando coluna DIAR_ACOM...
Tratando coluna QT_DIARIAS...
Tratando coluna DIAS_PERM...
Processando chunk 2 (500,000 registros)...
Tratando 14 variáveis CID...
Tratando coluna SEXO...
Tratando coluna VAL_SH...
Tratando coluna VAL_SP...
Tratando coluna VAL_TOT...
Tratando coluna VAL_UTI...
Tratando coluna UTI_MES_TO...
Tratando coluna UTI_INT_TO...
Tratando coluna DIAR_ACOM...
Tratando coluna QT_DIARIAS...
Tratando coluna DIAS_PERM...
Processando chunk 3 (500,000 registros)...
Tratando 14 variáveis CID...
Tratando coluna SEXO...
Tratando coluna VAL_SH...
Tratando coluna VAL_SP...
Tratando c