# AskPoopQA

This notebook builds a langgraph that showcases the potential of a RAG agent that summarises scientific evidence from the literature and extracts statistical significant associations between pathogens and enviromental factors

#### Imports

In [26]:
import json
import os
from os.path import exists
import sqlite3
# LLM
from langchain_ollama import ChatOllama
from langchain.schema import Document, AIMessage
# to chunk the text
from langchain.text_splitter import RecursiveCharacterTextSplitter
# to make/store embeddings 
from langchain_community.vectorstores import SKLearnVectorStore
#from langchain_community.embeddings.spacy_embeddings import SpacyEmbeddings
from langchain_nomic.embeddings import NomicEmbeddings
from langchain_core.messages import HumanMessage, SystemMessage
# to build/display/run a langgraph
from langgraph.graph import StateGraph
from IPython.display import Image, display
import operator
from typing_extensions import TypedDict
from typing import List, Annotated
from langgraph.graph import END

#### Set-up the LLM

Do you have ollama running?

In [None]:
local_llm = "llama3.1:70b"
llm = ChatOllama(model=local_llm, temperature=0)
llm_json_mode = ChatOllama(model=local_llm, temperature=0, format="json")

## Note: Skip to the embeddings part if you don't have the `db` file as the embeddings are run on the text contained in the database

#### Load database `db` of papers

This is the path of the `db` database containing the text extracted from the relevant papers with llamaParse

In [28]:
database_path = '/projects/0/prjs1194/literature_relevant.db'

Define `extract_full_text_content` - this function below takes as input the SQL database and returns the full-text of the papers per page content. 

In [29]:
def extract_full_text_content(database_path):
    conn = sqlite3.connect(database_path)
    cursor = conn.cursor()

    # Retrieve all rows/papers from the table
    cursor.execute(f"SELECT fulltext, DOI FROM literature_fulltext;")
    rows = cursor.fetchall()

    documents = []
    # Add paper text and dict of metadata
    for row in rows:
        fulltext, doi = row
        if isinstance(fulltext, str) and fulltext is not None:
            documents.append({
                "content": fulltext,
                "metadata": {
                    "doi": doi
                }
            })

    conn.close()
    return documents


Extract the content below:

In [30]:
docs = extract_full_text_content(database_path)

`docs` is a list containing the full text per paper. This is not directly usable for `langchain` and it must be converted into a `Document` istance

We do this together with the splitting, by using the function `.create_documents` while splitting it, within the `RecursiveCharacterTextSplitter`

#### Chunk text and extract embeddings

In [32]:
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=2000, chunk_overlap=200
)

documents = text_splitter.create_documents([doc["content"] for doc in docs], 
                                           metadatas=[doc["metadata"] for doc in docs])

In [None]:
# check first document
documents[0].metadata

{'doi': 'paper 10.1371/journal.pone.0071616 44'}

In [34]:
print(len(documents))
print(documents[0].page_content[:600])

491
# Rotavirus Seasonality and Age Effects in a Birth Cohort Study of Southern India

#
# Rotavirus Seasonality and Age Effects in a Birth Cohort Study of Southern India

Rajiv Sarkar1, Gagandeep Kang1, Elena N. Naumova1,2*

1Department of Gastrointestinal Sciences, Christian Medical College, Vellore, TN, India

2Department of Civil and Environmental Engineering, Tufts University School of Engineering, Boston, Massachusetts, United States of America

# Abstract

# Introduction

Understanding the temporal patterns in disease occurrence is valuable for formulating effective disease preventive progr


In [35]:
# Spacy Embeddings -- a bit faster than the nomic ones
# we save the embeddings into a in-memory vector store from sklearn, that is SKLearnVectorStore
persist_path = f"/projects/0/prjs1194/embeddings/"+"union.bson"

def create_embeddings(out_path, docs):
    folder = os.path.dirname(out_path)
    if not os.path.exists(folder):
        os.makedirs(folder)
    
    if exists(out_path):
        vectorstore = SKLearnVectorStore(
            embedding=NomicEmbeddings(model="nomic-embed-text-v1.5", inference_mode="local"),
            persist_path=out_path,
            serializer="bson") 
        print("Vector store was loaded from", out_path)
        
    else:
        vectorstore = SKLearnVectorStore.from_texts(
            texts=[doc.page_content for doc in docs],
            embedding=NomicEmbeddings(model="nomic-embed-text-v1.5", inference_mode="local"),
            persist_path=out_path,
            serializer="bson",
            metadatas=[doc.metadata for doc in docs])
        vectorstore.persist()
        print("Vector store was persisted to", out_path)
        
    return vectorstore

In [36]:
vectorstore = create_embeddings(persist_path, docs=documents)

Vector store was loaded from /projects/0/prjs1194/embeddings/union.bson


In [37]:
# Create retriever
retriever = vectorstore.as_retriever()

You can check if the retriever works by asking to retrieve documents similar to your search

In [38]:
enviroment_docs = retriever.invoke("dracunculus")

In [39]:
enviroment_docs[0].metadata

{'id': 'fbaa1337-ec11-4a9a-b3ed-a05eaa2d821c',
 'doi': 'paper 10.1016/j.envres.2009.02.008 36'}

In [40]:
enviroment_docs[0].page_content[:200]

'Griffiths, J.K., 1998. Human cryptosporidiosis: epidemiology, transmission, clinical disease, treatment, and diagnosis. Adv. Parasitol. 40, 37–85.\n\nHarvell, C.D., Mitchell, C.E., Ward, J.R., Altizer, '

# build a langGraph

set up a state. A state is a schema that is going to survive across the steps above. All inputs/output across the prompting is going to survive across the steps.

In [41]:
# this is the class state 
class GraphState(TypedDict):
    """
    Graph state is a dictionary that contains information we want to propagate to, and modify in, each graph node.
    """
    question : str # User question
    generation : str # LLM generation
    max_retries : int # Max number of retries for answer generation 
    answers : int # Number of answers generated
    loop_step: Annotated[int, operator.add] 
    documents : List[str] # List of retrieved documents

Take each of the steps above, and wrapping those into individual functions (which are nodes). They take state as input, and modify it.

Some of the functions are 'edges', so they basically evaluate the input and decide where to go (which nodes) next.

In [42]:
# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

### Nodes
def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Write retrieved documents to documents key in state
    documents = retriever.invoke(question)
    return {"documents": documents}
    

def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    loop_step = state.get("loop_step", 0)
    
    rag_prompt = """You are an assistant for question-answering tasks. 

    Here is the context to use to answer the question:

    {context} 

    Think carefully about the above context. 

    Now, review the user question:

    {question}

    Provide an answer to this questions using only the above context. 

    Use 10 sentences maximum and keep the answer concise.

    Answer:"""
    
    
    # RAG generation
    docs_txt = format_docs(documents)
    rag_prompt_formatted = rag_prompt.format(context=docs_txt, question=question)
    generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
    return {"generation": generation, "loop_step": loop_step+1}

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag AND THAT'S IT

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    
    doc_grader_prompt = """Here is the retrieved document: \n\n {document} \n\n Here is the user question: \n\n {question}. 

    This carefully and objectively assess whether the document contains at least some information that is relevant to the question.

    Return JSON with single key, binary_score, that is 'yes' or 'no' score to indicate whether the document contains at least some information that is relevant to the question."""
    
    doc_grader_instructions = """You are a grader assessing relevance of a retrieved document to a user question.

    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant."""
    
    # Score each doc
    filtered_docs = []
    web_search = "No" 
    for d in documents:
        doc_grader_prompt_formatted = doc_grader_prompt.format(document=d.page_content, question=question)
        result = llm_json_mode.invoke([SystemMessage(content=doc_grader_instructions)] + [HumanMessage(content=doc_grader_prompt_formatted)])
        grade = json.loads(result.content)['binary_score']
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "web_search": web_search}
    


### Edges

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    max_retries = state.get("max_retries", 3) # Default to 3 if not provided

    hallucination_grader_prompt_formatted = hallucination_grader_prompt.format(documents=format_docs(documents), generation=generation.content)
    result = llm_json_mode.invoke([SystemMessage(content=hallucination_grader_instructions)] + [HumanMessage(content=hallucination_grader_prompt_formatted)])
    grade = json.loads(result.content)['binary_score']

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        # Test using question and generation from above 
        answer_grader_prompt_formatted = answer_grader_prompt.format(question=question, generation=generation.content)
        result = llm_json_mode.invoke([SystemMessage(content=answer_grader_instructions)] + [HumanMessage(content=answer_grader_prompt_formatted)])
        grade = json.loads(result.content)['binary_score']
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        elif state["loop_step"] <= max_retries:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
        else:
            print("---DECISION: MAX RETRIES REACHED---")
            return "max retries"  
    elif state["loop_step"] <= max_retries:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"
    else:
        print("---DECISION: MAX RETRIES REACHED---")
        return "max retries"

In [43]:
# Create the StateGraph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate

# Build graph
# Directly set "retrieve" as the entry point, skipping route_question and web_search
workflow.set_entry_point("retrieve")

# Set edges to transition between nodes
workflow.add_edge("retrieve", "grade_documents")  # After retrieving, grade documents
workflow.add_edge("grade_documents", "generate")  # After grading, generate the response

# Compile
graph = workflow.compile()
display(Image(graph.get_graph().draw_mermaid_png()))


<IPython.core.display.Image object>

In [44]:
import re 
def print_doi(doi):
    doi_pattern = r"10.\d{4,9}/[-._;()/:A-Za-z0-9]+"
    doi_match = re.search(doi_pattern, doi)
    
    if doi_match:
        return doi_match.group(0)
    return print("no valid DOI has been found")
    

In [45]:
question = {"question": "Which pathogens are mentioned in the texts?", 
            "max_retries": 1}

documents_list = []

for event in graph.stream(question, stream_mode="values"):
    if "generation" in event and isinstance(event["generation"], AIMessage):
        # Print only the content of the AIMessage
        print(event["generation"].content)

        # Get the unique references
        references = []
        print('================================ References ================================')
        for ref in event['documents']:
            if ref.metadata['doi'] not in references:
                doi = print_doi(ref.metadata['doi'])
                references.append(doi)
                print(f"DOI: {doi}")
    
    # event contains documents (which are stored under the key 'documents')
    if "documents" in event:
        # append the documents to the documents_list, but do not print them
        documents_list.extend(event["documents"])


---RETRIEVE---
---CHECK DOCUMENT RELEVANCE TO QUESTION---


KeyboardInterrupt: 