In [1]:
#%%
# Bloco 1: Importações e Configuração do Ambiente
# --------------------------------------------------------------------------
# Objetivo: Importar as bibliotecas necessárias para o pipeline e
# definir as configurações de exibição e logging.

# Certifique-se de que as bibliotecas estão instaladas:
# pip install pandas openpyxl pyarrow tqdm

import pandas as pd
import pathlib
import json
from datetime import datetime
from multiprocessing import Pool, cpu_count
from functools import partial
from tqdm.auto import tqdm

# IMPORTANTE: Importa as funções do nosso novo arquivo .py
import processador as proc

# Configura o pandas para não truncar a exibição de colunas em DataFrames
pd.set_option('display.max_columns', None)

print("Ambiente configurado e bibliotecas importadas.")

Ambiente configurado e bibliotecas importadas.


In [2]:
# Bloco 2: Definição de Caminhos e Carregamento de Configurações
# --------------------------------------------------------------------------
PASTA_DADOS = pathlib.Path("dados_catalogo")
PASTA_RAW_XLSX = PASTA_DADOS / "raw_xlsx"
PASTA_STAGING  = PASTA_DADOS / "staging"
PASTA_LOGS     = PASTA_DADOS / "logs"
ARQ_MANIFESTO = PASTA_DADOS / "manifesto_xlsx.json"
CAMINHO_MAPA_COLUNAS = pathlib.Path('mapeamento_colunas.json')

# Cria as pastas se não existirem
for pasta in (PASTA_RAW_XLSX, PASTA_STAGING, PASTA_LOGS):
    pasta.mkdir(parents=True, exist_ok=True)

# Carrega o mapa de colunas
if CAMINHO_MAPA_COLUNAS.exists():
    with open(CAMINHO_MAPA_COLUNAS, 'r', encoding='utf-8') as f:
        mapa_colunas = json.load(f)
    print(f"Arquivo de mapeamento '{CAMINHO_MAPA_COLUNAS}' carregado.")
else:
    mapa_colunas = {}
    print(f"AVISO: Arquivo de mapeamento '{CAMINHO_MAPA_COLUNAS}' não encontrado.")

# Carrega o manifesto
if ARQ_MANIFESTO.exists():
    with open(ARQ_MANIFESTO, "r", encoding="utf-8") as f:
        manifesto = json.load(f)
else:
    manifesto = {}
    
print(f"Estrutura de diretórios e configurações carregadas.")


Arquivo de mapeamento 'mapeamento_colunas.json' carregado.
Estrutura de diretórios e configurações carregadas.


In [3]:
# Bloco 3: Execução Principal (Orquestrador)
# --------------------------------------------------------------------------

if __name__ == '__main__':
    # 1. Descobrir arquivos a serem processados
    todos_arquivos = sorted(list(PASTA_RAW_XLSX.glob("*.xlsx")))
    arquivos_para_processar = [arq for arq in todos_arquivos if arq.name not in manifesto]
    
    print(f"Encontrados {len(todos_arquivos)} arquivos. Destes, {len(arquivos_para_processar)} serão processados.")

    relatorio_processamento = []
    
    # 2. Executar o processamento em paralelo
    if arquivos_para_processar:
        num_processos = cpu_count()
        print(f"\nIniciando o processo de conversão em lote com {num_processos} núcleos...")

        func_parcial = partial(proc.processar_arquivo, mapa_colunas=mapa_colunas, pasta_staging=PASTA_STAGING)
                
        with Pool(processes=num_processos) as pool:
            # --- MUDANÇA NO TQDM ---
            # O unit="arquivo" torna a barra de progresso mais clara para esta tarefa
            for resultado in tqdm(pool.imap_unordered(func_parcial, arquivos_para_processar), total=len(arquivos_para_processar), unit="arquivo"):
                relatorio_processamento.append(resultado)
                if resultado["status"] == "SUCESSO":
                    manifesto[resultado["arquivo"]] = {
                        "processado_em": resultado["timestamp"],
                        "linhas": resultado["linhas"], "colunas": resultado["colunas"]
                    }

    print("\nProcesso de conversão concluído.")
    
    # 3. Salvar manifesto e gerar relatório em CSV
    with open(ARQ_MANIFESTO, "w", encoding="utf-8") as f:
        json.dump(manifesto, f, indent=4, ensure_ascii=False)
    print(f"Manifesto de controle foi atualizado em: '{ARQ_MANIFESTO}'")
    
    caminho_relatorio = None
    if relatorio_processamento:
        df_relatorio = pd.DataFrame(relatorio_processamento)
        caminho_relatorio = PASTA_LOGS / f"relatorio_conversao_{datetime.now():%Y-%m-%d_%H-%M-%S}.csv"
        df_relatorio.to_csv(caminho_relatorio, index=False, sep=';', encoding='utf-8-sig')
        print(f"Relatório de execução detalhado salvo em: '{caminho_relatorio}'")

    # --- NOVO RELATÓRIO FINAL NA TELA ---
    print("\n" + "="*50)
    print("--- RELATÓRIO FINAL DA EXECUÇÃO ---")
    print("="*50)
    
    if not relatorio_processamento:
        print("Nenhum arquivo novo foi processado nesta execução.")
    else:
        sucessos = sum(1 for r in relatorio_processamento if r['status'] == 'SUCESSO')
        erros = sum(1 for r in relatorio_processamento if r['status'] == 'ERRO')
        
        print(f"\nResumo:")
        print(f"  - Arquivos Processados com SUCESSO: {sucessos}")
        print(f"  - Arquivos com ERRO: {erros}")

        if erros > 0:
            print("\nDetalhes dos Erros:")
            for resultado in relatorio_processamento:
                if resultado['status'] == 'ERRO':
                    # Pega apenas a última linha do erro para um relatório mais limpo
                    erro_resumido = resultado['detalhes'].strip().split('\n')[-1]
                    print(f"  - Arquivo: {resultado['arquivo']}")
                    print(f"    Erro: {erro_resumido}")
                    print(f"    (Consulte o arquivo CSV em {caminho_relatorio} para o erro completo)")
    
    colunas_nao_mapeadas = set()
    for res in relatorio_processamento:
        colunas_nao_mapeadas.update(res.get("colunas_nao_mapeadas", []))
            
    if colunas_nao_mapeadas:
        print("\n--- ATENÇÃO: Colunas encontradas mas não importadas no mapeamento ---")
        for col in sorted(list(colunas_nao_mapeadas)):
            print(f" - {col}")
        print(f"Considere adicionar estas colunas ao arquivo '{CAMINHO_MAPA_COLUNAS}'.")

Encontrados 57 arquivos. Destes, 57 serão processados.

Iniciando o processo de conversão em lote com 8 núcleos...


  0%|          | 0/57 [00:00<?, ?arquivo/s]


Processo de conversão concluído.
Manifesto de controle foi atualizado em: 'dados_catalogo\manifesto_xlsx.json'
Relatório de execução detalhado salvo em: 'dados_catalogo\logs\relatorio_conversao_2025-09-17_10-28-42.csv'

--- RELATÓRIO FINAL DA EXECUÇÃO ---

Resumo:
  - Arquivos Processados com SUCESSO: 57
  - Arquivos com ERRO: 0

--- ATENÇÃO: Colunas encontradas mas não no mapeamento ---
 - BB Age
 - Douban ID
 - EIDR ID
 - Platform Rating
 - Unnamed: 0
Considere adicionar estas colunas ao arquivo 'mapeamento_colunas.json'.
