<a href="https://colab.research.google.com/github/AlejandroBeltre/AlejandroBeltre/blob/main/Procesador_OCR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Celda #1: Instalación de dependencias y configuración de entorno**

In [1]:
print("Configurando entorno OCR para alto rendimiento...")

# Actualizar sistema
!apt-get update -qq

# Instalar dependencias del sistema con todos los idiomas necesarios
!apt-get install -y poppler-utils tesseract-ocr tesseract-ocr-spa tesseract-ocr-eng -qq

# Instalar paquetes Python necesarios
!pip install pdf2image pytesseract psutil gspread google-auth-oauthlib google-auth-httplib2 google-api-python-client -q

# Actualizar pillow si es necesario
!pip install --upgrade pillow -q

print("\nConfigurando Tesseract correctamente...")

import os
import glob

# Auto-detección y configuración de TESSDATA_PREFIX
def configurar_tesseract():
    """Detecta y configura automáticamente la ruta de Tesseract"""
    possible_paths = [
        '/usr/share/tesseract-ocr/4.00/tessdata/',
        '/usr/share/tesseract-ocr/5/tessdata/',
        '/usr/share/tessdata/',
        '/usr/local/share/tessdata/'
    ]

    tessdata_path = None
    for path in possible_paths:
        if os.path.exists(path):
            files = glob.glob(f"{path}*.traineddata")
            if files:
                tessdata_path = path
                print(f"   Tessdata encontrado en: {path}")
                print(f"   Archivos de idioma disponibles: {len(files)}")

                # Verificar que tenemos español
                spa_file = f"{path}spa.traineddata"
                eng_file = f"{path}eng.traineddata"

                if os.path.exists(spa_file):
                    print("   Español (spa): Disponible")
                else:
                    print("   Español (spa): No encontrado")

                if os.path.exists(eng_file):
                    print("   Inglés (eng): Disponible")
                else:
                    print("   Inglés (eng): No encontrado")

                break

    if not tessdata_path:
        print("   No se encontraron archivos de datos, descargando...")
        tessdata_path = '/usr/share/tessdata/'
        os.makedirs(tessdata_path, exist_ok=True)

        # Descargar archivos de datos si no existen
        if not os.path.exists(f"{tessdata_path}spa.traineddata"):
            print("   Descargando datos de español...")
            !wget -q https://github.com/tesseract-ocr/tessdata/raw/main/spa.traineddata -O /usr/share/tessdata/spa.traineddata

        if not os.path.exists(f"{tessdata_path}eng.traineddata"):
            print("   Descargando datos de inglés...")
            !wget -q https://github.com/tesseract-ocr/tessdata/raw/main/eng.traineddata -O /usr/share/tessdata/eng.traineddata

    return tessdata_path

# Configurar Tesseract
tessdata_path = configurar_tesseract()

# Configurar variables de entorno
os.environ['TESSDATA_PREFIX'] = tessdata_path
os.environ['OMP_THREAD_LIMIT'] = '2'

print(f"\nVariables de entorno configuradas:")
print(f"   TESSDATA_PREFIX: {os.environ.get('TESSDATA_PREFIX')}")
print(f"   OMP_THREAD_LIMIT: {os.environ.get('OMP_THREAD_LIMIT')}")

print("\nEntorno OCR configurado exitosamente")

Configurando entorno OCR para alto rendimiento...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)

Configurando Tesseract correctamente...
   Tessdata encontrado en: /usr/share/tesseract-ocr/4.00/tessdata/
   Archivos de idioma disponibles: 3
   Español (spa): Disponible
   Inglés (eng): Disponible

Variables de entorno configuradas:
   TESSDATA_PREFIX: /usr/share/tesseract-ocr/4.00/tessdata/
   OMP_THREAD_LIMIT: 2

Entorno OCR configurado exitosamente


#**Celda #2: Importaciones, clases de datos y optimizador**

In [2]:
import os
import re
import gc
import psutil
import time
from typing import List, Optional, Tuple, Dict
from dataclasses import dataclass, field
import pytesseract
from pdf2image import convert_from_path
from datetime import datetime, timedelta
import warnings
import gspread
from google.colab import auth
from google.auth import default
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp
from PIL import Image
import threading

warnings.filterwarnings('ignore')

@dataclass
class DetalleProducto:
    """Producto individual en la factura"""
    ncf_factura_fk: str = ""
    pos: int = 0
    item_codigo: str = ""
    descripcion: str = ""
    cantidad: int = 1
    precio_unitario: float = 0.0
    total_linea_con_itbis: float = 0.0

@dataclass
class FacturaPrincipal:
    """Estructura de factura"""
    ncf_factura: str = "No encontrado"
    cliente_rnc: str = "No encontrado"
    cliente_nombre: str = "No encontrado"
    fecha_emision: str = "No encontrado"
    fecha_vencimiento: str = "No encontrado"
    tipo_pago: str = "No encontrado"
    valor_pagar: float = 0.0
    vendedor: str = "No encontrado"
    nombre_archivo: str = ""
    link_archivo_drive: str = ""

@dataclass
class MetadataProceso:
    """Metadatos del procesamiento"""
    nombre_archivo: str
    link_archivo_drive: str
    timestamp_proceso: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    estado_ocr: str = "Iniciado"
    error: str = ""
    campos_extraidos: int = 0
    confianza_extraccion: str = "Baja"
    tiempo_procesamiento: float = 0.0

class PatternExtractor:
    """Extractor de patrones con compilación para rendimiento"""

    _COMPILED_PATTERNS = None

    @classmethod
    def _get_compiled_patterns(cls):
        """Lazy loading de patrones compilados"""
        if cls._COMPILED_PATTERNS is None:
            cls._COMPILED_PATTERNS = {
                'ncf': [
                    re.compile(r'(E31\d{10})'),
                    re.compile(r'(ES1\d{10})'),
                    re.compile(r'(E5l\d{10})'),
                    re.compile(r'(ESl\d{10})'),
                    re.compile(r'(E3l\d{10})'),
                    re.compile(r'(ESI\d{10})'),
                    re.compile(r'(E3I\d{10})'),
                ],
                'rnc': [
                    re.compile(r'RNC Cliente\s*:\s*([0-9\-]+)'),
                    re.compile(r'RNC.*?(\d{8,12})'),
                    re.compile(r'Cliente\s*:\s*([0-9\-]+)'),
                ],
                'fecha_emision': [
                    re.compile(r'FECHA FACTURA\s*:\s*(\d{2}\.\d{2}\.\d{4})'),
                    re.compile(r'FECHA.*?(\d{2}\.\d{2}\.\d{4})'),
                ],
                'tipo_pago': [
                    re.compile(r'(CREDITO|CONTADO)', re.IGNORECASE),
                ],
                'valor_pagar': [
                    re.compile(r'VALOR A PAGAR\s*\$?\s*([0-9]{1,3}(?:\.[0-9]{3})*,\d{2})'),
                    re.compile(r'Total a Pagar\s*\$?\s*([0-9]{1,3}(?:\.[0-9]{3})*,\d{2})'),
                    re.compile(r'PAGAR\s*\$\s*([0-9]{1,3}(?:\.[0-9]{3})*,\d{2})'),
                ],
                'nombre_cliente': [
                    re.compile(r'Razón Social\s*:\s*(.*?)(?:\s*Establecimiento|\s*$)'),
                ],
                'vendedor': [
                    re.compile(r'Vendedor\s*:\s*([A-Z\s]+?)(?:\s*Entrega|\s*$)'),
                ]
            }
        return cls._COMPILED_PATTERNS

    @staticmethod
    def extraer_campo(texto: str, campo: str) -> Optional[str]:
        """Extracción usando patrones compilados"""
        patterns = PatternExtractor._get_compiled_patterns().get(campo, [])

        for pattern in patterns:
            match = pattern.search(texto)
            if match:
                resultado = match.group(1).strip()

                if campo == 'ncf':
                    resultado = PatternExtractor._corregir_ncf(resultado)
                elif campo == 'tipo_pago':
                    resultado = resultado.upper()

                return resultado
        return None

    @staticmethod
    def _corregir_ncf(ncf: str) -> str:
        """Corrección de NCF con mapeo directo"""
        correcciones = {
            'ES1': 'E31', 'E5l': 'E31', 'ESl': 'E31',
            'E3l': 'E31', 'ESI': 'E31', 'E3I': 'E31'
        }
        prefijo = ncf[:3] if len(ncf) >= 3 else ncf
        return correcciones.get(prefijo, prefijo) + ncf[3:] if prefijo in correcciones else ncf

class ValueNormalizer:
    """Normalización de valores monetarios dominicanos"""

    _MONEY_PATTERN = re.compile(r'^([0-9]{1,3}(?:\.[0-9]{3})*),(\d{1,2})$')
    _CLEAN_PATTERN = re.compile(r'[^\d.,]')

    @staticmethod
    def normalizar_valor_monetario(valor_str: str) -> float:
        """Normalización para formato dominicano (ejemplo: "28.164,72" -> 28164.72)"""
        if not valor_str or not valor_str.strip():
            return 0.0

        try:
            valor_limpio = ValueNormalizer._CLEAN_PATTERN.sub('', valor_str.strip())

            if ',' in valor_limpio:
                match = ValueNormalizer._MONEY_PATTERN.match(valor_limpio)
                if match:
                    parte_entera = match.group(1).replace('.', '')
                    parte_decimal = match.group(2).ljust(2, '0')[:2]
                    return float(f"{parte_entera}.{parte_decimal}")
                else:
                    partes = valor_limpio.split(',')
                    if len(partes) == 2:
                        parte_entera = partes[0].replace('.', '')
                        parte_decimal = partes[1][:2].ljust(2, '0')
                        return float(f"{parte_entera}.{parte_decimal}")
            else:
                return float(valor_limpio.replace('.', ''))

        except (ValueError, AttributeError):
            return 0.0

        return 0.0

class OCRProcessor:
    """Procesador OCR con configuración de alto rendimiento"""

    FAST_CONFIG = '--psm 6 --oem 3 -c preserve_interword_spaces=1'

    @staticmethod
    def verificar_configuracion_tesseract():
        """Verifica que Tesseract esté configurado correctamente"""
        import os
        tessdata_prefix = os.environ.get('TESSDATA_PREFIX', '')

        if not tessdata_prefix:
            print("TESSDATA_PREFIX no configurado, aplicando auto-detección...")

            # Auto-detección de tessdata
            import glob
            possible_paths = [
                '/usr/share/tesseract-ocr/4.00/tessdata/',
                '/usr/share/tesseract-ocr/5/tessdata/',
                '/usr/share/tessdata/',
                '/usr/local/share/tessdata/'
            ]

            for path in possible_paths:
                if os.path.exists(path) and glob.glob(f"{path}*.traineddata"):
                    os.environ['TESSDATA_PREFIX'] = path
                    print(f"TESSDATA_PREFIX configurado automáticamente: {path}")
                    break

    @staticmethod
    def test_ocr_simple() -> bool:
        """Prueba básica de OCR para diagnóstico"""
        try:
            # Verificar configuración antes del test
            OCRProcessor.verificar_configuracion_tesseract()

            from PIL import Image, ImageDraw
            img = Image.new('RGB', (300, 150), color='white')
            draw = ImageDraw.Draw(img)
            draw.text((20, 50), "FACTURA TEST 123", fill='black')
            draw.text((20, 80), "NCF: E31000001234", fill='black')

            # Probar con español primero
            try:
                result_spa = pytesseract.image_to_string(img, lang='spa', config=OCRProcessor.FAST_CONFIG)
                if "FACTURA" in result_spa or "TEST" in result_spa or "E31" in result_spa:
                    return True
            except:
                pass

            # Fallback a inglés
            try:
                result_eng = pytesseract.image_to_string(img, lang='eng', config=OCRProcessor.FAST_CONFIG)
                if "FACTURA" in result_eng or "TEST" in result_eng or "E31" in result_eng:
                    return True
            except:
                pass

            return False

        except Exception as e:
            print(f"Test OCR falló: {e}")
            return False

    @staticmethod
    def extraer_texto_con_diagnostico(imagen: Image.Image, timeout: int = 30) -> Tuple[str, str]:
        """Extracción con diagnóstico detallado y fallback de idiomas"""
        diagnostico = ""

        try:
            # Verificar configuración primero
            OCRProcessor.verificar_configuracion_tesseract()

            if imagen is None:
                return "", "Imagen es None"

            if imagen.size[0] == 0 or imagen.size[1] == 0:
                return "", f"Imagen tiene tamaño inválido: {imagen.size}"

            diagnostico += f"Imagen OK: {imagen.size[0]}x{imagen.size[1]} "

            # Intentar con español primero (mejor para facturas dominicanas)
            try:
                texto = pytesseract.image_to_string(
                    imagen,
                    lang='spa',
                    config=OCRProcessor.FAST_CONFIG,
                    timeout=timeout
                )

                if texto and texto.strip():
                    diagnostico += f"OCR español exitoso: {len(texto)} caracteres"
                    return texto, diagnostico
                else:
                    diagnostico += "OCR español: texto vacío, probando inglés... "
            except Exception as e:
                diagnostico += f"OCR español falló: {str(e)[:50]}, probando inglés... "

            # Fallback a inglés
            try:
                texto = pytesseract.image_to_string(
                    imagen,
                    lang='eng',
                    config=OCRProcessor.FAST_CONFIG,
                    timeout=timeout
                )

                if texto and texto.strip():
                    diagnostico += f"OCR inglés exitoso: {len(texto)} caracteres"
                    return texto, diagnostico
                else:
                    diagnostico += "OCR inglés: texto vacío"
                    return "", diagnostico

            except Exception as e:
                diagnostico += f"OCR inglés falló: {str(e)}"
                return "", diagnostico

        except Exception as e:
            diagnostico += f"Error crítico OCR: {str(e)}"
            return "", diagnostico

    @staticmethod
    def procesar_pdf_con_diagnostico(ruta_pdf: str, dpi: int = 200) -> Tuple[str, List[str]]:
        """Procesamiento PDF con diagnóstico completo"""
        diagnosticos = []

        try:
            if not os.path.exists(ruta_pdf):
                diagnosticos.append(f"Archivo no existe: {ruta_pdf}")
                return "", diagnosticos

            file_size = os.path.getsize(ruta_pdf)
            if file_size == 0:
                diagnosticos.append(f"Archivo vacío: {ruta_pdf}")
                return "", diagnosticos

            diagnosticos.append(f"Archivo OK: {file_size} bytes")

            try:
                imagenes = convert_from_path(ruta_pdf, dpi=dpi)
                diagnosticos.append(f"Conversión exitosa: {len(imagenes)} página(s)")
            except Exception as e:
                diagnosticos.append(f"Error conversión PDF: {str(e)}")
                return "", diagnosticos

            if not imagenes:
                diagnosticos.append("No se generaron imágenes del PDF")
                return "", diagnosticos

            textos_paginas = []
            for i, imagen in enumerate(imagenes):
                texto_pagina, diag_pagina = OCRProcessor.extraer_texto_con_diagnostico(imagen)
                diagnosticos.append(f"Página {i+1}: {diag_pagina}")

                if texto_pagina and texto_pagina.strip():
                    textos_paginas.append(texto_pagina)

                del imagen

            texto_completo = ' '.join(textos_paginas)

            if texto_completo.strip():
                diagnosticos.append(f"Texto extraído: {len(texto_completo)} caracteres total")
                return texto_completo, diagnosticos
            else:
                diagnosticos.append("No se extrajo texto de ninguna página")
                return "", diagnosticos

        except Exception as e:
            diagnosticos.append(f"Error crítico: {str(e)}")
            return "", diagnosticos

class BatchSheetsManager:
    """Manejo por lotes de Google Sheets para reducir latencia"""

    def __init__(self, spreadsheet):
        self.spreadsheet = spreadsheet
        self._facturas_batch = []
        self._metadata_batch = []
        self._batch_size = 50

    def add_factura(self, factura_data: List):
        """Añade factura al lote"""
        self._facturas_batch.append(factura_data)

    def add_metadata(self, metadata_data: List):
        """Añade metadata al lote"""
        self._metadata_batch.append(metadata_data)

    def flush_batch(self):
        """Envía lote completo a Google Sheets"""
        try:
            if self._facturas_batch:
                facturas_sheet = self.spreadsheet.worksheet("Facturas_Digitalizadas")
                facturas_sheet.append_rows(self._facturas_batch, value_input_option='USER_ENTERED')
                print(f"Lote enviado: {len(self._facturas_batch)} facturas")
                self._facturas_batch.clear()

            if self._metadata_batch:
                metadata_sheet = self.spreadsheet.worksheet("Metadata")
                metadata_sheet.append_rows(self._metadata_batch, value_input_option='USER_ENTERED')
                print(f"Lote enviado: {len(self._metadata_batch)} metadata")
                self._metadata_batch.clear()

        except Exception as e:
            print(f"Error enviando lote: {e}")

    def should_flush(self) -> bool:
        """Verifica si debe enviar el lote"""
        return (len(self._facturas_batch) >= self._batch_size or
                len(self._metadata_batch) >= self._batch_size)

class PerformanceMonitor:
    """Monitor de rendimiento"""

    def __init__(self):
        self.reset_stats()

    def reset_stats(self):
        self.stats = {
            'total_archivos': 0,
            'exitosos': 0,
            'fallidos': 0,
            'tiempo_total': 0.0,
            'tiempo_promedio': 0.0,
            'archivos_por_minuto': 0.0,
            'memoria_maxima': 0.0
        }

    def record_file_processed(self, tiempo_archivo: float, exitoso: bool):
        """Registra estadísticas de archivo procesado"""
        self.stats['total_archivos'] += 1
        self.stats['tiempo_total'] += tiempo_archivo

        if exitoso:
            self.stats['exitosos'] += 1
        else:
            self.stats['fallidos'] += 1

        if self.stats['total_archivos'] > 0:
            self.stats['tiempo_promedio'] = self.stats['tiempo_total'] / self.stats['total_archivos']
            self.stats['archivos_por_minuto'] = 60.0 / self.stats['tiempo_promedio'] if self.stats['tiempo_promedio'] > 0 else 0

        memoria_actual = psutil.virtual_memory().percent
        self.stats['memoria_maxima'] = max(self.stats['memoria_maxima'], memoria_actual)

def test_sistema_ocr():
    """Prueba diagnóstica del sistema OCR completo"""
    print("EJECUTANDO DIAGNÓSTICO DEL SISTEMA OCR...")

    print("\n1. Probando configuración de Tesseract...")
    OCRProcessor.verificar_configuracion_tesseract()

    print("\n2. Probando OCR básico...")
    if OCRProcessor.test_ocr_simple():
        print("   OCR básico funciona correctamente")
    else:
        print("   OCR básico falla - revisar instalación de Tesseract")
        return False

    print("\n3. Probando conversión PDF...")
    try:
        from PIL import Image, ImageDraw
        img = Image.new('RGB', (500, 300), color='white')
        draw = ImageDraw.Draw(img)

        draw.text((50, 50), "FACTURA DE PRUEBA", fill='black')
        draw.text((50, 80), "NCF: E31000001234", fill='black')
        draw.text((50, 110), "RNC Cliente: 123456789", fill='black')
        draw.text((50, 140), "VALOR A PAGAR $ 1.500,00", fill='black')
        draw.text((50, 170), "FECHA FACTURA: 14.05.2025", fill='black')
        draw.text((50, 200), "CREDITO", fill='black')

        texto, diag = OCRProcessor.extraer_texto_con_diagnostico(img)
        print(f"   Diagnóstico: {diag}")

        if texto and any(word in texto.upper() for word in ["FACTURA", "NCF", "E31", "VALOR", "RNC"]):
            print("   Conversión y OCR funcionan correctamente")
            print(f"   Texto extraído: {texto[:100].replace(chr(10), ' ')}...")
            return True
        else:
            print("   OCR funciona pero no extrae texto esperado")
            print(f"   Texto completo: {texto}")
            return False

    except Exception as e:
        print(f"   Error en test de conversión: {e}")
        return False

    print("\nDIAGNÓSTICO COMPLETADO")

print("Clases OCR inicializadas")

Clases OCR inicializadas


#**Celda #3: Orquesta todo el proceso de OCR y guarda en Google Sheets**

In [3]:
class DocumentProcessor:
    """Procesador OCR con diagnóstico avanzado para resolver problemas"""

    def __init__(self, drive_base_path: str, sheet_name: str, enable_debug: bool = False):
        self.base_path = drive_base_path
        self.input_path = os.path.join(self.base_path, "Input_MVP")
        self.debug_path = os.path.join(self.base_path, "Debug_Output") if enable_debug else None
        self.enable_debug = enable_debug

        # Crear directorios necesarios
        os.makedirs(self.input_path, exist_ok=True)
        if self.enable_debug:
            os.makedirs(self.debug_path, exist_ok=True)

        # Inicializar monitor de rendimiento
        self.performance_monitor = PerformanceMonitor()

        # Configurar servicios de Google
        self._setup_google_services(sheet_name)

        # Inicializar manager de lotes
        self.batch_manager = BatchSheetsManager(self.spreadsheet)

        # Cache para registros existentes
        self._load_existing_records_cache()

        # Verificación diagnóstica del sistema
        # self._diagnostic_system_check()

    def _diagnostic_system_check(self):
        """Verificación diagnóstica del sistema OCR"""
        print("\nDIAGNÓSTICO DEL SISTEMA OCR:")

        # Verificar configuración de Tesseract primero
        OCRProcessor.verificar_configuracion_tesseract()

        if os.path.exists(self.input_path):
            archivos = [f for f in os.listdir(self.input_path) if f.lower().endswith('.pdf')]
            print(f"   Archivos PDF encontrados: {len(archivos)}")

            if archivos:
                archivo_prueba = archivos[0]
                ruta_prueba = os.path.join(self.input_path, archivo_prueba)
                print(f"   Probando con archivo: {archivo_prueba}")

                texto, diagnosticos = OCRProcessor.procesar_pdf_con_diagnostico(ruta_prueba, dpi=200)

                for diag in diagnosticos:
                    print(f"   {diag}")

                if texto and len(texto.strip()) > 10:
                    print(f"   Sistema OCR funcionando - extrajo {len(texto)} caracteres")

                    # Verificar si extrae contenido relevante de factura
                    texto_upper = texto.upper()
                    palabras_clave = ["FACTURA", "NCF", "RNC", "VALOR", "TOTAL", "PAGAR", "FECHA"]
                    palabras_encontradas = [p for p in palabras_clave if p in texto_upper]

                    if palabras_encontradas:
                        print(f"   Contenido relevante detectado: {', '.join(palabras_encontradas)}")
                    else:
                        print("   Texto extraído pero sin palabras clave de factura")

                    if self.enable_debug:
                        debug_file = os.path.join(self.debug_path, "diagnostic_output.txt")
                        with open(debug_file, 'w', encoding='utf-8') as f:
                            f.write(f"DIAGNÓSTICO - {archivo_prueba}\n")
                            f.write("=" * 50 + "\n")
                            for diag in diagnosticos:
                                f.write(f"{diag}\n")
                            f.write(f"\nPALABRAS CLAVE ENCONTRADAS: {', '.join(palabras_encontradas)}\n")
                            f.write("\nTEXTO EXTRAÍDO:\n")
                            f.write(texto)
                        print(f"   Debug guardado en: {debug_file}")
                else:
                    print(f"   Sistema OCR con problemas - revisar instalaciones")
        else:
            print(f"   Carpeta no existe: {self.input_path}")

    def _setup_google_services(self, sheet_name: str):
        """Configuración de servicios Google"""
        try:
            print("Autenticando servicios de Google...")
            auth.authenticate_user()
            creds, _ = default()

            gc = gspread.authorize(creds)
            self.spreadsheet = gc.open(sheet_name)
            self.drive_service = build('drive', 'v3', credentials=creds)

            self._ensure_sheets_exist()

            print("Servicios Google configurados exitosamente")

        except Exception as e:
            print(f"Error configurando servicios: {e}")
            raise e

    def _ensure_sheets_exist(self):
        """Asegurar que las hojas necesarias existen"""
        required_sheets = {
            "Metadata": ["nombre_archivo", "link_archivo_drive", "timestamp_proceso",
                        "estado_ocr", "error", "campos_extraidos", "confianza_extraccion", "tiempo_procesamiento"],
            "Facturas_Digitalizadas": ["ncf_factura", "cliente_rnc", "cliente_nombre", "fecha_emision",
                                     "fecha_vencimiento", "tipo_pago", "valor_pagar", "vendedor",
                                     "nombre_archivo", "link_archivo_drive"]
        }

        for sheet_name, headers in required_sheets.items():
            try:
                self.spreadsheet.worksheet(sheet_name)
            except:
                print(f"Creando hoja {sheet_name}...")
                new_sheet = self.spreadsheet.add_worksheet(sheet_name, 1000, len(headers))
                new_sheet.append_row(headers)

    def _load_existing_records_cache(self):
        """Cache de registros existentes"""
        self.existing_records = {}
        try:
            metadata_sheet = self.spreadsheet.worksheet("Metadata")
            records = metadata_sheet.get_all_records()

            for record in records:
                filename = record.get('nombre_archivo', '')
                if filename:
                    self.existing_records[filename] = {
                        'estado': record.get('estado_ocr', ''),
                        'confianza': record.get('confianza_extraccion', ''),
                        'tiempo': record.get('tiempo_procesamiento', 0)
                    }

            print(f"Cache cargado: {len(self.existing_records)} registros existentes")

        except Exception as e:
            print(f"Error cargando cache: {e}")
            self.existing_records = {}

    def _should_skip_file(self, filename: str) -> bool:
        """Determina si debe saltar el archivo basado en cache"""
        if filename not in self.existing_records:
            return False

        record = self.existing_records[filename]
        estado = record.get('estado', '')
        confianza = record.get('confianza', '')

        # Saltar solo si es exitoso con alta confianza
        return estado == "Exitoso" and confianza == "Alta"

    def _get_drive_files_info(self) -> dict:
        """Obtener info de archivos de Drive con menos llamadas API"""
        try:
            folder_name = os.path.basename(self.input_path)
            parent_folder_name = os.path.basename(os.path.dirname(self.input_path))

            query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false"
            response = self.drive_service.files().list(q=query, fields='files(id, name, parents)').execute()
            folders = response.get('files', [])

            if not folders:
                return {}

            folder_id = folders[0].get('id')

            files_query = f"'{folder_id}' in parents and trashed=false"
            files_response = self.drive_service.files().list(q=files_query, fields='files(id, name)').execute()
            files = files_response.get('files', [])

            print(f"{len(files)} archivos encontrados en Drive")
            return {file.get('name'): file.get('id') for file in files}

        except Exception as e:
            print(f"Error buscando archivos en Drive: {e}")
            return {}

    def _extraer_datos(self, texto_completo: str, nombre_archivo: str, link_drive: str) -> Tuple[FacturaPrincipal, int, str]:
        """Extracción de datos con mínimas operaciones"""
        inicio_extraccion = time.time()
        factura = FacturaPrincipal(nombre_archivo=nombre_archivo, link_archivo_drive=link_drive)

        texto_limpio = texto_completo.replace('\n', ' ').replace('  ', ' ').strip()

        campos_extraidos = 0

        # Extraer campos críticos
        ncf = PatternExtractor.extraer_campo(texto_limpio, 'ncf')
        if ncf:
            factura.ncf_factura = ncf
            campos_extraidos += 1

        rnc = PatternExtractor.extraer_campo(texto_limpio, 'rnc')
        if rnc:
            factura.cliente_rnc = rnc
            campos_extraidos += 1

        valor_raw = PatternExtractor.extraer_campo(texto_limpio, 'valor_pagar')
        if valor_raw:
            factura.valor_pagar = ValueNormalizer.normalizar_valor_monetario(valor_raw)
            if factura.valor_pagar > 0:
                campos_extraidos += 1

        fecha_emision = PatternExtractor.extraer_campo(texto_limpio, 'fecha_emision')
        if fecha_emision:
            factura.fecha_emision = fecha_emision
            campos_extraidos += 1

        # Campos secundarios
        tipo_pago = PatternExtractor.extraer_campo(texto_limpio, 'tipo_pago')
        if tipo_pago:
            factura.tipo_pago = tipo_pago
            campos_extraidos += 1

        nombre_cliente = PatternExtractor.extraer_campo(texto_limpio, 'nombre_cliente')
        if nombre_cliente:
            factura.cliente_nombre = nombre_cliente
            campos_extraidos += 1

        vendedor = PatternExtractor.extraer_campo(texto_limpio, 'vendedor')
        if vendedor:
            factura.vendedor = vendedor
            campos_extraidos += 1

        # Calcular fecha vencimiento
        if factura.fecha_emision != "No encontrado" and factura.tipo_pago != "No encontrado":
            factura.fecha_vencimiento = self._calcular_vencimiento(factura.fecha_emision, factura.tipo_pago)
            if factura.fecha_vencimiento != "No encontrado":
                campos_extraidos += 1

        # Calcular confianza
        campos_criticos = [factura.ncf_factura, factura.cliente_rnc, factura.valor_pagar, factura.fecha_emision]
        campos_criticos_exitosos = sum(1 for campo in campos_criticos if campo != "No encontrado" and campo != 0.0)

        if campos_criticos_exitosos >= 3:
            confianza = "Alta"
        elif campos_criticos_exitosos >= 2:
            confianza = "Media"
        else:
            confianza = "Baja"

        tiempo_extraccion = time.time() - inicio_extraccion

        return factura, campos_extraidos, confianza

    def _calcular_vencimiento(self, fecha_emision: str, tipo_pago: str) -> str:
        """Cálculo de vencimiento"""
        try:
            if tipo_pago.upper() == "CONTADO":
                return fecha_emision
            elif tipo_pago.upper() == "CREDITO":
                fecha_base = datetime.strptime(fecha_emision, "%d.%m.%Y")
                fecha_vencimiento = fecha_base + timedelta(days=30)
                return fecha_vencimiento.strftime("%d.%m.%Y")
        except:
            pass
        return "No encontrado"

    def _validar_factura(self, factura: FacturaPrincipal) -> Tuple[bool, str]:
        """Validación de factura"""
        # Validación NCF
        ncf_valido = (factura.ncf_factura != "No encontrado" and
                     factura.ncf_factura.startswith('E31') and
                     len(factura.ncf_factura) == 13)

        # Validación RNC
        rnc_limpio = re.sub(r'[-\s]', '', factura.cliente_rnc) if factura.cliente_rnc != "No encontrado" else ""
        rnc_valido = len(rnc_limpio) in [9, 11] and rnc_limpio.isdigit()

        # Validación valor
        valor_valido = factura.valor_pagar > 0

        # Lógica de identificación
        if ncf_valido and valor_valido:
            return True, "NCF_VALIDO"
        elif rnc_valido and valor_valido:
            fecha_limpia = factura.fecha_emision.replace('.', '') if factura.fecha_emision != "No encontrado" else "NOFECHA"
            factura.ncf_factura = f"RNC_{rnc_limpio}_{fecha_limpia}"
            return True, "RNC_ALTERNATIVO"
        else:
            errores = []
            if not ncf_valido and not rnc_valido:
                errores.append("Sin identificador válido")
            if not valor_valido:
                errores.append(f"Valor inválido: {factura.valor_pagar}")
            return False, " | ".join(errores)

    def procesar_lote(self, archivos_lote: List[str], drive_files_info: dict):
        """Procesamiento de lote con máxima eficiencia y diagnóstico"""
        print(f"Procesando lote de {len(archivos_lote)} archivos...")

        archivos_procesados = 0
        archivos_saltados = 0

        for nombre_archivo in archivos_lote:
            inicio_archivo = time.time()

            if self._should_skip_file(nombre_archivo):
                print(f"Saltando: {nombre_archivo} (ya procesado exitosamente)")
                archivos_saltados += 1
                continue

            file_id = drive_files_info.get(nombre_archivo)
            link_drive = f"https://drive.google.com/file/d/{file_id}/view" if file_id else "No encontrado"

            print(f"Procesando: {nombre_archivo}")

            try:
                ruta_completa = os.path.join(self.input_path, nombre_archivo)

                texto_completo, diagnosticos = OCRProcessor.procesar_pdf_con_diagnostico(ruta_completa, dpi=200)

                if self.enable_debug:
                    for diag in diagnosticos[-2:]:
                        print(f"   {diag}")

                if not texto_completo or not texto_completo.strip():
                    error_msg = "No se pudo extraer texto del PDF"
                    if diagnosticos:
                        error_msg += f" - {diagnosticos[-1]}"
                    raise Exception(error_msg)

                if self.enable_debug:
                    debug_file = os.path.join(self.debug_path, f"{os.path.splitext(nombre_archivo)[0]}_ocr.txt")
                    with open(debug_file, 'w', encoding='utf-8') as f:
                        f.write(f"DIAGNÓSTICO - {nombre_archivo}\n")
                        f.write("=" * 50 + "\n")
                        for diag in diagnosticos:
                            f.write(f"{diag}\n")
                        f.write("\nTEXTO EXTRAÍDO:\n")
                        f.write(texto_completo)

                factura, campos_extraidos, confianza = self._extraer_datos(
                    texto_completo, nombre_archivo, link_drive
                )

                es_valida, detalle = self._validar_factura(factura)

                tiempo_archivo = time.time() - inicio_archivo

                metadata_data = [
                    nombre_archivo, link_drive, datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "Exitoso" if es_valida else "Fallido",
                    "" if es_valida else detalle,
                    campos_extraidos, confianza, round(tiempo_archivo, 2)
                ]

                if es_valida:
                    factura_data = [
                        factura.ncf_factura, factura.cliente_rnc, factura.cliente_nombre,
                        factura.fecha_emision, factura.fecha_vencimiento, factura.tipo_pago,
                        f"${factura.valor_pagar:,.2f}", factura.vendedor,
                        nombre_archivo, link_drive
                    ]
                    self.batch_manager.add_factura(factura_data)
                    print(f"   Exitoso: {detalle} - ${factura.valor_pagar:,.2f} ({tiempo_archivo:.1f}s)")
                else:
                    print(f"   Fallido: {detalle} ({tiempo_archivo:.1f}s)")

                self.batch_manager.add_metadata(metadata_data)

                if self.batch_manager.should_flush():
                    self.batch_manager.flush_batch()

                self.performance_monitor.record_file_processed(tiempo_archivo, es_valida)
                archivos_procesados += 1

            except Exception as e:
                tiempo_archivo = time.time() - inicio_archivo
                error_msg = str(e)[:200]

                metadata_data = [
                    nombre_archivo, link_drive, datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "Fallido", error_msg, 0, "Baja", round(tiempo_archivo, 2)
                ]
                self.batch_manager.add_metadata(metadata_data)

                print(f"   Error: {nombre_archivo} - {error_msg} ({tiempo_archivo:.1f}s)")
                self.performance_monitor.record_file_processed(tiempo_archivo, False)
                archivos_procesados += 1

            if archivos_procesados % 5 == 0:
                gc.collect()

        self.batch_manager.flush_batch()

        print(f"Lote completado: {archivos_procesados} procesados, {archivos_saltados} saltados")

print("Procesador OCR inicializado")

Procesador OCR inicializado


#**Celda #4: Hace el despligue final del proceso**


In [4]:
def ejecutar_proceso_ocr(
    nombre_base_de_datos: str = "BaseDatos_Facturas",
    tamano_lote_inicial: int = 20,
    pausa_entre_lotes_seg: int = 1,
    enable_debug: bool = False,
    adaptive_batching: bool = True
):
    """Ejecutor principal"""

    print("SISTEMA OCR INICIADO")

    metricas_globales = {
        'inicio_proceso': time.time(),
        'total_archivos_encontrados': 0,
        'archivos_nuevos': 0,
        'archivos_saltados': 0,
        'archivos_reprocesados': 0,
        'exitosos_ncf': 0,
        'exitosos_rnc': 0,
        'fallidos': 0,
        'valor_total_extraido': 0.0,
        'tiempo_ahorrado_cache': 0.0,
        'lotes_procesados': 0,
        'memoria_promedio': 0.0,
        'velocidad_objetivo': 15.0
    }

    # Inicialización con diagnóstico
    try:
        from google.colab import drive
        print("\nMontando Google Drive...")
        drive.mount('/content/drive', force_remount=True)

        procesador = DocumentProcessor(
            drive_base_path="/content/drive/MyDrive/OCR_Facturas",
            sheet_name=nombre_base_de_datos,
            enable_debug=enable_debug
        )

        print(f"Procesador inicializado (Debug: {'ON' if enable_debug else 'OFF'})")

    except Exception as e:
        print(f"Error durante inicialización: {e}")
        return

    print(f"\nANÁLISIS INICIAL")

    drive_files_info = procesador._get_drive_files_info()
    archivos_locales = [f for f in os.listdir(procesador.input_path) if f.lower().endswith('.pdf')]

    if not archivos_locales:
        print("No se encontraron archivos PDF en la carpeta de entrada")
        return

    metricas_globales['total_archivos_encontrados'] = len(archivos_locales)

    # Análisis de cache
    archivos_nuevos = []
    archivos_saltados = []

    for archivo in archivos_locales:
        if procesador._should_skip_file(archivo):
            archivos_saltados.append(archivo)
            metricas_globales['archivos_saltados'] += 1
            metricas_globales['tiempo_ahorrado_cache'] += 20
        else:
            archivos_nuevos.append(archivo)
            metricas_globales['archivos_nuevos'] += 1

    print(f"   Total encontrados: {len(archivos_locales)}")
    print(f"   Nuevos a procesar: {len(archivos_nuevos)}")
    print(f"   Saltados (cache): {len(archivos_saltados)}")
    print(f"   Tiempo ahorrado: {metricas_globales['tiempo_ahorrado_cache']/60:.1f} min")

    if not archivos_nuevos:
        print("\nTODOS LOS ARCHIVOS YA PROCESADOS EXITOSAMENTE")
        print(f"Tiempo total ahorrado: {metricas_globales['tiempo_ahorrado_cache']/60:.1f} minutos")
        return

    # Configuración de lotes adaptativos
    tamano_lote_actual = tamano_lote_inicial
    lotes = [archivos_nuevos[i:i + tamano_lote_actual] for i in range(0, len(archivos_nuevos), tamano_lote_actual)]

    print(f"\nCONFIGURACIÓN DE PROCESAMIENTO")
    print(f"   Total lotes: {len(lotes)}")
    print(f"   Tamaño lote inicial: {tamano_lote_actual}")
    print(f"   Pausa entre lotes: {pausa_entre_lotes_seg}s")
    print(f"   Ajuste automático: {'ON' if adaptive_batching else 'OFF'}")
    print(f"   Debug mode: {'ON' if enable_debug else 'OFF'}")

    print(f"\n" + "=" * 80)
    print(f"INICIANDO PROCESAMIENTO")
    print(f"=" * 80)

    tiempo_inicio_procesamiento = time.time()

    for i, lote_actual in enumerate(lotes):
        tiempo_inicio_lote = time.time()
        metricas_globales['lotes_procesados'] += 1

        print(f"\nLOTE #{i+1}/{len(lotes)} - {len(lote_actual)} archivos")
        print(f"   Progreso global: {(i/len(lotes))*100:.1f}%")

        # Monitoreo de memoria pre-lote
        memoria_pre = psutil.virtual_memory().percent
        if memoria_pre > 80:
            print(f"   Liberando memoria (uso: {memoria_pre:.1f}%)...")
            gc.collect()
            memoria_post = psutil.virtual_memory().percent
            print(f"   Memoria optimizada: {memoria_pre:.1f}% -> {memoria_post:.1f}%")

        # Procesar lote con diagnóstico
        procesador.procesar_lote(lote_actual, drive_files_info)

        tiempo_lote = time.time() - tiempo_inicio_lote
        velocidad_lote = len(lote_actual) / (tiempo_lote / 60)

        print(f"   Tiempo lote: {tiempo_lote:.1f}s")
        print(f"   Velocidad: {velocidad_lote:.1f} archivos/min")

        # Ajuste adaptativo de lotes
        if adaptive_batching and i < len(lotes) - 1:
            if velocidad_lote > metricas_globales['velocidad_objetivo'] * 1.2:
                nuevo_tamano = min(tamano_lote_actual + 5, 30)
                if nuevo_tamano != tamano_lote_actual:
                    print(f"   Ajuste: Incrementando lote {tamano_lote_actual} -> {nuevo_tamano}")
                    tamano_lote_actual = nuevo_tamano
            elif velocidad_lote < metricas_globales['velocidad_objetivo'] * 0.8:
                nuevo_tamano = max(tamano_lote_actual - 3, 5)
                if nuevo_tamano != tamano_lote_actual:
                    print(f"   Ajuste: Reduciendo lote {tamano_lote_actual} -> {nuevo_tamano}")
                    tamano_lote_actual = nuevo_tamano

        # Pausa entre lotes
        if i < len(lotes) - 1:
            print(f"   Pausa: {pausa_entre_lotes_seg}s...")
            time.sleep(pausa_entre_lotes_seg)

    # Información de debug
    if enable_debug:
        print(f"\nARCHIVOS DE DEBUG GENERADOS:")
        if procesador.debug_path and os.path.exists(procesador.debug_path):
            debug_files = [f for f in os.listdir(procesador.debug_path) if f.endswith('.txt')]
            print(f"   Ubicación: {procesador.debug_path}")
            print(f"   Archivos generados: {len(debug_files)}")
            if debug_files:
                print(f"   Revisar estos archivos para análisis detallado de problemas")
                for i, debug_file in enumerate(debug_files[:3]):
                    print(f"      {i+1}. {debug_file}")
                if len(debug_files) > 3:
                    print(f"      ... y {len(debug_files) - 3} más")

# EJECUTAR PROCESO
print("Iniciando proceso OCR completo...")

metricas_resultado = ejecutar_proceso_ocr(
    nombre_base_de_datos="BaseDatos_Facturas",
    tamano_lote_inicial=20,
    pausa_entre_lotes_seg=1,
    enable_debug=False,
    adaptive_batching=True
)

print("\nPROCESO OCR FINALIZADO")

Iniciando proceso OCR completo...
SISTEMA OCR INICIADO

Montando Google Drive...
Mounted at /content/drive
Autenticando servicios de Google...
Servicios Google configurados exitosamente
Cache cargado: 0 registros existentes
Procesador inicializado (Debug: OFF)

ANÁLISIS INICIAL
6 archivos encontrados en Drive
   Total encontrados: 6
   Nuevos a procesar: 6
   Saltados (cache): 0
   Tiempo ahorrado: 0.0 min

CONFIGURACIÓN DE PROCESAMIENTO
   Total lotes: 1
   Tamaño lote inicial: 20
   Pausa entre lotes: 1s
   Ajuste automático: ON
   Debug mode: OFF

INICIANDO PROCESAMIENTO

LOTE #1/1 - 6 archivos
   Progreso global: 0.0%
Procesando lote de 6 archivos...
Procesando: Registro_Files__33802265.Anexo.135430.pdf
   Exitoso: NCF_VALIDO - $36,590.57 (25.4s)
Procesando: Registro_Files__33802279.Anexo.141147.pdf
   Exitoso: RNC_ALTERNATIVO - $91,376.37 (4.8s)
Procesando: Registro_Files__30802262.Anexo.135014.pdf
   Exitoso: NCF_VALIDO - $28,164.72 (13.7s)
Procesando: Registro_Files__33802283.An