In [None]:
import os
import PyPDF2
from langchain.text_splitter import CharacterTextSplitter
pdf_folder = "/content/drive/My Drive/clase 3 carrera/PROY3/Archivos/"
import pdfplumber
from pdfplumber.table import TableFinder
from langchain.schema import Document

In [None]:
async def load_pdf_by_language(file_path: str) -> list[Document]:
    """
    Carga un PDF separando contenido en valenciano y castellano,
    agrupando todo el contenido por idioma en documentos únicos
    """
    valencia_content = []
    castellano_content = []

    with pdfplumber.open(file_path) as pdf:
        for page_num, page in enumerate(pdf.pages):
            # Configuración para detección de tablas
            table_settings = {
                "vertical_strategy": "lines",
                "horizontal_strategy": "lines",
                "snap_tolerance": 4,
                "join_tolerance": 10
            }

            # Procesar tablas primero
            tf = TableFinder(page, table_settings)
            table_bboxes = []
            width = page.width
            mid_x = width / 2

            for table in tf.tables:
                # Extraer contenido de tabla manejando None
                table_data = []
                for row in table.extract():
                    cleaned_row = [str(cell) if cell is not None else "" for cell in row]
                    table_data.append("|".join(cleaned_row))

                table_content = "\n".join(table_data)
                table_center = (table.bbox[0] + table.bbox[2]) / 2

                # Determinar idioma por posición
                if table_center < mid_x:
                    valencia_content.append(f"Tabla página {page_num+1}:\n{table_content}")
                else:
                    castellano_content.append(f"Tabla página {page_num+1}:\n{table_content}")

                table_bboxes.append(table.bbox)

            # Procesar texto normal excluyendo tablas
            words = page.extract_words()
            non_table_words = [
                word for word in words
                if not any(
                    (word['x0'] >= t[0] and word['x1'] <= t[2] and
                    word['top'] >= t[1] and word['bottom'] <= t[3])
                    for t in table_bboxes
                )
            ]

            # Separar columnas con márgenes
            left_col = []
            right_col = []
            for word in non_table_words:
                word_center = (word['x0'] + word['x1']) / 2
                if word_center < mid_x - 15:
                    left_col.append(word)
                elif word_center > mid_x + 15:
                    right_col.append(word)

            # Construir textos
            def build_text(col_words):
                return " ".join([w['text'] for w in sorted(col_words, key=lambda x: (x['top'], x['x0']))])

            valencia_content.append(build_text(left_col))
            castellano_content.append(build_text(right_col))

    # Crear documentos finales por idioma
    return [
        Document(
            page_content="\n\n".join(valencia_content),
            metadata={"source": file_path, "language": "valencia"}
        ),
        Document(
            page_content="\n\n".join(castellano_content),
            metadata={"source": file_path, "language": "castellano"}
        )
    ]

In [None]:
file_path = "/content/drive/My Drive/clase 3 carrera/PROY3/Archivos/U0957354.pdf"
documents = await load_pdf_by_language(file_path)

# Acceder a los documentos por idioma
doc_valencia = documents[0]
doc_castellano = documents[1]

print(f"Contenido en Valenciano ({len(doc_valencia.page_content)} caracteres):")
print(json.dumps(doc_valencia, indent=4, default=lambda o: o.__dict__))
print(f"\nContenido en Castellano ({len(doc_castellano.page_content)} caracteres):")
print(json.dumps(doc_castellano, indent=4, default=lambda o: o.__dict__))

In [1]:
from rag_functions import RAGSystem
rag = RAGSystem()
print("✅ Sistema RAG inicializado correctamente")

  from .autonotebook import tqdm as notebook_tqdm
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


✅ Sistema RAG inicializado correctamente


In [None]:
import os
import pandas as pd
import pdfplumber
import traceback
from tkinter import Tk, filedialog
from IPython.display import display, FileLink

def seleccionar_archivo():
    """Abre un diálogo para seleccionar archivo PDF"""
    root = Tk()
    root.withdraw()  # Ocultar la ventana principal
    root.wm_attributes('-topmost', 1)  # Mantener la ventana encima de otras
    archivo = filedialog.askopenfilename(
        title="Selecciona un archivo PDF",
        filetypes=[("Archivos PDF", "*.pdf"), ("Todos los archivos", "*.*")]
    )
    return archivo

def es_documento_bilingue(texto):
    """Detecta si el documento es bilingüe castellano/valenciano"""
    lineas = texto.split('\n')
    if len(lineas) < 2:
        return False
    
    palabras_valenciano = {'valencià', 'valenciana', 'nosaltres', 'vosaltres', 'davant', 'ací'}
    palabras_castellano = {'castellano', 'español', 'nosotros', 'vosotros', 'delante', 'aquí'}
    
    muestras = min(20, len(lineas)//2)
    detecciones_val = 0
    detecciones_cast = 0
    
    for i in range(muestras):
        linea_cast = lineas[i*2].lower()
        linea_val = lineas[i*2+1].lower() if i*2+1 < len(lineas) else ""
        
        if any(pal in linea_cast for pal in palabras_castellano):
            detecciones_cast += 1
        if any(pal in linea_val for pal in palabras_valenciano):
            detecciones_val += 1
    
    return detecciones_val > muestras/2 and detecciones_cast > muestras/2

def dividir_tabla_bilingue(tabla):
    """Divide una tabla bilingüe en castellano y valenciano"""
    tabla_cast = []
    tabla_val = []
    
    for fila in tabla:
        fila_cast = []
        fila_val = []
        for celda in fila:
            partes = str(celda).split('\n')
            fila_cast.append(partes[0] if len(partes) > 0 else "")
            fila_val.append(partes[1] if len(partes) > 1 else partes[0] if len(partes) > 0 else "")
        
        tabla_cast.append(fila_cast)
        tabla_val.append(fila_val)
    
    return tabla_cast, tabla_val

def procesar_pdf(pdf_path):
    """Procesa el PDF y devuelve los resultados"""
    try:
        if not pdf_path or not os.path.exists(pdf_path):
            raise FileNotFoundError(f"El archivo {pdf_path} no existe o no se especificó")
        
        if not pdf_path.lower().endswith('.pdf'):
            raise ValueError("El archivo debe ser un PDF (extensión .pdf)")
        
        texto_completo = ""
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                texto = page.extract_text()
                if texto:
                    texto_completo += texto + "\n"
        
        es_bilingue = es_documento_bilingue(texto_completo)
        resultados = {
            "archivo": pdf_path,
            "es_bilingue": es_bilingue,
            "tablas_castellano": [],
            "tablas_valenciano": [],
            "texto_castellano": "",
            "texto_valenciano": "",
            "error": None
        }
        
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                tables = page.extract_tables()
                for table in tables:
                    if len(table) > 1:
                        if es_bilingue:
                            tabla_cast, tabla_val = dividir_tabla_bilingue(table)
                            df_cast = pd.DataFrame(tabla_cast[1:], columns=tabla_cast[0])
                            df_val = pd.DataFrame(tabla_val[1:], columns=tabla_val[0])
                            resultados["tablas_castellano"].append(df_cast)
                            resultados["tablas_valenciano"].append(df_val)
                        else:
                            df = pd.DataFrame(table[1:], columns=table[0])
                            resultados["tablas_castellano"].append(df)
            
            if not resultados["tablas_castellano"]:
                lineas = texto_completo.split('\n')
                if es_bilingue:
                    resultados["texto_castellano"] = "\n".join(lineas[::2])
                    resultados["texto_valenciano"] = "\n".join(lineas[1::2])
                else:
                    resultados["texto_castellano"] = texto_completo
        
        return resultados
        
    except Exception as e:
        return {
            "archivo": pdf_path,
            "error": str(e),
            "traceback": traceback.format_exc()
        }

def guardar_resultados(resultados):
    """Guarda los resultados en el mismo directorio que el PDF"""
    if resultados.get("error"):
        return
    
    directorio = os.path.dirname(resultados["archivo"])
    nombre_base = os.path.splitext(os.path.basename(resultados["archivo"]))[0]
    archivos_creados = []
    
    # Guardar tablas
    for i, tabla in enumerate(resultados["tablas_castellano"], 1):
        archivo_cast = os.path.join(directorio, f"{nombre_base}_tabla_{i}_cast.csv")
        tabla.to_csv(archivo_cast, index=False, encoding='utf-8-sig')
        archivos_creados.append(archivo_cast)
        
        if resultados["es_bilingue"]:
            archivo_val = os.path.join(directorio, f"{nombre_base}_tabla_{i}_val.csv")
            resultados["tablas_valenciano"][i-1].to_csv(archivo_val, index=False, encoding='utf-8-sig')
            archivos_creados.append(archivo_val)
    
    # Guardar texto
    archivo_cast = os.path.join(directorio, f"{nombre_base}_cast.txt")
    with open(archivo_cast, "w", encoding="utf-8") as f:
        f.write(resultados["texto_castellano"])
    archivos_creados.append(archivo_cast)
    
    if resultados["es_bilingue"]:
        archivo_val = os.path.join(directorio, f"{nombre_base}_val.txt")
        with open(archivo_val, "w", encoding="utf-8") as f:
            f.write(resultados["texto_valenciano"])
        archivos_creados.append(archivo_val)
    
    return archivos_creados

# Versión para Jupyter Notebook
def procesar_pdf_interactivo():
    """Versión interactiva para Jupyter Notebook"""
    print("🔍 Selecciona un archivo PDF para procesar")
    pdf_path = seleccionar_archivo()
    
    if not pdf_path:
        print("❌ No se seleccionó ningún archivo")
        return
    
    print(f"\n📄 Procesando archivo: {os.path.basename(pdf_path)}")
    
    resultados = procesar_pdf(pdf_path)
    
    if resultados.get("error"):
        print(f"\n❌ Error al procesar el archivo:")
        print(resultados["error"])
    else:
        archivos_creados = guardar_resultados(resultados)
        
        print("\n✅ Procesamiento completado:")
        print(f"- Documento {'bilingüe' if resultados['es_bilingue'] else 'monolingüe'}")
        
        if resultados["tablas_castellano"]:
            print(f"- {len(resultados['tablas_castellano'])} tablas encontradas")
        else:
            print("- Texto extraído (sin tablas)")
        
        print("\n📂 Archivos creados:")
        for archivo in archivos_creados:
            display(FileLink(archivo, result_html_prefix="📄 "))

# Ejecutar en Jupyter Notebook
procesar_pdf_interactivo()

🔍 Selecciona un archivo PDF para procesar


In [3]:
%pip install database_faiss

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement database_faiss (from versions: none)

[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\ojeem\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip
ERROR: No matching distribution found for database_faiss


In [1]:
# otro_script.py

from poli_gpt import PoliGPT

def main():
    # Inicializar una vez
    poligpt_client = PoliGPT()
    
    # Consultar múltiples veces
    preguntas = [
        "¿Cuántas horas de docencia presencial tiene un crédito?",
        "¿Qué normativa regula el reconocimiento de horas?",
        "¿Existe un máximo de horas no presenciales por crédito?"
    ]
    
    for pregunta in preguntas:
        respuesta = poligpt_client.query_poligpt(pregunta)
        print(f"\nPregunta: {pregunta}")
        print(f"Respuesta: {respuesta}")
        
if __name__ == "__main__":
    main()

  from .autonotebook import tqdm as notebook_tqdm


FAISS inicializado - Vectores: 79044 | Dimensión: 768

Pregunta: ¿Cuántas horas de docencia presencial tiene un crédito?
Respuesta: {'query': '¿Cuántas horas de docencia presencial tiene un crédito?', 'context_used': '39052', 'response': 'No hay información suficiente para determinar el número de horas de docencia presencial que tiene un crédito.', 'contexts': [{'id': '39052', 'content': 'c. Número de horas de formación  realizadas  \nhace dos años. \n \n                                                            \n3 Es comptabilitzen  les hores de formació concedides  d’activitats  voluntàries  pertanyents  al Pla de \nFormació;  les hores certificades  dels cursos d’autoformació  i de proves de certificació  de la Unitat de \nFormació,  i les hores concedides  de les activitats externes,  així com les de formació fetes en el Centre de', 'score': np.float32(0.9164475)}, {'id': '10166', 'content': '- Descripció  de les accions que s’han de desenvolupar  (títol de l’activitat,  quantita