# üß™ Test Mistral Document Annotation / OCR

Este notebook permite probar r√°pidamente el Document Annotation de Mistral sobre documentos PDF/DOCX.

**Caracter√≠sticas:**
- Soporte para documentos de m√°s de 8 p√°ginas (chunking autom√°tico)
- Consolidaci√≥n inteligente de chunks
- M√∫ltiples modelos de datos predefinidos
- Visualizaci√≥n de resultados

## 1. Setup e Imports

In [1]:
import os
import sys
import json
import base64
import tempfile
import shutil
import logging
from pathlib import Path
from typing import Type, Optional, Any, Dict, List
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager

from pydantic import BaseModel, Field
from PyPDF2 import PdfReader, PdfWriter
from mistralai import Mistral
from mistralai.extra import response_format_from_pydantic_model
from dotenv import load_dotenv
from IPython.display import display, JSON, Markdown

# Agregar el directorio ra√≠z al path para importar modelos
ROOT_DIR = Path(os.getcwd()).parent
if str(ROOT_DIR) not in sys.path:
    sys.path.insert(0, str(ROOT_DIR))

# Cargar variables de entorno
load_dotenv(ROOT_DIR / ".env")

# Configurar logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print(f"‚úÖ Root directory: {ROOT_DIR}")
print(f"‚úÖ MISTRAL_API_KEY configurada: {'S√≠' if os.getenv('MISTRAL_API_KEY') else 'No'}")

‚úÖ Root directory: d:\Users\iromero\OneDrive - Grupo Procaps\Previos\Documents\pharma_ai\ma_change_control_agent
‚úÖ MISTRAL_API_KEY configurada: S√≠


## 2. Importar Modelos de Datos Disponibles

In [2]:
# Importar modelos existentes del proyecto
try:
    from src.models.change_control import ChangeControlModel
    from src.models.analytical_method_models import MetodoAnaliticoDA, MetodoAnaliticoCompleto
    from src.models.side_by_side_model import SideBySideModel
    from src.models.structured_test_model import TestSolution, TestSolutions
    print("‚úÖ Modelos importados correctamente")
except ImportError as e:
    print(f"‚ö†Ô∏è Error importando modelos: {e}")
    print("Definiendo modelos b√°sicos de fallback...")

# Diccionario de modelos disponibles para selecci√≥n r√°pida
AVAILABLE_MODELS = {
    "change_control": ChangeControlModel,
    "analytical_method": MetodoAnaliticoDA,
    "analytical_method_complete": MetodoAnaliticoCompleto,
    "side_by_side": SideBySideModel,
    "test_solutions": TestSolutions,
}

print("\nüìã Modelos disponibles:")
for name, model in AVAILABLE_MODELS.items():
    print(f"   - {name}: {model.__name__}")

‚úÖ Modelos importados correctamente

üìã Modelos disponibles:
   - change_control: ChangeControlModel
   - analytical_method: MetodoAnaliticoDA
   - analytical_method_complete: MetodoAnaliticoCompleto
   - side_by_side: SideBySideModel
   - test_solutions: TestSolutions


## 3. Funciones de Utilidad para Procesamiento de PDFs

In [3]:
@contextmanager
def prepare_pdf_document(document_path: str):
    """Asegura que el documento est√© disponible como PDF, convirtiendo DOCX si es necesario."""
    if not document_path:
        raise ValueError("No se proporcion√≥ la ruta del documento a procesar.")

    resolved_path = Path(document_path)
    if not resolved_path.exists():
        raise FileNotFoundError(f"El documento {document_path} no existe.")

    suffix = resolved_path.suffix.lower()
    if suffix == ".pdf":
        yield str(resolved_path)
        return

    if suffix == ".docx":
        temp_dir = tempfile.mkdtemp(prefix="doc_annotation_")
        pdf_output_path = os.path.join(temp_dir, resolved_path.with_suffix(".pdf").name)
        try:
            from docx2pdf import convert as docx_to_pdf_convert
            logger.info(f"Convirtiendo DOCX a PDF: {document_path}")
            docx_to_pdf_convert(str(resolved_path), pdf_output_path)

            if not os.path.exists(pdf_output_path):
                raise RuntimeError(f"No se gener√≥ el archivo PDF convertido para {document_path}.")

            yield pdf_output_path
        finally:
            shutil.rmtree(temp_dir, ignore_errors=True)
        return

    raise ValueError(f"Formato de archivo no soportado: {suffix}. Solo PDF o DOCX.")


def get_pdf_page_count(pdf_path: str) -> int:
    """Obtiene el n√∫mero de p√°ginas de un PDF."""
    try:
        with open(pdf_path, "rb") as pdf_file:
            reader = PdfReader(pdf_file)
            return len(reader.pages)
    except Exception as e:
        logger.error(f"Error contando p√°ginas en {pdf_path}: {e}")
        return 0


def split_pdf_into_chunks(
    pdf_path: str, 
    max_pages_per_chunk: int = 8, 
    chunk_overlap_pages: int = 2
) -> list[str]:
    """Divide un PDF en chunks con overlap."""
    chunk_files: list[str] = []
    try:
        reader = PdfReader(pdf_path)
        total_pages = len(reader.pages)
        if total_pages == 0:
            return []

        overlap = max(chunk_overlap_pages, 0)
        chunk_size = max(max_pages_per_chunk, 1)
        step = max(chunk_size - overlap, 1)

        for start in range(0, total_pages, step):
            end = min(start + chunk_size, total_pages)
            chunk_writer = PdfWriter()
            for page_idx in range(start, end):
                chunk_writer.add_page(reader.pages[page_idx])

            with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as chunk_pdf:
                chunk_writer.write(chunk_pdf)
                chunk_files.append(chunk_pdf.name)

        logger.info(f"üìÑ PDF dividido en {len(chunk_files)} chunks (p√°ginas: {total_pages}, chunk_size: {chunk_size}, overlap: {overlap})")
        return chunk_files
    except Exception as e:
        logger.error(f"Error dividiendo PDF: {e}")
        for chunk_file in chunk_files:
            try:
                os.unlink(chunk_file)
            except OSError:
                pass
        return []


def encode_pdf(pdf_path: str) -> str:
    """Codifica un PDF a base64."""
    try:
        with open(pdf_path, "rb") as pdf_file:
            return base64.b64encode(pdf_file.read()).decode('utf-8')
    except Exception as e:
        logger.error(f"Error codificando PDF {pdf_path}: {e}")
        return None

print("‚úÖ Funciones de utilidad cargadas")

‚úÖ Funciones de utilidad cargadas


## 4. Funciones de Procesamiento con Mistral OCR

In [4]:
def process_chunk(
    pdf_path: str, 
    extraction_model: Type[BaseModel] = None,
    chunk_retry_backoff_seconds: int = 5, 
    chunk_retry_attempts: int = 3,
    include_image_base64: bool = False
):
    """Procesa un chunk de PDF con Mistral OCR."""
    import time
    
    base64_pdf = encode_pdf(pdf_path)
    if not base64_pdf:
        return None
    
    api_key = os.getenv("MISTRAL_API_KEY")
    if not api_key:
        raise EnvironmentError("Defina MISTRAL_API_KEY en el entorno o en el archivo .env")
    
    ocr_client = Mistral(api_key=api_key, timeout_ms=300000)

    request_params = {
        "model": "mistral-ocr-latest",
        "document": {
            "type": "document_url",
            "document_url": f"data:application/pdf;base64,{base64_pdf}"
        },
        "include_image_base64": include_image_base64,
    }

    if extraction_model:
        try:
            request_params["document_annotation_format"] = response_format_from_pydantic_model(extraction_model)
            logger.info(f"üéØ Usando modelo de extracci√≥n: {extraction_model.__name__}")
        except Exception as exc:
            logger.warning(f"No se pudo generar schema pydantic: {exc}")

    last_exception: Optional[Exception] = None
    total_attempts = max(chunk_retry_attempts, 1)

    for attempt in range(1, total_attempts + 1):
        try:
            return ocr_client.ocr.process(**request_params)
        except Exception as exc:
            last_exception = exc
            if attempt >= total_attempts:
                break
            wait_seconds = chunk_retry_backoff_seconds * attempt
            logger.warning(f"Reintentando chunk despu√©s de error: {exc}. Intento {attempt}/{chunk_retry_attempts} en {wait_seconds}s")
            time.sleep(wait_seconds)

    logger.error(f"Error procesando chunk {pdf_path}: {last_exception}")
    return None


def process_document(
    pdf_path: str,
    extraction_model: Type[BaseModel] = None,
    max_pages_per_chunk: int = 8,
    chunk_overlap_pages: int = 2,
    max_workers: int = 4,
    include_image_base64: bool = False
) -> list:
    """Procesa un PDF con chunking autom√°tico si es necesario."""
    total_pages = get_pdf_page_count(pdf_path)
    logger.info(f"üìÑ Procesando PDF con {total_pages} p√°ginas")
    
    if total_pages == 0:
        logger.error(f"No se pudieron leer p√°ginas del PDF")
        return []
    
    # Si el documento es peque√±o, procesar directamente
    if total_pages <= max_pages_per_chunk:
        logger.info(f"‚úÖ Documento peque√±o ({total_pages} p√°ginas), procesando directamente")
        result = process_chunk(pdf_path, extraction_model, include_image_base64=include_image_base64)
        return [result] if result else []
    
    # Dividir en chunks y procesar en paralelo
    logger.info(f"üì¶ Documento grande ({total_pages} p√°ginas), dividiendo en chunks...")
    chunk_files = split_pdf_into_chunks(
        pdf_path,
        max_pages_per_chunk=max_pages_per_chunk,
        chunk_overlap_pages=chunk_overlap_pages,
    )
    
    if not chunk_files:
        return []

    indexed_results: list[tuple[int, Any]] = []
    
    try:
        workers = max(1, min(max_workers, len(chunk_files)))
        logger.info(f"üöÄ Procesando {len(chunk_files)} chunks con {workers} workers...")
        
        with ThreadPoolExecutor(max_workers=workers) as executor:
            future_map = {
                executor.submit(
                    process_chunk,
                    chunk_file,
                    extraction_model,
                    5,
                    3,
                    include_image_base64
                ): (idx, chunk_file)
                for idx, chunk_file in enumerate(chunk_files)
            }

            for future in as_completed(future_map):
                idx, chunk_file = future_map[future]
                try:
                    result = future.result()
                    if result:
                        indexed_results.append((idx, result))
                        logger.info(f"‚úÖ Chunk {idx + 1}/{len(chunk_files)} procesado")
                except Exception as exc:
                    logger.error(f"‚ùå Error procesando chunk {idx + 1}: {exc}")
    finally:
        # Limpiar archivos temporales
        for chunk_file in chunk_files:
            try:
                os.unlink(chunk_file)
            except Exception:
                pass
    
    # Ordenar por √≠ndice
    indexed_results.sort(key=lambda item: item[0])
    return [result for _, result in indexed_results]

print("‚úÖ Funciones de procesamiento Mistral cargadas")

‚úÖ Funciones de procesamiento Mistral cargadas


## 5. Funciones de Consolidaci√≥n de Chunks

In [5]:
def _merge_list_items(target_list: list, source_list: list):
    """Mergea listas cuidando duplicados y combinando elementos dict similares."""
    for item in source_list:
        if item in (None, [], {}, ""):
            continue

        if isinstance(item, dict):
            existing = next((t for t in target_list if isinstance(t, dict) and t == item), None)
            if existing is not None:
                _merge_chunk_data(existing, item)
                continue

        if item not in target_list:
            target_list.append(item)


def _merge_chunk_data(target: dict, source: dict):
    """Mergea datos de un chunk con el diccionario consolidado."""
    for key, value in source.items():
        if value in (None, [], {}, ""):
            continue

        if key not in target or target[key] in (None, [], {}):
            target[key] = value
            continue

        target_value = target[key]

        if isinstance(target_value, list) and isinstance(value, list):
            _merge_list_items(target_value, value)
        elif isinstance(target_value, dict) and isinstance(value, dict):
            _merge_chunk_data(target_value, value)
        elif isinstance(target_value, str) and isinstance(value, str):
            # Preferir el texto m√°s largo para mantener contexto adicional
            if len(value.strip()) > len(target_value.strip()):
                target[key] = value
        else:
            target[key] = value


def consolidate_chunks_data(
    chunk_responses: list, 
    extraction_model: type[BaseModel] = None,
    return_raw: bool = False
) -> Dict[str, Any]:
    """Consolida los document_annotation de todos los chunks."""
    if not chunk_responses:
        logger.warning("No hay chunks para procesar")
        return None
    
    all_chunk_data = {}
    all_markdown = []
    
    for i, response in enumerate(chunk_responses):
        if not response:
            continue
        
        # Extraer markdown de cada p√°gina
        if hasattr(response, 'pages'):
            for page in response.pages:
                if hasattr(page, 'markdown') and page.markdown:
                    all_markdown.append(page.markdown)
        
        # Extraer document_annotation del chunk
        annotation_data = None
        if hasattr(response, 'document_annotation'):
            annotation_data = response.document_annotation
        elif isinstance(response, dict) and 'document_annotation' in response:
            annotation_data = response['document_annotation']
        
        if annotation_data:
            try:
                if isinstance(annotation_data, str):
                    chunk_data = json.loads(annotation_data)
                elif isinstance(annotation_data, dict):
                    chunk_data = annotation_data
                else:
                    chunk_data = json.loads(str(annotation_data))
                
                _merge_chunk_data(all_chunk_data, chunk_data)
                logger.debug(f"Merged chunk {i+1} data")
                
            except (json.JSONDecodeError, TypeError) as e:
                logger.warning(f"Error parsing chunk {i+1} annotation: {e}")
    
    result = {
        "consolidated_data": all_chunk_data,
        "markdown_combined": "\n\n---\n\n".join(all_markdown),
        "chunks_processed": len(chunk_responses),
    }
    
    # Intentar crear instancia del modelo Pydantic
    if all_chunk_data and extraction_model and not return_raw:
        try:
            model_instance = extraction_model(**all_chunk_data)
            result["model_instance"] = model_instance
            result["model_dict"] = model_instance.model_dump()
            logger.info(f"‚úÖ Creada instancia de {extraction_model.__name__}")
        except Exception as e:
            logger.warning(f"‚ö†Ô∏è No se pudo crear instancia del modelo: {e}")
            result["model_instance"] = None
            result["model_dict"] = all_chunk_data
    else:
        result["model_instance"] = None
        result["model_dict"] = all_chunk_data
    
    return result

print("‚úÖ Funciones de consolidaci√≥n cargadas")

‚úÖ Funciones de consolidaci√≥n cargadas


## 6. Funci√≥n Principal de Test

In [6]:
def test_document_annotation(
    document_path: str,
    model_name: str = None,
    custom_model: Type[BaseModel] = None,
    max_pages_per_chunk: int = 8,
    chunk_overlap_pages: int = 2,
    max_workers: int = 4,
    include_image_base64: bool = False,
    save_output: bool = True,
    output_dir: str = None
) -> Dict[str, Any]:
    """
    Funci√≥n principal para testear Document Annotation de Mistral.
    
    Args:
        document_path: Ruta al documento PDF o DOCX
        model_name: Nombre del modelo predefinido ('change_control', 'analytical_method', etc.)
        custom_model: Modelo Pydantic personalizado (tiene prioridad sobre model_name)
        max_pages_per_chunk: M√°ximo de p√°ginas por chunk
        chunk_overlap_pages: P√°ginas de overlap entre chunks
        max_workers: Workers para procesamiento paralelo
        include_image_base64: Incluir im√°genes en base64 en la respuesta
        save_output: Guardar resultados en archivos JSON
        output_dir: Directorio para guardar outputs
    
    Returns:
        Dict con resultados consolidados
    """
    import time
    start_time = time.time()
    
    # Seleccionar modelo
    extraction_model = None
    if custom_model:
        extraction_model = custom_model
    elif model_name and model_name in AVAILABLE_MODELS:
        extraction_model = AVAILABLE_MODELS[model_name]
    
    print(f"\n{'='*60}")
    print(f"üî¨ TEST MISTRAL DOCUMENT ANNOTATION")
    print(f"{'='*60}")
    print(f"üìÑ Documento: {document_path}")
    print(f"üéØ Modelo: {extraction_model.__name__ if extraction_model else 'Sin modelo (solo OCR)'}")
    print(f"üì¶ Config: {max_pages_per_chunk} p√°ginas/chunk, {chunk_overlap_pages} overlap")
    print(f"{'='*60}\n")
    
    results = {
        "document_path": document_path,
        "model_used": extraction_model.__name__ if extraction_model else None,
        "config": {
            "max_pages_per_chunk": max_pages_per_chunk,
            "chunk_overlap_pages": chunk_overlap_pages,
            "max_workers": max_workers
        },
        "success": False,
        "error": None
    }
    
    try:
        with prepare_pdf_document(document_path) as pdf_path:
            results["total_pages"] = get_pdf_page_count(pdf_path)
            
            # Procesar documento
            chunk_responses = process_document(
                pdf_path=pdf_path,
                extraction_model=extraction_model,
                max_pages_per_chunk=max_pages_per_chunk,
                chunk_overlap_pages=chunk_overlap_pages,
                max_workers=max_workers,
                include_image_base64=include_image_base64
            )
            
            results["chunks_processed"] = len(chunk_responses)
            
            # Consolidar resultados
            consolidated = consolidate_chunks_data(chunk_responses, extraction_model)
            
            results["consolidated_data"] = consolidated["model_dict"]
            results["markdown_combined"] = consolidated["markdown_combined"]
            results["model_instance"] = consolidated["model_instance"]
            results["success"] = True
            
    except Exception as e:
        results["error"] = str(e)
        logger.error(f"‚ùå Error: {e}")
    
    elapsed_time = time.time() - start_time
    results["elapsed_time_seconds"] = round(elapsed_time, 2)
    
    # Guardar resultados
    if save_output and results["success"]:
        output_dir = output_dir or str(ROOT_DIR / "notebooks" / "outputs")
        os.makedirs(output_dir, exist_ok=True)
        
        doc_name = Path(document_path).stem
        
        # Guardar JSON estructurado
        json_path = os.path.join(output_dir, f"{doc_name}_structured.json")
        with open(json_path, "w", encoding="utf-8") as f:
            json.dump(results["consolidated_data"], f, indent=2, ensure_ascii=False, default=str)
        
        # Guardar Markdown
        md_path = os.path.join(output_dir, f"{doc_name}_ocr.md")
        with open(md_path, "w", encoding="utf-8") as f:
            f.write(results["markdown_combined"])
        
        print(f"\nüíæ Resultados guardados en:")
        print(f"   - JSON: {json_path}")
        print(f"   - Markdown: {md_path}")
    
    # Resumen
    print(f"\n{'='*60}")
    print(f"üìä RESUMEN")
    print(f"{'='*60}")
    print(f"‚úÖ √âxito: {results['success']}")
    print(f"üìÑ P√°ginas totales: {results.get('total_pages', 'N/A')}")
    print(f"üì¶ Chunks procesados: {results.get('chunks_processed', 'N/A')}")
    print(f"‚è±Ô∏è Tiempo: {results['elapsed_time_seconds']}s")
    if results.get('error'):
        print(f"‚ùå Error: {results['error']}")
    print(f"{'='*60}\n")
    
    return results

print("‚úÖ Funci√≥n principal de test cargada")

‚úÖ Funci√≥n principal de test cargada


## 7. Funciones de Visualizaci√≥n

In [7]:
def show_results(results: Dict[str, Any], show_markdown: bool = False, max_json_depth: int = 3):
    """Muestra los resultados de forma visual."""
    if not results.get("success"):
        print(f"‚ùå El procesamiento fall√≥: {results.get('error')}")
        return
    
    print("\nüìã DATOS ESTRUCTURADOS EXTRA√çDOS:")
    print("-" * 40)
    
    # Mostrar JSON bien indentado
    print(json.dumps(results["consolidated_data"], indent=2, ensure_ascii=False, default=str))
    
    if show_markdown and results.get("markdown_combined"):
        print("\nüìù MARKDOWN OCR (primeros 2000 caracteres):")
        print("-" * 40)
        display(Markdown(results["markdown_combined"][:2000] + "..."))


def show_model_fields(model_name: str = None, custom_model: Type[BaseModel] = None):
    """Muestra los campos de un modelo de datos."""
    model = custom_model or AVAILABLE_MODELS.get(model_name)
    if not model:
        print(f"‚ùå Modelo no encontrado: {model_name}")
        return
    
    print(f"\nüìã Campos del modelo {model.__name__}:")
    print("-" * 40)
    
    for field_name, field_info in model.model_fields.items():
        field_type = str(field_info.annotation).replace("typing.", "")
        required = "‚úÖ" if field_info.is_required() else "‚¨ú"
        desc = field_info.description[:80] + "..." if field_info.description and len(field_info.description) > 80 else (field_info.description or "")
        print(f"{required} {field_name}: {field_type}")
        if desc:
            print(f"   ‚îî‚îÄ {desc}")

print("‚úÖ Funciones de visualizaci√≥n cargadas")

‚úÖ Funciones de visualizaci√≥n cargadas


---

# üß™ ZONA DE PRUEBAS

Modifica las celdas siguientes para probar tus documentos.

## Ejemplo 1: Ver campos de un modelo

In [8]:
# Ver campos disponibles de un modelo
show_model_fields("change_control")


üìã Campos del modelo ChangeControlModel:
----------------------------------------
‚¨ú codigo_solicitud: Optional[str]
   ‚îî‚îÄ C√≥digo de la solicitud de cambio, normalmente se encuentra en el encabezado del ...
‚¨ú fecha_solicitud: Optional[str]
   ‚îî‚îÄ Fecha de la solicitud.
‚¨ú nombre: Optional[str]
   ‚îî‚îÄ Nombre de la persona que presenta el cambio.
‚¨ú cargo: Optional[str]
   ‚îî‚îÄ Cargo de la persona que presenta el cambio. Puede ser Analistas, Jefes, Coordina...
‚¨ú titulo: Optional[str]
   ‚îî‚îÄ T√≠tulo del cambio. Puede ser el nombre de un producto o declarar el nombre del m...
‚¨ú fecha_aprobacion: Optional[str]
   ‚îî‚îÄ Fecha de aprobaci√≥n del cambio.
‚úÖ descripcion_cambio: List[src.models.change_control.DescripcionCambio]
   ‚îî‚îÄ Listado de descripciones de los diferentes cambios en las pruebas del m√©todo ana...
‚¨ú cliente: Optional[str]
   ‚îî‚îÄ Nombre del cliente. Se encuentra cerca del string 'CLIENTE'.
‚¨ú centro: Optional[str]
   ‚îî‚îÄ Nombre del ce

In [9]:
show_model_fields("analytical_method")


üìã Campos del modelo MetodoAnaliticoDA:
----------------------------------------
‚¨ú apis: Optional[List[str]]
   ‚îî‚îÄ Listado de ingredientes activos (APIs) del producto, si se indican en el m√©todo.
‚¨ú tipo_metodo: Optional[src.models.analytical_method_models.TipoMetodo]
   ‚îî‚îÄ Tipo de m√©todo seg√∫n el encabezado o portada. Seleccionar una de las opciones de...
‚¨ú nombre_producto: Optional[str]
   ‚îî‚îÄ Nombre comercial/t√©cnico del producto en el encabezado. Mantener potencia si est...
‚¨ú numero_metodo: Optional[str]
   ‚îî‚îÄ Identificador del m√©todo indicado como 'M√©todo No', 'C√ìDIGO', 'M√âTODO', etc. Ej....
‚¨ú version_metodo: Optional[str]
   ‚îî‚îÄ Versi√≥n del m√©todo indicada en el encabezado o portada. Ej.: '01'. Solo el valor...
‚¨ú codigo_producto: Optional[str]
   ‚îî‚îÄ C√≥digo del producto en el encabezado o tabla de alcance (columna 'C√≥digo'/'C√≥dig...
‚¨ú tabla_de_contenidos: Optional[List[str]]
   ‚îî‚îÄ Extracci√≥n EXHAUSTIVA y LITERAL de la estruct

## Ejemplo 2: Probar un documento con modelo predefinido

In [10]:
# ‚ö†Ô∏è MODIFICA ESTA RUTA con tu documento
DOCUMENT_PATH = r"D:/Users/iromero/OneDrive - Grupo Procaps/Portafolio NTF/16 - I&D 4.0/21. Template migration workflow/Caso Gestavit DHA/ANEXO NAPROXENO SODICO 100000346 -rotated-real.pdf"

# Modelos disponibles: 'change_control', 'analytical_method', 'analytical_method_complete', 'side_by_side', 'test_solutions'
MODEL_NAME = "side_by_side"

# Ejecutar test
results = test_document_annotation(
    document_path=DOCUMENT_PATH,
    model_name=MODEL_NAME,
    max_pages_per_chunk=8,
    chunk_overlap_pages=2,
    save_output=True
)

2025-12-11 15:29:35,162 - INFO - üìÑ Procesando PDF con 30 p√°ginas



üî¨ TEST MISTRAL DOCUMENT ANNOTATION
üìÑ Documento: D:/Users/iromero/OneDrive - Grupo Procaps/Portafolio NTF/16 - I&D 4.0/21. Template migration workflow/Caso Gestavit DHA/ANEXO NAPROXENO SODICO 100000346 -rotated-real.pdf
üéØ Modelo: SideBySideModel
üì¶ Config: 8 p√°ginas/chunk, 2 overlap



2025-12-11 15:29:35,163 - INFO - üì¶ Documento grande (30 p√°ginas), dividiendo en chunks...
2025-12-11 15:29:35,239 - INFO - üìÑ PDF dividido en 5 chunks (p√°ginas: 30, chunk_size: 8, overlap: 2)
2025-12-11 15:29:35,240 - INFO - üöÄ Procesando 5 chunks con 4 workers...
2025-12-11 15:29:35,461 - INFO - üéØ Usando modelo de extracci√≥n: SideBySideModel
2025-12-11 15:29:35,470 - INFO - üéØ Usando modelo de extracci√≥n: SideBySideModel
2025-12-11 15:29:35,476 - INFO - üéØ Usando modelo de extracci√≥n: SideBySideModel
2025-12-11 15:29:35,482 - INFO - üéØ Usando modelo de extracci√≥n: SideBySideModel
2025-12-11 15:30:01,373 - INFO - HTTP Request: POST https://api.mistral.ai/v1/ocr "HTTP/1.1 200 OK"
2025-12-11 15:30:01,386 - INFO - ‚úÖ Chunk 4/5 procesado
2025-12-11 15:30:01,453 - INFO - üéØ Usando modelo de extracci√≥n: SideBySideModel
2025-12-11 15:30:16,120 - INFO - HTTP Request: POST https://api.mistral.ai/v1/ocr "HTTP/1.1 200 OK"
2025-12-11 15:30:16,305 - INFO - ‚úÖ Chunk 2/5 pr


üíæ Resultados guardados en:
   - JSON: d:\Users\iromero\OneDrive - Grupo Procaps\Previos\Documents\pharma_ai\ma_change_control_agent\notebooks\outputs\ANEXO NAPROXENO SODICO 100000346 -rotated-real_structured.json
   - Markdown: d:\Users\iromero\OneDrive - Grupo Procaps\Previos\Documents\pharma_ai\ma_change_control_agent\notebooks\outputs\ANEXO NAPROXENO SODICO 100000346 -rotated-real_ocr.md

üìä RESUMEN
‚úÖ √âxito: True
üìÑ P√°ginas totales: 30
üì¶ Chunks procesados: 5
‚è±Ô∏è Tiempo: 85.52s



In [11]:
# Visualizar resultados
show_results(results, show_markdown=True)


üìã DATOS ESTRUCTURADOS EXTRA√çDOS:
----------------------------------------
{
  "extract_tests_sbs": {
    "control_cambio": "SC-25-777",
    "nombre_anexo": "ANEXO 1",
    "codigo_producto": 100000346,
    "pagina": 26,
    "pruebas_metodo_actual": [
      {
        "test_name": "Descripci√≥n (Interna)",
        "test_type": "Descripci√≥n",
        "condiciones_cromatograficas": null,
        "soluciones": null,
        "procedimiento": {
          "texto": "Tomar una porci√≥n de la muestra, extenderla sobre una caja de Petri y observar sus caracter√≠sticas. Debe estar libre de material extra√±o.",
          "notas": null,
          "tiempo_retencion": null
        },
        "criterio_aceptacion": {
          "texto": "Polvo de color blanco o crema, untuoso al tacto, libre de material extra√±o.",
          "notas": null,
          "tabla_criterios": null
        },
        "equipos": null,
        "reactivos": null,
        "procedimiento_sst": null
      },
      {
        "test_

![img-0.jpeg](img-0.jpeg)

P√°gina 1 de 31

# ANEXO I

## CONTROL DE CAMBIO SC-25-777

### CUADRO COMPARATIVO DE PRUEBAS PARA LA MATERIA PRIMA

#### NAPROXENO SODICO 100000346

|  M√âTODO DE AN√ÅLISIS | MODIFICACI√ìN PROPUESTA  |
| --- | --- |
|  **M√âTODO DE AN√ÅLISIS 100000346**
10-0514 Versi√≥n 01
(VERSI√ìN ACTUAL) | **M√âTODO DE AN√ÅLISIS 100000346**
10-0514 Versi√≥n 02
(NUEVA VERSI√ìN)  |
|  **DESARROLLO** |   |
|  **NOMBRE DEL MATERIAL:** NAPROXEN SODICO. | **NOMBRE DEL MATERIAL:** NAPROXENO SODICO.  |
|  **DESCRIPCI√ìN (INTERNA)** | **DESCRIPCI√ìN (USP)**  |
|  Tomar una porci√≥n de la muestra, extenderla sobre una caja de Petri y observar sus caracter√≠sticas. Debe estar libre de material extra√±o. | Tomar una porci√≥n de la muestra, extenderla sobre una caja de Petri y observar sus caracter√≠sticas. Debe estar libre de material extra√±o.  |
|  Criterio de aceptaci√≥n: Polvo de color blanco o crema, untuoso al tacto, libre de material extra√±o. | Criterio de aceptaci√≥n: Polvo cristalino de color blanco a cremoso.  |
|  **PUNTO DE FUSI√ìN (USP)** | **PUNTO DE FUSI√ìN (USP)**  |
|  Colocar un poco de la muestra en un tubo capilar, llevar a un fusi√≥metro debidamente calibrado y determinar el punto de fusi√≥n. Funde cerca de 255¬∞C con descomposici√≥n. | Colocar un poco de la muestra en un tubo capilar, llevar a un fusi√≥metro debidamente calibrado y determinar el punto de fusi√≥n. Funde cerca de 255¬∞C con descomposici√≥n.  |
|  Criterio de aceptaci√≥n: Funde aproximadamente a 255¬∞C. | Criterio de aceptaci√≥n: Funde aproximadamente a 255¬∞C, con descomposici√≥n.  |
|  **IDENTIFICACI√ìN A (IR) (Cofa)** | **IDENTIFICACI√ìN A (IR) (USP)**  |
|  Colocar una peque√±a cantidad de la muestra, previamente pulverizada y mezclada con bromuro de potasio en un mortero, en el dispositivo de Reflectancia Total Atenuada (ATR). Obtener el espectro IR de la muestra y compararlo con el espectro obtenido a partir de un est√°ndar de referencia bajo las mismas condiciones de tratamiento y equipo. Verificar que el espect...

## Ejemplo 3: Probar solo OCR (sin modelo estructurado)

In [None]:
# ‚ö†Ô∏è MODIFICA ESTA RUTA con tu documento
DOCUMENT_PATH = r"D:/Users/iromero/OneDrive - Grupo Procaps/Portafolio NTF/16 - I&D 4.0/21. Template migration workflow/Caso Gestavit DHA/ANEXO NAPROXENO SODICO 100000346 -rotated-real.pdf"

# Sin modelo = solo OCR a Markdown
results_ocr = test_document_annotation(
    document_path=DOCUMENT_PATH,
    model_name=None,  # Sin modelo
    max_pages_per_chunk=50,
    save_output=True
)

In [None]:
# Ver el markdown extra√≠do
if results_ocr.get("success"):
    display(Markdown(results_ocr["markdown_combined"])) #display(Markdown(results_ocr["markdown_combined"][:5000]))

## Ejemplo 4: Definir un modelo personalizado

In [None]:
# Define tu propio modelo Pydantic
class MiModeloPersonalizado(BaseModel):
    """Modelo personalizado para extracci√≥n."""
    titulo: Optional[str] = Field(None, description="T√≠tulo del documento")
    fecha: Optional[str] = Field(None, description="Fecha del documento")
    autor: Optional[str] = Field(None, description="Autor del documento")
    resumen: Optional[str] = Field(None, description="Resumen del contenido")
    secciones: Optional[List[str]] = Field(None, description="Lista de secciones principales")
    conclusiones: Optional[str] = Field(None, description="Conclusiones del documento")

# Ver campos
show_model_fields(custom_model=MiModeloPersonalizado)

In [None]:
# ‚ö†Ô∏è MODIFICA ESTA RUTA con tu documento
DOCUMENT_PATH = r"C:\ruta\a\tu\documento.pdf"

# Usar modelo personalizado
results_custom = test_document_annotation(
    document_path=DOCUMENT_PATH,
    custom_model=MiModeloPersonalizado,
    max_pages_per_chunk=8,
    save_output=True
)

In [None]:
show_results(results_custom)

## Ejemplo 5: Comparar diferentes configuraciones de chunking

In [None]:
# ‚ö†Ô∏è MODIFICA ESTA RUTA con tu documento
DOCUMENT_PATH = r"C:\ruta\a\tu\documento.pdf"

configs = [
    {"max_pages_per_chunk": 4, "chunk_overlap_pages": 1},
    {"max_pages_per_chunk": 8, "chunk_overlap_pages": 2},
    {"max_pages_per_chunk": 12, "chunk_overlap_pages": 3},
]

comparison_results = []

for config in configs:
    print(f"\nüîÑ Probando config: {config}")
    result = test_document_annotation(
        document_path=DOCUMENT_PATH,
        model_name="analytical_method",
        save_output=False,
        **config
    )
    comparison_results.append({
        "config": config,
        "chunks": result.get("chunks_processed"),
        "time": result.get("elapsed_time_seconds"),
        "success": result.get("success")
    })

print("\nüìä COMPARACI√ìN DE CONFIGURACIONES:")
print("-" * 60)
for r in comparison_results:
    print(f"Config: {r['config']} | Chunks: {r['chunks']} | Tiempo: {r['time']}s | √âxito: {r['success']}")

## Ejemplo 6: Acceder a datos espec√≠ficos del modelo

In [None]:
# Despu√©s de ejecutar un test, puedes acceder a campos espec√≠ficos
if 'results' in dir() and results.get("success"):
    data = results["consolidated_data"]
    
    # Ejemplo para ChangeControlModel
    if results.get("model_used") == "ChangeControlModel":
        print("üìã C√≥digo de solicitud:", data.get("codigo_solicitud"))
        print("üìÖ Fecha:", data.get("fecha_solicitud"))
        print("üìù T√≠tulo:", data.get("titulo"))
        
        if data.get("descripcion_cambio"):
            print(f"\nüîÑ Cambios encontrados: {len(data['descripcion_cambio'])}")
            for i, cambio in enumerate(data["descripcion_cambio"][:3], 1):
                print(f"   {i}. {cambio.get('prueba', 'N/A')}: {cambio.get('texto', 'N/A')[:100]}...")
    
    # Ejemplo para MetodoAnaliticoDA
    elif results.get("model_used") == "MetodoAnaliticoDA":
        print("üìã Nombre producto:", data.get("nombre_producto"))
        print("üìÑ N√∫mero m√©todo:", data.get("numero_metodo"))
        print("üìå Versi√≥n:", data.get("version_metodo"))
        print("üéØ Objetivo:", data.get("objetivo", "")[:200] + "..." if data.get("objetivo") else "N/A")
else:
    print("‚ö†Ô∏è Ejecuta primero un test para ver los datos")

---

## üìö Referencia R√°pida

### Modelos disponibles:
- `change_control`: Para documentos de Control de Cambios
- `analytical_method`: Para m√©todos anal√≠ticos (sin markdown)
- `analytical_method_complete`: Para m√©todos anal√≠ticos (con markdown completo)
- `side_by_side`: Para comparativos lado a lado
- `test_solutions`: Para pruebas/tests anal√≠ticos

### Par√°metros importantes:
- `max_pages_per_chunk`: M√°ximo de p√°ginas por chunk (default: 8)
- `chunk_overlap_pages`: P√°ginas de solapamiento entre chunks (default: 2)
- `max_workers`: N√∫mero de workers para procesamiento paralelo (default: 4)
- `include_image_base64`: Incluir im√°genes en base64 (default: False)

### Tips:
1. Para documentos muy largos, aumenta `max_pages_per_chunk` para reducir el n√∫mero de llamadas API
2. El `chunk_overlap_pages` ayuda a no perder informaci√≥n en los bordes de los chunks
3. Usa `save_output=True` para guardar los resultados y revisarlos despu√©s
4. Define modelos personalizados para extraer exactamente lo que necesitas