In [3]:
# Célula 1: Importações, Configurações e Funções Utilitárias

import requests
import json
import time
import os
import pymongo
from datetime import datetime, timezone, timedelta # Adicionado timedelta
import hashlib

print("Bibliotecas importadas.")

# --- Configurações Globais ---
API_BASE_URL = "https://api-publica.datajud.cnj.jus.br/api_publica_{tribunal}/_search"
TAMANHO_PAGINA_API = 10000
MAX_RETRIES_CONEXAO_API = 5
TIMEOUT_REQUEST_API = 240

ESTADO_BASE_PATH = r"C:\IFPB\bd\projeto\estado_mongo"
if not os.path.exists(ESTADO_BASE_PATH):
    os.makedirs(ESTADO_BASE_PATH)
    print(f"Diretório de estado '{ESTADO_BASE_PATH}' criado.")

SUA_API_KEY = "cDZHYzlZa0JadVREZDJCendQbXY6SkJlTzNjLV9TRENyQk1RdnFKZGRQdw==" # <--- SUBSTITUA

HEADERS_API = {
    "Content-Type": "application/json",
    "Authorization": f"ApiKey {SUA_API_KEY}"
}

if SUA_API_KEY == "COLOQUE_SUA_API_KEY_AQUI":
    print("!!! ATENÇÃO: API KEY NÃO CONFIGURADA. INTERROMPENDO. !!!")
    raise ValueError("API Key não configurada.")

print("Configurações da API carregadas.")

# --- FUNÇÕES UTILITÁRIAS GLOBAIS ---

def consultar_api_datajud_mongo(tribunal, query_payload):
    """Faz uma requisição à API do DataJud com retries, tratando 429."""
    url = API_BASE_URL.format(tribunal=tribunal.lower())
    response_data = None
    last_exception_details = ""
    for tentativa in range(MAX_RETRIES_CONEXAO_API + 3): # Aumentar um pouco para 429
        try:
            print(f"  API Call: Tribunal {tribunal}, Tentativa {tentativa + 1}...")
            response = requests.post(url, json=query_payload, headers=HEADERS_API, timeout=TIMEOUT_REQUEST_API)
            if response.status_code == 429:
                retry_after_str = response.headers.get("Retry-After")
                espera = 30 + (2 ** tentativa) * 5 
                if retry_after_str:
                    try:
                        espera_header = int(retry_after_str)
                        espera = max(espera, espera_header) # Usa o maior tempo de espera
                        print(f"    API Rate Limit (429). Cabeçalho Retry-After: {espera_header}s. Aguardando {espera}s.")
                    except ValueError:
                        print(f"    API Rate Limit (429). Retry-After inválido ('{retry_after_str}'). Usando espera de {espera}s.")
                else:
                    print(f"    API Rate Limit (429) sem cabeçalho Retry-After. Usando espera de {espera}s.")
                if tentativa < MAX_RETRIES_CONEXAO_API + 2:
                    time.sleep(espera)
                    continue
                else:
                    last_exception_details = f"Status 429: Too Many Requests. Máximo de retries atingido."
                    break
            response.raise_for_status()
            response_data = response.json()
            return response_data
        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.ChunkedEncodingError) as e_net:
            last_exception_details = str(e_net)
            print(f"    Erro de rede/timeout na tentativa {tentativa + 1}: {e_net}")
            if tentativa < MAX_RETRIES_CONEXAO_API + 2: time.sleep((2 ** tentativa) * 2)
        except requests.exceptions.RequestException as e_req:
            last_exception_details = f"Status {response.status_code if response else 'N/A'}: {response.text[:500] if response else str(e_req)}"
            print(f"    Erro HTTP não recuperável: {e_req}. Detalhes: {last_exception_details}")
            break 
        except json.JSONDecodeError as e_json:
            last_exception_details = f"JSONDecodeError: {str(e_json)}. Response: {response.text[:500] if response else 'N/A'}"
            print(f"    Erro ao decodificar JSON: {e_json}. Response: {response.text[:500] if response else 'N/A'}")
            break
        except Exception as e_geral:
            last_exception_details = f"Erro geral: {type(e_geral).__name__} - {str(e_geral)}"
            print(f"    Erro geral inesperado na API call: {last_exception_details}")
            break
    if response_data is None:
        print(f"    Falha final ao consultar API para {tribunal}. Último erro: {last_exception_details}")
    return response_data

def string_para_datetime_utc(date_string):
    if not date_string: return None
    try:
        # Lidar com formatos YYYY-MM-DDTHH:mm:ss.SSSSSSSSSZ e YYYY-MM-DDTHH:mm:ssZ
        # e também YYYY-MM-DDTHH:mm:ss.SSS+HH:MM
        original_date_string = date_string # Para log em caso de falha
        
        # Normalizar 'Z' para +00:00
        if date_string.endswith('Z'):
            date_string = date_string[:-1] + '+00:00'
        
        # Truncar nanossegundos para microssegundos (6 dígitos) se houver fração de segundo
        if '.' in date_string:
            parts = date_string.split('.')
            datetime_part = parts[0]
            fraction_and_tz_part = parts[1]

            # Encontrar o início do fuso horário, se houver
            tz_index_plus = fraction_and_tz_part.rfind('+')
            tz_index_minus = fraction_and_tz_part.rfind('-')
            
            tz_char_index = -1
            if tz_index_plus != -1 and tz_index_minus != -1:
                tz_char_index = min(tz_index_plus, tz_index_minus)
            elif tz_index_plus != -1:
                tz_char_index = tz_index_plus
            elif tz_index_minus != -1:
                tz_char_index = tz_index_minus

            if tz_char_index != -1:
                microseconds_part = fraction_and_tz_part[:tz_char_index][:6]
                tz_part = fraction_and_tz_part[tz_char_index:]
            else: # Sem fuso horário explícito após a fração
                microseconds_part = fraction_and_tz_part[:6]
                tz_part = '+00:00' # Assume UTC se não especificado e Z foi removido
            
            date_string = f"{datetime_part}.{microseconds_part}{tz_part}"
        elif not ('+' in date_string or date_string.count('-') > 2): # Sem fração e sem fuso explícito
             date_string += '+00:00'


        dt_obj = datetime.fromisoformat(date_string)
        # Garantir que seja UTC
        return dt_obj.astimezone(timezone.utc)
    except Exception as e:
        # print(f"    Aviso: Não foi possível converter a string de data '{original_date_string}' para datetime. Erro: {e}. Retornando None.")
        return None

def converter_datas_no_documento(doc):
    if isinstance(doc, dict):
        return {k: converter_datas_no_documento(v) for k, v in doc.items()}
    elif isinstance(doc, list):
        return [converter_datas_no_documento(elem) for elem in doc]
    elif isinstance(doc, str):
        # Tentar converter apenas campos que parecem ser datas para evitar conversões erradas
        # Heurística: tem 'T', tem 'Z' ou fuso, e tem pelo menos 19 chars
        if len(doc) >= 19 and 'T' in doc and (doc.endswith('Z') or '+' in doc or doc.count('-') > 2):
            converted_date = string_para_datetime_utc(doc)
            return converted_date if converted_date is not None else doc # Mantém string se falhar
    return doc

# Funções de estado (agora só para search_after da coleta inicial)
def salvar_estado_coleta_inicial_mongo(tribunal, search_after_param):
    estado = {"search_after_coleta_inicial": search_after_param}
    path_estado = os.path.join(ESTADO_BASE_PATH, f"coleta_inicial_mongo_{tribunal}_estado.json")
    try:
        with open(path_estado, 'w') as f_estado: json.dump(estado, f_estado)
    except Exception as e: print(f"  ERRO ao salvar estado de coleta inicial para {tribunal}: {e}")

def carregar_estado_coleta_inicial_mongo(tribunal):
    path_estado = os.path.join(ESTADO_BASE_PATH, f"coleta_inicial_mongo_{tribunal}_estado.json")
    if os.path.exists(path_estado):
        try:
            with open(path_estado, 'r') as f_estado:
                estado = json.load(f_estado)
                print(f"  Estado de COLETA INICIAL MongoDB para {tribunal} carregado: SA={estado.get('search_after_coleta_inicial')}")
                return estado.get("search_after_coleta_inicial")
        except Exception as e:
            print(f"  Erro ao carregar estado de COLETA INICIAL para {tribunal}: {e}. Iniciando do começo.")
    return None

print("Funções utilitárias definidas.")

Bibliotecas importadas.
Configurações da API carregadas.
Funções utilitárias definidas.


In [4]:
# Célula 2: Consultar Quantidade de Processos na API

# tribunais_para_verificar_qtd = ["tjpb", "trf5", "stj"] # Exemplo de lista
tribunais_para_verificar_qtd = ["tjpb"] # Para focar em um para a coleta principal

print("\n--- Verificando Quantidade de Processos (Estimativa da API) ---")
for tribunal_sigla_qtd in tribunais_para_verificar_qtd:
    print(f"\nConsultando tribunal: {tribunal_sigla_qtd.upper()}")
    query_contagem_api = {
        "query": {"match_all": {}},
        "size": 0,
        "track_total_hits": True
    }
    
    data_api_qtd = consultar_api_datajud_mongo(tribunal_sigla_qtd, query_contagem_api)
    
    if data_api_qtd and data_api_qtd.get('hits') and isinstance(data_api_qtd['hits'].get('total'), dict):
        total_value_api = data_api_qtd['hits']['total'].get('value', 'N/A')
        total_relation_api = data_api_qtd['hits']['total'].get('relation', 'N/A')
        print(f"  Estimativa de processos para {tribunal_sigla_qtd.upper()}: {total_value_api} (relação: {total_relation_api})")
    else:
        print(f"  Não foi possível obter a contagem para {tribunal_sigla_qtd.upper()}. Resposta: {str(data_api_qtd)[:200] if data_api_qtd else 'Nenhuma resposta'}")
    time.sleep(0.5)


--- Verificando Quantidade de Processos (Estimativa da API) ---

Consultando tribunal: TJPB
  API Call: Tribunal tjpb, Tentativa 1...
  Estimativa de processos para TJPB: 2694792 (relação: eq)


In [15]:
# Célula 3: Conexão com MongoDB e Definição de Coleções (CORRIGIDA NOVAMENTE)

MONGO_URI_CONEXAO = "mongodb://localhost:27017/"
NOME_BANCO_DADOS_MONGO = "base_cnj"
TRIBUNAL_ALVO_MONGO_COLETA = "tjpb" # Mude conforme necessário
COLECAO_PROCESSOS_MONGO_NOME = f"processos_{TRIBUNAL_ALVO_MONGO_COLETA.lower()}"
COLECAO_MOVIMENTACOES_MONGO_NOME = f"movimentacoes_{TRIBUNAL_ALVO_MONGO_COLETA.lower()}"

mongo_client = None
db_mongo = None
colecao_processos_mongo = None
colecao_movimentacoes_mongo = None

try:
    print(f"\n--- Conectando ao MongoDB em {MONGO_URI_CONEXAO} ---")
    mongo_client = pymongo.MongoClient(MONGO_URI_CONEXAO, serverSelectionTimeoutMS=5000)
    mongo_client.admin.command('ping')
    print("Conexão com MongoDB bem-sucedida!")

    db_mongo = mongo_client[NOME_BANCO_DADOS_MONGO]
    print(f"Usando/Criando banco de dados: '{NOME_BANCO_DADOS_MONGO}'")

    colecao_processos_mongo = db_mongo[COLECAO_PROCESSOS_MONGO_NOME]
    print(f"Usando/Criando coleção de processos: '{COLECAO_PROCESSOS_MONGO_NOME}'")
    print(f"  Índice padrão em '_id' para '{COLECAO_PROCESSOS_MONGO_NOME}' é automaticamente único.")
    try:
        # Índice para buscar o @timestamp mais recente da API
        colecao_processos_mongo.create_index([("@timestamp", pymongo.DESCENDING)], 
                                             name="idx_processo_api_timestamp", 
                                             background=True)
        print(f"  Índice em '@timestamp' para '{COLECAO_PROCESSOS_MONGO_NOME}' garantido.")
    except Exception as e_idx_ts:
        print(f"  Aviso ao criar/verificar índice em '@timestamp' para processos: {e_idx_ts}")


    colecao_movimentacoes_mongo = db_mongo[COLECAO_MOVIMENTACOES_MONGO_NOME]
    print(f"Usando/Criando coleção de movimentações: '{COLECAO_MOVIMENTACOES_MONGO_NOME}'")
    # O MongoDB cria automaticamente um índice único em _id para a coleção de movimentações também.
    # A linha abaixo foi REMOVIDA:
    # colecao_movimentacoes_mongo.create_index([("_id", pymongo.ASCENDING)], name="idx_mov_id_unico", background=True, unique=True) 
    print(f"  Índice padrão em '_id' para '{COLECAO_MOVIMENTACOES_MONGO_NOME}' é automaticamente único.")
    
    # Outros índices úteis para a coleção de movimentações
    try:
        colecao_movimentacoes_mongo.create_index([("processo_id", pymongo.ASCENDING)], 
                                                 name="idx_mov_processo_id", 
                                                 background=True)
        colecao_movimentacoes_mongo.create_index([("processo_id", pymongo.ASCENDING), ("dataHora", pymongo.ASCENDING)], 
                                                 name="idx_mov_proc_datahora", 
                                                 background=True)
        print(f"  Outros índices para '{COLECAO_MOVIMENTACOES_MONGO_NOME}' criados/verificados.")
    except Exception as e_idx_mov:
        print(f"  Aviso ao criar/verificar outros índices para movimentações: {e_idx_mov}")


except Exception as e:
    print(f"ERRO inesperado durante a configuração do MongoDB: {type(e).__name__} - {e}")
    if 'mongo_client' in locals() and mongo_client is not None: # Tentar fechar se foi aberto
        mongo_client.close()
    mongo_client = None # Garantir que está None se a conexão falhar

if mongo_client is None:
    print("!!! ATENÇÃO: Não foi possível conectar ou configurar o MongoDB corretamente. As próximas células de inserção não funcionarão. !!!")


--- Conectando ao MongoDB em mongodb://localhost:27017/ ---
Conexão com MongoDB bem-sucedida!
Usando/Criando banco de dados: 'base_cnj'
Usando/Criando coleção de processos: 'processos_tjpb'
  Índice padrão em '_id' para 'processos_tjpb' é automaticamente único.
  Índice em '@timestamp' para 'processos_tjpb' garantido.
Usando/Criando coleção de movimentações: 'movimentacoes_tjpb'
  Índice padrão em '_id' para 'movimentacoes_tjpb' é automaticamente único.
  Outros índices para 'movimentacoes_tjpb' criados/verificados.


In [5]:
# Célula 3: Coleta Inicial e Retomada para MongoDB (Ajustada)

BATCH_SIZE_MONGO_INSERT_C4 = 100000 # Ou o valor que você preferir

if 'mongo_client' not in locals() or mongo_client is None:
    print("!!! ATENÇÃO: MongoDB não está conectado. Pulando Célula 4. !!!")
else:
    print(f"\n--- Iniciando Coleta INICIAL/RETOMADA para MongoDB do Tribunal: {TRIBUNAL_ALVO_MONGO_COLETA.upper()} (Célula 4) ---")
    
    search_after_coleta_inicial_c4 = carregar_estado_coleta_inicial_mongo(TRIBUNAL_ALVO_MONGO_COLETA)
    
    total_api_lidos_nesta_sessao_c4 = 0
    primeira_pagina_coleta_c4 = not bool(search_after_coleta_inicial_c4)
    
    processos_batch_mongo_c4 = []
    movimentacoes_batch_mongo_c4 = []
    
    try:
        while True:
            query_api_c4 = {
                "size": TAMANHO_PAGINA_API,
                "query": {"match_all": {}},
                "sort": [{"@timestamp": "asc"}] 
            }
            if search_after_coleta_inicial_c4:
                query_api_c4["search_after"] = search_after_coleta_inicial_c4
            
            response_data_c4 = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_api_c4)

            if response_data_c4 is None:
                print(f"  Falha ao obter dados da API. Encerrando coleta (C4).")
                break

            hits_api_c4 = response_data_c4.get('hits', {}).get('hits', [])

            if primeira_pagina_coleta_c4:
                total_estimado_api_c4 = response_data_c4.get('hits', {}).get('total', {}).get('value', 0)
                print(f"  Total de processos estimados (API) para coleta inicial: {total_estimado_api_c4}")
                primeira_pagina_coleta_c4 = False
                if total_estimado_api_c4 == 0: break
            
            if not hits_api_c4:
                print(f"  Nenhum processo retornado nesta página. Fim da coleta (C4).")
                break

            # Processar os hits (lógica de transformação e append aos batches como antes)
            for hit_api in hits_api_c4:
                source_data_api = hit_api.get("_source", {})
                id_cnj_original_api = hit_api.get("_id")
                if not id_cnj_original_api: continue
                id_processo_mongo = id_cnj_original_api
                
                processo_doc = {"_id": id_processo_mongo}
                processo_doc.update(source_data_api)
                movs_do_processo_api = processo_doc.pop('movimentos', [])
                processo_doc = converter_datas_no_documento(processo_doc)
                processo_doc["_timestampColetaLocal"] = datetime.now(timezone.utc)
                processos_batch_mongo_c4.append(processo_doc)

                if movs_do_processo_api and isinstance(movs_do_processo_api, list):
                    for mov_idx, mov_api_data in enumerate(movs_do_processo_api):
                        if isinstance(mov_api_data, dict):
                            mov_doc = mov_api_data.copy()
                            mov_doc["processo_id"] = id_processo_mongo
                            mov_hash_parts = [
                                str(id_processo_mongo), str(mov_api_data.get("codigo")),
                                str(mov_api_data.get("dataHora")), 
                                json.dumps(mov_api_data.get("complementosTabelados"), sort_keys=True),
                                str(mov_idx) 
                            ]
                            mov_doc["_id"] = hashlib.sha1("".join(filter(None, mov_hash_parts)).encode('utf-8')).hexdigest()
                            mov_doc = converter_datas_no_documento(mov_doc)
                            mov_doc["_timestampColetaLocal"] = datetime.now(timezone.utc)
                            movimentacoes_batch_mongo_c4.append(mov_doc)
            
            total_api_lidos_nesta_sessao_c4 += len(hits_api_c4)
            # A contagem global agora é implícita pelo que está no banco
            print(f"    Lidos {len(hits_api_c4)} da API. Total sessão (C4): {total_api_lidos_nesta_sessao_c4}.")

            if len(processos_batch_mongo_c4) >= BATCH_SIZE_MONGO_INSERT_C4:
                print(f"    Inserindo/Atualizando {len(processos_batch_mongo_c4)} processos no MongoDB...")
                bulk_ops_proc = [pymongo.ReplaceOne({"_id": doc["_id"]}, doc, upsert=True) for doc in processos_batch_mongo_c4]
                if bulk_ops_proc: 
                    try: colecao_processos_mongo.bulk_write(bulk_ops_proc, ordered=False)
                    except pymongo.errors.BulkWriteError as bwe_proc: print(f"      Erro de BulkWrite (processos): {bwe_proc.details}")
                processos_batch_mongo_c4 = []
                
                print(f"    Inserindo/Atualizando {len(movimentacoes_batch_mongo_c4)} movimentações...")
                bulk_ops_mov = [pymongo.ReplaceOne({"_id": mov["_id"]}, mov, upsert=True) for mov in movimentacoes_batch_mongo_c4]
                if bulk_ops_mov:
                    try: colecao_movimentacoes_mongo.bulk_write(bulk_ops_mov, ordered=False)
                    except pymongo.errors.BulkWriteError as bwe_mov: print(f"      Erro de BulkWrite (movs): {bwe_mov.details}")
                movimentacoes_batch_mongo_c4 = []
                print(f"    Lotes inseridos/atualizados no MongoDB.")

            search_after_coleta_inicial_c4 = hits_api_c4[-1]['sort']
            salvar_estado_coleta_inicial_mongo(TRIBUNAL_ALVO_MONGO_COLETA, search_after_coleta_inicial_c4)
            time.sleep(0.1)

    except Exception as e_fatal_mongo_coleta:
        print(f"\nErro fatal durante a coleta inicial (C4): {type(e_fatal_mongo_coleta).__name__} - {e_fatal_mongo_coleta}")
        import traceback
        traceback.print_exc()
    except KeyboardInterrupt:
        print(f"\nColeta inicial (C4) interrompida.")
    finally:
        # Inserir batches restantes
        if processos_batch_mongo_c4:
            print(f"    Inserindo/Atualizando {len(processos_batch_mongo_c4)} processos restantes (C4)...")
            # ... (lógica de bulk_write como acima)
            bulk_ops_proc_final = [pymongo.ReplaceOne({"_id": doc["_id"]}, doc, upsert=True) for doc in processos_batch_mongo_c4]
            if bulk_ops_proc_final: 
                try: colecao_processos_mongo.bulk_write(bulk_ops_proc_final, ordered=False)
                except pymongo.errors.BulkWriteError as bwe_p_f: print(f"      Erro BW (final proc): {bwe_p_f.details}")
        if movimentacoes_batch_mongo_c4:
            print(f"    Inserindo/Atualizando {len(movimentacoes_batch_mongo_c4)} movimentações restantes (C4)...")
            # ... (lógica de bulk_write como acima)
            bulk_ops_mov_final = [pymongo.ReplaceOne({"_id": mov["_id"]}, mov, upsert=True) for mov in movimentacoes_batch_mongo_c4]
            if bulk_ops_mov_final:
                try: colecao_movimentacoes_mongo.bulk_write(bulk_ops_mov_final, ordered=False)
                except pymongo.errors.BulkWriteError as bwe_m_f: print(f"      Erro BW (final mov): {bwe_m_f.details}")
        
        print(f"\nColeta Inicial MongoDB (C4 - sessão) finalizada. {total_api_lidos_nesta_sessao_c4} processos lidos da API.")
        
        final_search_after_c4 = locals().get('search_after_coleta_inicial_c4')
        if final_search_after_c4 is not None:
            salvar_estado_coleta_inicial_mongo(TRIBUNAL_ALVO_MONGO_COLETA, final_search_after_c4)

!!! ATENÇÃO: MongoDB não está conectado. Pulando Célula 4. !!!


In [None]:
# Célula 5: Atualização Incremental de Processos e Movimentações (usando @timestamp da API e lendo do MongoDB)

# Funções utilitárias e variáveis globais (mongo_client, colecoes, etc.) devem estar definidas.

if 'mongo_client' not in locals() or mongo_client is None:
    print("!!! ATENÇÃO: MongoDB não está conectado. Pulando Célula 5. !!!")
else:
    print(f"\n--- Iniciando Atualização Incremental (API @timestamp) para: {TRIBUNAL_ALVO_MONGO_COLETA.upper()} (Célula 5) ---")
    
    ultimo_ts_api_processado_mongo_obj_c5 = None
    ultimo_ts_api_processado_mongo_str_c5 = None # String ISO para a query da API

    # 1. Buscar o @timestamp mais recente do MongoDB da coleção de processos
    try:
        # Certifique-se que o campo é "@timestamp" e que está indexado (Célula 3)
        documento_mais_recente_mongo = colecao_processos_mongo.find_one(
            filter={}, # Sem filtro específico, apenas ordenação
            sort=[("@timestamp", pymongo.DESCENDING)] 
        )
        
        if documento_mais_recente_mongo and "@timestamp" in documento_mais_recente_mongo and \
           isinstance(documento_mais_recente_mongo["@timestamp"], datetime):
            ultimo_ts_api_processado_mongo_obj_c5 = documento_mais_recente_mongo["@timestamp"]
            # Formatar para string ISO com fuso UTC e milissegundos para a query da API
            ultimo_ts_api_processado_mongo_str_c5 = ultimo_ts_api_processado_mongo_obj_c5.astimezone(timezone.utc).isoformat(timespec='milliseconds')
            print(f"  Último @timestamp da API encontrado no MongoDB: {ultimo_ts_api_processado_mongo_str_c5}")
        else:
            print("  Nenhum @timestamp válido encontrado no MongoDB para este tribunal. "
                  "Para atualização incremental, é esperado que a coleta inicial (Célula 4) já tenha rodado.")
            # Se for a primeira vez absoluta, talvez você queira um timestamp muito antigo.
            # Mas para uma atualização, é melhor parar se não houver base.
            # Ou usar um timestamp de "início dos tempos" se a coleção estiver vazia.
            if colecao_processos_mongo.count_documents({}) == 0:
                print("  Coleção de processos está vazia. Usando timestamp de início dos tempos para buscar tudo.")
                ultimo_ts_api_processado_mongo_obj_c5 = datetime(1970, 1, 1, tzinfo=timezone.utc)
                ultimo_ts_api_processado_mongo_str_c5 = ultimo_ts_api_processado_mongo_obj_c5.isoformat(timespec='milliseconds')
            else:
                 print("  A coleção de processos não está vazia, mas não foi possível determinar o último @timestamp.")
                 print("  Verifique se os documentos têm o campo '@timestamp' como objeto datetime.")
                 # Não prosseguir se não puder determinar o ponto de partida de forma segura
                 raise SystemExit("Não foi possível determinar o timestamp de partida para atualização.")


    except Exception as e_mongo_ts_c5:
        print(f"  ERRO ao buscar último @timestamp do MongoDB: {e_mongo_ts_c5}")
        raise SystemExit("Falha ao determinar o timestamp de partida para atualização.")


    print(f"  Buscando processos/atualizações na API com @timestamp > {ultimo_ts_api_processado_mongo_str_c5}")

    search_after_atualizacao_c5 = None 
    processos_api_lidos_nesta_sessao_c5 = 0
    
    # max_timestamp_api_visto_nesta_sessao_obj_c5 rastreia o mais recente DESTA atualização.
    # Inicia com o último conhecido do banco para garantir que só pegamos mais novos.
    max_timestamp_api_visto_nesta_sessao_obj_c5 = ultimo_ts_api_processado_mongo_obj_c5

    processos_batch_mongo_upsert_c5 = []
    movimentacoes_para_processar_c5 = {} # {processo_id: [lista de novas movs dict]}
    
    # BATCH_SIZE_MONGO_INSERT_C5 (pode ser menor que o da coleta inicial)
    BATCH_SIZE_MONGO_INSERT_C5 = 1000 # Ajuste conforme necessário

    try:
        while True:
            query_api_atualizacao = {
                "size": TAMANHO_PAGINA_API,
                "query": {
                    "range": {
                        "@timestamp": { 
                            "gt": ultimo_ts_api_processado_mongo_str_c5 
                        }
                    }
                },
                "sort": [{"@timestamp": "asc"}]
            }
            if search_after_atualizacao_c5:
                query_api_atualizacao["search_after"] = search_after_atualizacao_c5
            
            response_data_att = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_api_atualizacao)

            if response_data_att is None: break
            hits_api_att = response_data_att.get('hits', {}).get('hits', [])
            if not hits_api_att:
                print(f"  Nenhum novo processo ou atualização encontrado na API desde {ultimo_ts_api_processado_mongo_str_c5}.")
                break

            # Processar os hits (lógica de transformação e append aos batches como na Célula 4)
            for hit_api in hits_api_att:
                source_data_api = hit_api.get("_source", {})
                id_cnj_api = hit_api.get("_id") 
                if not id_cnj_api: continue
                id_processo_mongo = id_cnj_api 
                
                api_timestamp_str_doc = source_data_api.get("@timestamp")
                if api_timestamp_str_doc:
                    current_api_dt_doc = string_para_datetime_utc(api_timestamp_str_doc)
                    if current_api_dt_doc and \
                       (max_timestamp_api_visto_nesta_sessao_obj_c5 is None or \
                        current_api_dt_doc > max_timestamp_api_visto_nesta_sessao_obj_c5):
                        max_timestamp_api_visto_nesta_sessao_obj_c5 = current_api_dt_doc

                processo_doc_upsert = {"_id": id_processo_mongo}
                processo_doc_upsert.update(source_data_api)
                movs_api_para_processo = processo_doc_upsert.pop('movimentos', [])
                processo_doc_upsert = converter_datas_no_documento(processo_doc_upsert)
                processo_doc_upsert["_timestampColetaLocal"] = datetime.now(timezone.utc)
                processos_batch_mongo_upsert_c5.append(pymongo.ReplaceOne(
                    {"_id": id_processo_mongo}, processo_doc_upsert, upsert=True
                ))

                novas_movs_formatadas = []
                if movs_api_para_processo and isinstance(movs_api_para_processo, list):
                    for mov_idx, mov_api_data in enumerate(movs_api_para_processo):
                        if isinstance(mov_api_data, dict):
                            mov_doc = mov_api_data.copy()
                            mov_doc["processo_id"] = id_processo_mongo
                            mov_hash_parts = [
                                str(id_processo_mongo), str(mov_api_data.get("codigo")),
                                str(mov_api_data.get("dataHora")), 
                                json.dumps(mov_api_data.get("complementosTabelados"), sort_keys=True),
                                str(mov_idx) 
                            ]
                            mov_doc["_id"] = hashlib.sha1("".join(filter(None, mov_hash_parts)).encode('utf-8')).hexdigest()
                            mov_doc = converter_datas_no_documento(mov_doc)
                            mov_doc["_timestampColetaLocal"] = datetime.now(timezone.utc)
                            novas_movs_formatadas.append(mov_doc)
                movimentacoes_para_processar_c5[id_processo_mongo] = novas_movs_formatadas
            
            processos_api_lidos_nesta_sessao_c5 += len(hits_api_att)
            print(f"    Lidos {len(hits_api_att)} processos/atualizações da API. Total sessão: {processos_api_lidos_nesta_sessao_c5}")

            if len(processos_batch_mongo_upsert_c5) >= BATCH_SIZE_MONGO_INSERT_C5:
                if processos_batch_mongo_upsert_c5:
                    print(f"    Aplicando {len(processos_batch_mongo_upsert_c5)} upserts de processos...")
                    try: colecao_processos_mongo.bulk_write(processos_batch_mongo_upsert_c5, ordered=False)
                    except pymongo.errors.BulkWriteError as bwe_p: print(f"      Erro BW (proc): {bwe_p.details}")
                    processos_batch_mongo_upsert_c5 = []
                
                if movimentacoes_para_processar_c5:
                    print(f"    Atualizando movimentações para {len(movimentacoes_para_processar_c5)} processos...")
                    ops_mov_batch_c5 = []
                    for proc_id, movs_novas in movimentacoes_para_processar_c5.items():
                        ops_mov_batch_c5.append(pymongo.DeleteMany({"processo_id": proc_id}))
                        for mov_n in movs_novas:
                            ops_mov_batch_c5.append(pymongo.ReplaceOne({"_id": mov_n["_id"]}, mov_n, upsert=True))
                    if ops_mov_batch_c5:
                        try: colecao_movimentacoes_mongo.bulk_write(ops_mov_batch_c5, ordered=False)
                        except pymongo.errors.BulkWriteError as bwe_m: print(f"      Erro BW (mov): {bwe_m.details}")
                    movimentacoes_para_processar_c5 = {}
                print(f"    Lotes de atualização MongoDB processados.")

            search_after_atualizacao_c5 = hits_api_att[-1]['sort']
            # O estado da coleta inicial (search_after) não é salvo aqui.
            # Apenas o progresso desta atualização seria salvo se fosse um processo muito longo.
            # Como estamos buscando "novos desde o último X", o próximo run da Célula 5
            # vai buscar novamente o último @timestamp do Mongo.
            time.sleep(0.1)

    except Exception as e_fatal_att_c5:
        print(f"Erro fatal na atualização incremental (C5): {type(e_fatal_att_c5).__name__} - {e_fatal_att_c5}")
        import traceback
        traceback.print_exc()
    finally:
        # Processar batches restantes
        if processos_batch_mongo_upsert_c5:
            print(f"    Aplicando {len(processos_batch_mongo_upsert_c5)} upserts finais de processos...")
            # ... (lógica de bulk_write como acima)
            try: colecao_processos_mongo.bulk_write(processos_batch_mongo_upsert_c5, ordered=False)
            except pymongo.errors.BulkWriteError as bwe_p_f: print(f"      Erro BW (final proc): {bwe_p_f.details}")

        if movimentacoes_para_processar_c5:
            print(f"    Atualizando movimentações finais para {len(movimentacoes_para_processar_c5)} processos...")
            # ... (lógica de bulk_write como acima)
            ops_mov_batch_f_c5 = []
            for proc_id_f, movs_f_novas in movimentacoes_para_processar_c5.items():
                ops_mov_batch_f_c5.append(pymongo.DeleteMany({"processo_id": proc_id_f}))
                for mov_f_n in movs_f_novas:
                    ops_mov_batch_f_c5.append(pymongo.ReplaceOne({"_id": mov_f_n["_id"]}, mov_f_n, upsert=True))
            if ops_mov_batch_f_c5:
                try: colecao_movimentacoes_mongo.bulk_write(ops_mov_batch_f_c5, ordered=False)
                except pymongo.errors.BulkWriteError as bwe_m_f: print(f"      Erro BW (final mov): {bwe_m_f.details}")
            
        print(f"\nAtualização Incremental (C5 - sessão) finalizada. {processos_api_lidos_nesta_sessao_c5} processos/atualizações lidos da API.")
        
        # O estado da coleta inicial (search_after_coleta_inicial) não é modificado por esta célula.
        # O "estado" para a próxima execução da Célula 5 será lido diretamente do MongoDB.
        # Se você quisesse salvar o último @timestamp processado por esta célula em um arquivo,
        # poderia fazê-lo aqui, mas a ideia é depender do MongoDB.
        if max_timestamp_api_visto_nesta_sessao_obj_c5:
            print(f"  O @timestamp mais recente da API processado nesta sessão foi: {max_timestamp_api_visto_nesta_sessao_obj_c5.isoformat()}")

In [None]:
# Célula 6: Rotina para Atualizar Movimentações de Processos EXISTENTES no MongoDB

if mongo_client is None:
    print("MongoDB não está conectado. Pulando Célula 6.")
else:
    print(f"\n--- Iniciando Atualização de MOVIMENTAÇÕES em Processos Existentes no MongoDB para: {TRIBUNAL_ALVO_MONGO_COLETA.upper()} ---")

    # Data da última vez que esta rotina de atualização de movimentações rodou com sucesso
    # Isso deve ser armazenado de forma persistente (ex: em outra coleção de metadados ou arquivo)
    # Por agora, vamos simular ou buscar de um arquivo de estado específico para esta tarefa.
    path_estado_att_mov_mongo = os.path.join(ESTADO_BASE_PATH, f"att_mov_mongo_{TRIBUNAL_ALVO_MONGO_COLETA}_estado.json")
    
    data_ultima_varredura_mov_str = None
    if os.path.exists(path_estado_att_mov_mongo):
        with open(path_estado_att_mov_mongo, 'r') as f_att_mov:
            data_ultima_varredura_mov_str = json.load(f_att_mov).get("data_ultima_varredura_mov")
            if data_ultima_varredura_mov_str:
                 print(f"  Última varredura de movimentações em: {data_ultima_varredura_mov_str}")

    if not data_ultima_varredura_mov_str:
        print("  Nenhuma data de última varredura de movimentações encontrada. Verificando processos com base em um período recente ou todos.")
        # Para a primeira execução, você pode querer um critério diferente ou processar um subconjunto.
        # Aqui, vamos pegar processos atualizados na API desde o início dos tempos se não houver data.
        data_ultima_varredura_mov_str = datetime(1900,1,1,tzinfo=timezone.utc).isoformat()


    # 1. Buscar IDs de processos na API que foram atualizados desde a última varredura
    #    Usaremos o @timestamp do processo para isso.
    print(f"  Buscando processos na API atualizados desde: {data_ultima_varredura_mov_str}")
    ids_processos_para_rebuscar_api = []
    search_after_att_mov = None
    
    while True:
        query_api_att_mov = {
            "size": TAMANHO_PAGINA_API, # Pode ser maior aqui, já que só pegamos _id
            "_source": ["_id"], # Só precisamos do ID do processo da API nesta fase
            "query": {
                "range": {
                    "@timestamp": { # @timestamp do documento do processo na API
                        "gt": data_ultima_varredura_mov_str 
                    }
                }
            },
            "sort": [{"@timestamp": "asc"}]
        }
        if search_after_att_mov:
            query_api_att_mov["search_after"] = search_after_att_mov
        
        response_att_mov = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_api_att_mov)
        if response_att_mov is None or not response_att_mov.get('hits', {}).get('hits'):
            break
        
        hits_att_mov = response_att_mov['hits']['hits']
        for hit in hits_att_mov:
            ids_processos_para_rebuscar_api.append(hit.get("_id"))
        
        if len(hits_att_mov) < TAMANHO_PAGINA_API: break
        search_after_att_mov = hits_att_mov[-1]['sort']
        print(f"    Coletados {len(ids_processos_para_rebuscar_api)} IDs de processos atualizados na API...")

    print(f"  Total de {len(ids_processos_para_rebuscar_api)} processos identificados na API para re-verificação de movimentações.")

    # 2. Para cada ID de processo identificado, re-buscar o documento completo da API
    #    e atualizar no MongoDB (processo e suas movimentações)
    BATCH_SIZE_REBUSCA_MONGO = 50 # Quantos processos completos re-buscar da API por vez
    processos_atualizados_mov_sessao = 0
    
    for i in range(0, len(ids_processos_para_rebuscar_api), BATCH_SIZE_REBUSCA_MONGO):
        lote_ids_rebusca = ids_processos_para_rebuscar_api[i : i + BATCH_SIZE_REBUSCA_MONGO]
        print(f"\n  Rebuscando lote de {len(lote_ids_rebusca)} processos da API...")
        
        query_lote_rebusca = {
            "query": {"ids": {"values": lote_ids_rebusca}},
            "size": len(lote_ids_rebusca)
        }
        response_lote_completo = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_lote_rebusca)

        if response_lote_completo and response_lote_completo.get('hits', {}).get('hits'):
            hits_lote_completo = response_lote_completo['hits']['hits']
            
            processos_para_upsert_mongo = []
            movimentacoes_para_deletar_e_inserir_mongo = [] # Lista de (processo_id, lista_de_novas_movs)
            ids_processos_neste_lote_api = []

            for hit_completo in hits_lote_completo:
                source_completo = hit_completo.get("_source", {})
                id_cnj_completo = hit_completo.get("_id")
                if not id_cnj_completo: continue

                ids_processos_neste_lote_api.append(id_cnj_completo)
                id_processo_mongo_att = id_cnj_completo
                
                processo_doc_att = {"_id": id_processo_mongo_att}
                processo_doc_att.update(source_completo)
                movs_api_att = processo_doc_att.pop('movimentos', [])
                processo_doc_att = converter_datas_no_documento(processo_doc_att)
                processo_doc_att["timestampColetaLocal"] = datetime.now(timezone.utc) # Atualiza timestamp
                processos_para_upsert_mongo.append(pymongo.ReplaceOne({"_id": id_processo_mongo_att}, processo_doc_att, upsert=True))

                novas_movs_para_este_processo = []
                if movs_api_att:
                    for mov_idx_att, mov_api_att_data in enumerate(movs_api_att):
                        if isinstance(mov_api_att_data, dict):
                            mov_doc_att = mov_api_att_data.copy()
                            mov_doc_att["processo_id"] = id_processo_mongo_att
                            mov_doc_att = converter_datas_no_documento(mov_doc_att)
                            mov_doc_att["timestampColetaLocal"] = datetime.now(timezone.utc)
                            novas_movs_para_este_processo.append(mov_doc_att)
                if novas_movs_para_este_processo: # Mesmo se vazio, precisamos deletar as antigas
                     movimentacoes_para_deletar_e_inserir_mongo.append({"processo_id": id_processo_mongo_att, "novas_movs": novas_movs_para_este_processo})


            # Executar operações no MongoDB
            if processos_para_upsert_mongo:
                print(f"    Atualizando/Inserindo {len(processos_para_upsert_mongo)} processos no MongoDB...")
                colecao_processos_mongo.bulk_write(processos_para_upsert_mongo, ordered=False)
            
            if movimentacoes_para_deletar_e_inserir_mongo:
                print(f"    Atualizando movimentações para {len(movimentacoes_para_deletar_e_inserir_mongo)} processos...")
                for item_mov_att in movimentacoes_para_deletar_e_inserir_mongo:
                    pid = item_mov_att["processo_id"]
                    novas_movs = item_mov_att["novas_movs"]
                    # Deletar todas as movimentações antigas deste processo
                    colecao_movimentacoes_mongo.delete_many({"processo_id": pid})
                    # Inserir as novas movimentações (se houver)
                    if novas_movs:
                        colecao_movimentacoes_mongo.insert_many(novas_movs, ordered=False)
            
            processos_atualizados_mov_sessao += len(ids_processos_neste_lote_api)
            print(f"    Lote de {len(ids_processos_neste_lote_api)} processos teve movimentações atualizadas. Total sessão: {processos_atualizados_mov_sessao}")
        time.sleep(0.5) # Pausa entre lotes de rebusca

    # Salvar a data/hora atual como a última varredura
    with open(path_estado_att_mov_mongo, 'w') as f_att_mov:
        json.dump({"data_ultima_varredura_mov": datetime.now(timezone.utc).isoformat()}, f_att_mov)
    print(f"  Data da última varredura de movimentações atualizada.")
    print(f"\nAtualização de MOVIMENTAÇÕES (sessão) finalizada. {processos_atualizados_mov_sessao} processos verificados/atualizados.")

# --- Célula Final: Fechar Conexão MongoDB (se não for mais usar) ---
# if mongo_client:
#     mongo_client.close()
#     print("\nConexão com MongoDB fechada.")

In [None]:
# Célula Nova: Amostragem Estratificada de Processos da API vs MongoDB (COM AJUSTE PARA DATAS MIN/MAX)

import pandas as pd
from dateutil.relativedelta import relativedelta 

# Funções e variáveis globais (consultar_api_datajud_mongo, TRIBUNAL_ALVO_MONGO_COLETA, 
# HEADERS_API, mongo_client, colecao_processos_mongo, string_para_datetime_utc) devem estar definidas.

print(f"\n--- Iniciando Amostragem Estratificada para {TRIBUNAL_ALVO_MONGO_COLETA.upper()} (com ajuste de data min) ---")

if 'mongo_client' not in locals() or mongo_client is None:
    print("!!! ATENÇÃO: MongoDB não está conectado. Não é possível verificar o banco de dados. !!!")
    db_conectado_amostragem = False
else:
    db_conectado_amostragem = True

NUMERO_TOTAL_AMOSTRAS = 27000
# NUMERO_TOTAL_AMOSTRAS = 200 # Para teste rápido
NUMERO_DE_INTERVALOS_TEMPO = 100 
AMOSTRAS_POR_INTERVALO = NUMERO_TOTAL_AMOSTRAS // NUMERO_DE_INTERVALOS_TEMPO
if AMOSTRAS_POR_INTERVALO == 0: AMOSTRAS_POR_INTERVALO = 1 

processos_amostrados_api = []
processos_nao_encontrados_mongo_amostragem = []
total_consultados_api_amostragem = 0

try:
    # 1. Obter data de ajuizamento mínima (a partir do 100º registro) e máxima da API
    print("  Buscando data de ajuizamento mínima (a partir do 100º) e máxima da API...")
    
    # Para data mínima, pegar o 100º processo mais antigo
    # Fazemos uma query pedindo 1 processo, pulando os primeiros 99 (from: 99)
    query_min_data_offset = {
        "size": 1, 
        "from": 99, # Pula os primeiros 99 para pegar o 100º
        "sort": [{"dataAjuizamento": "asc"}], 
        "_source": ["dataAjuizamento", "numeroProcesso", "_id"] # Pegar ID para log se necessário
    }
    
    query_max_data = {
        "size": 1, 
        "sort": [{"dataAjuizamento": "desc"}], 
        "_source": ["dataAjuizamento"]
    }

    resp_min_data_offset = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_min_data_offset)
    resp_max_data = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_max_data)

    data_ajuizamento_min_str = None
    data_ajuizamento_max_str = None

    if resp_min_data_offset and resp_min_data_offset.get('hits', {}).get('hits'):
        hit_min = resp_min_data_offset['hits']['hits'][0]
        data_ajuizamento_min_str = hit_min['_source'].get('dataAjuizamento')
        print(f"    Data de ajuizamento do 100º processo mais antigo (usada como MÍNIMA): {data_ajuizamento_min_str}")
        print(f"      (ID do processo usado para data mínima: {hit_min.get('_id')})")
    else:
        print("    AVISO: Não foi possível obter o 100º processo mais antigo. Tentando o 1º.")
        # Fallback para o primeiro se não conseguir o 100º (ex: menos de 100 processos no total)
        query_min_data_primeiro = {"size": 1, "sort": [{"dataAjuizamento": "asc"}], "_source": ["dataAjuizamento"]}
        resp_min_data_primeiro = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_min_data_primeiro)
        if resp_min_data_primeiro and resp_min_data_primeiro.get('hits', {}).get('hits'):
            data_ajuizamento_min_str = resp_min_data_primeiro['hits']['hits'][0]['_source'].get('dataAjuizamento')
            print(f"    Data de ajuizamento do 1º processo mais antigo (usada como MÍNIMA): {data_ajuizamento_min_str}")


    if resp_max_data and resp_max_data.get('hits', {}).get('hits'):
        data_ajuizamento_max_str = resp_max_data['hits']['hits'][0]['_source'].get('dataAjuizamento')
        print(f"    Data de ajuizamento MÁXIMA: {data_ajuizamento_max_str}")


    if not data_ajuizamento_min_str or not data_ajuizamento_max_str:
        print("  ERRO: Não foi possível obter o intervalo de datas de ajuizamento da API.")
        raise SystemExit("Falha ao obter datas min/max.")

    data_min = string_para_datetime_utc(data_ajuizamento_min_str)
    data_max = string_para_datetime_utc(data_ajuizamento_max_str)
    
    if not data_min or not data_max or data_min > data_max: # Adicionada verificação de data_min > data_max
        print("  ERRO: Falha ao converter as strings de data min/max ou data mínima é maior que a máxima.")
        print(f"    Data min string: {data_ajuizamento_min_str} -> Convertida: {data_min}")
        print(f"    Data max string: {data_ajuizamento_max_str} -> Convertida: {data_max}")
        raise SystemExit("Falha na conversão ou lógica de datas.")

    print(f"  Intervalo de data de ajuizamento (ajustado) na API: {data_min.date()} a {data_max.date()}")

    # 2. Criar os intervalos de tempo (lógica mantida)
    delta_total_dias = (data_max - data_min).days
    if delta_total_dias <= 0 : delta_total_dias = 1
    
    tamanho_intervalo_dias = max(1, delta_total_dias // NUMERO_DE_INTERVALOS_TEMPO)
    
    print(f"  Delta total de dias (ajustado): {delta_total_dias}. Cada um dos {NUMERO_DE_INTERVALOS_TEMPO} intervalos terá aprox. {tamanho_intervalo_dias} dias.")
    print(f"  Tentando coletar {AMOSTRAS_POR_INTERVALO} amostras por intervalo.")

    data_inicio_intervalo = data_min
    
    for i in range(NUMERO_DE_INTERVALOS_TEMPO):
        if total_consultados_api_amostragem >= NUMERO_TOTAL_AMOSTRAS:
            print(f"  Limite de {NUMERO_TOTAL_AMOSTRAS} amostras atingido. Parando.")
            break

        data_fim_intervalo = data_inicio_intervalo + pd.Timedelta(days=tamanho_intervalo_dias)
        if data_fim_intervalo > data_max + pd.Timedelta(days=1):
            data_fim_intervalo = data_max + pd.Timedelta(days=1)
        
        if data_inicio_intervalo >= data_fim_intervalo:
            if data_inicio_intervalo <= data_max:
                 data_fim_intervalo = data_max + pd.Timedelta(days=1)
            else:
                break

        print(f"\n  Amostrando Intervalo {i+1}/{NUMERO_DE_INTERVALOS_TEMPO}: "
              f"{data_inicio_intervalo.strftime('%Y-%m-%dT%H:%M:%S.%fZ')} a "
              f"{data_fim_intervalo.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}")

        query_intervalo = {
            "size": AMOSTRAS_POR_INTERVALO,
            "query": {
                "range": {
                    "dataAjuizamento": {
                        "gte": data_inicio_intervalo.isoformat().replace('+00:00', 'Z'),
                        "lt": data_fim_intervalo.isoformat().replace('+00:00', 'Z')
                    }
                }
            },
            "sort": [{"@timestamp": "asc"}], 
            "_source": ["numeroProcesso", "@timestamp", "_id"] # Adicionado _id para consistência
        }
        
        response_intervalo = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_intervalo)
        
        if response_intervalo and response_intervalo.get('hits', {}).get('hits'):
            hits_intervalo = response_intervalo['hits']['hits']
            print(f"    API retornou {len(hits_intervalo)} processos para este intervalo.")
            
            for hit in hits_intervalo:
                if total_consultados_api_amostragem >= NUMERO_TOTAL_AMOSTRAS: break
                
                source = hit.get("_source", {})
                id_api_amostra = hit.get("_id") 
                num_proc_api_amostra = source.get("numeroProcesso")
                ts_api_amostra = source.get("@timestamp")
                
                id_mongo_check_amostra = id_api_amostra

                if id_mongo_check_amostra and ts_api_amostra:
                    processos_amostrados_api.append({
                        "id_mongo": id_mongo_check_amostra,
                        "numeroProcesso": num_proc_api_amostra,
                        "@timestamp_api": ts_api_amostra
                    })
                    total_consultados_api_amostragem += 1
        else:
            print(f"    Nenhum processo encontrado pela API para este intervalo.")

        data_inicio_intervalo = data_fim_intervalo 
        if data_inicio_intervalo > data_max:
            break 
        
        time.sleep(0.2)

    # 3. Verificar as amostras no MongoDB (lógica mantida)
    if processos_amostrados_api and db_conectado_amostragem:
        print(f"\n--- Verificando {len(processos_amostrados_api)} Processos Amostrados no MongoDB ---")
        encontrados_mongo_amostragem = 0
        
        for proc_amostra in processos_amostrados_api:
            doc_mongo = colecao_processos_mongo.find_one({"_id": proc_amostra["id_mongo"]})
            if doc_mongo:
                encontrados_mongo_amostragem += 1
            else:
                processos_nao_encontrados_mongo_amostragem.append(proc_amostra)
        
        print(f"\n  Resumo da Amostragem:")
        print(f"    - Total de amostras da API coletadas: {len(processos_amostrados_api)}")
        print(f"    - Amostras encontradas no MongoDB: {encontrados_mongo_amostragem}")
        print(f"    - Amostras NÃO encontradas no MongoDB: {len(processos_nao_encontrados_mongo_amostragem)}")

        if processos_nao_encontrados_mongo_amostragem:
            print("\n    Detalhe das amostras NÃO encontradas no MongoDB (primeiros 20):")
            for i_nao_enc, p_nao_enc in enumerate(processos_nao_encontrados_mongo_amostragem):
                if i_nao_enc >= 20:
                    print(f"      ... e mais {len(processos_nao_encontrados_mongo_amostragem) - 20} não listados aqui.")
                    break
                print(f"      - ID/Num: {p_nao_enc['id_mongo']} (@ts API: {p_nao_enc['@timestamp_api']})")
            
            if len(processos_nao_encontrados_mongo_amostragem) > 0:
                print("\n    CONCLUSÃO DA AMOSTRAGEM: Foram encontradas lacunas na sua base de dados local.")
            # (Resto da lógica de conclusão como antes)

        elif len(processos_amostrados_api) > 0 :
             print("\n    CONCLUSÃO DA AMOSTRAGEM: Todas as amostras coletadas da API foram encontradas no MongoDB.")
             print("    Isso é um bom indicativo, mas não uma garantia absoluta de 100% de integridade.")

    elif not db_conectado_amostragem:
        print("\n  Verificação no MongoDB não realizada (cliente não conectado).")
    elif not processos_amostrados_api:
        print("\n  Nenhuma amostra foi coletada da API para verificação.")

except SystemExit as se:
    print(f"Script interrompido: {se}")
except Exception as e_amostragem:
    print(f"ERRO GERAL durante a amostragem: {type(e_amostragem).__name__} - {e_amostragem}")
    import traceback
    traceback.print_exc()

In [None]:
# Célula Nova: Verificar Últimos Processos da API vs MongoDB

# As funções utilitárias (consultar_api_datajud_mongo) e variáveis globais 
# (TRIBUNAL_ALVO_MONGO_COLETA, HEADERS_API, mongo_client, colecao_processos_mongo)
# devem estar definidas e o mongo_client conectado.

print(f"\n--- Verificando os 20 Processos Mais Recentes da API para {TRIBUNAL_ALVO_MONGO_COLETA.upper()} vs MongoDB ---")

if 'mongo_client' not in locals() or mongo_client is None:
    print("!!! ATENÇÃO: MongoDB não está conectado. Não é possível verificar o banco de dados. !!!")
    # Você pode querer parar aqui ou apenas pular a verificação do MongoDB
    # raise SystemExit("MongoDB não conectado.")
    db_conectado = False
else:
    db_conectado = True

# 1. Consultar a API do CNJ pelos processos mais recentes
query_api_recentes = {
    "size": 20,
    "query": {"match_all": {}}, # Pegar todos, mas a ordenação trará os mais recentes
    "sort": [{"@timestamp": "desc"}], # Ordenar por @timestamp descendente para pegar os mais novos
    "_source": ["numeroProcesso", "@timestamp"] # Pedir apenas os campos que precisamos da API
                                            # Se você usa o hit['_id'] como ID no Mongo, peça ele também:
                                            # "_source": ["_id", "numeroProcesso", "@timestamp"] 
}

print(f"  Consultando API para obter os 20 processos mais recentes do {TRIBUNAL_ALVO_MONGO_COLETA.upper()}...")
response_data_recentes = consultar_api_datajud_mongo(TRIBUNAL_ALVO_MONGO_COLETA, query_api_recentes)

processos_recentes_api = []
if response_data_recentes and response_data_recentes.get('hits', {}).get('hits'):
    hits_recentes = response_data_recentes['hits']['hits']
    print(f"  API retornou {len(hits_recentes)} processos.")
    for hit in hits_recentes:
        source = hit.get("_source", {})
        id_api = hit.get("_id") # O ID completo da API, ex: TJPB_CLASSE_GRAU_ORGAO_NUMERO
        num_processo_api = source.get("numeroProcesso") # O número do processo CNJ formatado
        timestamp_api_str = source.get("@timestamp")
        
        # Decida qual ID você usa como chave primária (_id) no seu MongoDB
        # Se for o id_api (TJPB_CLASSE_...), use ele.
        # Se for o num_processo_api, use ele.
        # No seu código da Célula 4, você usou id_cnj_original_api (que é hit.get("_id")) como _id do Mongo.
        id_para_verificar_no_mongo = id_api 
        
        if id_para_verificar_no_mongo and timestamp_api_str:
            processos_recentes_api.append({
                "id_api_ou_numero": id_para_verificar_no_mongo,
                "numeroProcesso": num_processo_api, # Para referência
                "@timestamp_api": timestamp_api_str
            })
else:
    print("  Não foi possível obter os processos recentes da API ou nenhum foi retornado.")

# 2. Verificar se esses processos estão no MongoDB
if processos_recentes_api and db_conectado:
    print(f"\n  Verificando {len(processos_recentes_api)} processos recentes no MongoDB (Coleção: {COLECAO_PROCESSOS_MONGO_NOME}):")
    
    encontrados_no_mongo = 0
    nao_encontrados_no_mongo = []

    for proc_api in processos_recentes_api:
        id_check = proc_api["id_api_ou_numero"]
        # A query no MongoDB usa o campo que você definiu como _id na sua coleção de processos
        documento_mongo = colecao_processos_mongo.find_one({"_id": id_check}) 
        
        if documento_mongo:
            encontrados_no_mongo += 1
            print(f"    ENCONTRADO: Processo ID '{id_check}' (API @ts: {proc_api['@timestamp_api']}) existe no MongoDB.")
            # Você pode querer comparar o @timestamp da API com algum timestamp do seu documento Mongo
            # Por exemplo, se você armazena o @timestamp da API no seu documento Mongo:
            # ts_mongo_api = documento_mongo.get("@timestamp") # Supondo que você salvou o @timestamp original
            # if ts_mongo_api and ts_mongo_api != proc_api['@timestamp_api']:
            #     print(f"      AVISO: @timestamp diferente! API: {proc_api['@timestamp_api']}, MongoDB: {ts_mongo_api}")
        else:
            nao_encontrados_no_mongo.append(proc_api)
            print(f"    NÃO ENCONTRADO: Processo ID '{id_check}' (API @ts: {proc_api['@timestamp_api']}) NÃO existe no MongoDB.")

    print(f"\n  Resumo da Verificação:")
    print(f"    - Processos recentes da API analisados: {len(processos_recentes_api)}")
    print(f"    - Encontrados no MongoDB: {encontrados_no_mongo}")
    print(f"    - Não encontrados no MongoDB: {len(nao_encontrados_no_mongo)}")
    
    if nao_encontrados_no_mongo:
        print("\n    Detalhe dos processos NÃO encontrados no MongoDB:")
        for p_nao_enc in nao_encontrados_no_mongo:
            print(f"      - ID/Num: {p_nao_enc['id_api_ou_numero']}, API @timestamp: {p_nao_enc['@timestamp_api']}")
        print("\n    Se houver processos não encontrados, sua Célula 5 (atualização incremental)")
        print("    deveria pegá-los na próxima execução, assumindo que o 'ultimo_ts_api_processado_global_str_c5'")
        print("    no seu arquivo de estado é anterior ao @timestamp desses processos.")

elif not db_conectado:
    print("\n  Verificação no MongoDB não realizada pois o cliente não está conectado.")
else:
    print("\n  Nenhum processo recente obtido da API para verificar no MongoDB.")