#### Cocereales 

In [108]:
pip install pymupdf pdfplumber tabula-py camelot-py[cv] pytesseract Pillow pandas camelot

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [109]:
# Lectura de pdf
import fitz  
import pdfplumber
import tabula
import pytesseract
from PIL import Image
import pandas as pd
import io
import re
import pandas as pd
import unicodedata

In [110]:
def limpiar_columnas(df):
    """
    Limpia los nombres de las columnas de un DataFrame:
    - Elimina espacios al inicio y final
    - Convierte a minúsculas
    - Quita tildes y acentos
    - Reemplaza espacios y caracteres especiales por guiones bajos
    - Elimina caracteres no alfanuméricos excepto guiones bajos
    - Evita guiones bajos duplicados
    """
    def normalizar_texto(texto):
        # Quitar tildes y acentos usando unicodedata
        texto_sin_acentos = unicodedata.normalize('NFD', texto)
        texto_sin_acentos = ''.join(c for c in texto_sin_acentos if unicodedata.category(c) != 'Mn')
        return texto_sin_acentos
    
    df.columns = (
        df.columns
        .str.strip()                                    # Quitar espacios al inicio y final
        .str.lower()                                    # Convertir a minúsculas
        .map(normalizar_texto)                          # Quitar tildes y acentos
        .str.replace(r'[\s\-]+', '_', regex=True)       # Espacios y guiones por guiones bajos
        .str.replace(r'[^\w]', '_', regex=True)         # Caracteres especiales por guiones bajos
        .str.replace(r'_+', '_', regex=True)            # Múltiples guiones bajos por uno solo
        .str.strip('_')                                 # Quitar guiones bajos al inicio y final
    )
    return df

In [111]:


class PDFExtractorAdaptativo:
    def __init__(self):
        self.metodos_texto = ['pdfplumber', 'pymupdf', 'ocr']
        self.metodos_tablas = ['tabula', 'pdfplumber']  # Quitamos 'camelot'
    
    def detectar_tipo_contenido(self, archivo_pdf):
        """Detecta qué tipo de contenido tiene el PDF"""
        try:
            doc = fitz.open(archivo_pdf)
            primera_pagina = doc[0]
            
            texto = primera_pagina.get_text().strip()
            imagenes = primera_pagina.get_images()
            
            with pdfplumber.open(archivo_pdf) as pdf:
                page = pdf.pages[0]
                tablas_detectadas = page.find_tables()
            
            resultado = {
                'tiene_texto': len(texto) > 50,
                'tiene_imagenes': len(imagenes) > 0,
                'posibles_tablas': len(tablas_detectadas) > 0,
                'es_escaneado': len(texto) < 50 and len(imagenes) > 0,
                'calidad_texto': 'alta' if len(texto) > 200 else 'baja'
            }
            
            doc.close()
            return resultado
            
        except Exception as e:
            return {'error': str(e)}
    
    def extraer_texto_robusto(self, archivo_pdf):
        """Extrae texto probando diferentes métodos"""
        metodos_resultados = {}
        
        # pdfplumber
        try:
            with pdfplumber.open(archivo_pdf) as pdf:
                texto = ""
                for page in pdf.pages:
                    texto += page.extract_text() + "\n"
                metodos_resultados['pdfplumber'] = texto
        except Exception as e:
            metodos_resultados['pdfplumber'] = f"Error: {e}"
        
        # pymupdf
        try:
            doc = fitz.open(archivo_pdf)
            texto = ""
            for page in doc:
                texto += page.get_text() + "\n"
            doc.close()
            metodos_resultados['pymupdf'] = texto
        except Exception as e:
            metodos_resultados['pymupdf'] = f"Error: {e}"
        
        # OCR
        try:
            doc = fitz.open(archivo_pdf)
            texto = ""
            for page_num in range(len(doc)):
                page = doc.load_page(page_num)
                pix = page.get_pixmap()
                img_data = pix.tobytes("png")
                img = Image.open(io.BytesIO(img_data))
                texto += pytesseract.image_to_string(img, lang='spa') + "\n"
            doc.close()
            metodos_resultados['ocr'] = texto
        except Exception as e:
            metodos_resultados['ocr'] = f"Error: {e}"
        
        # Seleccionar mejor resultado
        mejor_resultado = ""
        max_longitud = 0
        for metodo, resultado in metodos_resultados.items():
            if not resultado.startswith("Error") and len(resultado) > max_longitud:
                mejor_resultado = resultado
                max_longitud = len(resultado)
        
        return {
            'texto_final': mejor_resultado,
            'metodos_intentados': metodos_resultados,
            'metodo_exitoso': max([k for k, v in metodos_resultados.items() 
                                 if not v.startswith("Error") and len(v) == max_longitud], 
                                default="ninguno")
        }
    
    def extraer_tablas_robusto(self, archivo_pdf):
        """Extrae tablas probando diferentes métodos (sin camelot)"""
        resultados_tablas = {}
        
        # tabula-py
        try:
            tablas = tabula.read_pdf(archivo_pdf, pages='all', multiple_tables=True)
            resultados_tablas['tabula'] = {
                'tablas': tablas,
                'cantidad': len(tablas),
                'exito': True
            }
        except Exception as e:
            resultados_tablas['tabula'] = {'error': str(e), 'exito': False}
        
        # pdfplumber
        try:
            with pdfplumber.open(archivo_pdf) as pdf:
                todas_tablas = []
                for page in pdf.pages:
                    tablas_pagina = page.extract_tables()
                    for tabla in tablas_pagina:
                        df = pd.DataFrame(tabla[1:], columns=tabla[0])
                        todas_tablas.append(df)
                
                resultados_tablas['pdfplumber'] = {
                    'tablas': todas_tablas,
                    'cantidad': len(todas_tablas),
                    'exito': True
                }
        except Exception as e:
            resultados_tablas['pdfplumber'] = {'error': str(e), 'exito': False}
        
        return resultados_tablas
    
    def procesar_pdf_completo(self, archivo_pdf):
        """Proceso completo adaptativo"""
        print(f"Analizando: {archivo_pdf}")
        
        info_contenido = self.detectar_tipo_contenido(archivo_pdf)
        print(f"Análisis de contenido: {info_contenido}")
        
        resultado_final = {
            'archivo': archivo_pdf,
            'analisis_contenido': info_contenido,
            'texto': None,
            'tablas': None
        }
        
        if info_contenido.get('tiene_texto') or info_contenido.get('es_escaneado'):
            print("Extrayendo texto...")
            resultado_texto = self.extraer_texto_robusto(archivo_pdf)
            resultado_final['texto'] = resultado_texto
        
        if info_contenido.get('posibles_tablas'):
            print("Extrayendo tablas...")
            resultado_tablas = self.extraer_tablas_robusto(archivo_pdf)
            resultado_final['tablas'] = resultado_tablas
        
        return resultado_final

# Ejemplo de uso
if __name__ == "__main__":
    extractor = PDFExtractorAdaptativo()
    resultado = extractor.procesar_pdf_completo("resources/COCEREALES.pdf")

CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox


Analizando: resources/COCEREALES.pdf
Análisis de contenido: {'tiene_texto': True, 'tiene_imagenes': True, 'posibles_tablas': True, 'es_escaneado': False, 'calidad_texto': 'alta'}
Extrayendo texto...
Extrayendo tablas...


CropBox missing from /Page, defaulting to MediaBox


In [112]:
#Acceder al texto 
#extraer del diccionario final tablas y texto
textos = resultado.get('texto', {}).get('texto_final', '')
print(type(textos))

#Escritura .txt
with open("COCEREALES.txt", "w", encoding="utf-8") as f:
    f.write(textos)

<class 'str'>


In [113]:

texto =  textos

# Regex para capturar líneas de productos
patron = re.compile(r"""
    (\d+)\s*                      # Item (número inicial)
    (\d+)\s*                      # Ref
    ([A-ZÁÉÍÓÚÑ\s]+?)\s+          # Descripción (texto en mayúsculas, incluyendo espacios y acentos)
    ([A-Z]{3})\s*                # Unidad (3 letras como KGM)
    ([\d.,]+)\s*                 # Cantidad (con coma o punto)
    (\d+)\s*                     # IVA
    ([\d.,]+)\s*                 # Vr Unitario
    ([\d.,]+)                    # Vr Total
""", re.VERBOSE)

# Extraer coincidencias
datos = patron.findall(texto)

# Crear DataFrame
df = pd.DataFrame(datos, columns=[
    "Item", "Ref", "Descripción", "Unidad", "Cantidad", "IVA", "Vr Unitario", "Vr Total"
])

# Opcional: convertir números a formato numérico
df["Cantidad"] = df["Cantidad"].str.replace(".", "", regex=False).str.replace(",", ".").astype(float)
df["Vr Unitario"] = df["Vr Unitario"].str.replace(".", "", regex=False).str.replace(",", ".").astype(float)
df["Vr Total"] = df["Vr Total"].str.replace(".", "", regex=False).str.replace(",", ".").astype(float)
df["IVA"] = df["IVA"].astype(int)

df

Unnamed: 0,Item,Ref,Descripción,Unidad,Cantidad,IVA,Vr Unitario,Vr Total
0,1,157,TOCINO EN TROZOS,KGM,40.0,5,77238.0,3089520.0


#### Escoger Columnas Productos 
1. Numero de item
2. descripcion
3. Cantidad
4. Unidad de medida
5. Fecha de entrega
6. Costo Unitario
7. importe
8. id falso (Unir para realizar insersion de datos)

In [114]:
df_cocereales = limpiar_columnas(df)

In [115]:
#rename columns
df_cocereales.rename(columns={
    'ref': 'NUMERO_DE_ITEM',
    'descripcion': 'DESCRIPCION',
    'cantidad': 'CANTIDAD',
    'unidad': 'UNIDAD_DE_MEDIDA',
    'vr_unitario': 'COSTO_UNITARIO',
    'vr_total': 'IMPORTE'
}, inplace=True)

# Create a falsa Column
df_cocereales["id"] = 1


In [116]:

#select columns
df_cocereales = df_cocereales[['id', 'NUMERO_DE_ITEM', 'DESCRIPCION', 'CANTIDAD', 'UNIDAD_DE_MEDIDA',  'COSTO_UNITARIO', 'IMPORTE']]

In [117]:
df_cocereales

Unnamed: 0,id,NUMERO_DE_ITEM,DESCRIPCION,CANTIDAD,UNIDAD_DE_MEDIDA,COSTO_UNITARIO,IMPORTE
0,1,157,TOCINO EN TROZOS,40.0,KGM,77238.0,3089520.0


In [118]:
# 2. Regex encabezado
encabezado = {
    "Empresa": re.search(r"^(.*?)\n-COCEREALES-", texto).group(1).strip(),
    "NIT Empresa": re.search(r"NIT\s*:\s*([\d, -]+)", texto).group(1).strip(),
    "Dirección Empresa": re.search(r"NIT\s*:\s*[\d, -]+\n(.*?)\n", texto).group(1).strip(),
    "Tel Empresa": re.search(r"\n(\d{7,})\s+-", texto).group(1),
    "Cliente": re.search(r"Señores\s*\n(.*?)\n", texto).group(1).strip(),
    "Dirección Cliente": re.search(r"NIT.*?\n(.*?)\n", texto).group(1).strip(),
    "Ciudad": re.search(r"Teléfono\s*\n(.*?)\n", texto).group(1),
    "Forma de Pago": re.search(r"(CREDITO|CONTADO)", texto).group(1),
    "O.C.": re.search(r"O\.C\. MATERIA PRIMA\s*\n(.*?)\n", texto).group(1).strip()
}

df_encabezado = pd.DataFrame([encabezado]) 

In [119]:
try:
    fecha_comprobante ={
    "Fecha Comprobante": re.search(r"Fecha Comprobante\s*\n(\d{4}-\d{2}-\d{2})", texto).group(1)}
except AttributeError:
    fecha_comprobante = {"Fecha Comprobante": None}

In [120]:
encabezado

{'Empresa': 'COMPAÑIA INDUSTRIAL DE CEREALES S.A.',
 'NIT Empresa': '860,501,848 - 9',
 'Dirección Empresa': 'CR 18A 1G 49',
 'Tel Empresa': '6013334354',
 'Cliente': 'NIT',
 'Dirección Cliente': 'CR 18A 1G 49',
 'Ciudad': 'Ciudad',
 'Forma de Pago': 'CREDITO',
 'O.C.': 'Y - 090 - 5233'}

In [121]:
# Extraemos todas las fechas tipo YYYY-MM-DD
fechas = re.findall(r"\d{4}-\d{2}-\d{2}", texto)

# Buscamos dónde está cada texto asociado
fecha_comprobante = re.search(r"Fecha Comprobante.*?(\d{4}-\d{2}-\d{2})", texto)
fecha_entrega = re.search(r"Fecha Entrega.*?(\d{4}-\d{2}-\d{2})", texto)

# Asignamos si se encuentra, o None
encabezado["Fecha Comprobante"] = fecha_comprobante.group(1) if fecha_comprobante else (fechas[0] if len(fechas) > 0 else None)
encabezado["Fecha Entrega"] = fecha_entrega.group(1) if fecha_entrega else (fechas[1] if len(fechas) > 1 else None)

In [122]:
encabezado

{'Empresa': 'COMPAÑIA INDUSTRIAL DE CEREALES S.A.',
 'NIT Empresa': '860,501,848 - 9',
 'Dirección Empresa': 'CR 18A 1G 49',
 'Tel Empresa': '6013334354',
 'Cliente': 'NIT',
 'Dirección Cliente': 'CR 18A 1G 49',
 'Ciudad': 'Ciudad',
 'Forma de Pago': 'CREDITO',
 'O.C.': 'Y - 090 - 5233',
 'Fecha Comprobante': '2023-10-26',
 'Fecha Entrega': '2023-10-31'}

In [123]:
df_encabezado = pd.DataFrame([encabezado])

### Escoger Columnas -Ordenar dataframe
1. Cliente
2. Direccion Cliente
3. Destino
4. Numero de Pedido
5. Fecha de Orden
6. Fecha de Entrega
7. Id Falso (Para Cruce de Orden de compra )


#### Columns encabezado

In [124]:
df_encabezado = limpiar_columnas(df_encabezado)

In [125]:
df_encabezado["id"] = 1

In [126]:
#Seleccion de columnas 
df_encabezado = df_encabezado[['empresa', 'direccion_empresa','direccion_cliente' ,  'o_c' ,  'fecha_comprobante'    , 'fecha_entrega' , 'id']]

In [127]:
df_encabezado

Unnamed: 0,empresa,direccion_empresa,direccion_cliente,o_c,fecha_comprobante,fecha_entrega,id
0,COMPAÑIA INDUSTRIAL DE CEREALES S.A.,CR 18A 1G 49,CR 18A 1G 49,Y - 090 - 5233,2023-10-26,2023-10-31,1


In [128]:


df_cocereales_info = df_encabezado.copy()
# Renombrar columnas del encabezado
df_cocereales_info.rename(columns={
    'empresa': 'CLIENTE',
    'direccion_empresa': 'DIRECCION',
    'direccion_cliente': 'DESTINO',
    'o_c': 'NUMERO_DE_PEDIDO',
    'fecha_comprobante': 'FECHA_DE_ORDEN',
    'fecha_entrega': 'FECHA_DE_ENTREGA'
    
    
}, inplace=True)

# Añadir columna de ID al encabezado
df_cocereales_info["id"] = 1


In [129]:
df_cocereales_info

Unnamed: 0,CLIENTE,DIRECCION,DESTINO,NUMERO_DE_PEDIDO,FECHA_DE_ORDEN,FECHA_DE_ENTREGA,id
0,COMPAÑIA INDUSTRIAL DE CEREALES S.A.,CR 18A 1G 49,CR 18A 1G 49,Y - 090 - 5233,2023-10-26,2023-10-31,1


#### Join de Datos 

In [130]:
df_join  = df_cocereales_info.merge(df_cocereales, on='id', how='inner')

# Drop Duplicates 
df_join  = df_join.drop_duplicates()

# Reordenar columnas
df_join = df_join[['id', 'CLIENTE', 'DIRECCION', 'DESTINO', 'NUMERO_DE_PEDIDO', 'FECHA_DE_ORDEN', 'FECHA_DE_ENTREGA',
                   'NUMERO_DE_ITEM', 'DESCRIPCION', 'CANTIDAD', 'UNIDAD_DE_MEDIDA', 'COSTO_UNITARIO', 'IMPORTE']]

In [132]:
df_join

Unnamed: 0,id,CLIENTE,DIRECCION,DESTINO,NUMERO_DE_PEDIDO,FECHA_DE_ORDEN,FECHA_DE_ENTREGA,NUMERO_DE_ITEM,DESCRIPCION,CANTIDAD,UNIDAD_DE_MEDIDA,COSTO_UNITARIO,IMPORTE
0,1,COMPAÑIA INDUSTRIAL DE CEREALES S.A.,CR 18A 1G 49,CR 18A 1G 49,Y - 090 - 5233,2023-10-26,2023-10-31,157,TOCINO EN TROZOS,40.0,KGM,77238.0,3089520.0


In [None]:
#Destino Bogota , se despacha en 250 GRAMOS --> 40 * 4 UNIDADES = 40 (CADA KILO CORRESPONDE A 4 UNIDADES DE 250 GRAMOS), Valor total / 4 