In [None]:
import os
import pandas as pd
from pysus.online_data.SIA import download
import time
from datetime import datetime
import gc
from retrying import retry

# Lista de estados para o teste
estados = ['MG', 'SP']
ano = 2023
meses = [str(month).zfill(2) for month in range(1, 13)]  # Todos os meses de 01 a 12
grupos = ['PA']  # Grupo de procedimentos
output_dir = 'dados_sia'  # Diretório para salvar arquivos de dados

# Criar diretório de saída se não existir
os.makedirs(output_dir, exist_ok=True)

# Função para fazer o download com repetição em caso de falha
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, stop_max_attempt_number=5)
def download_with_retry(estado, ano, meses, grupos):
    return download(estado, str(ano), meses, grupos)

# Gravar o horário inicial
hora_inicio = datetime.now()
print(f"Processo iniciado em: {hora_inicio}")

# Função para processar e salvar dados de um estado e mês específico
def process_and_save_data(estado, mes):
    filename = f"{output_dir}/{estado}_{ano}_{mes}.csv"
    if os.path.exists(filename):
        print(f"Dados já baixados para {estado}-{ano}-{mes}, pulando download.")
        return pd.read_csv(filename)

    print(f"Baixando dados para {estado}-{ano}-{mes}")
    try:
        data = download_with_retry(estado, str(ano), [mes], grupos)
        print(f"Dados baixados para {estado}-{ano}-{mes}")
        
        try:
            dfs = []
            if isinstance(data, list):
                for item in data:
                    if hasattr(item, 'to_dataframe'):
                        dfs.append(item.to_dataframe())
                    elif hasattr(item, 'read_parquet'):
                        dfs.append(item.read_parquet())
                    else:
                        print(f"Objeto desconhecido: {type(item)}")
            elif hasattr(data, 'to_dataframe'):
                dfs.append(data.to_dataframe())
            elif hasattr(data, 'read_parquet'):
                dfs.append(data.read_parquet())
            else:
                print(f"Objeto ParquetSet não possui métodos conhecidos para leitura: {type(data)}")
                return None

            if dfs:
                df = pd.concat(dfs, ignore_index=True)
                # Adicionar colunas ANO, MES e UF
                df['ANO'] = ano
                df['MES'] = mes
                df['UF'] = estado

                # Converter PA_VALAPR e PA_QTDAPR para float
                df['PA_VALAPR'] = df['PA_VALAPR'].str.replace(',', '.').astype(float)
                df['PA_QTDAPR'] = df['PA_QTDAPR'].astype(float)

                # Selecionar as colunas desejadas
                df = df[['ANO', 'MES', 'PA_UFMUN', 'PA_MUNPCN', 'PA_DOCORIG', 'PA_QTDAPR', 'PA_VALAPR', 'UF']]

                # Salvar o DataFrame em CSV para evitar baixar novamente no futuro
                df.to_csv(filename, index=False)
                return df
            else:
                print(f"Nenhum dado válido encontrado para {estado}-{ano}-{mes}")
                return None

        except Exception as e:
            print(f"Erro ao processar os dados para {estado}-{ano}-{mes}: {e}")
            return None

    except Exception as e:
        print(f"Erro ao baixar dados para {estado}-{ano}-{mes}: {e}")
        return None

# Função para verificar e carregar dados previamente processados
def load_previously_processed_data(output_dir):
    processed_files = [f for f in os.listdir(output_dir) if f.endswith('.csv')]
    return set((f.split('_')[0], f.split('_')[2].replace('.csv', '')) for f in processed_files)

# Carregar dados previamente processados
processed_data = load_previously_processed_data(output_dir)

# Processar os dados para cada estado
for estado in estados:
    for mes in meses:
        if (estado, mes) in processed_data:
            print(f"Dados já processados para {estado}-{ano}-{mes}, pulando processamento.")
            continue
        
        df = process_and_save_data(estado, mes)
        if df is not None:
            # Agrupar e salvar resultados intermediários por estado e mês
            df_agrupado = df.groupby(['ANO', 'MES', 'PA_UFMUN', 'PA_MUNPCN', 'PA_DOCORIG', 'UF']).agg({
                'PA_QTDAPR': 'sum',
                'PA_VALAPR': 'sum'
            }).reset_index()
            df_agrupado.to_csv(f"{output_dir}/{estado}_{ano}_{mes}_agrupado.csv", index=False)
            print(f"Dados agrupados e salvos para {estado}-{ano}-{mes}")

        # Coletar lixo e liberar memória
        gc.collect()
        time.sleep(1)  # Esperar 1 segundo entre os downloads

# Concatenar todos os arquivos agrupados por estado e mês
dfs_final = []
for estado in estados:
    for mes in meses:
        file_path = f"{output_dir}/{estado}_{ano}_{mes}_agrupado.csv"
        if os.path.exists(file_path):
            df = pd.read_csv(file_path)
            dfs_final.append(df)

# Exportar os dados finais
if dfs_final:
    df_final = pd.concat(dfs_final, ignore_index=True)
    try:
        df_final.to_excel('dados_agrupados.xlsx', index=False)
        print("Dados exportados para 'dados_agrupados.xlsx' com sucesso.")
    except ValueError as e:
        print(f"Erro ao exportar para Excel: {e}. Exportando para CSV.")
        df_final.to_csv('dados_agrupados.csv', index=False)
        print("Dados exportados para 'dados_agrupados.csv' com sucesso.")

# Gravar o horário final
hora_fim = datetime.now()
print(f"Processo finalizado em: {hora_fim}")

# Calcular e exibir o tempo total do processo
tempo_total = hora_fim - hora_inicio
print(f"Tempo total do processo: {tempo_total}")


Processo iniciado em: 2024-07-24 13:28:23.010772
Dados já processados para MG-2023-01, pulando processamento.
Dados já processados para MG-2023-02, pulando processamento.
Baixando dados para MG-2023-03


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 176M/176M [00:00<00:00, 27.5GB/s]


Dados baixados para MG-2023-03
