In [28]:
from langchain_core.documents import Document as BaseDocument
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from typing import Any, Dict, Literal, List, Union
from langchain_chroma import Chroma
from langchain.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever

from langchain.prompts import ChatPromptTemplate
from langchain_community.llms.ollama import Ollama

from dotenv import load_dotenv
import os
import sys
import time
import json
import logging


logging.basicConfig(
    level=logging.DEBUG,  # Set the logging level (DEBUG, INFO, etc.)
    format='%(asctime)s - %(levelname)s - %(message)s',  # Log format
    handlers=[
        logging.FileHandler("output_log.txt"),  # Log to file
        #logging.StreamHandler()  # Log to console
    ]
)



In [None]:
load_dotenv()
llmsherpa_path = os.getenv('PATH_TO_LLMSHERPA')

if llmsherpa_path:
    sys.path.insert(0, llmsherpa_path)
    print(f"Added {llmsherpa_path} to sys.path")
else:
    print("Environment variable PATH_TO_LLMSHERPA is not set.")

folder_path = "../data"  # Path to your folder containing PDFs
CHROMA_PATH = os.getenv("CHROMA_PATH")

In [3]:
# LOAD PARSER
from llmsherpa.readers import LayoutPDFReader
parser_api_url = "http://localhost:5010/api/parseDocument?renderFormat=all" 
pdf_reader = LayoutPDFReader(parser_api_url)

In [4]:
class Document(BaseDocument):
    """Class for storing a piece of text and associated metadata.
    
    Example:
        .. code-block:: python
            document = Document(
                page_content="Hello, world!",
                metadata={"source": "https://example.com", "page_idx": 1, "block_idx": 0, "tag": "greeting"}
            )
    """

    page_content: str
    """String text."""
    type: Literal["Document"] = "Document"

    def __init__(self, page_content: str, metadata: Dict[str, Any] = None) -> None:
        """Initialize a Document instance.
        
        Args:
            page_content (str): The content of the document.
            metadata (dict): A dictionary containing metadata about the document.
        """
        if metadata is None:
            metadata = {}
        super().__init__(page_content=page_content, metadata=metadata)  # Call the base class initializer

    @classmethod
    def is_lc_serializable(cls) -> bool:
        """Return whether this class is serializable."""
        return True

    @classmethod
    def get_lc_namespace(cls) -> List[str]:
        """Get the namespace of the langchain object."""
        return ["langchain", "schema", "document"]

    def format_metadata(self) -> Dict[str, str]:
        """Format the metadata of the document.

        Returns:
            dict: A dictionary containing formatted metadata with predefined attributes.
        """
        attributes = ['source', 'page_idx', 'block_idx', 'tag', 'type']
        formatted_metadata = {key: self.metadata.get(key, 'unknown') for key in attributes}
        return formatted_metadata
    
    @property
    def source(self) -> str:
        """Get the source of the document."""
        return self.metadata.get('source', 'unknown')

    @property
    def page_idx(self) -> str:
        """Get the page index of the document."""
        return self.metadata.get('page_idx', 'unknown')

    @property
    def block_idx(self) -> str:
        """Get the block index of the document."""
        return self.metadata.get('block_idx', 'unknown')

    @property
    def tag(self) -> str:
        """Get the tag of the document."""
        return self.metadata.get('tag', 'unknown')

    @property
    def type(self) -> str:
        """Get the type of the document."""
        return self.metadata.get('type', 'unknown')

    def __repr__(self) -> str:
        """Return a string representation of the Document instance."""
        metadata_str = ', '.join(f"{key}={value}" for key, value in self.format_metadata().items())
        return f"Document({metadata_str}, content={self.page_content})"

    def __str__(self) -> str:
        """Override __str__ to restrict it to page_content and metadata."""
        if self.metadata:
            return f"page_content='{self.page_content}' metadata={self.metadata}"
        else:
            return f"page_content='{self.page_content}'"

In [5]:
def process_pdf(file_path: str, pdf_reader: LayoutPDFReader) -> List[Document]:
    """
    Process a PDF file and extract its content into Document instances.

    Args:
        file_path (str): The path to the PDF file.
        pdf_reader (LayoutPDFReader): An instance of LayoutPDFReader to read the PDF.

    Returns:
        List[Document]: A list of Document instances representing the content of the PDF.
    """
    # Read PDF content
    doc = pdf_reader.read_pdf(file_path)

    documents = []
    for chunk in doc.chunks():
        # Extract metadata using chunk attributes
        metadata = {
            'source': os.path.basename(file_path),
            'page_idx': chunk.page_idx,
            'block_idx': chunk.block_idx,
            'tag': chunk.tag,
            'type': type(chunk).__name__  # Get the type of the chunk (e.g., 'Paragraph', 'Table', etc.)
        }
        # Extract text content from chunk
        page_content = chunk.to_text()

        # Create a Document instance
        document = Document(metadata=metadata, page_content=page_content)
        documents.append(document)

    return documents

In [6]:
def process_folder(folder_path: str, pdf_reader: LayoutPDFReader) -> List[Document]:
    """
    Process all PDF files in a folder and extract their content into Document instances.

    Args:
        folder_path (str): The path to the folder containing PDF files.
        pdf_reader (LayoutPDFReader): An instance of LayoutPDFReader to read the PDFs.

    Returns:
        List[Document]: A list of Document instances representing the content of all PDFs in the folder.
    """
    all_documents = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.pdf'):
            file_path = os.path.join(folder_path, file_name)
            print(f"Processing file: {file_name}")
            try:
                # Process each PDF file and extend the documents list
                documents = process_pdf(file_path, pdf_reader)
                all_documents.extend(documents)
            except Exception as e:
                print(f"Error processing file {file_name}: {e}")

    return all_documents

In [7]:
class DocumentChunker:
    """
    A class to handle the chunking of documents.

    Attributes:
        model (SentenceTransformer): The sentence transformer model used for embeddings.
        splitter (RecursiveCharacterTextSplitter): The text splitter instance.
    """

    def __init__(self, model_name: str, chunk_size: int, chunk_overlap: int):
        """
        Initialize a DocumentChunker instance.

        Args:
            model_name (str): The name of the model for the tokenizer.
            chunk_size (int): The maximum size of each chunk.
            chunk_overlap (int): The number of overlapping tokens between chunks.
        """
        self.model = SentenceTransformer(model_name)
        self.splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
            tokenizer=self.model.tokenizer,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

    def chunk_documents(self, documents: List[Document]) -> List[Document]:
        """
        Chunk a list of Document instances.

        Args:
            documents (List[Document]): The list of Document instances to be chunked.

        Returns:
            List[Document]: A list of Document instances representing the chunks.
        """
        try:
            chunks = self.splitter.split_documents(documents)
            return chunks
        except Exception as e:
            print(f"Error during chunking: {e}")
            return []

In [8]:
def calculate_chunk_ids(chunks: List[Document]) -> List[Document]:
    """
    Calculate unique IDs for each chunk based on source, page number, and chunk index.

    Args:
        chunks (List[Document]): A list of Document instances.

    Returns:
        List[Document]: The updated list of Document instances with calculated IDs.
    """
    # This will create IDs like "data/monopoly.pdf:6:2"
    # Page Source : Page Number : Chunk Index

    last_page_id = None
    current_chunk_index = 0

    for chunk in chunks:
        source = chunk.metadata.get("source", "unknown")
        page = chunk.metadata.get("page_idx", "unknown")  # Use 'page_idx' based on your Document definition
        current_page_id = f"{source}:{page}"

        # If the page ID is the same as the last one, increment the index.
        if current_page_id == last_page_id:
            current_chunk_index += 1
        else:
            current_chunk_index = 0

        # Calculate the chunk ID.
        chunk_id = f"{current_page_id}:{current_chunk_index}"
        last_page_id = current_page_id

        # Add it to the chunk metadata.
        chunk.metadata["id"] = chunk_id

    return chunks

In [9]:
class TransformerEmbeddingFunction:
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def __call__(self, input: Union[str, List[str]]) -> List[List[float]]:
        if isinstance(input, str):
            input = [input]
        embeddings = self.model.encode(input, show_progress_bar=False)
        return embeddings.tolist()

    def embed_documents(self, documents: List[str]) -> List[List[float]]:
        return self(documents)

    def embed_query(self, query: str) -> List[float]:
        return self([query])[0]

In [10]:
def add_to_chroma(chunks: List[Document], CHROMA_PATH: str) -> None:
    """
    Add new document chunks to the Chroma database if they don't already exist.

    Args:
        chunks (List[Document]): A list of Document instances to be added.
        CHROMA_PATH (str): The path to the Chroma database.
        hf: The embedding function for Hugging Face embeddings.
    """
    # Load the existing database with Hugging Face embeddings
    try:
        embedding_function = TransformerEmbeddingFunction()
        db = Chroma(
            persist_directory=CHROMA_PATH,
            embedding_function=embedding_function
        )

        # Calculate Page IDs.
        chunks_with_ids = calculate_chunk_ids(chunks)

        # Add or Update the documents.
        existing_items = db.get(include=[])  # IDs are always included by default
        existing_ids = set(existing_items["ids"])
        print(f"Number of existing doc segments in DB: {len(existing_ids)}")

        # Only add documents that don't exist in the DB.
        new_chunks = [chunk for chunk in chunks_with_ids if chunk.metadata["id"] not in existing_ids]

        if new_chunks:
            print(f"👉 Adding new doc segments: {len(new_chunks)}")
            new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks]
            db.add_documents(new_chunks, ids=new_chunk_ids)
            print(f"✅ Successfully added {len(new_chunks)} new doc segments.")
        else:
            print("✅ No new doc segments to add")

    except Exception as e:
        print(f"❌ Error while adding to Chroma: {e}")

In [11]:
all_docs = process_folder(folder_path, pdf_reader)
chunker = DocumentChunker(model_name='sentence-transformers/all-MiniLM-L6-v2', chunk_size=1024, chunk_overlap=128)
chunks = chunker.chunk_documents(all_docs)
add_to_chroma(chunks, CHROMA_PATH)

Processing file: ShowcaseUserGuide.pdf
Processing file: access_019-access-management-system-user-guide-v4-0.pdf
Processing file: nihms-1769170.pdf
Processing file: Bookshelf_NBK5295.pdf




Number of existing doc segments in DB: 8793
✅ No new doc segments to add


RETRIEVAL

In [12]:
def load_documents(pdf_directory: str, parser_api_url: str) -> List[BaseDocument]:
    """
    Load documents from a directory of PDF files and convert them into Document instances.

    Args:
        pdf_directory (str): The directory containing the PDF files.

    Returns:
        List[Document]: A list of Document instances created from the PDF files.
    """
    documents = []
    pdf_files = [f for f in os.listdir(pdf_directory) if f.endswith('.pdf')]

    for pdf_file in pdf_files:
        file_path = os.path.join(pdf_directory, pdf_file)
        # Process the PDF file and extract content into Document instances
        doc_instances = process_pdf(file_path, pdf_reader=LayoutPDFReader(parser_api_url=parser_api_url))
        documents.extend(doc_instances)

    return documents

In [29]:
class ChromaRetrieval:
    def __init__(self, CHROMA_PATH: str, documents: List[BaseDocument], num_queries: int):
        self.embedding_function = TransformerEmbeddingFunction()
        self.db = Chroma(
            persist_directory=CHROMA_PATH,
            embedding_function=self.embedding_function
        )
        
        # Load documents into the vector database
        self.db.add_documents(documents)
        
        # Set up the retrievers
        self.vector_retriever = self.db.as_retriever(search_type="similarity", search_kwargs={"k": num_queries})
        self.keyword_retriever = BM25Retriever.from_texts([doc.page_content for doc in documents])

        self.keyword_retriever.k = num_queries  # Set the number of top documents to return

    def retrieve(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:

        # Initialize the ensemble retriever
        ensemble_retriever = EnsembleRetriever(
            retrievers=[self.keyword_retriever, self.vector_retriever],
            weights=[0.8, 0.2]  # Adjust weights as necessary
        )

        # Use the ensemble retriever to get results
        ensemble_results = ensemble_retriever.invoke(query)

        # Combine results with additional data
        combined_results = []
        for result in ensemble_results:
            chunk_data = {
                "score": result.metadata.get("score", 0),  # Adjust this as needed
                "document": result,
                "id": result.metadata.get("id", "unknown"),
                "source": result.metadata.get("source", "unknown"),
                "type": "combined"  # You can label it as "combined" or something else
            }
            combined_results.append(chunk_data)

        # Sort combined results by score if necessary
        combined_results.sort(key=lambda x: x['score'], reverse=True)

        return combined_results[:top_k] 

    def generate_response(self, query_text: str, llm_model_name: str, prompt_template: str, top_k: int = 5) -> str:
        try:
            # Retrieve results using the query
            results = self.retrieve(query_text, top_k=top_k)
            logging.debug(f"Retrieved results: {results}")
            
            # Combine the results into a single context text
            context_text = "\n\n---\n\n".join([doc['document'].page_content for doc in results])
            logging.debug(f"Combined context text: {context_text}")

            # Create the prompt using the provided template
            prompt_template = ChatPromptTemplate.from_template(prompt_template)
            prompt = prompt_template.format(context=context_text, question=query_text)
            logging.debug(f"Formatted prompt: {prompt}")
            

            # Measure the time taken to generate the response
            start_time = time.time()
            model = Ollama(model=llm_model_name, format="json")
            end_time = time.time()
            elapsed_time = end_time - start_time
            logging.info(f"{llm_model_name} model initialization time: {elapsed_time:.2f} seconds")


            inference_start_time = time.time()
            response_text = model.invoke(prompt)
            inference_end_time = time.time()
            elapsed_time = inference_end_time - inference_start_time
            logging.info(f"{llm_model_name} inference time: {elapsed_time:.2f} seconds")


            # Parse the response if necessary
            try:
                response_json = json.loads(response_text)
                logging.debug(f"Parsed response: {response_json}")
            except json.JSONDecodeError as e:
                logging.error(f"Failed to parse response as JSON: {e}")
                response_json = {"error": "Failed to parse response"}
            
            return response_text #might not need load_json but good to check
        
        except Exception as e:
            logging.error(f"Error generating response: {e}")
            return str(e)

In [30]:
# Load documents from the specified directory
documents = load_documents(folder_path, parser_api_url)

chroma_retrieval = ChromaRetrieval(
    CHROMA_PATH=CHROMA_PATH,
    documents=documents,
    num_queries=5)

2024-09-24 19:53:51,116 - DEBUG - Starting new HTTP connection (1): localhost:5010
2024-09-24 19:53:52,023 - DEBUG - http://localhost:5010 "POST /api/parseDocument?renderFormat=all HTTP/11" 200 0
2024-09-24 19:53:52,028 - DEBUG - Starting new HTTP connection (1): localhost:5010
2024-09-24 19:53:53,528 - DEBUG - http://localhost:5010 "POST /api/parseDocument?renderFormat=all HTTP/11" 200 0
2024-09-24 19:53:53,535 - DEBUG - Starting new HTTP connection (1): localhost:5010
2024-09-24 19:53:55,301 - DEBUG - http://localhost:5010 "POST /api/parseDocument?renderFormat=all HTTP/11" 200 0
2024-09-24 19:53:55,309 - DEBUG - Starting new HTTP connection (1): localhost:5010
2024-09-24 19:53:58,692 - DEBUG - http://localhost:5010 "POST /api/parseDocument?renderFormat=all HTTP/11" 200 0
2024-09-24 19:53:58,707 - INFO - Use pytorch device_name: mps
2024-09-24 19:53:58,709 - INFO - Load pretrained SentenceTransformer: sentence-transformers/all-MiniLM-L6-v2
2024-09-24 19:53:58,834 - DEBUG - https://hug

GENERATE RESPONSES

In [31]:
# Define query and prompt template
prompt_template = """
    Context: 
    {context}

    Question: 
    {question}

    Instructions:
    - Use the provided context to answer the question.
    - Be as thorough and precise as possible.
    - If the context does not provide enough information to answer the question, state that clearly.
    - If relevant, also add organism information

    Example JSON output:
    {{
        "question": "What is the recommended input mass of stool for the DNA extraction protocol?",
        "organism": "Human"
        "context": "Keeping the frozen stool sample on dry ice as much as possible (to maintain sample integrity), place the sample tube in a tube rack and use a biopsy punch to distribute a 150-mg aliquot of stool into a 2-ml microcentrifuge tube.",
        "answer": "150 mg"
    }}

    Answer only in valid JSON format:
"""

In [32]:
# Generate response to a test question
query_text = "What is the recommended input mass of stool for the DNA extraction protocol?"
response_text = chroma_retrieval.generate_response(
    query_text=query_text,
    llm_model_name="llama3.1",
    prompt_template=prompt_template
)

print(response_text)

2024-09-24 19:54:42,969 - DEBUG - Retrieved results: [{'score': 0, 'document': Document(page_content='Experimental design Sample lysis and contaminant digestion (Steps 1–7)—We recommend using samples that have been divided into aliquots and frozen as soon after collection as possible, because extended time at room temperature before freezing can lead to altered microbial abundance, as some taxa may die while others continue to divide17,18.\nWhen dividing stool samples into aliquots for DNA extraction, we prefer to use biopsy punches with plungers, which can precisely produce frozen stool aliquots and limit freeze-thaw cycles.\nCare should be taken to avoid injury when using these sharp tools, and we recommend placing the sample tube in a rack rather than holding the tube by hand during the punching procedure.\nWhen preparing aliquots, one should consider the biomass of the sample.\nIf a stool sample has a lower biomass and is more watery in consistency, a greater total mass is recommen

{'question': 'What is the recommended input mass of stool for the DNA extraction protocol?', 'organism': 'Human', 'context': 'When preparing aliquots, one should consider the biomass of the sample. If a stool sample has a lower biomass and is more watery in consistency, a greater total mass is recommended for extraction input.', 'answer': 'A greater total mass (no specific amount mentioned) for samples with lower biomass and are more watery in consistency'}


In [17]:
# Generate response to a test question
query_text = "Which enzymes are used in the enzymatic lysis step?"
response_text = chroma_retrieval.generate_response(
    query_text=query_text,
    llm_model_name="llama3.1",
    prompt_template=prompt_template
)
print(response_text)

llama3.1 response generation time: 122.15 seconds
{
    "question": "Which enzymes are used in the enzymatic lysis step?",
    "context": "In general, we recommend enzymatic lysis using a combination of lytic enzyme solution (Qiagen 158928) and MetaPolyzyme (Millipore Sigma MAC4L-5MG) for effective lysis of a range of microbes with minimal shearing.",
    "answer": {
        "lyticase and chitinase": "disrupt glucan and chitin in cell walls",
        "lysozyme, mutanolysin and lysostaphin": "disrupt linkages in peptidoglycans",
        "achromopeptidase": "a lysyl endopeptidase that is effective in lysing Gram-positive bacteria"
    }
}


- User the  Docker Image of Ollama improve the inference time by utilizing GPU (https://ollama.com/blog/ollama-is-now-available-as-an-official-docker-image)

TODO:

- Improve the query function
- Prompt Engineering

- async ops for DB generation

- Knowledge graph integration 

- Multi-Modality with Llava embeddings

- GUI: Streamlit (2 step installation with Docker?)