In [None]:
import logging, os
from datetime import datetime

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)

current_date = datetime.now().strftime('%Y-%m-%d')
log_file = os.path.join(log_dir, f'rag-api-{current_date}.log')
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

In [None]:
from typing import Union, Literal, List
import pathlib
from abc import abstractmethod
from pydantic import BaseModel, field_validator

class DocumentProcessorInterface(BaseModel):   
    """
    Provides abstract interface and shared functionality for document of different types
    """
    file_path: pathlib.Path 
    
    # Runs file_path validation before any processing
    @field_validator("file_path")
    def validate_file_path(cls, value: Union[str, pathlib.Path]) -> pathlib.Path:
        if isinstance(value, str):
            return pathlib.Path(value)
        return value

    @property
    def filename(self) -> str:
        return self.file_path.name
    
    @abstractmethod
    def cleanup(self) -> None:
        """Closes documents manually"""
        raise NotImplementedError
    
    @abstractmethod
    def extract_text(self, format: str) -> str:
        raise NotImplementedError    

In [None]:
from enum import Enum

class TextExtractor(Enum):
    FITZ = "fitz"
    OCR = "ocr"


In [None]:
import fitz
from PIL import Image

class PDFDocumentProcessorInterface(DocumentProcessorInterface):
    """
    Provides abstract interface and shared functionality for PDF Documents Processors
    """
    
    fitz_doc: fitz.Document = None
    
    class Config:
        arbitrary_types_allowed = True  # Skip validation for unsupported types like fitz.Document, fitz.Page

    def open_fitz_doc(self) -> None:
        if self.fitz_doc is None:
            self.fitz_doc =fitz.open(self.file_path)

    def cleanup(self) -> None:
        if self.fitz_doc:
            self.fitz_doc.close()

    def get_fitz_page(self, page_number: int) -> fitz.Page:
        if not self.fitz_doc:
            self.open_fitz_doc()
        return self.fitz_doc.load_page(page_number)
    
    def get_page_image(self, page_number: int) -> Image.Image:
        if not self.fitz_doc:
            self.open_fitz_doc()
        page = self.fitz_doc.load_page(page_number)
        pixmap = page.get_pixmap()
        image = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples)       
        return image
    
    def get_text_extractor(self) -> TextExtractor:
        # TODO OCR Implementation
        return TextExtractor.FITZ
        # return TextExtractor.OCR
        
    
    def extract_text(self, format: Literal['raw', 'md']='raw', pagewise: bool=False) -> Union[List[str], str]:
        text_extractor = self.get_text_extractor()
        assert text_extractor == TextExtractor.FITZ, "Only Fitz Text extractor is supported as of now!! Thank you for your patience."
        
        if format=='raw':
            self.open_fitz_doc()
            if pagewise:
                document_text = []
                for page in self.fitz_doc.pages():
                    page_text = page.get_text()
                    document_text.append(page_text)
                return document_text                
            else:
                document_text=""
                for page in self.fitz_doc.pages():
                    page_text = page.get_text()
                    document_text += page_text +"\n\n-<PAGE_BREAK>-\n\n"
                return document_text
        elif format=="md":
            import pymupdf4llm
            md_text = pymupdf4llm.to_markdown(self.file_path)
            return md_text
        else:
            raise ValueError("Only 'raw' and 'md formats are supported as of now!!")
        
        
        
    
        
        
        

In [None]:
input_pdf = "/home/bibekyess/yolo/plain_rag/FastRAG/tests/llama2.pdf"

pdf_processor = PDFDocumentProcessorInterface(file_path=input_pdf)
pdf_processor.extract_text(pagewise=True)


In [None]:
import pymupdf4llm

In [None]:
from llama_index.core.readers.base import BaseReader
from typing import Any, List, Optional, Dict
from pathlib import Path
from llama_index.core.schema import Document


class FitzPDFReader(BaseReader):
    def __init__(
        self, chunk_size: int=512, chunk_stride:int = 256, pagenum: int=-1, 
        split_documents: bool=True, page_wise: bool=False, overlap: bool=True, 
        *args: Any, **kwargs: Any
    ) -> None:
        super().__init__(*args, **kwargs)
        if chunk_size <= 0: chunk_size=256
        if chunk_stride < 0: chunk_stride=chunk_size//2
        if chunk_stride > chunk_size: chunk_stride=chunk_size//2
        self.chunk_size = chunk_size
        self.chunk_stride = chunk_stride
        self.pagenum = pagenum
        self.split_documents = split_documents
        self.page_wise = page_wise
        self.overlap = overlap
        if not self.overlap:
            self.chunk_stride=0


    def load_data(
        self, file: Union[str, Path], extra_info: Optional[Dict] = {}
    ) -> List[Document]:
        """
        Load data and extract document chunks from corresponding file contents.
        Please don't modify the arguments passed here as LlamaIndex's SimpleDirectoryReader expects these arguments only.
        ALIRAG uses SimpleDirectoryReader to process the files/list of files and directory.

        Args:
            file (str): A url or file path pointing to the document
            extra_info (Optional[Dict]): Additional information that might be needed for loading the data, by default None.
                LlamaIndex SimpleDirectoryReader by default provides some basic File metadatas with this parameter
        Returns:
            List[Document]: List of documents.
        """

        def get_sentence_chunks(text, chunk_size, chunk_stride):
            from langchain_text_splitters import RecursiveCharacterTextSplitter
            text_splitter = RecursiveCharacterTextSplitter(
                        chunk_size = chunk_size,
                        chunk_overlap  = chunk_stride,
                        length_function = lambda x: len(x.split()),
                        is_separator_regex = False,
                    )

            texts = text_splitter.create_documents([text])
            return [t.page_content for t in texts]
        
        pdf_processor = PDFDocumentProcessorInterface(file_path=str(file))
        page_wise_texts = pdf_processor.extract_text(pagewise=True)

        results = []
        prev_chunk_preceding_content = None

        entire_text = ""
        document = None
        
        for pagenum, document_chunks in enumerate(page_wise_texts):

            if not self.split_documents:
                entire_text += document_chunks.strip() + "\n\n----\n\n" # FIXME '----' represents page splits
                continue

            if not self.page_wise:
                texts = get_sentence_chunks(text=document_chunks, chunk_size=self.chunk_size, chunk_stride=self.chunk_stride)
            else:
                texts = [document_chunks]  
        
            for idx, text in enumerate(texts):
                if self.pagenum != -1:
                    extra_info["page_label"] = self.pagenum # This is being inquired about the specific page
                else:
                    extra_info["page_label"] = pagenum
                document = Document(
                    text=text, extra_info=extra_info
                )
                results.append(document)

            if prev_chunk_preceding_content is not None and len(texts) > 0:
                current_whole_text = document.text
                current_head_text = get_sentence_chunks(text=current_whole_text, chunk_size=self.chunk_size//2, chunk_stride=0)[0]
                previous_whole_text = prev_chunk_preceding_content.text
                previous_tail_text = get_sentence_chunks(text=previous_whole_text, chunk_size=self.chunk_size//2, chunk_stride=0)[-1]
                overlapping_text = previous_tail_text + current_head_text
                metadata = prev_chunk_preceding_content.metadata
                metadata["overlapping"] = True
                overlapping_document = Document(
                    text=overlapping_text, extra_info=metadata
                )
                results.append(overlapping_document)
            
            if self.overlap and document is not None:
                prev_chunk_preceding_content = document
        
        if not self.split_documents:
            results = [Document(
                text=entire_text, extra_info=extra_info
            )]

        return results
    

In [None]:
pdf_reader = FitzPDFReader(overlap=False)
documents = pdf_reader.load_data(file=input_pdf)
documents

In [None]:
len(documents)

In [None]:
len

### Markdown Splitter

In [None]:
import pymupdf4llm
llama_reader = pymupdf4llm.LlamaMarkdownReader()
llama_docs = llama_reader.load_data("/home/bibekyess/yolo/plain_rag/FastRAG/tests/llama2.pdf")

In [None]:
llama_docs

In [None]:
len(llama_docs)

### Add documents to the vector Index

In [None]:
from llama_index.core.schema import TextNode
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from qdrant_client import QdrantClient


qdrant_url = "http://0.0.0.0:6333"
model_name = "BAAI/bge-m3"
qdrant_client = QdrantClient(url=qdrant_url, timeout=20)


embed_model = HuggingFaceEmbedding(model_name=model_name, max_length=512) # FIXME Changel max_length to consider high memory usage with bge-m3


def initialize_text_node_from_document(doc, embed_model) -> TextNode:
    doc_data: Dict[str, Any] = {
        "id_": doc.id_,
        "embedding": embed_model.get_text_embedding(doc.text),
        "metadata": doc.metadata,
        "excluded_embed_metadata_keys": doc.excluded_embed_metadata_keys,
        "excluded_llm_metadata_keys": doc.excluded_llm_metadata_keys,
        "relationships": doc.relationships,
        "text": doc.text,
        "mimetype": doc.mimetype,
        "start_char_idx": doc.start_char_idx,
        "end_char_idx": doc.end_char_idx,
        "text_template": doc.text_template,
        "metadata_template": doc.metadata_template,
        "metadata_seperator": doc.metadata_seperator,
    }

    text_node = TextNode(**doc_data)
    return text_node

def add_documents_to_index(
    documents: List[Document],
    index_id: str,
):
    """Upload documents to an index"""
    nodes = [initialize_text_node_from_document(d, embed_model) for d in documents]

    # Load the vector store
    qdrant_vector_store = QdrantVectorStore(
        client=qdrant_client,
        collection_name=index_id,
    )

    # Add nodes to vector index
    document_ids = qdrant_vector_store.add(nodes)
    return {"document_ids": document_ids}




In [None]:
len(llama_docs), len(documents)

In [None]:
llama_docs[0].id_

In [None]:
add_documents_to_index(llama_docs, 'delete')