In [25]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Procesamiento de Pliegos de Licitación

Este notebook demuestra el flujo completo para:
1. Extraer texto de PDFs de pliegos de licitación
2. Procesar y analizar el contenido
3. Identificar secciones clave y entidades importantes
4. Generar un reporte estructurado

### Configuración Inicial

In [26]:
import fitz  # PyMuPDF
import pdfplumber
import pytesseract # OCR
from PIL import Image
import io
import re
import os
import sys
from pathlib import Path
import json
import datetime
from typing import Dict, List


In [27]:
from src.utils.pdf_extractor import PDFTextExtractor, extract_text_from_pdf
from src.utils.text_processor import TextProcessor, clean_contract_text, analyze_contract_sections


In [28]:
# Agregar src al path para importar nuestros módulos
module_path = str(Path.cwd().parent / "src")
if module_path not in sys.path:
    sys.path.append(module_path)

### Extracción de Texto desde PDF

In [29]:
def process_pdf_file(pdf_path: str, use_ocr: bool = False) -> dict:
    """
    Procesa un archivo PDF de pliegos y devuelve un diccionario estructurado con los resultados
    
    Args:
        pdf_path: Ruta al archivo PDF
        use_ocr: Si se debe forzar el uso de OCR
        
    Returns:
        Diccionario con los resultados del procesamiento
    """

    # Validar que el archivo existe
    if not os.path.exists(pdf_path):
        print(f"Error: El archivo {pdf_path} no existe")
        return None
    
    results = {
        "file_name": os.path.basename(pdf_path),
        "raw_text": "",
        "sections": [],
        "entities": {},
        "processing_errors": []
    }
    
    try:
        # 1. Extraer texto del PDF
        extractor = PDFTextExtractor()
        raw_text = extractor.extract_text(pdf_path, use_ocr)
        results["raw_text"] = raw_text
        
        # 2. Limpiar y procesar el texto
        processor = TextProcessor(language='es')
        clean_text = processor.clean_text(raw_text)
        
        # 3. Identificar secciones
        sections = processor.detect_sections(clean_text)
        results["sections"] = [{
            "name": section.name,
            "content": section.content,
            "start_pos": section.start_pos,
            "end_pos": section.end_pos
        } for section in sections]
        
        # 4. Extraer entidades clave
        results["entities"] = processor.extract_key_entities(clean_text)
        
    except Exception as e:
        error_msg = f"Error procesando {pdf_path}: {str(e)}"
        print(error_msg)
        results["processing_errors"].append(error_msg)
    return results
    

### Ejemplo de uso

In [30]:
pdf_path = "../data/PLIEGO-LICO-V-2023-001.pdf"

# Procesar el PDF
analysis_results = process_pdf_file(pdf_path)

# Mostrar resultados preliminares
print(f"Procesado: {analysis_results['file_name']}")
print(f"Secciones identificadas: {len(analysis_results['sections'])}")
print(f"Entidades encontradas:")
for entity_type, values in analysis_results["entities"].items():
    print(f"- {entity_type}: {len(values)}")


Procesado: PLIEGO-LICO-V-2023-001.pdf
Secciones identificadas: 3
Entidades encontradas:
- amounts: 1006
- clauses: 68


### Analisis Detallado 

In [31]:
def analyze_sections(sections: list) -> dict:
    """
    Realiza un análisis detallado de cada sección del contrato
    
    Args:
        sections: Lista de secciones identificadas
        
    Returns:
        Diccionario con análisis por sección
    """
    section_analysis = {}
    processor = TextProcessor(language='es')
    
    for section in sections:
        section_name = section["name"]
        content = section["content"]
        
        # Extraer entidades específicas por sección
        entities = processor.extract_key_entities(content)
        
        # Análisis especial para secciones clave
        special_flags = []
        
        # Manejo seguro de montos
        if "monto" in section_name.lower():
            amount_msg = "No encontrado"
            if "amounts" in entities and entities["amounts"]:
                try:
                    amount_msg = max(entities["amounts"], 
                                   key=lambda x: float(re.sub(r'[^\d,]', '', x).replace(',', '.')))
                except:
                    amount_msg = entities["amounts"][0]  # Usar el primero si no se puede calcular max
            special_flags.append(("Monto principal", amount_msg))
        
        # Manejo seguro de fechas
        if "plazo" in section_name.lower():
            date_msg = "No encontrado"
            if "dates" in entities and entities["dates"]:
                try:
                    # Convertir a objetos date para encontrar el máximo
                    date_objects = []
                    for date_str in entities["dates"]:
                        try:
                            # Probar formato dd/mm/aaaa
                            date_objects.append(datetime.datetime.strptime(date_str, '%d/%m/%Y'))
                        except:
                            try:
                                # Probar formato dd-mm-aaaa
                                date_objects.append(datetime.datetime.strptime(date_str, '%d-%m-%Y'))
                            except:
                                pass
                    
                    if date_objects:
                        date_msg = max(date_objects).strftime('%d/%m/%Y')
                except:
                    date_msg = entities["dates"][0]  # Usar el primero si no se puede calcular max
            special_flags.append(("Fecha límite", date_msg))
        
        # Crear estructura de resultados para la sección
        section_analysis[section_name] = {
            "length": len(content),
            "entity_counts": {k: len(v) for k, v in entities.items()},
            "special_flags": special_flags,
            "sample_content": content[:200] + "..." if len(content) > 200 else content,
            "all_entities": entities  # Opcional: incluir todas las entidades para depuración
        }
    
    return section_analysis

In [32]:
# Ejecutar análisis de secciones

DEBUG_MODE = True

if analysis_results.get("sections"):  # Usar .get() para evitar KeyError
    try:
        section_analysis = analyze_sections(analysis_results["sections"])
        
        # Mostrar resultados
        print("\nAnálisis por sección:")
        for section_name, analysis in section_analysis.items():
            print(f"\n=== {section_name.upper()} ===")
            print(f"Longitud: {analysis['length']} caracteres")
            
            # Mostrar conteo de entidades de manera segura
            entity_counts = analysis.get('entity_counts', {})
            print("Entidades encontradas:")
            for entity_type, count in entity_counts.items():
                print(f"- {entity_type}: {count}")
            
            # Mostrar datos destacados si existen
            if analysis.get('special_flags'):
                print("\nDatos destacados:")
                for flag in analysis['special_flags']:
                    print(f"- {flag[0]}: {flag[1]}")
            
            print(f"\nMuestra de contenido:\n{analysis.get('sample_content', 'No disponible')}")
            
            # Opcional: mostrar todas las entidades para depuración
            if DEBUG_MODE:
                print("\nTodas las entidades:")
                for entity_type, values in analysis.get('all_entities', {}).items():
                    print(f"{entity_type.upper()}:")
                    for value in values[:5]:  # Mostrar solo las primeras 5 de cada tipo
                        print(f"- {value}")
                    if len(values) > 5:
                        print(f"- ...({len(values)-5} más)")
    
    except Exception as e:
        print(f"\nError durante el análisis de secciones: {str(e)}")
        if DEBUG_MODE:
            import traceback
            traceback.print_exc()


Análisis por sección:

=== OBJETO ===
Longitud: 1991 caracteres
Entidades encontradas:
- amounts: 29

Muestra de contenido:
objeto del contrato), durante el período que dure la ejecución de la obra, con (nombre del oferente) en el caso de que suscriba el contrato de ejecución de las obras. Declaro bajo juramento que no ten...

Todas las entidades:
AMOUNTS:
- 023
- 001
- 023
- 001
- 2
- ...(24 más)

=== PLAZO ===
Longitud: 45 caracteres
Entidades encontradas:
- amounts: 5

Datos destacados:
- Fecha límite: No encontrado

Muestra de contenido:
Plazo de ejecución 3.6.​ Forma de pago 3.6.1.

Todas las entidades:
AMOUNTS:
- 3
- 6
- 3
- 6
- 1

=== ANTICIPO ===
Longitud: 10698 caracteres
Entidades encontradas:
- amounts: 100
- clauses: 9

Muestra de contenido:
Anticipo: 3.6.2. Valor restante de la obra SECCIÓN IV VERIFICACIÓN Y EVALUACIÓN DE LAS OFERTAS 4.1. Verificación de las ofertas 4.1.1. Integridad de las ofertas 4.1.2. Equipo mínimo 4.1.3. Personal té...

Todas las entidades:
AMOUNTS:


### Visualización de Resultados

In [33]:
import pandas as pd
from IPython.display import display, HTML

#### Tabla de Resumen

In [34]:
def generate_summary_table(analysis_results: dict) -> pd.DataFrame:
    """
    Genera un resumen tabular de los resultados del análisis
    
    Args:
        analysis_results: Resultados del procesamiento
        
    Returns:
        DataFrame con el resumen
    """
    summary_data = []
    
    # Resumen general
    summary_data.append({
        "Metrica": "Archivo procesado",
        "Valor": analysis_results["file_name"]
    })
    
    summary_data.append({
        "Metrica": "Secciones identificadas",
        "Valor": len(analysis_results["sections"])
    })
    
    # Conteo de entidades
    for entity_type, values in analysis_results["entities"].items():
        summary_data.append({
            "Metrica": f"Entidades ({entity_type})",
            "Valor": len(values)
        })
    
    # Errores de procesamiento
    if analysis_results["processing_errors"]:
        summary_data.append({
            "Metrica": "Errores encontrados",
            "Valor": len(analysis_results["processing_errors"])
        })
    
    return pd.DataFrame(summary_data)

In [35]:
# Generar y mostrar tabla de resumen
summary_df = generate_summary_table(analysis_results)
display(HTML(summary_df.to_html(index=False)))

Metrica,Valor
Archivo procesado,PLIEGO-LICO-V-2023-001.pdf
Secciones identificadas,3
Entidades (amounts),1006
Entidades (clauses),68


#### Tabla de Entidades

In [36]:
def generate_entities_table(entities: dict) -> pd.DataFrame:
    """
    Genera una tabla con las entidades encontradas
    
    Args:
        entities: Diccionario de entidades
        
    Returns:
        DataFrame con las entidades
    """
    entities_list = []
    
    for entity_type, values in entities.items():
        for value in values:
            entities_list.append({
                "Tipo": entity_type,
                "Valor": value
            })
    
    return pd.DataFrame(entities_list)


In [37]:
# Generar y mostrar tabla de entidades
if analysis_results["entities"]:
    entities_df = generate_entities_table(analysis_results["entities"])
    display(HTML(entities_df.to_html(index=False)))

Tipo,Valor
amounts,023
amounts,001
amounts,023
amounts,001
amounts,2
amounts,1
amounts,2
amounts,2
amounts,2
amounts,3


### Exportación de Resultados

In [38]:
def save_analysis_results(results: dict, output_dir: str = "results"):
    """
    Guarda los resultados del análisis en archivos JSON y TXT
    
    Args:
        results: Diccionario con los resultados
        output_dir: Directorio de salida
    """
    os.makedirs(output_dir, exist_ok=True)
    base_name = os.path.splitext(results["file_name"])[0]
    
    # Guardar JSON completo
    json_path = os.path.join(output_dir, f"{base_name}_analysis.json")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)
    
    # Guardar texto estructurado
    txt_path = os.path.join(output_dir, f"{base_name}_sections.txt")
    with open(txt_path, 'w', encoding='utf-8') as f:
        f.write(f"=== ANÁLISIS DE {results['file_name'].upper()} ===\n\n")
        
        if results["sections"]:
            for section in results["sections"]:
                f.write(f"\n\n=== SECCIÓN: {section['name'].upper()} ===\n")
                f.write(section["content"])
        
        if results["entities"]:
            f.write("\n\n=== ENTIDADES CLAVE ===\n")
            for entity_type, values in results["entities"].items():
                f.write(f"\n{entity_type.upper()}:\n")
                f.write("\n".join(f"- {v}" for v in values))
    
    print(f"Resultados guardados en:\n- {json_path}\n- {txt_path}")


In [39]:
# Ejemplo de guardado de resultados
save_analysis_results(analysis_results)

Resultados guardados en:
- results\PLIEGO-LICO-V-2023-001_analysis.json
- results\PLIEGO-LICO-V-2023-001_sections.txt


#### Análisis Comparativo

In [43]:
def compare_bid_with_rfp(bid_pdf_path: str, rfp_pdf_path: str) -> dict:
    """
    Compara una oferta (bid) con el pliego de condiciones (RFP)
    
    Args:
        bid_pdf_path: Ruta al PDF de la oferta
        rfp_pdf_path: Ruta al PDF del pliego
        
    Returns:
        Diccionario con resultados de la comparación o None si hay error
    """
    # Validar que los archivos existen
    if not os.path.exists(bid_pdf_path):
        print(f"❌ Error: Archivo de oferta no encontrado: {bid_pdf_path}")
        return None
    if not os.path.exists(rfp_pdf_path):
        print(f"❌ Error: Archivo de pliego no encontrado: {rfp_pdf_path}")
        return None
    
    # Diccionario para resultados
    comparison_results = {
        "bid_file": os.path.basename(bid_pdf_path),
        "rfp_file": os.path.basename(rfp_pdf_path),
        "missing_sections": [],
        "inconsistent_amounts": [],
        "date_discrepancies": [],
        "warnings": [],
        "errors": []
    }
    
    try:
        # Procesar ambos documentos
        print(f"\n🔍 Procesando pliego: {rfp_pdf_path}")
        rfp_results = process_pdf_file(rfp_pdf_path)
        if not rfp_results:
            comparison_results["errors"].append("No se pudo procesar el archivo del pliego")
            return comparison_results
        
        print(f"\n🔍 Procesando oferta: {bid_pdf_path}")
        bid_results = process_pdf_file(bid_pdf_path)
        if not bid_results:
            comparison_results["errors"].append("No se pudo procesar el archivo de oferta")
            return comparison_results
        
        # Comparar usando el TextProcessor
        processor = TextProcessor(language='es')
        
        # Verificar que tenemos texto para comparar
        if not rfp_results.get("raw_text"):
            comparison_results["warnings"].append("El pliego no contiene texto extraído")
        if not bid_results.get("raw_text"):
            comparison_results["warnings"].append("La oferta no contiene texto extraído")
        
        # Solo comparar si hay texto en ambos
        if rfp_results.get("raw_text") and bid_results.get("raw_text"):
            comparison = processor.compare_with_bid(
                rfp_results["raw_text"],
                bid_results["raw_text"]
            )
            
            # Consolidar resultados
            comparison_results.update({
                "missing_sections": comparison.get("missing_sections", []),
                "inconsistent_amounts": comparison.get("inconsistent_amounts", []),
                "date_discrepancies": comparison.get("date_discrepancies", [])
            })
        
        # Comparar secciones específicas
        rfp_sections = {s["name"] for s in rfp_results.get("sections", [])}
        bid_sections = {s["name"] for s in bid_results.get("sections", [])}
        comparison_results["missing_sections"] = list(rfp_sections - bid_sections)
        
        return comparison_results
    
    except Exception as e:
        error_msg = f"Error durante la comparación: {str(e)}"
        print(f"\n❌ {error_msg}")
        comparison_results["errors"].append(error_msg)
        if DEBUG_MODE:
            import traceback
            traceback.print_exc()
        return comparison_results

def print_comparison_results(comparison: dict):
    """Muestra los resultados de la comparación de forma estructurada"""
    if not comparison:
        print("No hay resultados de comparación para mostrar")
        return
    
    print(f"\n📊 COMPARACIÓN ENTRE:")
    print(f"• Pliego: {comparison['rfp_file']}")
    print(f"• Oferta: {comparison['bid_file']}")
    
    if comparison.get("errors"):
        print("\nErrores encontrados:")
        for error in comparison["errors"]:
            print(f"- {error}")
    
    if comparison.get("warnings"):
        print("\nAdvertencias:")
        for warning in comparison["warnings"]:
            print(f"- {warning}")
    
    if comparison.get("missing_sections"):
        print("\nSecciones faltantes en la oferta:")
        for section in comparison["missing_sections"]:
            print(f"- {section}")
    else:
        print("\nLa oferta contiene todas las secciones principales")
    
    if comparison.get("inconsistent_amounts"):
        print("\nDiferencias en montos:")
        for diff in comparison["inconsistent_amounts"]:
            print(f"- {diff}")
    else:
        print("\nLos montos coinciden con el pliego")
    
    if comparison.get("date_discrepancies"):
        print("\nDiscrepancias en fechas:")
        for date_diff in comparison["date_discrepancies"]:
            print(f"- {date_diff}")
    else:
        print("\nLas fechas coinciden con el pliego")

In [44]:
# Configurar rutas (ajustar según tu estructura de archivos)
bid_path = "../data/PLIEGO-LICO-V-2023-001.pdf"
rfp_path = "../data/PLIEGO-LICO-V-2023-001.pdf"

# Ejecutar comparación
comparison_results = compare_bid_with_rfp(bid_path, rfp_path)

# Mostrar resultados
if comparison_results:
    print_comparison_results(comparison_results)
else:
    print("No se pudo realizar la comparación")


🔍 Procesando pliego: ../data/PLIEGO-LICO-V-2023-001.pdf

🔍 Procesando oferta: ../data/PLIEGO-LICO-V-2023-001.pdf

📊 COMPARACIÓN ENTRE:
• Pliego: PLIEGO-LICO-V-2023-001.pdf
• Oferta: PLIEGO-LICO-V-2023-001.pdf

La oferta contiene todas las secciones principales

Los montos coinciden con el pliego

Las fechas coinciden con el pliego
