# M√≥dulo 2: Controle Mensal da Folha Inteligente

Este notebook documenta e prototipa as funcionalidades do M√≥dulo 2 do sistema AUDITORIA360, focando na importa√ß√£o e extra√ß√£o de dados de extratos de folha de pagamento. O fluxo principal inclui:

1. **Upload de arquivos PDF** via interface Streamlit
2. **Processamento ass√≠ncrono** usando Document AI
3. **Valida√ß√£o e mapeamento** dos dados extra√≠dos
4. **Armazenamento** estruturado no BigQuery
5. **Monitoramento** de status e tratamento de erros

## Arquitetura do Sistema
- **Frontend**: Streamlit para interface de usu√°rio
- **Backend**: API FastAPI para processamento ass√≠ncrono
- **OCR**: Document AI para extra√ß√£o de dados de PDFs
- **Storage**: Google Cloud Storage para arquivos
- **Database**: BigQuery para dados estruturados

## 1. Configura√ß√£o do Ambiente

Configura√ß√£o inicial das bibliotecas, autentica√ß√£o com Google Cloud e inicializa√ß√£o de vari√°veis globais necess√°rias para o funcionamento do sistema.

In [None]:
# Importa√ß√£o das bibliotecas necess√°rias
import os
import pandas as pd
import numpy as np
import json
import time
import requests

# Google Cloud libraries
from google.cloud import storage
from google.cloud import bigquery
from google.cloud import documentai_v1beta3 as documentai
from google.oauth2 import service_account

# Visualization libraries
import matplotlib.pyplot as plt
import streamlit as st

# Configura√ß√£o da autentica√ß√£o com Google Cloud
# Nota: Em produ√ß√£o, use vari√°veis de ambiente ou service account keys
SERVICE_ACCOUNT_PATH = 'path/to/your-service-account-key.json'
if os.path.exists(SERVICE_ACCOUNT_PATH):
    credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_PATH)
    storage_client = storage.Client(credentials=credentials)
    bigquery_client = bigquery.Client(credentials=credentials)
    docai_client = documentai.DocumentProcessorServiceClient(credentials=credentials)
else:
    # Usar credenciais padr√£o do ambiente
    storage_client = storage.Client()
    bigquery_client = bigquery.Client()
    docai_client = documentai.DocumentProcessorServiceClient()

# Vari√°veis de configura√ß√£o globais
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT', 'seu-projeto-id')
BUCKET_NAME = os.getenv('GCS_BUCKET_NAME', 'seu-bucket-folhas-clientes')
PROCESSOR_ID = os.getenv('DOCAI_PROCESSOR_ID', 'seu-processor-id')
LOCATION = os.getenv('DOCAI_LOCATION', 'us')
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:8000')

print(f"Configura√ß√£o carregada:")
print(f"- Projeto: {PROJECT_ID}")
print(f"- Bucket: {BUCKET_NAME}")
print(f"- Processor ID: {PROCESSOR_ID}")
print(f"- Location: {LOCATION}")

## 2. Upload de Arquivo PDF

Interface para upload de arquivos PDF usando Streamlit. Os arquivos s√£o validados e salvos no Google Cloud Storage para processamento posterior.

In [None]:
def upload_pdf_interface():
    """
    Interface Streamlit para upload de arquivos PDF.
    Valida o arquivo e realiza o upload para o Google Cloud Storage.
    """
    st.title("üìÑ Upload de Extrato de Folha de Pagamento")
    st.markdown("Selecione o arquivo PDF do extrato para processamento autom√°tico.")
    
    # Widget de upload de arquivo
    uploaded_file = st.file_uploader(
        "Selecione o arquivo PDF:", 
        type=["pdf"],
        help="Apenas arquivos PDF s√£o aceitos. Tamanho m√°ximo: 10MB"
    )
    
    if uploaded_file is not None:
        # Validar tamanho do arquivo (m√°ximo 10MB)
        file_size = len(uploaded_file.getvalue())
        if file_size > 10 * 1024 * 1024:  # 10MB
            st.error("‚ùå Arquivo muito grande. Tamanho m√°ximo permitido: 10MB")
            return None
            
        st.success(f"‚úÖ Arquivo '{uploaded_file.name}' carregado com sucesso!")
        st.info(f"üìä Tamanho: {file_size / 1024 / 1024:.2f} MB")
        
        # Bot√£o para processar o arquivo
        if st.button("üöÄ Processar Arquivo", type="primary"):
            with st.spinner("Salvando arquivo no Google Cloud Storage..."):
                try:
                    # Upload para GCS
                    bucket = storage_client.bucket(BUCKET_NAME)
                    blob_name = f"uploads/{uploaded_file.name}"
                    blob = bucket.blob(blob_name)
                    
                    # Reset file pointer and upload
                    uploaded_file.seek(0)
                    blob.upload_from_file(uploaded_file, content_type='application/pdf')
                    
                    gcs_uri = f"gs://{BUCKET_NAME}/{blob_name}"
                    st.success(f"‚úÖ Arquivo salvo no GCS: {gcs_uri}")
                    return gcs_uri
                    
                except Exception as e:
                    st.error(f"‚ùå Erro ao salvar arquivo: {str(e)}")
                    return None
    
    return None

# Exemplo de uso da interface
# gcs_uri = upload_pdf_interface()

## 3. Processamento Ass√≠ncrono com Document AI

Processamento de documentos PDF utilizando o Google Document AI para extra√ß√£o de dados estruturados. O processo √© ass√≠ncrono para melhor performance.

In [None]:
def process_pdf_with_docai(gcs_uri, client_id="default"):
    """
    Processa um PDF armazenado no GCS usando Document AI.
    
    Args:
        gcs_uri (str): URI do arquivo no Google Cloud Storage
        client_id (str): ID do cliente para processamento
    
    Returns:
        str: Job ID para acompanhamento do processamento
    """
    try:
        # Iniciar processamento ass√≠ncrono via API
        response = requests.post(
            f"{API_BASE_URL}/api/v1/clientes/{client_id}/folhas/importar-pdf-async",
            json={"gcs_uri": gcs_uri}
        )
        
        if response.status_code == 200:
            job_data = response.json()
            job_id = job_data.get("job_id")
            print(f"‚úÖ Job iniciado com sucesso. ID: {job_id}")
            print(f"üìä Status: {job_data.get('status', 'INICIADO')}")
            return job_id
        else:
            print(f"‚ùå Erro ao iniciar processamento: {response.text}")
            return None
            
    except requests.RequestException as e:
        print(f"‚ùå Erro de conex√£o: {str(e)}")
        return None

def check_job_status(job_id, client_id="default"):
    """
    Consulta o status de um job de processamento.
    
    Args:
        job_id (str): ID do job para consulta
        client_id (str): ID do cliente
    
    Returns:
        dict: Informa√ß√µes sobre o status do job
    """
    try:
        response = requests.get(
            f"{API_BASE_URL}/api/v1/clientes/{client_id}/folhas/importar-pdf-async/status/{job_id}"
        )
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"‚ùå Erro ao consultar status: {response.text}")
            return None
            
    except requests.RequestException as e:
        print(f"‚ùå Erro de conex√£o: {str(e)}")
        return None

# Exemplo de uso
# gcs_uri_example = "gs://seu-bucket/exemplo.pdf"
# job_id = process_pdf_with_docai(gcs_uri_example, "12345")
# if job_id:
#     status = check_job_status(job_id, "12345")
#     print(f"Status atual: {status}")

## 4. Valida√ß√£o e Mapeamento de Dados

Os dados extra√≠dos do Document AI s√£o validados e mapeados para a estrutura interna do sistema antes do armazenamento.

In [None]:
import re
from typing import List, Dict, Any

def validate_cpf(cpf: str) -> bool:
    """
    Valida formato e d√≠gitos verificadores do CPF.
    """
    if not cpf:
        return False
    
    # Remove caracteres n√£o num√©ricos
    cpf_numbers = re.sub(r'\D', '', cpf)
    
    # Verifica se tem 11 d√≠gitos
    if len(cpf_numbers) != 11:
        return False
    
    # Verifica se n√£o s√£o todos os d√≠gitos iguais
    if cpf_numbers == cpf_numbers[0] * 11:
        return False
    
    # C√°lculo dos d√≠gitos verificadores (simplificado para exemplo)
    return True

def validate_folha_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Valida e mapeia dados extra√≠dos da folha de pagamento.
    
    Args:
        data: Lista de registros extra√≠dos do Document AI
    
    Returns:
        Lista de registros validados com indicadores de erro
    """
    validated_data = []
    
    for i, record in enumerate(data):
        errors = []
        warnings = []
        
        # Valida√ß√£o do CPF
        if not validate_cpf(record.get('cpf', '')):
            errors.append("CPF inv√°lido ou ausente")
        
        # Valida√ß√£o do nome do funcion√°rio
        if not record.get('funcionario_nome', '').strip():
            errors.append("Nome do funcion√°rio ausente")
        
        # Valida√ß√£o da rubrica
        if not record.get('rubrica', '').strip():
            warnings.append("Rubrica n√£o identificada")
        
        # Valida√ß√£o do valor
        try:
            valor = float(record.get('valor', 0))
            if valor == 0:
                warnings.append("Valor zero detectado")
        except (ValueError, TypeError):
            errors.append("Valor inv√°lido")
            valor = 0.0
        
        # Mapeamento para estrutura interna
        validated_record = {
            'id_linha': i + 1,
            'funcionario_nome': record.get('funcionario_nome', '').strip(),
            'cpf': re.sub(r'\D', '', record.get('cpf', '')),
            'rubrica': record.get('rubrica', '').strip(),
            'valor': valor,
            'data_processamento': pd.Timestamp.now().isoformat(),
            'status_validacao': 'ERRO' if errors else ('AVISO' if warnings else 'OK'),
            'erros': errors,
            'avisos': warnings
        }
        
        validated_data.append(validated_record)
    
    return validated_data

# Exemplo de dados extra√≠dos (simula√ß√£o)
dados_exemplo = [
    {"funcionario_nome": "Jo√£o Silva", "cpf": "123.456.789-00", "rubrica": "Sal√°rio Base", "valor": 5000.00},
    {"funcionario_nome": "Maria Oliveira", "cpf": "987.654.321-00", "rubrica": "Vale Transporte", "valor": 150.00},
    {"funcionario_nome": "Pedro Santos", "cpf": "111.111.111-11", "rubrica": "FGTS", "valor": 400.00},  # CPF inv√°lido
    {"funcionario_nome": "", "cpf": "456.789.123-00", "rubrica": "Desconto INSS", "valor": -300.00},  # Nome ausente
]

# Processar valida√ß√£o
dados_validados = validate_folha_data(dados_exemplo)

# Converter para DataFrame para melhor visualiza√ß√£o
df_validado = pd.DataFrame(dados_validados)
print("üìä Resultados da Valida√ß√£o:")
print(df_validado[['funcionario_nome', 'cpf', 'rubrica', 'valor', 'status_validacao']].head())

# Estat√≠sticas de valida√ß√£o
stats = df_validado['status_validacao'].value_counts()
print(f"\nüìà Estat√≠sticas de Valida√ß√£o:")
for status, count in stats.items():
    print(f"  {status}: {count} registros")

## 5. Armazenamento no BigQuery

Dados validados s√£o armazenados no BigQuery com estrutura otimizada para consultas de auditoria.

In [None]:
def insert_validated_data_to_bigquery(validated_data: List[Dict], dataset_id: str = "auditoria_folha_dataset", table_id: str = "LinhasFolhaFuncionario"):
    """
    Insere dados validados no BigQuery com tratamento de erros.
    
    Args:
        validated_data: Lista de dados validados
        dataset_id: ID do dataset no BigQuery
        table_id: ID da tabela no BigQuery
    
    Returns:
        dict: Resultado da opera√ß√£o com estat√≠sticas
    """
    try:
        # Filtrar apenas dados sem erros cr√≠ticos
        dados_para_insercao = [
            record for record in validated_data 
            if record['status_validacao'] != 'ERRO'
        ]
        
        if not dados_para_insercao:
            return {
                'success': False,
                'message': 'Nenhum dado v√°lido para inser√ß√£o',
                'inserted_count': 0,
                'error_count': len(validated_data)
            }
        
        # Preparar dados para BigQuery (remover campos de valida√ß√£o)
        bq_data = []
        for record in dados_para_insercao:
            bq_record = {
                'id_linha': record['id_linha'],
                'funcionario_nome': record['funcionario_nome'],
                'cpf': record['cpf'],
                'rubrica': record['rubrica'],
                'valor': record['valor'],
                'data_processamento': record['data_processamento'],
                'status_validacao': record['status_validacao']
            }
            bq_data.append(bq_record)
        
        # Inserir no BigQuery
        table_ref = bigquery_client.dataset(dataset_id).table(table_id)
        errors = bigquery_client.insert_rows_json(table_ref, bq_data)
        
        if errors:
            print(f"‚ùå Erros ao inserir no BigQuery: {errors}")
            return {
                'success': False,
                'message': f'Erros na inser√ß√£o: {errors}',
                'inserted_count': 0,
                'error_count': len(bq_data)
            }
        else:
            print(f"‚úÖ {len(bq_data)} registros inseridos com sucesso no BigQuery!")
            return {
                'success': True,
                'message': 'Dados inseridos com sucesso',
                'inserted_count': len(bq_data),
                'error_count': len(validated_data) - len(bq_data)
            }
            
    except Exception as e:
        print(f"‚ùå Erro inesperado: {str(e)}")
        return {
            'success': False,
            'message': f'Erro inesperado: {str(e)}',
            'inserted_count': 0,
            'error_count': len(validated_data)
        }

# Exemplo de uso com dados validados
if 'dados_validados' in locals():
    resultado_insercao = insert_validated_data_to_bigquery(dados_validados)
    print(f"\nüìä Resultado da Inser√ß√£o:")
    print(f"  Sucesso: {resultado_insercao['success']}")
    print(f"  Registros inseridos: {resultado_insercao['inserted_count']}")
    print(f"  Registros com erro: {resultado_insercao['error_count']}")
    print(f"  Mensagem: {resultado_insercao['message']}")

## 6. Monitoramento e Visualiza√ß√£o

Acompanhamento do processamento e visualiza√ß√£o dos resultados para an√°lise e auditoria.

In [None]:
def monitor_processing_pipeline(job_id: str, client_id: str = "default", max_wait_time: int = 300):
    """
    Monitora o pipeline de processamento com timeout e relat√≥rios de progresso.
    
    Args:
        job_id: ID do job para monitoramento
        client_id: ID do cliente
        max_wait_time: Tempo m√°ximo de espera em segundos
    
    Returns:
        dict: Status final do processamento
    """
    start_time = time.time()
    status_history = []
    
    print(f"üîç Iniciando monitoramento do Job: {job_id}")
    print(f"‚è±Ô∏è Tempo m√°ximo de espera: {max_wait_time}s")
    
    while True:
        elapsed_time = time.time() - start_time
        
        # Verificar timeout
        if elapsed_time > max_wait_time:
            print(f"‚è∞ Timeout atingido ap√≥s {elapsed_time:.1f}s")
            return {
                'status': 'TIMEOUT',
                'elapsed_time': elapsed_time,
                'history': status_history
            }
        
        # Consultar status atual
        current_status = check_job_status(job_id, client_id)
        
        if current_status:
            status = current_status.get('status', 'UNKNOWN')
            
            # Adicionar ao hist√≥rico se mudou
            if not status_history or status_history[-1]['status'] != status:
                status_entry = {
                    'status': status,
                    'timestamp': pd.Timestamp.now().isoformat(),
                    'elapsed_time': elapsed_time
                }
                status_history.append(status_entry)
                print(f"üìä [{elapsed_time:.1f}s] Status: {status}")
            
            # Verificar se terminou
            if status in ["CONCLUIDO_SUCESSO", "CONCLUIDO_COM_PENDENCIAS", "FALHA_DOCAI", "FALHA_MAPEAMENTO", "ERRO"]:
                print(f"‚úÖ Processamento finalizado: {status}")
                return {
                    'status': status,
                    'elapsed_time': elapsed_time,
                    'history': status_history,
                    'details': current_status
                }
        else:
            print(f"‚ö†Ô∏è Erro ao consultar status do job")
        
        # Aguardar antes da pr√≥xima consulta
        time.sleep(10)

def visualize_processing_results(data: List[Dict]):
    """
    Cria visualiza√ß√µes dos resultados do processamento.
    """
    if not data:
        print("‚ö†Ô∏è Nenhum dado para visualizar")
        return
    
    df = pd.DataFrame(data)
    
    # Configurar matplotlib para melhor visualiza√ß√£o
    plt.style.use('default')
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('üìä An√°lise dos Dados Processados', fontsize=16, fontweight='bold')
    
    # 1. Distribui√ß√£o por status de valida√ß√£o
    status_counts = df['status_validacao'].value_counts()
    axes[0, 0].pie(status_counts.values, labels=status_counts.index, autopct='%1.1f%%', startangle=90)
    axes[0, 0].set_title('Status de Valida√ß√£o')
    
    # 2. Top 10 rubricas por valor
    top_rubricas = df.groupby('rubrica')['valor'].sum().sort_values(ascending=False).head(10)
    axes[0, 1].barh(range(len(top_rubricas)), top_rubricas.values)
    axes[0, 1].set_yticks(range(len(top_rubricas)))
    axes[0, 1].set_yticklabels(top_rubricas.index)
    axes[0, 1].set_title('Top 10 Rubricas por Valor Total')
    axes[0, 1].set_xlabel('Valor (R$)')
    
    # 3. Distribui√ß√£o de valores
    axes[1, 0].hist(df['valor'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
    axes[1, 0].set_title('Distribui√ß√£o de Valores')
    axes[1, 0].set_xlabel('Valor (R$)')
    axes[1, 0].set_ylabel('Frequ√™ncia')
    
    # 4. Funcion√°rios com mais rubricas
    func_rubricas = df.groupby('funcionario_nome').size().sort_values(ascending=False).head(10)
    axes[1, 1].bar(range(len(func_rubricas)), func_rubricas.values)
    axes[1, 1].set_xticks(range(len(func_rubricas)))
    axes[1, 1].set_xticklabels(func_rubricas.index, rotation=45, ha='right')
    axes[1, 1].set_title('Funcion√°rios com Mais Rubricas')
    axes[1, 1].set_ylabel('N√∫mero de Rubricas')
    
    plt.tight_layout()
    plt.show()
    
    # Estat√≠sticas resumidas
    print("\nüìà Estat√≠sticas Resumidas:")
    print(f"  Total de registros: {len(df)}")
    print(f"  Funcion√°rios √∫nicos: {df['funcionario_nome'].nunique()}")
    print(f"  Rubricas √∫nicas: {df['rubrica'].nunique()}")
    print(f"  Valor total: R$ {df['valor'].sum():,.2f}")
    print(f"  Valor m√©dio: R$ {df['valor'].mean():.2f}")

# Exemplo de uso das fun√ß√µes de monitoramento
# job_id_example = "12345-abcde-67890"
# resultado_monitoramento = monitor_processing_pipeline(job_id_example)
# 
# if 'dados_validados' in locals():
#     visualize_processing_results(dados_validados)

## 7. Tratamento de Erros e Logs

Sistema robusto de tratamento de erros com logs detalhados para debugging e auditoria.

In [None]:
import logging
from datetime import datetime
from typing import Optional, Union

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/tmp/auditoria_folha.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class FolhaProcessingError(Exception):
    """Exce√ß√£o espec√≠fica para erros de processamento de folha."""
    pass

class ErrorHandler:
    """
    Classe para centralizar o tratamento de erros do sistema.
    """
    
    @staticmethod
    def log_error(error: Union[Exception, str], context: str = "", job_id: Optional[str] = None):
        """
        Registra erro com contexto adicional.
        """
        error_msg = str(error)
        timestamp = datetime.now().isoformat()
        
        log_entry = {
            'timestamp': timestamp,
            'job_id': job_id,
            'context': context,
            'error': error_msg,
            'error_type': type(error).__name__ if isinstance(error, Exception) else 'Unknown'
        }
        
        logger.error(f"[{context}] {error_msg}", extra=log_entry)
        return log_entry
    
    @staticmethod
    def handle_processing_error(func):
        """
        Decorator para tratamento autom√°tico de erros em fun√ß√µes de processamento.
        """
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except requests.RequestException as e:
                ErrorHandler.log_error(e, f"Erro de conex√£o em {func.__name__}")
                return {'success': False, 'error': 'Erro de conex√£o com API', 'details': str(e)}
            except FolhaProcessingError as e:
                ErrorHandler.log_error(e, f"Erro de processamento em {func.__name__}")
                return {'success': False, 'error': 'Erro no processamento da folha', 'details': str(e)}
            except Exception as e:
                ErrorHandler.log_error(e, f"Erro inesperado em {func.__name__}")
                return {'success': False, 'error': 'Erro inesperado', 'details': str(e)}
        return wrapper

# Exemplo de tratamento de erros integrado
print("üîß Sistema de tratamento de erros configurado com sucesso!")
print("üìù Logs ser√£o salvos em: /tmp/auditoria_folha.log")

## 8. Exemplo de Fluxo Completo

Demonstra√ß√£o de um fluxo completo de processamento de folha de pagamento com todas as funcionalidades integradas.

In [None]:
def execute_complete_folha_processing_workflow(pdf_file_path: str, client_id: str = "demo_client"):
    """
    Executa o fluxo completo de processamento de folha de pagamento.
    
    Args:
        pdf_file_path: Caminho para o arquivo PDF
        client_id: ID do cliente
    
    Returns:
        dict: Resultado completo do processamento
    """
    workflow_result = {
        'client_id': client_id,
        'start_time': datetime.now().isoformat(),
        'steps': [],
        'final_status': 'INICIADO'
    }
    
    try:
        # Passo 1: Upload do PDF para GCS
        print("üì§ Passo 1: Upload do PDF para Google Cloud Storage")
        # gcs_uri = upload_pdf_to_gcs(pdf_file_path)
        gcs_uri = f"gs://{BUCKET_NAME}/uploads/demo_folha.pdf"  # Simulado
        workflow_result['steps'].append({
            'step': 1,
            'name': 'Upload PDF',
            'status': 'SUCESSO',
            'gcs_uri': gcs_uri
        })
        
        # Passo 2: Iniciar processamento ass√≠ncrono
        print("üöÄ Passo 2: Iniciar processamento ass√≠ncrono com Document AI")
        job_id = process_pdf_with_docai(gcs_uri, client_id)
        if not job_id:
            raise FolhaProcessingError("Falha ao iniciar processamento")
        
        workflow_result['steps'].append({
            'step': 2,
            'name': 'Iniciar Processamento',
            'status': 'SUCESSO',
            'job_id': job_id
        })
        
        # Passo 3: Monitorar processamento
        print("üëÅÔ∏è Passo 3: Monitorar processamento")
        monitoring_result = monitor_processing_pipeline(job_id, client_id, max_wait_time=120)
        
        workflow_result['steps'].append({
            'step': 3,
            'name': 'Monitoramento',
            'status': monitoring_result['status'],
            'elapsed_time': monitoring_result['elapsed_time']
        })
        
        # Passo 4: Processamento dos dados (simulado)
        if monitoring_result['status'] in ['CONCLUIDO_SUCESSO', 'CONCLUIDO_COM_PENDENCIAS']:
            print("üîç Passo 4: Valida√ß√£o e mapeamento de dados")
            
            # Dados simulados extra√≠dos do Document AI
            dados_extraidos = [
                {"funcionario_nome": "Jo√£o Silva", "cpf": "123.456.789-00", "rubrica": "Sal√°rio Base", "valor": 5000.00},
                {"funcionario_nome": "Maria Santos", "cpf": "987.654.321-00", "rubrica": "Vale Alimenta√ß√£o", "valor": 400.00},
                {"funcionario_nome": "Pedro Oliveira", "cpf": "456.789.123-45", "rubrica": "FGTS", "valor": 320.00},
            ]
            
            dados_validados = validate_folha_data(dados_extraidos)
            
            workflow_result['steps'].append({
                'step': 4,
                'name': 'Valida√ß√£o de Dados',
                'status': 'SUCESSO',
                'records_processed': len(dados_validados),
                'validation_stats': pd.DataFrame(dados_validados)['status_validacao'].value_counts().to_dict()
            })
            
            # Passo 5: Armazenamento no BigQuery
            print("üíæ Passo 5: Armazenamento no BigQuery")
            storage_result = insert_validated_data_to_bigquery(dados_validados)
            
            workflow_result['steps'].append({
                'step': 5,
                'name': 'Armazenamento BigQuery',
                'status': 'SUCESSO' if storage_result['success'] else 'ERRO',
                'inserted_count': storage_result['inserted_count'],
                'error_count': storage_result['error_count']
            })
            
            # Passo 6: Gera√ß√£o de relat√≥rio
            print("üìä Passo 6: Gera√ß√£o de visualiza√ß√µes")
            visualize_processing_results(dados_validados)
            
            workflow_result['steps'].append({
                'step': 6,
                'name': 'Visualiza√ß√µes',
                'status': 'SUCESSO'
            })
            
            workflow_result['final_status'] = 'CONCLUIDO_SUCESSO'
        else:
            workflow_result['final_status'] = 'FALHA_PROCESSAMENTO'
            
    except Exception as e:
        ErrorHandler.log_error(e, "Fluxo completo de processamento")
        workflow_result['final_status'] = 'ERRO'
        workflow_result['error'] = str(e)
    
    finally:
        workflow_result['end_time'] = datetime.now().isoformat()
        
    # Resumo final
    print(f"\nüèÅ Workflow Finalizado!")
    print(f"   Status: {workflow_result['final_status']}")
    print(f"   Passos executados: {len(workflow_result['steps'])}")
    
    return workflow_result

# Exemplo de execu√ß√£o do fluxo completo
print("üéØ Demonstra√ß√£o do Fluxo Completo de Processamento")
print("   (Utilizando dados simulados para demonstra√ß√£o)")
# resultado_completo = execute_complete_folha_processing_workflow("demo_folha.pdf")
print("\n‚úÖ Notebook configurado e pronto para uso!")
print("üìã Para executar o fluxo completo, chame: execute_complete_folha_processing_workflow('caminho_do_pdf.pdf')")