In [1]:
from langchain_docling import DoclingLoader
from langchain_docling.loader import ExportType
def pass_to_markdown(file_path):

    file_name = os.path.basename(file_path)
    
    file_name = os.path.join("data_md", file_name)

    loader = DoclingLoader(
            file_path=file_path,
            export_type=ExportType.MARKDOWN
            )

    docs = loader.load()

    clean_doc = docs[0].page_content.replace("glyph&lt;c=3,font=/CIDFont+F8&gt;", "").replace("glyph&lt;c=3,font=/CIDFont+F5&gt;", "").replace("glyph<c=3,font=/CIDFont+F5>", " ").replace("glyph<c=3,font=/CIDFont+F8>", " ")

    with open(f"{file_name}_md.md", "w", encoding="utf-8") as markdown_file:
        markdown_file.write(clean_doc)
    return f"{file_name}_md.md"

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import json
def clean_docs(file_path):

    def process_row(line):
        cells = [cell.strip() for cell in line.split('|')[1:-1]]
        return '| ' + ' | '.join(cells) + ' |'
    
    # Carregar o JSON com as correções
    with open('miss_words.json', 'r', encoding='utf-8') as f:
        correcoes = json.load(f)["replacements"]

    # Ler o documento original
    with open(file_path, 'r', encoding='utf-8') as f:
        texto = f.readlines()

    texto_corrigido = ''

    in_table = False

    for line in texto:
        for erro, correcao in correcoes.items():
            line = line.replace(erro, correcao)

        stripped_line = line.strip()


        if stripped_line.startswith('|') and stripped_line.endswith('|'):
            if not in_table:
                in_table = True
            processed_line = process_row(stripped_line)
            texto_corrigido += processed_line + "\n"
        else:
            texto_corrigido += stripped_line + "\n"
            if in_table:
                in_table = False
        

    file_name = os.path.basename(file_path)
    
    file_name = os.path.join("data_md_correct", file_name)

    # Salvar o documento corrigido
    with open(f"{file_name}_correct.md", 'w', encoding='utf-8') as f:
        f.write(texto_corrigido)
    return file_name

In [3]:
def extract_markdown_tables(file_path):
    def process_row(line):
        cells = [cell.strip() for cell in line.split('|')[1:-1]]  
        return '| ' + ' | '.join(cells) + ' |'  # Reconstroi a linha com 1 espaço entre texto e "|"
    
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    
    tables = []
    current_table = []
    in_table = False
    
    for line in lines:
        stripped_line = line.strip()
        # Verifica se é uma linha de tabela
        if stripped_line.startswith('|') and stripped_line.endswith('|'):
            if not in_table:
                in_table = True
            processed_line = process_row(stripped_line)
            current_table.append(processed_line)
        else:
            if in_table:
                if current_table:
                    tables.append(current_table)
                    current_table = []
                in_table = False
    
    # Adiciona a última tabela se o arquivo terminar com uma tabela
    if in_table and current_table:
        tables.append(current_table)
    
    return tables


In [4]:
import os

for i, file in enumerate(os.listdir('data')):
    
    start_file_path = 'data/'+file
    print(start_file_path)

    markdown_path = pass_to_markdown(start_file_path)
    print(markdown_path)

    end_file_path = clean_docs(markdown_path)
    print(end_file_path)


data/citros.pdf
data_md/citros.pdf_md.md
data_md_correct/citros.pdf_md.md


In [7]:
#Chunking
from typing import List, Tuple, Optional
import re

class MarkdownHeaderRecursiveSplitter:
    def __init__(
        self,
        chunk_size: int = 5000,
        chunk_overlap: int = 400,
        separators: List[str] = None,
        header_pattern: str = r"^#{1,6}\s.+"
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.header_regex = re.compile(header_pattern, re.MULTILINE)
        self.separators = separators or ["\n\n", "\n", " ", ""]
        
    def split_text(self, text: str) -> List[str]:
        """Splits markdown text into chunks with header context"""
        header_sections = self._split_by_headers(text)
        chunks = []
        
        for header, content in header_sections:
            if not content:
                continue
                
            
            section_chunks = self._recursive_split(
                text=content,
                separators=self.separators,
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap
            )
            
            
            for chunk in section_chunks:
                if header:
                    chunks.append(f"{header}\n{chunk}")
                else:
                    chunks.append(chunk)
        
        return chunks

    def _split_by_headers(self, text: str) -> List[Tuple[Optional[str], str]]:
        """Splits text into (header, content) sections"""
        lines = text.split('\n')
        sections = []
        current_header = None
        current_content = []

        for line in lines:
            if self.header_regex.match(line):
                if current_header or current_content:
                    sections.append((current_header, '\n'.join(current_content)))
                    current_content = []
                current_header = line
            else:
                current_content.append(line)
                
        if current_header or current_content:
            sections.append((current_header, '\n'.join(current_content)))
            
        return sections

    def _recursive_split(
        self,
        text: str,
        separators: List[str],
        chunk_size: int,
        chunk_overlap: int
    ) -> List[str]:
        """Recursively splits text using separators"""
        chunks = []
        remaining = text
        
        while len(remaining) > chunk_size:
            split_pos = -1
            for sep in separators:
                pos = remaining.find(sep, chunk_size - chunk_overlap)
                if pos != -1 and pos < chunk_size + chunk_overlap:
                    split_pos = pos + len(sep)
                    break
                    
            if split_pos == -1:
                split_pos = chunk_size
                
            chunk = remaining[:split_pos]
            chunks.append(chunk)
            remaining = remaining[split_pos - chunk_overlap:]
            
        if remaining:
            chunks.append(remaining)
            
        return chunks


splitter = MarkdownHeaderRecursiveSplitter(separators=["\n\n", "\n", ".", " "])
with open('data_md_correct/citros.pdf_md.md_correct.md', 'r') as f:
    text = f.read()
    chunks = splitter.split_text(text)

print(len(chunks))
for chunk in chunks:
    print(len(chunk))

11
168
906
897
581
3164
1427
4824
4832
5206
4790
1692


In [None]:
#Load data in Qdrant

from qdrant_client import QdrantClient, models
import os
from sentence_transformers import SentenceTransformer
from typing import Literal, List, Optional
from pydantic import BaseModel

# client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

client = QdrantClient(url="http://localhost:6333")

client

class QdrantCollection(BaseModel):
    client: QdrantClient
    name: str
    model_name: str
    distance: Literal["Cosine", "Euclid", "Dot"]
    model: SentenceTransformer = None  

    def __init__(self, **data):
        super().__init__(**data)
        # Initialize the model and store it as an instance attribute
        self.model = SentenceTransformer(self.model_name)


    def create(self):
      if not self.client.collection_exists(collection_name=self.name):
          self.client.create_collection(
              collection_name=self.name,
              vectors_config=models.VectorParams(
                  size=self.model.get_sentence_embedding_dimension(),
                  distance=self.distance
              )
          )
          print("Collection created")
      else:
          print("Collection already exists")

    def add_points(self, docs, metadata):
      embeddings = self.model.encode(docs)

      self.client.upsert(
          collection_name=self.name,
          points=[
        models.PointStruct(
            id=client.count(collection_name=self.name).count+idx,
            vector=emb,
            payload=metadata[idx]
        )
          for idx, emb in enumerate(embeddings)
          ]
      )
      print("Data added")

    def delete_collection(self):
      self.client.delete_collection(collection_name=self.name)
      print("Collection deleted")

    class Config:
        arbitrary_types_allowed = True # Allow arbitrary types like QdrantClient

collections = QdrantCollection(client=client, name="test_splitter", model_name="BAAI/bge-m3", distance="Cosine")
collections.delete_collection()
collections.create()

for idx, chunks in enumerate(chunks):
    print("="*200)
    collections.add_points(chunks, metadata=[{"content": f"{chunk}"} for chunk in chunks])