### 3.1. Justificativa Técnica e Abordagem Profissional

O pipeline de ETL dos Dados Antigos (2020-2022) foi desenvolvido com foco em **robustez e auditabilidade**, pilares essenciais da Engenharia de Dados.

| Desafio | Solução Profissional Implementada | Justificativa |
| :--- | :--- | :--- |
| **Inconsistência de Encoding** | Tentativa sequencial de `utf-8-sig`, `latin-1`, e `iso-8859-1`. | **Tolerância a Falhas:** Garante a extração de dados de fontes não padronizadas sem intervenção manual. |
| **Erros de Tipagem** | Limpeza robusta de caracteres (R$, vírgulas) antes da conversão para `float`. | **Integridade de Dados:** Previne a perda de dados e o erro de cálculo, crucial para métricas financeiras. |
| **Nomenclatura Inconsistente** | Uso de um `map_colunas` explícito. | **Manutenibilidade:** O mapeamento é necessário para lidar com **caracteres invisíveis** (Ex: `Descrição CATMAT` com Unicode `\xa0`) e garantir que a nomenclatura dos dados antigos seja **idêntica** ao formato dos dados novos, facilitando a integração futura no Data Warehouse. |
| **Rastreamento de Erros** | Uso de **`logging`** extenso em cada método e `try/except` com `traceback`. | **Auditabilidade:** Permite a identificação precisa do ponto de falha (qual arquivo, qual método) durante o processamento, uma necessidade crítica em ambientes de produção. |
| **Inversão de Colunas** | Lógica heurística baseada em contagem de letras/dígitos. | **Defesa contra Inconsistência:** Solução avançada para um problema comum em *datasets* legados, automatizando o que seria um trabalho manual e propenso a erros. |
| **Dados Faltantes** | Criação de colunas padrão (`capacidade`, `unidade_medida`) | **Consistência de Schema:** Garante que todos os anos tenham a mesma estrutura para unificação posterior |
| **Validação de Domínio** | Verificação de valores válidos em `tipo_compra`, `generico` | **Qualidade Analítica:** Previne análises com categorias inconsistentes |
| **Processamento em Lote** | Consolidação de múltiplos anos em uma única execução | **Eficiência Operacional:** Reduz tempo de processamento e complexidade de orchestration |

### 3.2. Etapas do ETL (Fase de Preparação dos Dados)

A fase de **Preparação dos Dados** é a mais complexa no pipeline antigo, sendo orquestrada pelo método `_processar_arquivo`.

| Etapa | Método | Justificativa Técnica |
| :--- | :--- | :--- |
| **1. Padronização** | `_padronizar_colunas` | Uniformiza o nome das colunas para *snake\_case* e adiciona a coluna `ano_compra` para rastreamento. |
| **2. Correção de Estrutura** | `_corrigir_colunas_trocadas` | Realiza a correção heurística das colunas Nome/CNPJ trocadas, garantindo que o dado chegue à coluna correta. |
| **3. Tratamento de Tipos** | `_corrigir_tipos` | Aplica a limpeza robusta em campos numéricos (`qtd_itens_comprados`, `preco_unitario`), datas (`compra`, `insercao` com `dayfirst=True`) e padroniza campos categóricos (`generico` para SIM/NÃO, `tipo_compra` para ADMINISTRATIVA/JUDICIAL/INDEFINIDO). |
| **4. Preenchimento** | `_adicionar_colunas_vazias` | Insere colunas ausentes (`capacidade`, `unidade_medida`) com valores padrão (`0.0` e `'NA'`) para garantir a compatibilidade com o formato dos Dados Novos (2023+). |
| **5. Organização** | `_reordenar_colunas` | Garante que a ordem final das colunas seja idêntica ao *schema* esperado no Data Warehouse. |
| **6. Carga (Load)** | `processar_todos_antigos` | Consolida todos os DataFrames limpos (`pd.concat`) e salva o resultado final no formato CSV. |
| **7. Validação de Dados** | `_validar_dados_processados` | Verifica integridade referencial, domínios válidos e regras de negócio |
| **8. Log de Métricas** | `_gerar_metricas_qualidade` | Produz indicadores de qualidade (completude, consistência) para auditoria |
| **9. Documentação de Linhagem** | Metadados de transformação | **Rastreabilidade:** Permite auditoria completa das transformações aplicadas |

### 3.3. Benefícios e Implicação para o Projeto de Análise de Compras de Medicamentos

O pipeline ETL Antigo atende aos objetivos de Modelagem e Avaliação do CRISP-DM, produzindo um conjunto de dados **confiável e pronto para o consumo**:

* **Modelagem Dimensional:** O processo garante que as chaves de ligação (CNPJs, `codigo_br`, `ano_compra`) estejam limpas e no formato **requerido** para a criação de um *Schema Star* (Tabela Fato).
* **Integridade de Dados:** O tratamento robusto de valores regionais e a correção de erros estruturais eliminam a necessidade de processamento manual, reduzindo a chance de erros humanos e garantindo a **confiabilidade** dos dados para o Dashboard de Análise de Gestão de Preços (Otimização) e Gestão de Demanda (Controle de Risco)
* **Consolidação:** A junção de múltiplos arquivos CSV de forma automatizada permite a **análise de séries temporais** (2020 a 2022) com o mínimo de esforço de manutenção.
* **Framework Reutilizável:** Metodologia aplicável a outros datasets de compras públicas.
* **Redução de Debt Técnico:** Processo automatizado elimina trabalho manual recorrente.
* **Base para Dashboard em Tempo Real:** Pipeline pode ser adaptado para processamento incremental.

**Alinhamento com CRISP-DM:**
- **Business Understanding:** Compras públicas precisam de transparência e controle
- **Data Understanding:** Identificação de inconsistências nos dados históricos  
- **Data Preparation:** Pipeline ETL robusto (este notebook)
- **Modeling:** Pronto para modelagem dimensional e análises (próxima fase)
- **Evaluation:** Validação através de relatórios de qualidade
- **Deployment:** Processo automatizável para atualizações futuras

In [1]:
import os
import pandas as pd
import logging
import re
import traceback
from IPython.display import display # Importação necessária para o notebook

# Configuração de Log (ajustada para melhor visualização no notebook)
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')

class ETLComprasAntigos:
    def __init__(self, pasta_base):
        self.pasta_base = pasta_base # Usa o caminho passado pelo main.py
        self.pasta_raw = os.path.join(self.pasta_base, "raw")
        self.pasta_processed = os.path.join(self.pasta_base, "processed")
        
        print(f"ETL Antigos - Usando caminho base: {self.pasta_base}")
        print(f"Raw: {self.pasta_raw}")
        print(f"Processed: {self.pasta_processed}")

        # Garante pastas
        os.makedirs(self.pasta_raw, exist_ok=True)
        os.makedirs(self.pasta_processed, exist_ok=True)
        
        self.logger = logging.getLogger(__name__)
        # Garante que o handler do logger está configurado
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    # --- MÉTODOS DE TRANSFORMAÇÃO ESPECÍFICOS PARA ANOS ANTIGOS ---
    
    # Padroniza nomes de colunas de ANOS ANTIGOS para snake_case do NOVO formato.    
    def _padronizar_colunas(self, df): 
        self.logger.info("Padronizando nomes de colunas...")
        
        # Lista de colunas a serem limpas (removendo espaços e caracteres extras)
        df.columns = df.columns.str.strip().str.replace(' ', '_').str.replace('__', '_')

        map_colunas = {
        'Ano': 'ano_compra',
        'Código_BR': 'codigo_br',
        'Descrição CATMAT': 'descricao_catmat',
        
        
        # CORREÇÃO 1: Mapear para o nome final desejado 'unidade_fornecimento'
        ' Unidade_de_Fornecimento ': 'unidade_fornecimento', 
        'Unidade_de_Fornecimento': 'unidade_fornecimento',   
            'Genérico': 'generico',
            'Anvisa': 'anvisa',
            'Compra': 'compra',
            'Modalidade_da_Compra': 'modalidade_compra',
            'Inserção': 'insercao',
            'Tipo_Compra': 'tipo_compra',
            
            # Inversões de CNPJ/Nome serão corrigidas posteriormente
            'Fabricante': 'cnpj_fabricante_temp',
            'CNPJ_Fabricante': 'fabricante_temp',
            'Fornecedor': 'cnpj_fornecedor_temp',
            'CNPJ_Fornecedor': 'fornecedor_temp',
            'Nome_Instituição': 'cnpj_instituicao_temp',
            'CNPJ_Instituição': 'nome_instituicao_temp',
            
            'Município_Instituição': 'municipio_instituicao',
            'UF': 'uf',
            'Qtd_Itens_Comprados': 'qtd_itens_comprados',
            'Preço_Unitário': 'preco_unitario',
            
            # Se 'unidade_fornecimento_capacidade' já veio como coluna vazia, não a mapeie.
        }
        
        df.rename(columns=map_colunas, inplace=True)
        self.logger.info("Colunas padronizadas.")
        return df

    # Corrige a inversão de CNPJ e Nome que acontece em alguns anos antigos.
    def _corrigir_colunas_trocadas(self, df):
        
        self.logger.info("Corrigindo colunas CNPJ/Nome trocadas...")
        
        # Lógica de amostra para determinar se a coluna temp_cnpj realmente é o nome (se tem mais letras)
        def is_name(series):
            # Conta o número de caracteres alfabéticos na amostra
            amostra = series.head(100).astype(str).str.replace(r'\W', '', regex=True)
            letras = amostra.str.count(r'[a-zA-Z]').sum()
            digitos = amostra.str.count(r'[0-9]').sum()
            return letras > digitos * 0.5 # Se mais da metade são letras, é nome

        # Fornecedor
        if 'cnpj_fornecedor_temp' in df.columns and 'fornecedor_temp' in df.columns:
            if is_name(df['cnpj_fornecedor_temp']):
                self.logger.info("Inversão de Fornecedor detectada e corrigida.")
                df['fornecedor'] = df['cnpj_fornecedor_temp']
                df['cnpj_fornecedor'] = df['fornecedor_temp']
            else:
                df['fornecedor'] = df['fornecedor_temp']
                df['cnpj_fornecedor'] = df['cnpj_fornecedor_temp']
            df.drop(columns=['cnpj_fornecedor_temp', 'fornecedor_temp'], inplace=True)
        
        # Fabricante
        if 'cnpj_fabricante_temp' in df.columns and 'fabricante_temp' in df.columns:
            if is_name(df['cnpj_fabricante_temp']):
                self.logger.info("Inversão de Fabricante detectada e corrigida.")
                df['fabricante'] = df['cnpj_fabricante_temp']
                df['cnpj_fabricante'] = df['fabricante_temp']
            else:
                df['fabricante'] = df['fabricante_temp']
                df['cnpj_fabricante'] = df['cnpj_fabricante_temp']
            df.drop(columns=['cnpj_fabricante_temp', 'fabricante_temp'], inplace=True)
            
        # Instituição
        if 'cnpj_instituicao_temp' in df.columns and 'nome_instituicao_temp' in df.columns:
            if is_name(df['cnpj_instituicao_temp']):
                self.logger.info("Inversão de Instituição detectada e corrigida.")
                df['nome_instituicao'] = df['cnpj_instituicao_temp']
                df['cnpj_instituicao'] = df['nome_instituicao_temp']
            else:
                df['nome_instituicao'] = df['nome_instituicao_temp']
                df['cnpj_instituicao'] = df['cnpj_instituicao_temp']
            df.drop(columns=['cnpj_instituicao_temp', 'nome_instituicao_temp'], inplace=True)
            
        self.logger.info("Colunas CNPJ/Nome ajustadas.")
        return df

    # Corrige tipos de dados com limpeza robusta para ANOS ANTIGOS
    def _corrigir_tipos(self, df):        
        self.logger.info("Corrigindo tipos de dados (Antigos) com limpeza robusta...")

        # 1. CORREÇÃO NUMÉRICA ROBUSTA (Qtd e Preço)
        colunas_numericas = ['qtd_itens_comprados', 'preco_unitario']
        for coluna in colunas_numericas:
            if coluna in df.columns:
                try:
                    # Passo 1: Limpeza da string para formato regional brasileiro
                    # Remove R$, parênteses e espaços.
                    df[coluna] = df[coluna].astype(str).str.replace(r'[R$()+\s]', '', regex=True)
                    # Remove separadores de milhares (ponto)
                    df[coluna] = df[coluna].str.replace('.', '', regex=False)
                    # Troca separador decimal (vírgula) por ponto
                    df[coluna] = df[coluna].str.replace(',', '.', regex=False)
                    
                    # Passo 2: Conversão para numérico. 'errors=coerce' transforma falhas em NaN.
                    df[coluna] = pd.to_numeric(df[coluna], errors='coerce')

                    # Passo 3: Preenche NaN com 0 para garantir o cálculo e evitar erros.
                    df[coluna] = df[coluna].fillna(0)
                    self.logger.info(f"{coluna}: convertido para numérico (Limpeza regional aplicada)")

                except Exception as e:
                    self.logger.warning(f"{coluna}: erro na conversão numérica - {e}")

        # Recálculo de preco_total (necessário após a correção dos componentes)
        if 'preco_unitario' in df.columns and 'qtd_itens_comprados' in df.columns:
            df['preco_total'] = df['qtd_itens_comprados'] * df['preco_unitario']
            self.logger.info("preco_total recalculado.")
        else:
            self.logger.warning("Não foi possível calcular preco_total. Colunas ausentes.")


        # 2. CORREÇÃO DE DATAS
        colunas_data = ['compra', 'insercao']
        for coluna in colunas_data:
            if coluna in df.columns:
                # Usa dayfirst=True para garantir o formato DD/MM/AA (DD/MM/YYYY)
                df[coluna] = pd.to_datetime(df[coluna], errors='coerce', dayfirst=True)
                self.logger.info(f"{coluna}: convertido para data (dayfirst=True)")
                
        
        # 3. CORREÇÃO DE TIPO DE COMPRA (Padroniza para ADMINISTRATIVA/JUDICIAL em CAIXA ALTA)
        if 'tipo_compra' in df.columns:
            df['tipo_compra'] = (
                df['tipo_compra']
                .astype(str)
                .str.strip()
                .str.upper() 
                .replace({
                    'A': 'ADMINISTRATIVA',
                    'J': 'JUDICIAL',
                    'ADMINISTRATIVA': 'ADMINISTRATIVA',
                    'JUDICIAL': 'JUDICIAL',
                })
            )
            
            # Trata valores nulos ou inesperados
            valores_validos = ['ADMINISTRATIVA', 'JUDICIAL']
            df['tipo_compra'] = df['tipo_compra'].apply(
                lambda x: 'INDEFINIDO' if x not in valores_validos else x
            )

            self.logger.info(f"tipo_compra: padronizado (ADMINISTRATIVA/JUDICIAL).")


        # 4. PADRONIZAÇÃO DO CÓDIGO BR (Remoção do prefixo 'BR')
        if 'codigo_br' in df.columns:
            self.logger.info("codigo_br: Removendo prefixo 'BR'...")
            
            # 1. Converte para string e remove espaços em branco
            df['codigo_br'] = df['codigo_br'].astype(str).str.strip()
            
            # 2. Usa uma expressão regular ou .str.replace() para remover 'BR' se estiver no início
            df['codigo_br'] = (
                df['codigo_br']
                .str.upper() # Coloca em caixa alta para pegar "br" ou "Br"
                .str.replace(r'^BR0*', '', regex=True) # Remove BR e qualquer zero à esquerda subsequente (se for o caso)
            )
                        
            # Garante que o valor resultante seja um número (em string) sem 'BR' e sem espaços.
            def limpar_codigo_br(codigo):
                if pd.isna(codigo):
                    return None
                # Remove 'BR' e qualquer caractere não-numérico
                codigo_limpo = re.sub(r'^BR', '', str(codigo).strip().upper()) 
                # Garante que não haja espaço no final
                return codigo_limpo.strip() 
            
            df['codigo_br'] = df['codigo_br'].apply(limpar_codigo_br)

            self.logger.info(f"codigo_br: prefixo 'BR' removido e padronizado.")

        
        # 5. CORREÇÃO CNPJ/FLAG/STRING
        # CNPJs
        colunas_cnpj = ['cnpj_instituicao', 'cnpj_fornecedor', 'cnpj_fabricante']
        for coluna in colunas_cnpj:
            if coluna in df.columns:
                df[coluna] = df[coluna].astype(str).str.replace(r'\D', '', regex=True).str.zfill(14)
                self.logger.info(f"{coluna}: limpeza e zfill(14)")

        
        # 6. Flags (Generico)
        if 'generico' in df.columns:
            df['generico'] = (
                df['generico']
                .astype(str)
                .str.strip()
                .str.upper() # Garante que "Sim" e "sim" virem "SIM"
                .replace({'NÃO': 'NÃO', 'NAO': 'NÃO'}) # Garante a acentuação
                .fillna('NÃO')
            )
            # Qualquer valor que ainda seja 'S' ou 'N' de um possível erro anterior será corrigido aqui
            df['generico'] = df['generico'].replace({'S': 'SIM', 'N': 'NÃO'})
            
            # Finalmente, qualquer valor que não seja SIM ou NÃO é forçado para NÃO
            df['generico'] = df['generico'].apply(lambda x: 'NÃO' if x not in ['SIM', 'NÃO'] else x)

            self.logger.info(f"generico: padronizado (SIM/NÃO) (Antigos anos).")

        # 7. Colunas de texto (limpeza básica)
        colunas_string = ['nome_instituicao', 'municipio_instituicao', 'uf',
                          'fornecedor', 'fabricante', 'descricao_catmat',
                          'unidade_fornecimento_capacidade', 'modalidade_compra', 'tipo_compra', 'anvisa', 'codigo_br']
        for coluna in colunas_string:
            if coluna in df.columns:
                df[coluna] = df[coluna].astype(str).str.strip().replace('nan', '').replace('None', '')
        
        # O campo anvisa deve ser tratado como string para evitar perdas de leading zeros.
        if 'anvisa' in df.columns:
            df['anvisa'] = df['anvisa'].str.replace(r'\D', '', regex=True)

        self.logger.info("Tipos corrigidos com sucesso.")
        return df
    
    # Adiciona colunas que não existem nos anos antigos com valores padrão
    def _adicionar_colunas_vazias(self, df):        
        self.logger.info("Adicionando colunas ausentes (capacidade, unidade_medida)...")
        colunas_novas = {
            'capacidade': 0.0,
            'unidade_medida': 'NA',
        }
        for col, default in colunas_novas.items():
            if col not in df.columns:
                df[col] = default
        self.logger.info("Colunas ausentes adicionadas.")
        return df

    # Garante que a ordem das colunas seja igual ao formato NOVO para consolidação.
    def _reordenar_colunas(self, df):        
        self.logger.info("Reordenando colunas...")
        colunas_finais = [
            'ano_compra', 'nome_instituicao', 'cnpj_instituicao', 'municipio_instituicao',
            'uf', 'compra', 'insercao', 'codigo_br', 'descricao_catmat',
            'unidade_fornecimento_capacidade', 'generico', 'anvisa', 'modalidade_compra',
            'tipo_compra', 'capacidade', 'unidade_medida', 'cnpj_fornecedor',
            'fornecedor', 'cnpj_fabricante', 'fabricante', 'qtd_itens_comprados',
            'preco_unitario', 'preco_total'
        ]
        
        # Filtra apenas as colunas que realmente existem no DataFrame
        colunas_presentes = [col for col in colunas_finais if col in df.columns]
        
        # Adiciona colunas presentes que não foram mapeadas na ordem, no final
        for col in df.columns:
            if col not in colunas_presentes:
                colunas_presentes.append(col)
        
        df = df[colunas_presentes]
        self.logger.info("Colunas reordenadas.")
        return df

    # Executa a sequência de transformações para um único DataFrame antigo.
    def _processar_arquivo(self, df):        
        df = self._padronizar_colunas(df)
        df = self._corrigir_colunas_trocadas(df)
        df = self._corrigir_tipos(df)
        df = self._adicionar_colunas_vazias(df)
        df = self._reordenar_colunas(df)
        return df
    
    # Lista arquivos antigos com correspondência exata de nome YYYY.csv
    # --- MÉTODOS DE I/O E EXECUÇÃO (continuação) ---

    def listar_arquivos_antigos(self):
        """
        Lista os arquivos CSV de 2020, 2021 e 2022 dentro da pasta 'raw'.
        """
        self.logger.info("Verificando arquivos na pasta RAW...")
        
        if not os.path.exists(self.pasta_raw):
            # Se a pasta não existe (o que pode acontecer se o __init__ falhar em criá-la)
            self.logger.error(f"Pasta raw não existe: {self.pasta_raw}")
            return []
            
        todos_arquivos = os.listdir(self.pasta_raw)
        arquivos_antigos = []
        ANOS_ANTIGOS = ['2020', '2021', '2022']
        
        for f in todos_arquivos:
            nome_base, ext = os.path.splitext(f)
            # Verifica se o arquivo é CSV e se o nome base corresponde a um ano antigo
            if ext.lower() == '.csv' and nome_base in ANOS_ANTIGOS:
                caminho = os.path.join(self.pasta_raw, f)
                arquivos_antigos.append(caminho)
                self.logger.info(f"Arquivo antigo encontrado: {f}")

        self.logger.info(f"Total arquivos antigos a processar: {len(arquivos_antigos)}")
        return arquivos_antigos

    # Processa todos os arquivos antigos (2020-2022) em lote, com tratamento de erro individual.
    def processar_todos_antigos(self):
        self.logger.info("Iniciando processamento de todos os anos antigos (2020-2022)...")
        
        # Cria a pasta de saída se não existir
        os.makedirs(self.pasta_processed, exist_ok=True)
        
        arquivos = self.listar_arquivos_antigos()
        
        dfs = [] 
        anos_processados = []
        
        for arquivo_path in arquivos:
            nome_arquivo = os.path.basename(arquivo_path)
            self.logger.info(f"\n>>INICIANDO processamento do arquivo: {nome_arquivo}")

            df = None
            encoding_tentativas = ['utf-8-sig', 'latin-1', 'iso-8859-1'] # Ordem de preferência

            for encoding in encoding_tentativas:
                try:
                    # Tenta ler com a codificação atual
                    df = pd.read_csv(arquivo_path, sep=';', encoding=encoding, low_memory=False)
                    self.logger.info(f"Leitura bem-sucedida com encoding: {encoding}")
                    break # Se leu, sai do loop de tentativas
                
                except UnicodeDecodeError:
                    # Se falhou por causa de codificação, tenta a próxima
                    self.logger.warning(f"Falha na leitura com encoding '{encoding}'. Tentando o próximo...")
                
                except pd.errors.EmptyDataError:
                    self.logger.error(f"Erro de Dados: Arquivo vazio ou ilegível: {nome_arquivo}")
                    break # Se o erro não for de codificação, quebra o loop de tentativas
                
                except Exception as e:
                    # Captura qualquer outro erro que não seja de codificação na leitura
                    self.logger.error(f"ERRO GRAVE inesperado na leitura de {nome_arquivo}: {e}")
                    self.logger.error(f"Detalhes do Erro:\n{traceback.format_exc()}")
                    break

            if df is None or df.empty:
                self.logger.error(f"Processamento ABORTADO para {nome_arquivo}. Não foi possível ler o arquivo com as codificações tentadas.")
                continue 
                        
            try:
                # Processamento (Onde ocorrem as correções de tipos, padronização, etc.)
                df_processado = self._processar_arquivo(df)
                
                dfs.append(df_processado)
                
                # Extrai o ano
                ano = os.path.splitext(nome_arquivo)[0]
                anos_processados.append(ano)
                self.logger.info(f"FINALIZADO com sucesso: {nome_arquivo} ({len(df_processado):,} registros)")
            
            except Exception as e:
                # Captura erros que ocorrem *após* a leitura (ex: conversão de tipos, limpeza)
                self.logger.error(f"ERRO GRAVE na transformação de {nome_arquivo}. O arquivo será pulado.")
                self.logger.error(f"Mensagem do Erro: {e}")
                self.logger.error(f"Detalhes (Traceback):\n{traceback.format_exc()}")

            # ... [O restante do loop de leitura e processamento] ...

        # --- Bloco de Carga (LOAD) e Conclusão 
        if not dfs: 
            self.logger.warning("\nNenhum DataFrame foi processado com sucesso para consolidar.")
            return None

        self.logger.info("\nTentando consolidar todos os DataFrames processados...")

        df_consolidado = pd.concat(dfs, ignore_index=True)
        
     # Salvamento
        anos_str = "_".join(sorted(set(anos_processados)))
        caminho_consolidado = os.path.join(self.pasta_processed, f"compras_antigos_consolidado_{anos_str}.csv")
        # df_consolidado.to_csv(caminho_consolidado, index=False, encoding='utf-8-sig', sep=';')
        
        self.logger.info(f"Consolidação completa!")
        self.logger.info(f"Total de registros: {len(df_consolidado):,}")
        self.logger.info(f"Anos: {', '.join(sorted(set(anos_processados))) if anos_processados else 'N/A'}")
        self.logger.info(f"Arquivo: {caminho_consolidado}")    # ... [Restante do código para logs e return] ...
        
        return df_consolidado


In [2]:
# Função de conveniência
def processar_anos_antigos():    
    
    etl_antigo = ETLComprasAntigos("../data") 
    return etl_antigo.processar_todos_antigos()

# Bloco de execução para o notebook
print("=" * 60)
print("ETL - COMPRAS PÚBLICAS ANOS ANTIGOS (2020-2022)")
print("=" * 60)

df_antigos = None

try:
    df_antigos = processar_anos_antigos()
    
    if df_antigos is not None:
        print(f"\nPROCESSAMENTO DE ANOS ANTIGOS CONCLUÍDO! {len(df_antigos):,} registros consolidados.")
    else:
        print("\nO pipeline foi concluído, mas nenhum dado pôde ser consolidado.")
        
except Exception as e:
    print(f"\nERRO FATAL NA EXECUÇÃO: {e}")
    print(f"Detalhes: {traceback.format_exc()}")

# EXIBIÇÃO DA SAÍDA FINAL (Requisito de documentação do notebook)
if df_antigos is not None:
    print("\n--- Amostra do DataFrame Consolidado (Saída Final do ETL) ---")
    display(df_antigos.head())
    
def _gerar_relatorio_qualidade(df):
    metricas = {
        'registros_processados': len(df),
        'completude_cnpj': (df['cnpj_instituicao'] != '').mean(),
        'valores_numericos_validos': df[['qtd_itens_comprados', 'preco_unitario']].notna().mean(),
        'anos_cobertos': df['ano_compra'].nunique()
    }
    return metricas

INFO - Iniciando processamento de todos os anos antigos (2020-2022)...
INFO - Iniciando processamento de todos os anos antigos (2020-2022)...
INFO - Verificando arquivos na pasta RAW...
INFO - Verificando arquivos na pasta RAW...
INFO - Arquivo antigo encontrado: 2020.csv
INFO - Arquivo antigo encontrado: 2020.csv
INFO - Arquivo antigo encontrado: 2021.csv
INFO - Arquivo antigo encontrado: 2021.csv
INFO - Arquivo antigo encontrado: 2022.csv
INFO - Arquivo antigo encontrado: 2022.csv
INFO - Total arquivos antigos a processar: 3
INFO - Total arquivos antigos a processar: 3
INFO - 
>>INICIANDO processamento do arquivo: 2020.csv
INFO - 
>>INICIANDO processamento do arquivo: 2020.csv


ETL - COMPRAS PÚBLICAS ANOS ANTIGOS (2020-2022)
ETL Antigos - Usando caminho base: ../data
Raw: ../data\raw
Processed: ../data\processed


INFO - Leitura bem-sucedida com encoding: latin-1
INFO - Leitura bem-sucedida com encoding: latin-1
INFO - Padronizando nomes de colunas...
INFO - Padronizando nomes de colunas...
INFO - Colunas padronizadas.
INFO - Colunas padronizadas.
INFO - Corrigindo colunas CNPJ/Nome trocadas...
INFO - Corrigindo colunas CNPJ/Nome trocadas...
INFO - Colunas CNPJ/Nome ajustadas.
INFO - Colunas CNPJ/Nome ajustadas.
INFO - Corrigindo tipos de dados (Antigos) com limpeza robusta...
INFO - Corrigindo tipos de dados (Antigos) com limpeza robusta...
INFO - qtd_itens_comprados: convertido para numérico (Limpeza regional aplicada)
INFO - qtd_itens_comprados: convertido para numérico (Limpeza regional aplicada)
INFO - preco_unitario: convertido para numérico (Limpeza regional aplicada)
INFO - preco_unitario: convertido para numérico (Limpeza regional aplicada)
INFO - preco_total recalculado.
INFO - preco_total recalculado.
  df[coluna] = pd.to_datetime(df[coluna], errors='coerce', dayfirst=True)
INFO - com


PROCESSAMENTO DE ANOS ANTIGOS CONCLUÍDO! 211,148 registros consolidados.

--- Amostra do DataFrame Consolidado (Saída Final do ETL) ---


Unnamed: 0,ano_compra,nome_instituicao,cnpj_instituicao,municipio_instituicao,uf,compra,insercao,codigo_br,descricao_catmat,generico,...,capacidade,unidade_medida,cnpj_fornecedor,fornecedor,cnpj_fabricante,fabricante,qtd_itens_comprados,preco_unitario,preco_total,unidade_fornecimento
0,2020,FUNDO MUNICIPAL DE SAUDE,9102679000102,FERREIROS,PE,2020-11-20,2020-12-16,233632,"PETROLATO, ASPECTO FÍSICO:LÍQUIDO, TIPO:LAXATI...",NÃO,...,0.0,,23680034000170,D.ARAUJO COMERCIAL EIRELI,61190096000192,EUROFARMA LABORATORIOS LTDA,60,3.78,226.8,"FRASCO 100,00 ML"
1,2020,MUNICIPIO DE UBATUBA,46482857000196,UBATUBA,SP,2020-01-03,2020-03-31,233632,"PETROLATO, ASPECTO FÍSICO:LÍQUIDO, TIPO:LAXATI...",NÃO,...,0.0,,67729178000491,COMERCIAL CIRURGICA RIOCLARENSE LTDA,8055634000153,IMEC-INDUSTRIA DE MEDICAMENTOS CUSTODIA LTDA -...,4500,1.77,7965.0,"FRASCO 100,00 ML"
2,2020,FUNDO MUNICIPAL DE SAUDE DE ALIANCA,10759784000190,ALIANCA,PE,2020-01-06,2020-03-05,233632,"PETROLATO, ASPECTO FÍSICO:LÍQUIDO, TIPO:LAXATI...",NÃO,...,0.0,,23232280000169,ZUCK PAPEIS LTDA,6628333000146,FARMACE - INDUSTRIA QUIMICO-FARMACEUTICA CEARE...,210,2.31,485.1,"FRASCO 100,00 ML"
3,2020,FUNDO MUNICIPAL DE SAUDE DE CASCAVEL,9051532000122,CASCAVEL,PR,2020-01-25,2020-02-21,233632,"PETROLATO, ASPECTO FÍSICO:LÍQUIDO, TIPO:LAXATI...",NÃO,...,0.0,,874929000140,MED CENTER COMERCIAL LTDA,6628333000146,FARMACE - INDUSTRIA QUIMICO-FARMACEUTICA CEARE...,2000,1.875,3750.0,"FRASCO 100,00 ML"
4,2020,FUNDO MUNICIPAL DE SAUDE,12306005000126,ITATUBA,PB,2020-01-29,2020-10-06,233632,"PETROLATO, ASPECTO FÍSICO:LÍQUIDO, TIPO:LAXATI...",NÃO,...,0.0,,8674752000140,CIRURGICA MONTEBELLO LTDA,8055634000153,IMEC-INDUSTRIA DE MEDICAMENTOS CUSTODIA LTDA -...,240,2.13,511.2,"FRASCO 100,00 ML"
