In [6]:
import boto3
import json
from sqlalchemy import create_engine, Column, Integer, Text
from sqlalchemy.orm import sessionmaker, declarative_base
from pgvector.sqlalchemy import Vector


In [3]:
import boto3
bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',
    region_name='us-east-1'
)

In [4]:
def embed_body(chunk_message: str):
    return json.dumps({
        'inputText': chunk_message,
    })

def embed_call(chunk_message: str):
    model_id = "amazon.titan-embed-text-v2:0"
    body = embed_body(chunk_message)

    response = bedrock_runtime.invoke_model(
        body=body,
        modelId=model_id,
        contentType='application/json',
        accept='application/json'
    )
    return json.loads(response['body'].read().decode('utf-8'))


In [7]:
DATABASE_URL = "postgresql://postgres:postgres72861001@sandbox-ia.ccnrq57mco3x.us-east-1.rds.amazonaws.com:5432/clau"
engine = create_engine(DATABASE_URL, connect_args={"connect_timeout": 1200})
Session = sessionmaker(bind=engine)
Base = declarative_base()

In [9]:
class Fragmented(Base):
    __tablename__ = 'fragmented'
    id = Column(Integer, primary_key=True)
    text_content = Column(Text, nullable=False)
    embedding = Column(Vector(1024), nullable=False)  

Base.metadata.create_all(engine)

In [48]:
def insert_fragment(text):
    session = Session()
    embedding = embed_call(text)['embedding']  
    fragment = Fragmented(text_content=text, embedding=embedding)
    session.add(fragment)
    session.commit()
    session.close()

In [13]:
from sqlalchemy.sql import text

def search_similar_fragments(query_text, top_k=3):
    session = Session()
    query_embedding = embed_call(query_text)['embedding']
    embedding_str = "ARRAY[" + ", ".join(map(str, query_embedding)) + "]::vector"
    query = text(f"""
        SELECT id, text_content, cosine_similarity(embedding, {embedding_str}) AS similarity
        FROM fragmented
        ORDER BY similarity DESC
        LIMIT :top_k
    """)

    results = session.execute(query, {"top_k": top_k}).fetchall()
    session.close()
    return results

In [50]:
import fitz

def extraer_texto_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    texto = ""
    for pagina in doc:
        texto += pagina.get_text()  
    return texto


In [51]:
import re
def limpiar_y_unir_lineas(texto):
    texto = re.sub(r'-\n', '', texto) 
    texto = re.sub(r'\s*\n\s*', '\n', texto)  
    return texto


In [52]:
import re
from typing import List, Dict

def limpiar_texto(texto: str) -> str:
    return re.sub(r'\s+', ' ', texto).strip()

def detectar_y_dividir_secciones(texto: str) -> List[Dict[str, str]]:
    patron_principal = r'(\d+\.\s+[A-ZÁÉÍÓÚÑ][^\n]*)'
    matches = list(re.finditer(patron_principal, texto))
    
    secciones = []
    for i, match in enumerate(matches):
        header = limpiar_texto(match.group(1))  
        start = match.end()  
        end = matches[i + 1].start() if i + 1 < len(matches) else len(texto)  
        content = limpiar_texto(texto[start:end]) 
        cadena = header + content
        secciones.append(cadena)  
    
    return secciones



In [53]:
def dividir_seccion(seccion: str, longitud_maxima: int) -> List[str]:
    if len(seccion) <= longitud_maxima:
        return [seccion] 
    
    partes = []
    for i in range(0, len(seccion), longitud_maxima):
        partes.append(seccion[i:i + longitud_maxima])
    return partes

In [54]:
def insertar_chunks_pdf(pdf_path):
    texto = extraer_texto_pdf(pdf_path)
    texto_limpio = limpiar_y_unir_lineas(texto)
    secciones = detectar_y_dividir_secciones(texto_limpio)
    secciones_relevantes = secciones[31:]
    
    longitud_maxima_tokens = 8000
    secciones_finales = []
    for chunk in secciones_relevantes:
        secciones_finales.extend(dividir_seccion(chunk, longitud_maxima_tokens))
    
    for chunk in secciones_finales:
        insert_fragment(chunk)


In [55]:
pdf_path = "tdr_v4.pdf"
    insertar_chunks_pdf(pdf_path)

In [14]:
if __name__ == "__main__":
    query = "El servicio debe permitir establecer conexiones seguras entre sus redes en las instalaciones de la entidad, las oficinas remotas, los dispositivos y la red global del proveedor de nube."
    results = search_similar_fragments(query, top_k=3)

    print("Resultados más relevantes:")
    for result in results:
        print(f"Texto: {result.text_content}")


Resultados más relevantes:
Texto: fuente de manera segura en la nube. b. Debe ofrecer capacidades de control de versiones y seguimiento de cambios en el código. c. El servicio debe ser compatible con protocolos de acceso, para la colaboración en el desarrollo de software. d. Debe proporcionar opciones de seguridad, incluida la autenticación de dos factores y la integración con servicio de autenticación y autorización de proveedor cloud, para controlar el acceso. e. El servicio debe ser capaz de integrarse con herramientas de desarrollo y pipelines de entrega continua. Página 16 de 50 Gerencia Central de Tecnologías de Información y Comunicaciones Servicio de Infraestructura, Plataforma y Microservicios en Nube Pública para el despliegue de las Aplicaciones y Nuevos Servicios de la Gerencia Central de Tecnologías de Información y Comunicaciones de Essalud f. Debe ser compatible con las políticas de seguridad de la organización, incluida la gestión de acceso y permisos. g.
Texto: fabrica