# Basic Ingestion
El proposito de este notebook es establecers lo paso que voy a necesitar, las funciones a nivel atómico para lograr expresar los componentes del ETL.

In [1]:
def add_src_to_path(path_folder: str):
    ''' 
    Helper function for adding the "path_folder" directory to the path.
    in order to work on notebooks and scripts
    '''
    import sys
    import importlib.util
    from pathlib import Path

    base_path = Path().resolve()
    for parent in [base_path] + list(base_path.parents):
        candidate = parent / path_folder  # <-- fix: use string, not set
        if candidate.exists():
            if str(candidate) not in sys.path:
                sys.path.append(str(candidate))
                print(f"Path Folder {path_folder} added: {candidate}")
            return
    print(f"Not found '{path_folder}' folder on the hierarchy of directories")

add_src_to_path(path_folder="data")
add_src_to_path(path_folder="utils")

Path Folder data added: D:\projects\Ungraph\src\data
Path Folder utils added: D:\projects\Ungraph\src\utils


In [2]:
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.document_loaders import UnstructuredMarkdownLoader, UnstructuredWordDocumentLoader
from langchain_community.document_loaders import TextLoader
from langchain_docling import DoclingLoader
import logging
import unicodedata
from pathlib import Path
import re
import pandas as pd
from typing import List

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from handlers import find_in_project
from IPython.display import display_markdown, Markdown
import uuid

In [4]:
data_path = find_in_project(
    target = "data",
    search_type = "folder",
    project_root = None)

data_path

2025-12-23 19:09:18,717 - INFO - Encontrado: D:\projects\Ungraph\src\data


WindowsPath('D:/projects/Ungraph/src/data')

In [5]:
# Set up logging configuration
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

logger = logging.getLogger("Basic Structured Ingestion")


In [6]:
txt_path = data_path / "AnnyLetter.txt"
txt_path

WindowsPath('D:/projects/Ungraph/src/data/AnnyLetter.txt')

In [7]:
md_path = data_path  /"110225.md"
md_path

WindowsPath('D:/projects/Ungraph/src/data/110225.md')

## Pipeline


1. Cargar la data, txt, markdown, docs, pfds

- Detectar el encoding
- Convertir a Markdown


2. Seccionarla en sus debidos objetos Document Object que servirán para procesar el archivo.

In [8]:
def clean_text(text, allowed_characters=None, replacement=" ", remove_accents=True):
    """
    Cleans a string of text by removing unwanted characters.
    """
    logger.info("Starting text cleaning.")
    if remove_accents:
        # Remove accents from the text
        text = unicodedata.normalize("NFD", text)
        text = text.encode("ascii", "ignore").decode("utf-8")

    if allowed_characters is None:
        allowed_characters = "a-zA-Z0-9áéíóúÁÉÍÓÚñÑ.,;:!?'\"()\\[\\]{}<>\\-\\_@#%&/\\s"

    # Create a pattern for disallowed characters
    pattern = f"[^{allowed_characters}]"

    # Replace disallowed characters
    cleaned_text = re.sub(pattern, replacement, text)

    # Remove duplicate spaces
    cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip()

    logger.info("Text cleaning completed.")
    return cleaned_text

In [9]:
def detect_encoding(file_path: Path, sample_size: int = 10000) -> str:
    """
    Detecta automáticamente la codificación de un archivo de texto.
    
    Parameters:
    -----------
    file_path : Path
        Ruta al archivo a analizar
    sample_size : int, optional
        Tamaño de la muestra a leer para detectar la codificación (default: 10000 bytes)
    
    Returns:
    --------
    str
        Nombre de la codificación detectada
    """
    # Lista de codificaciones comunes a probar como fallback
    fallback_encodings = ['utf-8', 'windows-1252', 'latin-1', 'iso-8859-1', 'cp1252']
    
    # Intentar usar chardet si está disponible
    try:
        import chardet
        with open(file_path, 'rb') as f:
            sample = f.read(sample_size)
            result = chardet.detect(sample)
            if result['encoding'] and result['confidence'] > 0.7:
                detected_encoding = result['encoding'].lower()
                logger.info(f"Codificación detectada por chardet: {detected_encoding} (confianza: {result['confidence']:.2%})")
                return detected_encoding
    except ImportError:
        logger.debug("chardet no está instalado, usando método de fallback")
    except Exception as e:
        logger.warning(f"Error al detectar codificación con chardet: {e}, usando método de fallback")
    
    # Método de fallback: probar codificaciones comunes
    logger.info("Intentando detectar codificación mediante prueba de lectura...")
    for enc in fallback_encodings:
        try:
            with open(file_path, 'r', encoding=enc) as f:
                f.read(sample_size)
            logger.info(f"Codificación detectada por prueba: {enc}")
            return enc
        except (UnicodeDecodeError, UnicodeError):
            continue
        except Exception:
            continue
    
    # Si ninguna funciona, retornar utf-8 como default
    logger.warning("No se pudo detectar la codificación, usando utf-8 como default")
    return 'utf-8'


In [10]:
def convert_file_to_markdown(text, llm_extractor = None):
    '''
    El propósito de esta función es ser un conversor para pasar de .txt a Markdown, buscando estructura en el documento.
    El resultado es un archivo .md con la estructura del documento original,
    y debe preservar el texto y sus palabras de manera exacta, sin ningún cambio,
    edición ni corrección sobre el contenido proporcionado.
    '''
    from langchain_core.output_parsers import StrOutputParser
    if llm_extractor is None:
        llm_extractor = ChatOllama(model="llama3.2:3b", base_url="http://127.0.0.1:11434", temperature=0.0)
    else:
        llm_extractor = llm_extractor

    # Definir el prompt garantizando estricto respeto y preservación textual
    prompt = ChatPromptTemplate.from_messages([
        (
            "system",
            """As a markdown writer, you are given a text.
    Your task is to write a markdown document based on the text.
    The document should be in the following format:

    - Title:
    - Body:
    - Footer:
    - Entities:

    You MUST STRICTLY preserve every word, spelling, accent and punctuation exactly as given in the input text. 
    DO NOT correct, modify or edit the original words under any circumstance, not even to fix grammar, accents or spelling.
    Do not summarize nor paraphrase. Absolutely all sentences, words and their structure must remain as in the provided input.

    Identify entities when possible (such as names, places, dates, etc.), but the main text content must remain unchanged and true to the input.
    Respect the above format and do not alter the integrity of the original wording.
    """,
        ),
        ("human", "{text}")
    ])

    llm_writter = prompt | llm_extractor | StrOutputParser()
    response = llm_writter.invoke({"text": text})
    return response


In [11]:
if 1 == 0:
    response = convert_file_to_markdown("""Pa, si fuera a decir que en 5 años tiene algo hecho, una sola cosa, ¿Qué sería esa única cosa?" Y si bien pensé primero en que esperaba que mi hijo fuera independiente en 5 años, pensándolo en una segunda mano estaría bien que viviéramos un poco más de tiempo juntos. Siento que me falta vida por compartirle, y que quiero hacer más cosas con mi hijo.""")
    Markdown(response)

In [12]:
def load_markdown_file(file_path: Path, clean: bool = True) -> List[Document]:
    """
    Carga un archivo Markdown (.md) usando LangChain UnstructuredMarkdownLoader.
    
    Parameters:
    -----------
    file_path : Path
        Ruta al archivo .md a cargar
    clean : bool, optional
        Si True, aplica limpieza de texto usando la función clean_text (default: True)
    
    Returns:
    --------
    List[Document]
        Lista de objetos Document de LangChain con el contenido del archivo
    """
    logger.info(f"Cargando archivo Markdown: {file_path}")
    
    try:
        # Verificar que el archivo existe
        if not file_path.exists():
            raise FileNotFoundError(f"El archivo no existe: {file_path}")
        
        # Verificar extensión
        if file_path.suffix.lower() not in ['.md', '.markdown']:
            logger.warning(f"El archivo no tiene extensión .md/.markdown: {file_path.suffix}")
        
        # Cargar el archivo usando UnstructuredMarkdownLoader
        loader = UnstructuredMarkdownLoader(str(file_path))
        documents = loader.load()
        
        # Aplicar limpieza si está habilitada
        if clean:
            logger.info("Aplicando limpieza de texto a los documentos")
            for doc in documents:
                doc.page_content = clean_text(doc.page_content)
        
        # Añadir metadatos adicionales
        for doc in documents:
            doc.metadata.update({
                'file_type': 'markdown',
                'filename': file_path.name,
                'file_path': str(file_path)
            })
        
        logger.info(f"Archivo cargado exitosamente. Documentos generados: {len(documents)}")
        return documents
        
    except Exception as e:
        logger.error(f"Error al cargar archivo Markdown {file_path}: {e}")
        raise


In [13]:
documents = load_markdown_file(md_path)
documents

2025-12-23 19:09:18,824 - INFO - Cargando archivo Markdown: D:\projects\Ungraph\src\data\110225.md
2025-12-23 19:09:21,192 - INFO - Aplicando limpieza de texto a los documentos
2025-12-23 19:09:21,193 - INFO - Starting text cleaning.
2025-12-23 19:09:21,196 - INFO - Text cleaning completed.
2025-12-23 19:09:21,198 - INFO - Archivo cargado exitosamente. Documentos generados: 1


[Document(metadata={'source': 'D:\\projects\\Ungraph\\src\\data\\110225.md', 'file_type': 'markdown', 'filename': '110225.md', 'file_path': 'D:\\projects\\Ungraph\\src\\data\\110225.md'}, page_content='Incluso es lo primero que se abrio, quiza lo que me falta en este momento es la palabra, y necesite encontrarme en ella para retomar mi camino, hoy mi hijo me hace pregunta ingenua pero solemne, como si supiera que debia de poner en mi lobulo frontal. "Pa, si fuera a decir que en 5 anos tiene algo hecho, una sola cosa, Que seria esa unica cosa?" Y si bien pense primero en que esperaba que mi hijo fuera independiente en 5 anos, pensandolo en una segunda mano estaria bien que vivieramos un poco mas de tiempo juntos. Siento que me falta vida por compartirle, y que quiero hacer mas cosas con mi hijo. Hoy estoy escribiendo en el silencio de mi habitacion mirando el cielo, y confirmo que a veces es el mismo cielo en todas partes, pero atardece diferente, anoche distinto. Me relato estas cosas 

### Splitting

In [14]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

In [15]:
# Para documentos narrativos/personales (como tu diario):
chunk_size = 1000 # caracteres
chunk_overlap = 200 # ~20% del chunk_size

# Para documentos técnicos/estructurados:
#chunk_size = 1500-2000
#chunk_overlap = 300-400

# Para documentos con estructura markdown (con headers):
#chunk_size = 2000-3000
#chunk_overlap = 400-600

In [16]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, 
chunk_overlap=chunk_overlap)

chunks = []
for doc in documents:
    texts = text_splitter.split_documents([doc])
    chunks.append(texts)

chunks

[[Document(metadata={'source': 'D:\\projects\\Ungraph\\src\\data\\110225.md', 'file_type': 'markdown', 'filename': '110225.md', 'file_path': 'D:\\projects\\Ungraph\\src\\data\\110225.md'}, page_content='Incluso es lo primero que se abrio, quiza lo que me falta en este momento es la palabra, y necesite encontrarme en ella para retomar mi camino, hoy mi hijo me hace pregunta ingenua pero solemne, como si supiera que debia de poner en mi lobulo frontal. "Pa, si fuera a decir que en 5 anos tiene algo hecho, una sola cosa, Que seria esa unica cosa?" Y si bien pense primero en que esperaba que mi hijo fuera independiente en 5 anos, pensandolo en una segunda mano estaria bien que vivieramos un poco mas de tiempo juntos. Siento que me falta vida por compartirle, y que quiero hacer mas cosas con mi hijo. Hoy estoy escribiendo en el silencio de mi habitacion mirando el cielo, y confirmo que a veces es el mismo cielo en todas partes, pero atardece diferente, anoche distinto. Me relato estas cosas

In [17]:
chunks[0]

[Document(metadata={'source': 'D:\\projects\\Ungraph\\src\\data\\110225.md', 'file_type': 'markdown', 'filename': '110225.md', 'file_path': 'D:\\projects\\Ungraph\\src\\data\\110225.md'}, page_content='Incluso es lo primero que se abrio, quiza lo que me falta en este momento es la palabra, y necesite encontrarme en ella para retomar mi camino, hoy mi hijo me hace pregunta ingenua pero solemne, como si supiera que debia de poner en mi lobulo frontal. "Pa, si fuera a decir que en 5 anos tiene algo hecho, una sola cosa, Que seria esa unica cosa?" Y si bien pense primero en que esperaba que mi hijo fuera independiente en 5 anos, pensandolo en una segunda mano estaria bien que vivieramos un poco mas de tiempo juntos. Siento que me falta vida por compartirle, y que quiero hacer mas cosas con mi hijo. Hoy estoy escribiendo en el silencio de mi habitacion mirando el cielo, y confirmo que a veces es el mismo cielo en todas partes, pero atardece diferente, anoche distinto. Me relato estas cosas 

In [18]:
len(chunks)

1

In [19]:
def load_doc_file(file_path: Path, clean: bool = True) -> List[Document]:
    """
    Carga un archivo Word (.doc, .docx) usando LangChain UnstructuredWordDocumentLoader.
    
    Parameters:
    -----------
    file_path : Path
        Ruta al archivo .doc o .docx a cargar
    clean : bool, optional
        Si True, aplica limpieza de texto usando la función clean_text (default: True)
    
    Returns:
    --------
    List[Document]
        Lista de objetos Document de LangChain con el contenido del archivo
    """
    logger.info(f"Cargando archivo Word: {file_path}")
    
    try:
        # Verificar que el archivo existe
        if not file_path.exists():
            raise FileNotFoundError(f"El archivo no existe: {file_path}")
        
        # Verificar extensión
        if file_path.suffix.lower() not in ['.doc', '.docx']:
            logger.warning(f"El archivo no tiene extensión .doc/.docx: {file_path.suffix}")
        
        # Cargar el archivo usando UnstructuredWordDocumentLoader
        loader = UnstructuredWordDocumentLoader(str(file_path))
        documents = loader.load()
        
        # Aplicar limpieza si está habilitada
        if clean:
            logger.info("Aplicando limpieza de texto a los documentos")
            for doc in documents:
                doc.page_content = clean_text(doc.page_content)
        
        # Añadir metadatos adicionales
        for doc in documents:
            doc.metadata.update({
                'file_type': 'word',
                'filename': file_path.name,
                'file_path': str(file_path)
            })
        
        logger.info(f"Archivo cargado exitosamente. Documentos generados: {len(documents)}")
        return documents
        
    except Exception as e:
        logger.error(f"Error al cargar archivo Word {file_path}: {e}")
        raise


In [None]:
# Función load_txt_file actualizada con detección automática de codificación
def load_txt_file(file_path: Path, encoding: str = None, clean: bool = True, auto_detect: bool = True) -> List[Document]:
    """
    Carga un archivo de texto (.txt) usando LangChain TextLoader.
    Incluye detección automática de codificación y fallback a codificaciones comunes.
    
    Parameters:
    -----------
    file_path : Path
        Ruta al archivo .txt a cargar
    encoding : str, optional
        Codificación del archivo. Si es None y auto_detect=True, se detecta automáticamente.
        Si se especifica, se usa esa codificación directamente (default: None)
    clean : bool, optional
        Si True, aplica limpieza de texto usando la función clean_text (default: True)
    auto_detect : bool, optional
        Si True, detecta automáticamente la codificación si no se especifica (default: True)
    
    Returns:
    --------
    List[Document]
        Lista de objetos Document de LangChain con el contenido del archivo
    """
    logger.info(f"Cargando archivo de texto: {file_path}")
    
    try:
        # Verificar que el archivo existe
        if not file_path.exists():
            raise FileNotFoundError(f"El archivo no existe: {file_path}")
        
        # Verificar extensión
        if file_path.suffix.lower() != '.txt':
            logger.warning(f"El archivo no tiene extensión .txt: {file_path.suffix}")
        
        # Determinar la codificación a usar
        if encoding is None and auto_detect:
            encoding = detect_encoding(file_path)
            logger.info(f"Usando codificación detectada: {encoding}")
        elif encoding is None:
            encoding = "utf-8"
            logger.info(f"Usando codificación por defecto: {encoding}")
        
        # Lista de codificaciones de fallback si la principal falla
        fallback_encodings = ['windows-1252', 'latin-1', 'iso-8859-1', 'cp1252']
        
        # Intentar cargar con la codificación especificada
        last_error = None
        try:
            loader = TextLoader(str(file_path), encoding=encoding)
            documents = loader.load()
        except (UnicodeDecodeError, RuntimeError) as e:
            last_error = e
            logger.warning(f"Error al cargar con codificación {encoding}: {e}")
            
            # Si auto_detect está activado y falló, intentar con fallback
            if auto_detect:
                logger.info("Intentando con codificaciones de fallback...")
                for fallback_enc in fallback_encodings:
                    if fallback_enc == encoding:
                        continue
                    try:
                        logger.info(f"Intentando con codificación: {fallback_enc}")
                        loader = TextLoader(str(file_path), encoding=fallback_enc)
                        documents = loader.load()
                        encoding = fallback_enc  # Actualizar la codificación usada
                        logger.info(f"Archivo cargado exitosamente con codificación: {encoding}")
                        break
                    except (UnicodeDecodeError, RuntimeError):
                        continue
                else:
                    # Si todas las codificaciones fallan, lanzar el último error
                    raise RuntimeError(f"No se pudo cargar el archivo con ninguna codificación. Último error: {last_error}") from last_error
            else:
                raise
        
        # Aplicar limpieza si está habilitada
        if clean:
            logger.info("Aplicando limpieza de texto a los documentos")
            for doc in documents:
                doc.page_content = clean_text(doc.page_content)
        
        # Añadir metadatos adicionales
        for doc in documents:
            doc.metadata.update({
                'file_type': 'txt',
                'filename': file_path.name,
                'file_path': str(file_path),
                'encoding': encoding
            })
        
        logger.info(f"Archivo cargado exitosamente. Documentos generados: {len(documents)}")
        return documents
        
    except Exception as e:
        logger.error(f"Error al cargar archivo de texto {file_path}: {e}")
        raise


#documents =load_txt_file(data_path / "AnnyLetter.txt", encoding="utf-8", clean=True)

#documents 
#txt_docs = load_txt_file(data_path / "AnnyLetter.txt", clean=True)

In [21]:
def document_to_dataframe(documents: List[Document], path_to_persist: str) -> pd.DataFrame:
    '''
    This function converts a list of Document objects into a pandas DataFrame 
    and saves the DataFrame to the specified path with a name that includes the filename
    of the first document.

    Parameters:
    documents (List[Document]): List of Document objects to be converted.
    path_to_persist (str): Path where the resulting DataFrame will be saved.

    Returns:
    pd.DataFrame: The resulting DataFrame containing the documents' data.
    '''
    # Prepare the data for the DataFrame
    data = []
    for doc in documents:
        data.append({
            'page_content': doc.page_content,
            'chunk_id': doc.metadata.get('chunk_id', None),
            'filename': doc.metadata.get('filename', None),
            'page_number': doc.metadata.get('page_number', None),
            'embeddings': doc.metadata.get('embeddings', None),
            'is_unitary': doc.metadata.get('is_unitary', None),
            'embeddings_dimensions': doc.metadata.get('embeddings_dimensions', None),
            'embedding_encoder_info': doc.metadata.get('embedding_encoder_info', None)
        })
    
    # Create the DataFrame
    df = pd.DataFrame(data)
    
    # Extract the filename of the first document
    if documents:
        filename = documents[0].metadata.get('filename', 'default')
    else:
        filename = 'default'
    
    # Construct the full path with the filename included
    full_path = f"{path_to_persist}/document_data_{filename}.csv"
    
    # Save the DataFrame to the specified path
    df.to_csv(full_path, index=False, escapechar='\\')

    return df



In [22]:
document_to_dataframe(chunks[0], path_to_persist=".")

Unnamed: 0,page_content,chunk_id,filename,page_number,embeddings,is_unitary,embeddings_dimensions,embedding_encoder_info
0,"Incluso es lo primero que se abrio, quiza lo q...",,110225.md,,,,,
1,quiero narrar como estoy desarrollando la visi...,,110225.md,,,,,
2,cobran sentido en la contradiccion de su natur...,,110225.md,,,,,
3,de verdad el otro dia. La mayoria de mis plane...,,110225.md,,,,,
4,"ser fundamental para fortalecer, crear, y derr...",,110225.md,,,,,
5,y testeo de aplicaciones desarrolladas fullsta...,,110225.md,,,,,
6,"enfocado, de descanso real, remover el ocio de...",,110225.md,,,,,
7,"literatura y venideros, pasando tiempo con mi ...",,110225.md,,,,,
8,conectarse a Neo4j local para que cualquier ID...,,110225.md,,,,,


In [23]:

def dataframe_to_documents(df: pd.DataFrame) -> List[Document]:
    ''' 
    La idea de esta función es añadir metadatos al objeto de Langchain que estamos creando.
    Para que sea compatible, el atributo 'source' va necesitar existir. La otra es heradarlo de la clase,
    sobreescribirlo y añadirle lo que queremos persistir del objeto.

    
    
    '''
    documents = []
    for index, row in df.iterrows():
        metadata = {
            'chunk_id': row['chunk_id'],
            'filename': row['filename'],
            'page_number': row['page_number'],
            'embeddings': row['embeddings'],
            'is_unitary': row['is_unitary'],
            'embeddings_dimensions': row['embeddings_dimensions'],
            'embedding_encoder_info': row['embedding_encoder_info']
        }
        document = Document(page_content=row['page_content'], metadata=metadata)
        documents.append(document)
    return documents


### Embedding

In [24]:
def get_embedding_encoder(model: str):
    """
    Retrieves the embedding encoder based on the model specified.
    """
    logger.info("Getting embedding encoder.")
    from langchain_community.embeddings import HuggingFaceEmbeddings
    import torch
    
    if model == "llama":
        logger.info(f"model selected for embedding -> {model}.")
        return OllamaEmbeddings(
            model="llama3.2:3b",
            base_url="http://0.0.0.0:11434",
            embed_instruction="passage: ",
            temperature=0.0,
            num_gpu=1,
            num_thread=8,
        )
    if model == "hf":
        logger.info(f"model selected for embedding -> {model}.")

        model_name = "sentence-transformers/all-MiniLM-L6-v2"
        
        # Detectar automáticamente si CUDA está disponible
        if torch.cuda.is_available():
            device = 'cuda'
            logger.info("CUDA disponible, usando GPU para embeddings.")
        else:
            device = 'cpu'
            logger.info("CUDA no disponible, usando CPU para embeddings.")
        
        model_kwargs = {'device': device}
        encode_kwargs = {'normalize_embeddings': False}
        return HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
    else:
        raise ValueError("The function needs a valid embedding model ('openai' or 'hf') to encode data")


In [25]:
embedding_encoder =  get_embedding_encoder("hf")
embedding_encoder

2025-12-23 19:09:21,352 - INFO - Getting embedding encoder.
2025-12-23 19:09:21,359 - INFO - model selected for embedding -> hf.
2025-12-23 19:09:21,360 - INFO - CUDA no disponible, usando CPU para embeddings.
  return HuggingFaceEmbeddings(
2025-12-23 19:09:21,362 - INFO - Load pretrained SentenceTransformer: sentence-transformers/all-MiniLM-L6-v2


HuggingFaceEmbeddings(client=SentenceTransformer(
  (0): Transformer({'max_seq_length': 256, 'do_lower_case': False, 'architecture': 'BertModel'})
  (1): Pooling({'word_embedding_dimension': 384, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})
  (2): Normalize()
), model_name='sentence-transformers/all-MiniLM-L6-v2', cache_folder=None, model_kwargs={'device': 'cpu'}, encode_kwargs={'normalize_embeddings': False}, multi_process=False, show_progress=False)

In [26]:
def create_document_object(documents: List[Document], embedding_encoder=None):
    """
    Crea objetos Document con embeddings a partir de una lista de documentos (chunks).
    Añade embeddings y metadatos necesarios para el procesamiento posterior.
    
    Parameters:
    -----------
    documents : List[Document]
        Una lista de objetos Document (chunks) que ya han sido divididos del documento original.
    embedding_encoder : object, optional
        Un objeto encoder usado para generar embeddings de texto. Si es None, no se generan embeddings.

    Returns:
    --------
    List[Document]
        Una lista de objetos Document con embeddings y metadatos añadidos.
    """
    logger.info("Creating document objects with embeddings.")
    document_objects = []

    if embedding_encoder is None:
        logger.warning("Embedding encoder not provided. Documents will be created without embeddings.")

    for doc in documents:
        try:
            # Generar embeddings si se proporciona un encoder
            text_embeddings = None
            if embedding_encoder:
                text_embeddings = embedding_encoder.embed_query(doc.page_content)
            
            # Crear o actualizar metadatos
            document_metadata = doc.metadata.copy() if doc.metadata else {}
            
            # Añadir o actualizar metadatos específicos
            document_metadata.update({
                "chunk_id": document_metadata.get("chunk_id", f"{document_metadata.get('filename', 'unknown')}_{uuid.uuid4()}"),
                "embeddings": str(text_embeddings) if text_embeddings else "",
                "embeddings_dimensions": len(text_embeddings) if text_embeddings else None,
                "embedding_encoder_info": str(embedding_encoder) if embedding_encoder else "",
            })
            
            # Crear el documento con el contenido y metadatos actualizados
            document = Document(page_content=doc.page_content, metadata=document_metadata)
            document_objects.append(document)
            
        except Exception as e:
            logger.error(f"Error creating document object: {e}")
            raise e

    logger.info(f"Document objects creation completed. Total documents: {len(document_objects)}")
    return document_objects



In [27]:
documents_with_embeddings = create_document_object(
    documents=chunks[0],  # Lista de chunks del documento
    embedding_encoder=embedding_encoder
)

2025-12-23 19:09:24,110 - INFO - Creating document objects with embeddings.
2025-12-23 19:09:24,599 - INFO - Document objects creation completed. Total documents: 9


In [28]:
# Convertir documentos con embeddings a DataFrame y guardar
df = document_to_dataframe(
    documents=documents_with_embeddings,  # Documentos CON embeddings
    path_to_persist="."
)

In [29]:
df

Unnamed: 0,page_content,chunk_id,filename,page_number,embeddings,is_unitary,embeddings_dimensions,embedding_encoder_info
0,"Incluso es lo primero que se abrio, quiza lo q...",110225.md_3da12fed-7e00-4ec6-bd0f-e8b889f37981,110225.md,,"[0.03818148747086525, 0.04162796959280968, 0.0...",,384,client=SentenceTransformer(\n (0): Transforme...
1,quiero narrar como estoy desarrollando la visi...,110225.md_2386e056-324b-470a-97f6-0b5a18a31d34,110225.md,,"[0.04164035990834236, 0.01807655394077301, -0....",,384,client=SentenceTransformer(\n (0): Transforme...
2,cobran sentido en la contradiccion de su natur...,110225.md_4436e309-c4d1-4229-b46a-4e7abe088efa,110225.md,,"[0.023161746561527252, 0.090914785861969, -0.0...",,384,client=SentenceTransformer(\n (0): Transforme...
3,de verdad el otro dia. La mayoria de mis plane...,110225.md_448a5c94-3ec0-45ac-a94a-ebd92037a163,110225.md,,"[0.02458001859486103, 0.0350651890039444, 0.00...",,384,client=SentenceTransformer(\n (0): Transforme...
4,"ser fundamental para fortalecer, crear, y derr...",110225.md_0b500597-bef4-43d7-9d6e-850c56a716b4,110225.md,,"[0.0082578519359231, 0.023625241592526436, 0.0...",,384,client=SentenceTransformer(\n (0): Transforme...
5,y testeo de aplicaciones desarrolladas fullsta...,110225.md_bef32d9b-a179-413b-800e-3bc4ca48b353,110225.md,,"[-0.02488844469189644, -0.02288469299674034, -...",,384,client=SentenceTransformer(\n (0): Transforme...
6,"enfocado, de descanso real, remover el ocio de...",110225.md_b0cd5a3b-4741-42eb-bdd9-d992678250a0,110225.md,,"[-0.009872610680758953, 0.016194215044379234, ...",,384,client=SentenceTransformer(\n (0): Transforme...
7,"literatura y venideros, pasando tiempo con mi ...",110225.md_9fa6b460-24aa-420c-a091-6e118908d590,110225.md,,"[-0.0019361716695129871, 0.008947310037910938,...",,384,client=SentenceTransformer(\n (0): Transforme...
8,conectarse a Neo4j local para que cualquier ID...,110225.md_b6b0a0b3-3122-43b5-b68b-d3e8a66de499,110225.md,,"[0.06550847738981247, 0.0676582008600235, -0.0...",,384,client=SentenceTransformer(\n (0): Transforme...


## Load To Grap

Aqui es donde creamos el proceso de carga de datos al grafo y los diferentes patrones.

In [30]:
import os
import ast
from neo4j import GraphDatabase
from neo4j.exceptions import ClientError
from dotenv import load_dotenv, find_dotenv


load_dotenv(find_dotenv())

True

In [31]:

# DB Connection
def graph_session() -> GraphDatabase:
    '''
    Creates and returns a connection session to the Neo4j database.

    This function uses the environment variables NEO4J_URI and NEO4J_PASSWORD to authenticate
    the connection. If either of these variables is not set, a ValueError is raised.

    Returns:
        GraphDatabase: A Neo4j database driver that allows performing operations
        on the database.

    Raises:
        ValueError: If NEO4J_URI or NEO4J_PASSWORD are not set in the environment variables.
        Exception: If an error occurs while trying to create the database session.
    '''
    URI = os.environ.get("NEO4J_URI")
    PASSWORD = os.environ.get("NEO4J_PASSWORD")

    if not URI or not PASSWORD:
        raise ValueError("NEO4J_URI and NEO4J_PASSWORD must be set in environment variables.")
    AUTH = ("neo4j", PASSWORD)

    try:
        # Retornar el driver sin usar 'with' para que el llamador controle el ciclo de vida
        driver = GraphDatabase.driver(URI, auth=AUTH)
        return driver
    except Exception as e:
        print(f"Failed to create a graph session: {e}")
        raise


In [32]:
driver = graph_session()
driver

<neo4j._sync.driver.Neo4jDriver at 0x16b51814f20>

In [33]:
# Configuración de índices avanzados, para la búsqueda por contenido y por vector.
def setup_advanced_indexes(session):
    """Configuración de índices avanzados"""
    try:
        # Índice vectorial mejorado
        vector_index_query = """
        CALL db.index.vector.createNodeIndex(
            'chunk_embeddings',           // nombre del índice
            'Chunk',                      // label del nodo
            'embeddings',                 // propiedad que contiene el vector
            384,                          // dimensiones del vector
            'cosine'                      // similitud por coseno
        )
        """
        
        # Índice de texto completo mejorado
        fulltext_index_query = """
        CREATE FULLTEXT INDEX chunk_content IF NOT EXISTS
        FOR (c:Chunk)
        ON EACH [c.page_content]
        OPTIONS {
            indexConfig: {
                `fulltext.analyzer`: 'spanish',
                `fulltext.eventually_consistent`: false
            }
        }
        """
        # Índice regular para búsquedas por chunk_id_consecutive
        regular_index_query = """
        CREATE INDEX chunk_consecutive_idx IF NOT EXISTS
        FOR (c:Chunk)
        ON (c.chunk_id_consecutive)
        """
        

        try:
            session.execute_write(lambda tx: tx.run(regular_index_query))
            print("Regular index created successfully")
        except Exception as e:
            print(f"Regular index creation message: {e}")
            
        try:
            session.execute_write(lambda tx: tx.run(vector_index_query))
            print("Vector index created successfully")
        except Exception as e:
            if "An equivalent index already exists" not in str(e):
                raise e
            print("Vector index already exists")

        try:
            session.execute_write(lambda tx: tx.run(fulltext_index_query))
            print("Full-text index created successfully")
        except Exception as e:
            print(f"Full-text index creation message: {e}")

    except Exception as e:
        print(f"Error in index setup: {e}")

In [34]:
with driver.session() as session:
    setup_advanced_indexes(session)

2025-12-23 19:09:24,878 - INFO - Received notification from DBMS server: {severity: INFORMATION} {code: Neo.ClientNotification.Schema.IndexOrConstraintAlreadyExists} {category: SCHEMA} {title: `CREATE RANGE INDEX chunk_consecutive_idx IF NOT EXISTS FOR (e:Chunk) ON (e.chunk_id_consecutive)` has no effect.} {description: `RANGE INDEX chunk_consecutive_idx FOR (e:Chunk) ON (e.chunk_id_consecutive)` already exists.} {position: None} for query: '\n        CREATE INDEX chunk_consecutive_idx IF NOT EXISTS\n        FOR (c:Chunk)\n        ON (c.chunk_id_consecutive)\n        '
2025-12-23 19:09:24,902 - INFO - Received notification from DBMS server: {severity: INFORMATION} {code: Neo.ClientNotification.Schema.IndexOrConstraintAlreadyExists} {category: SCHEMA} {title: `CREATE FULLTEXT INDEX chunk_content IF NOT EXISTS FOR (e:Chunk) ON EACH [e.page_content] OPTIONS {indexConfig: {`fulltext.analyzer`: "spanish", `fulltext.eventually_consistent`: false}}` has no effect.} {description: `FULLTEXT IND

Regular index created successfully
Vector index already exists
Full-text index created successfully


In [35]:
## PROCESAMIENTO DE DOCUMENT DATA OBJECT A GRAFO.
# Función para extraer estructura de documento
def extract_document_structure(tx, 
                               filename, 
                               page_number, 
                               chunk_id, 
                               page_content, 
                               is_unitary, 
                               embeddings, 
                               embeddings_dimensions, 
                               embedding_encoder_info,
                               chunk_id_consecutive):
    
    ''' 
    TODO:

    CREAR EL FUNCIONAMIENTO DE DOD PARA QUE SIRVA CON LO QUE SE LEE EN EL DCUMENTO DE DOCLING.
    
    '''
    try:
        query = """
                MERGE (f:File {filename: $filename})
                ON CREATE SET f.createdAt = timestamp()

                MERGE (p:Page {filename: $filename, page_number: toInteger($page_number)})

                MERGE (c:Chunk {chunk_id: $chunk_id})
                ON CREATE SET c.page_content = $page_content,
                              c.is_unitary = $is_unitary,
                              c.embeddings = $embeddings, 
                              c.embeddings_dimensions = toInteger($embeddings_dimensions),
                              c.embedding_encoder_info = $embedding_encoder_info,
                              c.chunk_id_consecutive = toInteger($chunk_id_consecutive)

                MERGE (f)-[:CONTAINS]->(p)
                MERGE (p)-[:HAS_CHUNK]->(c)

            """
        result = tx.run(query, 
                        filename=filename, 
                        page_number=page_number,
                        chunk_id=chunk_id,
                        page_content=page_content,
                        is_unitary=is_unitary,
                        embeddings=embeddings,
                        embeddings_dimensions=embeddings_dimensions,
                        embedding_encoder_info=embedding_encoder_info,
                        chunk_id_consecutive=chunk_id_consecutive)
        return result
    except ClientError as e:
        logger.error("Database error", exc_info=True)
        tx.rollback()
        raise

In [36]:
# Creo las relaciones entre chunks consecutivos.
def create_chunk_relationships(session):
    """Crear relaciones NEXT_CHUNK entre chunks consecutivos"""
    join_chunks_query = """
    MATCH (c1:Chunk),(c2:Chunk)
    WHERE c1.chunk_id_consecutive + 1 = c2.chunk_id_consecutive
    MERGE (c1)-[:NEXT_CHUNK]->(c2)
    """
    try:
        session.execute_write(lambda tx: tx.run(join_chunks_query))
        print("Chunk relationships created successfully")
    except Exception as e:
        print(f"Error creating chunk relationships: {e}")




In [37]:

#  Valido que el DataFrame tenga la estructura correcta.
def validate_dataframe(df, expected_dim=384):
    """Validar que el DataFrame tenga la estructura correcta"""
    import ast
    
    required_columns = [
        'filename', 'page_number', 'chunk_id', 'page_content',
        'is_unitary', 'embeddings', 'embeddings_dimensions',
        'embedding_encoder_info', 'chunk_id_consecutive'
    ]
    
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")
    
    # Parsear embeddings si son strings
    def parse_embedding(emb):
        if isinstance(emb, str):
            try:
                return ast.literal_eval(emb)
            except (ValueError, SyntaxError):
                return []
        return emb if emb is not None else []
    
    # Validar dimensiones de embeddings
    # Aqui podemos poner las demas dimensiones de los modelo, y ligarlos a la columna de encoder_info.
    parsed_embeddings = df['embeddings'].apply(parse_embedding)
    if not all(len(emb) == expected_dim for emb in parsed_embeddings if emb):
        # Mostrar qué embeddings tienen dimensiones incorrectas
        wrong_dims = [(i, len(emb)) for i, emb in enumerate(parsed_embeddings) if emb and len(emb) != expected_dim]
        raise ValueError(f"All embeddings must have {expected_dim} dimensions. Found: {wrong_dims[:5]}")
    
    # Validar que chunk_id_consecutive sea secuencial
    expected_range = range(1, len(df) + 1)
    if not all(df['chunk_id_consecutive'].values == list(expected_range)):
        raise ValueError("chunk_id_consecutive must be sequential starting from 1")
    
    return True


In [38]:

# Tratamiento de columnas para añadir secuencialidad y tipo de dato
def colummn_pretreatment(df):
    # Se les da el formato necesario, esto lo peudo llevar al momento en que se escribe la data en el modulo de ingestión
    df["embeddings"] = df["embeddings"].apply(ast.literal_eval)
    df['chunk_id_consecutive'] = range(1, len(df) + 1)
    return df

In [39]:
# Función para centralizar  el proceso de ingestión de datos al grafo.
def process_with_neo4j(df, batch_size=100, target_database="neo4j"):
    ''' 
    Función que busca:
    1. Configurar los índices en la base de datos.
    2. Validar la idoneidad del dataframe
    3. Procesar en lotes los chunks del texto.
    3.1 Cada lote procesarlo con el query que facilita extraer la extructura del texto.
    4. Una vez creado, populamos con relaciones consecutivas.
    
    '''
    import pandas as pd
    
    driver = graph_session()  # No usar 'with' aquí, solo obtener el driver
    
    try:
        with driver.session(database=target_database) as session:
            # Configurar índices
            setup_advanced_indexes(session)
            
            # Validar datos
            if validate_dataframe(df):
                # Procesar chunks en lotes
                for i in range(0, len(df), batch_size):
                    batch = df.iloc[i:i + batch_size]
                    print(f"Processing batch {i//batch_size + 1} of {(len(df)-1)//batch_size + 1}")
                    
                    batch.apply(
                        lambda row: session.execute_write(
                            extract_document_structure,
                            row['filename'],
                            row['page_number'] if pd.notna(row['page_number']) else 1,
                            row['chunk_id'],
                            row['page_content'],
                            row['is_unitary'] if pd.notna(row['is_unitary']) else False,
                            row['embeddings'],
                            row['embeddings_dimensions'] if pd.notna(row['embeddings_dimensions']) else 384,
                            row['embedding_encoder_info'] if pd.notna(row['embedding_encoder_info']) else 'unknown',
                            row['chunk_id_consecutive']
                        ),
                        axis=1
                    )
                
                # Crear relaciones entre chunks consecutivos
                create_chunk_relationships(session)
            else:
                print("Data validation failed. Exiting.")
    finally:
        driver.close()  # Cerrar el driver al final





In [40]:
# 3. Preparar datos faltantes antes de procesar
import ast
import pandas as pd

# Parsear embeddings de string a lista si es necesario
if df['embeddings'].dtype == 'object':
    df['embeddings'] = df['embeddings'].apply(
        lambda x: ast.literal_eval(x) if isinstance(x, str) and x.startswith('[') else x
    )

# Asegurar que chunk_id_consecutive esté correctamente numerado
if 'chunk_id_consecutive' not in df.columns or df['chunk_id_consecutive'].isna().any():
    df['chunk_id_consecutive'] = range(1, len(df) + 1)

# Llenar valores faltantes
df['page_number'] = df['page_number'].fillna(1)
df['is_unitary'] = df['is_unitary'].fillna(False)
df['embeddings_dimensions'] = df['embeddings_dimensions'].fillna(384)
df['embedding_encoder_info'] = df['embedding_encoder_info'].fillna('unknown')

print(f"DataFrame preparado: {len(df)} filas")
print(f"Embeddings parseados: {isinstance(df['embeddings'].iloc[0], list) if len(df) > 0 else 'N/A'}")


DataFrame preparado: 9 filas
Embeddings parseados: True


  df['page_number'] = df['page_number'].fillna(1)
  df['is_unitary'] = df['is_unitary'].fillna(False)


In [41]:
# Procesar el DataFrame en Neo4j
# Nota: process_with_neo4j() maneja la conexión internamente
process_with_neo4j(
    df=df,
    batch_size=100,  # Procesar en lotes de 100 chunks
    target_database="neo4j"
)

2025-12-23 19:09:25,017 - INFO - Received notification from DBMS server: {severity: INFORMATION} {code: Neo.ClientNotification.Schema.IndexOrConstraintAlreadyExists} {category: SCHEMA} {title: `CREATE RANGE INDEX chunk_consecutive_idx IF NOT EXISTS FOR (e:Chunk) ON (e.chunk_id_consecutive)` has no effect.} {description: `RANGE INDEX chunk_consecutive_idx FOR (e:Chunk) ON (e.chunk_id_consecutive)` already exists.} {position: None} for query: '\n        CREATE INDEX chunk_consecutive_idx IF NOT EXISTS\n        FOR (c:Chunk)\n        ON (c.chunk_id_consecutive)\n        '
2025-12-23 19:09:25,046 - INFO - Received notification from DBMS server: {severity: INFORMATION} {code: Neo.ClientNotification.Schema.IndexOrConstraintAlreadyExists} {category: SCHEMA} {title: `CREATE FULLTEXT INDEX chunk_content IF NOT EXISTS FOR (e:Chunk) ON EACH [e.page_content] OPTIONS {indexConfig: {`fulltext.analyzer`: "spanish", `fulltext.eventually_consistent`: false}}` has no effect.} {description: `FULLTEXT IND

Regular index created successfully
Vector index already exists
Full-text index created successfully
Processing batch 1 of 1


2025-12-23 19:09:29,706 - INFO - Received notification from DBMS server: {severity: INFORMATION} {code: Neo.ClientNotification.Statement.CartesianProduct} {category: PERFORMANCE} {title: This query builds a cartesian product between disconnected patterns.} {description: If a part of a query contains multiple disconnected patterns, this will build a cartesian product between all those parts. This may produce a large amount of data and slow down query processing. While occasionally intended, it may often be possible to reformulate the query that avoids the use of this cross product, perhaps by adding a relationship between the different parts or by using OPTIONAL MATCH (identifier is: (c2))} {position: line: 2, column: 5, offset: 5} for query: '\n    MATCH (c1:Chunk),(c2:Chunk)\n    WHERE c1.chunk_id_consecutive + 1 = c2.chunk_id_consecutive\n    MERGE (c1)-[:NEXT_CHUNK]->(c2)\n    '


Chunk relationships created successfully
