# Project Prototype: Agentic RAG System with Nursing Handbooks and Transes as Knowledge Base

The goal of this project is to create an **Agentic RAG-based system** that helps nursing students retrieve relevant information from nursing handbooks and their personal study notes. This system will augment the responses with context from the personal notes, making it more personalized and adaptive to the user's learning.

## Flow Overview

The project involves two primary stages:
1. **Prepopulating the Vector DB** (embedding the nursing handbooks and personal notes into a database)
2. **RAG Modeling** (retrieving relevant information and augmenting responses using both the handbooks and personal notes)

Additionally, there will be an **Agentic Layer** that intelligently routes queries to the appropriate source (nursing handbooks or personal study notes).

### Technologies to use

- **Docling**: For parsing the nursing handbooks and personal notes
- **OpenAI Embedding Model (large)**: For embedding both nursing handbooks and personal study notes.
- **ChromaDB**: For storing and querying the embeddings.
- **Deepseek LLM**: For augmenting responses based on the retrieved content.
- **Pydantic AI**: For AI agent intelligently routing queries between the nursing handbooks and personal notes. 
- **Pydantic**: For type safety and data validation.
- **FastAPI**: For building the API.
- **Docker**: For containerization and deployment.
- **Pydantic Graphs**: For workflow pipelines

## Initialization

In [74]:
import os
import joblib
import threading
import asyncio
import chromadb
import nanoid
import json
from pathlib import Path
import concurrent.futures
from rich import print
from docling.document_converter import DocumentConverter
from abc import ABC, abstractmethod
import chromadb.utils.embedding_functions as embedding_functions
from docling_core.transforms.chunker.hierarchical_chunker import DocChunk
from dataclasses import dataclass, field
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.models import Model
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, Edge, End, Graph, GraphRunContext, HistoryStep
from docling.chunking import HybridChunker
from enum import Enum
from typing import List, Dict, Annotated

from dotenv import load_dotenv

load_dotenv()
knowledge_base_raw_path = './data/knowledge-base/raw/'
knowledge_base_pickled_path = './data/knowledge-base/pickled/'

### Stage 1: Prepopulate the Vector DB

In this stage, we process nursing handbooks and personal study notes, embedding them into a vector database for efficient retrieval during RAG modeling.

#### Steps:

1. **Upload Nursing Handbooks**:
   - Upload nursing handbooks in popular formats like PDF or DOCX.
   - These documents may include textbooks on topics like **medical-surgical nursing, pharmacology, pediatric nursing**, and other specialized nursing areas.

2. **Parsing the documents and retaining the page metadata**:
   - Use **Docling** to convert nursing handbooks and personal notes docling document.
   - The conversion ensures that the pdf metadata are preserved, making it easier for the AI to process.

3. **Generate Embeddings**:
   - Use **OpenAI's large embedding model** to convert the nursing handbooks and personal notes into embeddings. These embeddings will capture the semantic meaning of each section, allowing for efficient similarity-based searches.
   - Both the nursing handbooks and the personal study notes will be embedded into the vector database.

4. **Save to Vector DB (ChromaDB)**:
   - Store the generated embeddings in **ChromaDB** for fast retrieval during RAG modeling.
   - The vector database will allow the system to quickly access the most relevant information when a query is made.

**Note**: Converting and embedding long documents like nursing handbooks may take some time (e.g., **15-30 minutes** per document).



In [75]:
# List the raw files to convert
os.listdir(knowledge_base_raw_path)

['Handbook of Clinical Nursing_ Medical-Surgical Nursing -- Joyce Fitzpatrick -- 2018 -- Springer Publishing Company -- 9780826130785 -- 26f2533f396508e653d45e0e76aadc53 -- Anna’s Archive.pdf',
 'NRG 304 LEC_ WEEK 2_CHAPTER 1 ASSESSMENT OF THE DIGESTIVE AND GASTROINTESTINAL FUNCTION.pdf']

In [76]:
document_converter = DocumentConverter()

knowledge_base_raw_files = os.listdir(knowledge_base_raw_path)
knowledge_base_pickled_files = os.listdir(knowledge_base_pickled_path)

for file in knowledge_base_raw_files:
	file_path = os.path.join(knowledge_base_raw_path, file)
	file_name = os.path.splitext(file)[0]

	# Check if not file 
	if not os.path.isfile(file_path):
		continue

	# Check if file is already pickled
	if f"{file_name}.docling" in knowledge_base_pickled_files:
		print(f"File `{file[:97]}`... is already pickled.")
		continue

	# Pickling the document to retain the metadata (there is currently no way to get the paging metadata from exported md)
	print(f"Pickling file: `{file[:97]}`...")
	conversion_result = document_converter.convert(file_path)
	
	joblib.dump(
		conversion_result.document, 
		os.path.join(knowledge_base_pickled_path, f"{file_name}.docling"),
		compress=3
	)

#### Create the collections

In [77]:
chroma_client = await chromadb.AsyncHttpClient(host="localhost", port="8001")

Using the OpenAI text-embedding-3-small model

In [78]:
embed_fn_openai = embedding_functions.OpenAIEmbeddingFunction(
	api_key=os.getenv("OPENAI_API_KEY"),
	model_name=os.getenv("OPENAI_EMBEDDING_MODEL"),
)

Creating the collections

In [79]:
collection_handbooks = await chroma_client.get_or_create_collection(name="handbooks", embedding_function=embed_fn_openai)
collection_transes = await chroma_client.get_or_create_collection(name="transes", embedding_function=embed_fn_openai)

Chunking the document the NRG 304 document as test

In [80]:
document = joblib.load(os.path.join(
	knowledge_base_pickled_path, 
	"NRG 304 LEC_ WEEK 2_CHAPTER 1 ASSESSMENT OF THE DIGESTIVE AND GASTROINTESTINAL FUNCTION.docling"
))

### Stage 2: RAG Modeling

Once the vector database is populated, the system will retrieve relevant information from both nursing handbooks and personal study notes. The retrieval will be augmented using an LLM (Large Language Model) for more accurate and contextually rich responses.

#### Steps:

1. **Retrieve Relevant Information**:
   - When a query is input by the user (e.g., "What are the symptoms of diabetes?"), the system will retrieve the most relevant content from the vector database.
   - ChromaDB will find the sections of the nursing handbooks or personal study notes that are most similar to the query.

2. **Augment Response Using LLM**:
   - The retrieved sections will be passed through an **LLM**, which will generate a response based on the content of the handbooks and notes.
   - The LLM will combine the relevant information and format it into a coherent, accurate response tailored to the question.

In [81]:
chunker = HybridChunker(max_tokens=100)
chunk_iter = chunker.chunk(dl_doc=document)

for i, chunk in enumerate(chunk_iter):
	print(f"=== {i} ===")
	print(chunk.meta.export_json_dict())
	enriched_text = chunker.serialize(chunk=chunk)
	print(enriched_text)
	break

In [82]:
class Metadata(BaseModel):
	user_id: str
	filename: str
	heading: str
	page_no: int

class Records(BaseModel):
	documents: list[str] = []
	metadatas: list[Metadata] = []
	ids: list[str] = []

In [83]:
chunker = HybridChunker(max_tokens=1000)
chunk_iter = chunker.chunk(dl_doc=document)
document_chunks = list(chunk_iter)

records = Records()

# Lock for thread-safe appends
lock = threading.Lock()

def extract_and_store(chunk: DocChunk):
	enriched_text = chunker.serialize(chunk=chunk)
	chunk_id = nanoid.generate()
	doc_chunk_metadata = chunk.meta.export_json_dict()
	metadata = Metadata(
		user_id="jiya",
		filename=doc_chunk_metadata['origin']['filename'],
		heading=doc_chunk_metadata['headings'][0],
		page_no=doc_chunk_metadata['doc_items'][0]['prov'][0]['page_no']
	)

	with lock:
		records.documents.append(enriched_text)
		records.metadatas.append(metadata.model_dump())
		records.ids.append(chunk_id)

with concurrent.futures.ThreadPoolExecutor() as executor:
	executor.map(extract_and_store, document_chunks)

In [None]:
# await collection_transes.add(
# 	documents=records.documents,
# 	metadatas=records.metadatas,
# 	ids=records.ids
# )

In [85]:
query_result = await collection_transes.query(
    query_texts=["upper gastrointestinal tract study nursing interventions"],
    n_results=3,
    where={"user_id": "jiya"},
)

In [86]:
documents = query_result["documents"][0]
metadatas = query_result["metadatas"][0]

for doc, metadata in zip(documents, metadatas):
	print(doc)
	print(f"Source: {metadata['filename']}, Page: {metadata['page_no']}")

#### Response Augmenting With OpenAI

In [87]:
class Reference(BaseModel):
	content: str
	page_number: int
	filename: str
	heading: str

In [88]:
query_result['documents']

[["Nursing Interventions :\n1. Clear  liquid  diet,  with  nothing  by  mouth  (NPO)  from midnight the night before the study.\n2. Patient is advised to not smoke or chew gum during the NPO period because these can increase gastric secretions and salivation.\n3. Polyethylene glycol (PEG)-based solutions are considered the most effective bowel cleansing preparatory agent.\n4. Oral  medications  are  withheld  on  the  morning  of  the study  and  resumed  that  evening,  but  each  patient's medication regimen should be evaluated on an individual basis.\n5. When a patient with insulin dependent diabetes is NPO, their insulin requirements will need  to be  adjusted accordingly.\n6. Instruct pt to increase OFI after the procedure  to facilitate evacuation of stool and barium.\n1. Low-residue diet 1 to 2 days before the test, a clear liquid diet and  a laxative  the  evening  before,  NPO  after midnight,  and  cleansing  enemas  until  returns  are  clear the following morning.\n2. Enema

In [89]:
references: list[Reference] = []
for metadata, document in zip(query_result["metadatas"][0], query_result["documents"][0]):
	references.append(
		Reference(
			content=document,
			page_number=metadata['page_no'],
			filename=metadata['filename'],
			heading=metadata['heading']
		)
	)

In [90]:
class AgentDependencies(BaseModel):
	references: list[Reference]

model = OpenAIModel(
	'google/gemini-2.0-flash-thinking-exp:free',
	base_url=os.getenv("OPEN_ROUTER_BASE_URL"),
	api_key=os.getenv("OPEN_ROUTER_API_KEY"),
)

agent = Agent(
	model=model,
	deps_type=AgentDependencies,
	result_type=str,
	system_prompt="""
		# Role
		You are an AI assistant designed to retrieve and summarize information from a comprehensive database of nursing transes. 
		Your goal is to provide accurate, relevant, and concise responses to user queries while citing the original documents and their page numbers.
		
		# Task
		1. Analyze the user's input to identify key topics, specific questions, or requests for information.
		2. Summarize the main points from the retrieved documents, ensuring that you capture essential details related to the user’s request.
		3. For every piece of information provided, include citations that reference the original document along with specific page numbers.
		4. Present the information in a clear and organized manner, making it easy for the user to understand and utilize.

		When a task requires using one or more of the tools, make sure to identify which tool is the most appropriate, 
		pass along relevant details and execute the actions needed to complete the task. 
		Your goal is to be proactive, precise, and organized in managing these resources.
	""",
)


@agent.system_prompt
async def add_references(
	ctx: RunContext[AgentDependencies],
) -> str:
	references = ctx.deps.references

	return f"""
		# References
		{'\n'.join([json.dumps(references[0].model_dump()) for reference in references])}

	"""

In [91]:
result = await agent.run(
	"can you explain me how the nursing interventions for upper gastrointestinal tract study are done",
	deps = AgentDependencies(
		references = references
	)
)

In [92]:
print(result.data)

## Services

### Reference Database Service

In [93]:
class ReferenceDatabaseService(ABC):
	@abstractmethod
	def query_handbooks(self, query: str, user_id: str) -> List[Reference]:
		pass

	@abstractmethod
	def query_transes(self, query: str, user_id: str) -> List[Reference]:
		pass

In [94]:
class ChromaReferenceDatabaseService(ReferenceDatabaseService):
	def __init__(self, chroma_client, embedding_function):
			self._chroma_client = chroma_client
			self._embedding_function = embedding_function

	@classmethod
	async def create(cls):
		chroma_client = await chromadb.AsyncHttpClient(host="localhost", port="8001")
		
		embedding_function = embedding_functions.OpenAIEmbeddingFunction(
			api_key=os.getenv("OPENAI_API_KEY"),
			model_name=os.getenv("OPENAI_EMBEDDING_MODEL"),
		)

		return cls(chroma_client, embedding_function)
	
	def _parse_references_from_query_result(self, query_result) -> List[Reference]:
		references: list[Reference] = []
		for metadata, document in zip(query_result["metadatas"][0], query_result["documents"][0]):
			references.append(
				Reference(
					content=document,
					page_number=metadata['page_no'],
					filename=metadata['filename'],
					heading=metadata['heading']
				)
			)
		return references
	
	async def _get_collection(self, collection_name: str):
		return await self._chroma_client.get_or_create_collection(
			name=collection_name, 
			embedding_function=self._embedding_function
		)

	async def query_handbooks(self, query: str, user_id: str) -> List[Reference]:
		collection = await self._get_collection('handbooks')

		query_result = await collection.query(
			query_texts=[query],
			n_results=3,
			where={"user_id": user_id}
		)

		references = self._parse_references_from_query_result(query_result)
		return references

	async def query_transes(self, query: str, user_id: str | None = None) -> List[Reference]:
		collection = await self._get_collection('transes')

		query_result = await collection.query(
			query_texts=[query],
			n_results=3,
			where={"user_id": user_id}
		)

		references = self._parse_references_from_query_result(query_result)
		return references

In [95]:
rds: ReferenceDatabaseService = await ChromaReferenceDatabaseService.create()

In [96]:
await rds.query_transes(user_id="jiya", query="upper gastrointestinal tract study nursing interventions")

[Reference(content="Nursing Interventions :\n1. Clear  liquid  diet,  with  nothing  by  mouth  (NPO)  from midnight the night before the study.\n2. Patient is advised to not smoke or chew gum during the NPO period because these can increase gastric secretions and salivation.\n3. Polyethylene glycol (PEG)-based solutions are considered the most effective bowel cleansing preparatory agent.\n4. Oral  medications  are  withheld  on  the  morning  of  the study  and  resumed  that  evening,  but  each  patient's medication regimen should be evaluated on an individual basis.\n5. When a patient with insulin dependent diabetes is NPO, their insulin requirements will need  to be  adjusted accordingly.\n6. Instruct pt to increase OFI after the procedure  to facilitate evacuation of stool and barium.\n1. Low-residue diet 1 to 2 days before the test, a clear liquid diet and  a laxative  the  evening  before,  NPO  after midnight,  and  cleansing  enemas  until  returns  are  clear the following m

## Agents

In [97]:
class ReferenceBasis(Enum):
	TRANSES="TRANSES"
	HANDBOOKS="HANDBOOKS"

In [98]:
geminiModel = OpenAIModel(
	'google/gemini-2.0-flash-thinking-exp:free',
	base_url=os.getenv("OPEN_ROUTER_BASE_URL"),
	api_key=os.getenv("OPEN_ROUTER_API_KEY"),
)

In [99]:
class TransesAgentDependencies(BaseModel):
	references: List[Reference]

class TransesAgent:
	def __init__(self, model: Model):
		self._agent = Agent(
			model=model,
			deps_type=TransesAgentDependencies,
			result_type=str,
			system_prompt="""
				# Role
				You are an AI assistant designed to retrieve and summarize information from a personal comprehensive database of nursing transes. 
				Your goal is to provide accurate, relevant, and concise responses to user queries while citing the original documents and their page numbers.
				
				# Task
				1. Analyze the user's input to identify key topics, specific questions, or requests for information.
				2. Summarize the main points from the retrieved documents, ensuring that you capture essential details related to the user’s request.
				3. For every piece of information provided, include citations that reference the original document name along with specific page numbers.
				4. Present the information in a clear and organized manner, making it easy for the user to understand and utilize.

				When a task requires using one or more of the tools, make sure to identify which tool is the most appropriate, 
				pass along relevant details and execute the actions needed to complete the task. 
				Your goal is to be proactive, precise, and organized in managing these resources.
			""",
		)

		@self._agent.system_prompt
		async def add_references(
			ctx: RunContext[TransesAgentDependencies],
		) -> str:
			return format_as_xml(root_tag='references', obj=ctx.deps.references)

	async def run(self, query: str, references: List[Reference]) -> str:
		result = await self._agent.run(query, deps=TransesAgentDependencies(references=references))
		return result.data

In [100]:
class HandbooksAgentDependencies(BaseModel):
	references: List[Reference]

class HandbooksAgent:
	def __init__(self, model: Model):
		self._agent = Agent(
			model=model,
			deps_type=HandbooksAgentDependencies,
			result_type=str,
			system_prompt="""
				# Role
				You are an AI assistant designed to retrieve and summarize information from a comprehensive database of nursing handbooks. 
				Your goal is to provide accurate, relevant, and concise responses to user queries while citing the original documents and their page numbers.
				
				# Task
				1. Analyze the user's input to identify key topics, specific questions, or requests for information.
				2. Summarize the main points from the retrieved documents, ensuring that you capture essential details related to the user’s request.
				3. For every piece of information provided, include citations that reference the original document name along with specific page numbers.
				4. Present the information in a clear and organized manner, making it easy for the user to understand and utilize.

				When a task requires using one or more of the tools, make sure to identify which tool is the most appropriate, 
				pass along relevant details and execute the actions needed to complete the task. 
				Your goal is to be proactive, precise, and organized in managing these resources.
			""",
		)

		@self._agent.system_prompt
		async def add_references(
			ctx: RunContext[HandbooksAgentDependencies],
		) -> str:
			return format_as_xml(root_tag='references', obj=ctx.deps.references)

	async def run(self, query: str, references: List[Reference]) -> str:
		result = await self._agent.run(query, deps=HandbooksAgentDependencies(references=references))
		return result.data

In [101]:
class ComposerAgentDependencies(BaseModel):
	agent_response_by_basis_map: Dict[str, str] = field(default_factory=dict)

class ComposerAgent:
	def __init__(self, model: Model):
		self._agent = Agent(
			model=model,
			deps_type=ComposerAgentDependencies,
			result_type=str,
			system_prompt="""
				# Role
				You are a Composer Agent responsible for synthesizing information from multiple sources to create comprehensive and coherent responses for users. 
				Your role involves integrating insights from the Transes Agent and Handbook Agent to provide accurate, relevant, and well-structured answers.
				
				# Task
				1. Combine the outputs from the Transes Agent and Handbook Agent to create a unified response. Ensure that the information is logically organized and easy to follow.
				2. Focus on presenting the most relevant information first, based on the user's query.
				3. Use clear and concise language to explain complex concepts, ensuring that the response is accessible to users with varying levels of expertise.
				4. Include citations for any referenced information, ensuring that users can easily locate the original sources.
				5. Use headings, bullet points, or numbered lists as necessary to enhance readability and comprehension.
				7. If necessary, provide additional context or explanations to help users understand the relevance and application of the information.

				When a task requires using one or more of the tools, make sure to identify which tool is the most appropriate, 
				pass along relevant details and execute the actions needed to complete the task. 
				Your goal is to be proactive, precise, and organized in managing these resources.
			""",
		)

		@self._agent.system_prompt
		async def add_agent_response_by_basis(
			ctx: RunContext[ComposerAgentDependencies],
		) -> str:
			return format_as_xml(root_tag='agent_response_by_basis', obj=ctx.deps.agent_response_by_basis_map)

	async def run(self, query: str, agent_response_by_basis_map: List[Reference]) -> str:
		result = await self._agent.run(query, deps=ComposerAgentDependencies(agent_response_by_basis_map=agent_response_by_basis_map))
		return result.data

In [105]:
query = "what are the nursing interventions for upper gastrointestinal tract study"
user_id = "jiya"

references = await rds.query_transes(query=query, user_id=user_id)
transesResponse = await TransesAgent(geminiModel).run(query=query, references=references)
composerResponse = await ComposerAgent(geminiModel).run(query=query, agent_response_by_basis_map={"transes": transesResponse})

print(transesResponse)
print(composerResponse)

## Graph

In [102]:
class WorkflowRag:
	async def run(query: str, user_id: str, include_reference_basis: List[ReferenceBasis]) -> str:
		state = State(query=query, user_id=user_id, include_reference_basis=include_reference_basis)
		graph = Graph(nodes=[ReferenceStaging, ComposeResponses], state_type=State)
		result = await graph.run(ReferenceStaging(), state=state)
		return result

@dataclass
class State:
	query: str
	user_id: str
	include_reference_basis: List[ReferenceBasis] = field(default_factory=list)
	references_by_basis_map: Dict[str, List[Reference]] = field(default_factory=dict)
	agent_response_by_basis_map: Dict[str, str] = field(default_factory=dict)

@dataclass
class ComposeResponses(BaseNode[State]):
	async def run(self, ctx: GraphRunContext[State]) -> End:
		composed_responses = await ComposerAgent(geminiModel).run(
			query=ctx.state.query,
			agent_response_by_basis_map=ctx.state.agent_response_by_basis_map
		)

		return End({
			"response": composed_responses,
			"references": ctx.state.references_by_basis_map
		})

@dataclass
class ReferenceStaging(BaseNode[State]):
	async def run(self, ctx: GraphRunContext[State]) -> ComposeResponses: 
		if ReferenceBasis.TRANSES in ctx.state.include_reference_basis:
			references = await rds.query_transes(user_id=ctx.state.user_id, query=ctx.state.query)
			ctx.state.references_by_basis_map.update({ReferenceBasis.TRANSES.value: references})
			result = await TransesAgent(geminiModel).run(ctx.state.query, references)
			ctx.state.agent_response_by_basis_map[ReferenceBasis.TRANSES.value] = result
		
		return ComposeResponses()
		

In [103]:
result = await WorkflowRag.run(
	query="can you explain me how the nursing interventions for upper gastrointestinal tract study are done", 
	user_id="jiya",
	include_reference_basis=[ReferenceBasis.TRANSES]
)

In [107]:
for item in result[1][:len(result[1])-1]:
	print(item.state)

In [104]:
print(result[0]['response'])