# Agentic RAG WITH MongoDB

In [34]:
import os
from dotenv import load_dotenv
load_dotenv()

os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
os.environ["MONGODB_URI"] = os.getenv("MONGODB_URI")

In [17]:
from langchain_groq import ChatGroq

llm = ChatGroq(model="llama3-8b-8192")

#Dataset Loading And Preparation

In [19]:
from datasets import load_dataset
import pandas as pd

# Load the healthcare conversations dataset
healthcare_conversations_dataset = load_dataset(
    "lavita/ChatDoctor-HealthCareMagic-100k", streaming=True, split="train"
)

healthcare_conversations_dataset = healthcare_conversations_dataset.take(1000)

In [20]:
# Load Drug Review dataset
drug_reviews_dataset = load_dataset(
    "Reboot87/drugs_reviews_dataset", streaming=True, split="train"
)

drug_reviews_dataset = drug_reviews_dataset.take(1000)

In [22]:
# Convert to pandas DataFrame
healthcare_conversations_dataset = pd.DataFrame(healthcare_conversations_dataset)
drug_reviews_dataset = pd.DataFrame(drug_reviews_dataset)

In [23]:
healthcare_conversation_dataset = healthcare_conversations_dataset.drop(
 columns=["instruction"]
)

In [24]:
drug_reviews_dataset = drug_reviews_dataset.drop(
 columns=["patientId", "date", "usefulCount", "review_length"]
)

In [25]:
healthcare_conversation_dataset.head()

Unnamed: 0,input,output
0,I woke up this morning feeling the whole room ...,"Hi, Thank you for posting your query. The most..."
1,My baby has been pooing 5-6 times a day for a ...,Hi... Thank you for consulting in Chat Doctor....
2,"Hello, My husband is taking Oxycodone due to a...","Hello, and I hope I can help you today.First, ..."
3,lump under left nipple and stomach pain (male)...,HI. You have two different problems. The lump ...
4,I have a 5 month old baby who is very congeste...,Thank you for using Chat Doctor. I would sugge...


In [26]:
drug_reviews_dataset.head()

Unnamed: 0,drugName,condition,review,rating
0,cyclosporine,keratoconjunctivitis sicca,"""I have used Restasis for about a year now and...",2
1,etonogestrel,birth control,"""My experience has been somewhat mixed. I have...",7
2,implanon,birth control,"""This is my second Implanon would not recommen...",1
3,hydroxyzine,anxiety,"""I recommend taking as prescribed, and the bot...",10
4,dalfampridine,multiple sclerosis,"""I have been on Ampyra for 5 days and have bee...",9


#HuggingFace Embedding generation

In [31]:
import nest_asyncio
nest_asyncio.apply()
import asyncio

In [35]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings

async def get_embedding(text, task_prefix="document"):
        """
        Generate embeddings for a text string with a task-specific prefix using th
        Parameters:
        text (str): The input text to be embedded.
        task_prefix (str): A prefix describing the task; this is prepended to 
        Returns:
        list: The generated embeddings as a list of floats.
        """
        if not text.strip():
            print("Attempted to get embedding for empty text.")
            return []

        embedding_model = GoogleGenerativeAIEmbeddings(
            model="models/embedding-001",
            task_prefix=task_prefix,
        )
        return await embedding_model.aembed_query(text)


In [36]:
def generate_embedding_for_healthcare_dataset(row):
    """
    Generate an embedding for a conversation by concatenating the patient's input
    and the medical practitioner's response.
    Parameters:
    row (pd.Series): A row from the healthcare conversation dataset containing
    - 'input': The patient's message.
    - 'output': The practitioner's response.
    Returns:
    embedding: The embedding vector generated from the concatenated conversation.
    """
    # Concatenate the input and output with descriptive text.
    conversation_text = (
    f"This is the input from the patient: {row['input']}. "
    f"This is the response from the medical practitioner: {row['output']}"
    )
    # Generate and return the embedding using the get_embedding function.
    return asyncio.run(get_embedding(conversation_text))

In [37]:
from tqdm import tqdm

tqdm.pandas(desc="Generating healthcare embeddings")
healthcare_conversation_dataset["embedding"] = healthcare_conversation_dataset.progress_apply(
    generate_embedding_for_healthcare_dataset, axis=1
)

Generating healthcare embeddings: 100%|██████████| 1000/1000 [12:19<00:00,  1.35it/s]


In [38]:
len(healthcare_conversation_dataset["embedding"][0])

768

In [39]:
healthcare_conversation_dataset.head()

Unnamed: 0,input,output,embedding
0,I woke up this morning feeling the whole room ...,"Hi, Thank you for posting your query. The most...","[0.0020050371531397104, -0.05553682520985603, ..."
1,My baby has been pooing 5-6 times a day for a ...,Hi... Thank you for consulting in Chat Doctor....,"[-0.003632515901699662, -0.011232027783989906,..."
2,"Hello, My husband is taking Oxycodone due to a...","Hello, and I hope I can help you today.First, ...","[-0.03484662249684334, -0.014810346066951752, ..."
3,lump under left nipple and stomach pain (male)...,HI. You have two different problems. The lump ...,"[0.004324246663600206, -0.006830697879195213, ..."
4,I have a 5 month old baby who is very congeste...,Thank you for using Chat Doctor. I would sugge...,"[0.008385895751416683, -0.05430954694747925, -..."


In [50]:
def make_embeddings(txt):
    result = asyncio.run(get_embedding(txt))
    return result

In [51]:
drug_reviews_dataset["embedding"] = drug_reviews_dataset["review"].progress_apply(
    make_embeddings
)

Generating healthcare embeddings: 100%|██████████| 1000/1000 [14:15<00:00,  1.17it/s]


In [52]:
drug_reviews_dataset.head()

Unnamed: 0,drugName,condition,review,rating,embedding
0,cyclosporine,keratoconjunctivitis sicca,"""I have used Restasis for about a year now and...",2,"[0.057198747992515564, -0.03319098800420761, -..."
1,etonogestrel,birth control,"""My experience has been somewhat mixed. I have...",7,"[-0.007726987358182669, -0.022331252694129944,..."
2,implanon,birth control,"""This is my second Implanon would not recommen...",1,"[0.005060719326138496, -0.060500070452690125, ..."
3,hydroxyzine,anxiety,"""I recommend taking as prescribed, and the bot...",10,"[0.018926484510302544, -0.03552955016493797, -..."
4,dalfampridine,multiple sclerosis,"""I have been on Ampyra for 5 days and have bee...",9,"[0.03307104483246803, 0.04259384423494339, -0...."


#MongoDB Creation

In [62]:
os.environ["MONGODB_URI"] = os.getenv("MONGODB_URI")
os.environ["MONGODB_DATABASE"] = os.getenv("MONGODB_DATABASE")

In [77]:
import nest_asyncio
nest_asyncio.apply()
import asyncio

mongio_uri = os.getenv("MONGODB_URI")

In [79]:
import pymongo

async def get_mongo_client(mongo_uri=None):
    """Establish a connection to the MongoDB database."""
    client = pymongo.AsyncMongoClient(mongo_uri, appname="Agentic RAG Medical Bot")

    try:
        ping_result = await client.admin.command("ping")
        if ping_result.get("ok") == 1.0:
            print("✅ Connection to MongoDB is Successful")
            return client
        else:
            print("❌ Ping failed")
            return None
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        return None


In [80]:
from pymongo.errors import CollectionInvalid
import getpass
from urllib.parse import quote_plus

mongo_client = asyncio.run(get_mongo_client(mongio_uri.format(PASSWORD=quote_plus(getpass.getpass("PASSWORD: ")))))
mongo_client

✅ Connection to MongoDB is Successful


AsyncMongoClient(host=['ac-tmlvw0p-shard-00-00.swxxwtn.mongodb.net:27017', 'ac-tmlvw0p-shard-00-01.swxxwtn.mongodb.net:27017', 'ac-tmlvw0p-shard-00-02.swxxwtn.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-9pl5s9-shard-0', tls=True, appname='Agentic RAG Medical Bot')

In [81]:
print(type(mongo_client))  # <class 'pymongo.mongo_client.MongoClient'>
#print(db.list_collection_names())

<class 'pymongo.asynchronous.mongo_client.AsyncMongoClient'>


In [82]:
DB_NAME = "virtual_health_care_assistant"
DRUG_REVIEW_COLLECTION_NAME = "drug_reviews"
CONVERSATION_COLLECTION_NAME = "conversations"

db = mongo_client[DB_NAME]
db

AsyncDatabase(AsyncMongoClient(host=['ac-tmlvw0p-shard-00-00.swxxwtn.mongodb.net:27017', 'ac-tmlvw0p-shard-00-01.swxxwtn.mongodb.net:27017', 'ac-tmlvw0p-shard-00-02.swxxwtn.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-9pl5s9-shard-0', tls=True, appname='Agentic RAG Medical Bot'), 'virtual_health_care_assistant')

In [89]:
collection_list = [
    DRUG_REVIEW_COLLECTION_NAME,
    CONVERSATION_COLLECTION_NAME
]

for collection_name in collection_list:
    if collection_name not in asyncio.run(db.list_collection_names()):
        try:
            await db.create_collection(collection_name)
            print(f"Collection '{collection_name}' created successfully.")
        except CollectionInvalid as e:
            print(f"Collection '{collection_name}' already exists.")

Collection 'drug_reviews' created successfully.
Collection 'conversations' created successfully.


In [90]:
drug_reviews_collection = db[DRUG_REVIEW_COLLECTION_NAME]
healthcare_conversation_collection = db[CONVERSATION_COLLECTION_NAME]
collections_list = [drug_reviews_collection, healthcare_conversation_collection]

#Index Creation

In [101]:
from pymongo.operations import SearchIndexModel

async def setup_vector_search_index(collection, index_definition, index_name="AgenticRAG"):
    """
    Setup a vector search index for a MongoDB collection and wait for 30 secon
    Args:
    collection: MongoDB collection object
    index_definition: Dictionary containing the index definition
    index_name: Name of the index (default: "vector_index")
    """
    new_vector_search_index = SearchIndexModel(
        definition=index_definition,
        name=index_name,
        type="vector_search",
    )
    try:
        result = collection.create_search_index(new_vector_search_index)
        print(f"Creating index {index_name} for {collection.name} collection: {result}")
        return result
    except Exception as e:
        print(f"Error creating index {index_name} for {collection.name} collection: {e}")
        return None

In [102]:
vector_index_definition_float32_ann = {
    "fields":[
        {
            "type": "vector",
            "path": "embedding",
            "dimension": 768,
            "similarity": "cosine"
        }
    ]
}

vector_search_float32_ann_index_name = "vector_index_float32_ann"

In [103]:
for specific_collection in collections_list:
    result = asyncio.run(setup_vector_search_index(
        specific_collection,
        vector_index_definition_float32_ann,
        vector_search_float32_ann_index_name
    ))
    if result:
        print(f"Index {vector_search_float32_ann_index_name} created successfully for {specific_collection.name} collection.")
    else:
        print(f"Failed to create index {vector_search_float32_ann_index_name} for {specific_collection.name} collection.")

Creating index vector_index_float32_ann for drug_reviews collection: <coroutine object AsyncCollection.create_search_index at 0x0000024D25DB06D0>
Index vector_index_float32_ann created successfully for drug_reviews collection.
Creating index vector_index_float32_ann for conversations collection: <coroutine object AsyncCollection.create_search_index at 0x0000024D25DB07C0>
Index vector_index_float32_ann created successfully for conversations collection.


  result = asyncio.run(setup_vector_search_index(


#Create Search Index

In [104]:
async def create_keyword_search_index(collection, definition, index_name="keyword_index"):
    """
    Create a keyword search index for a MongoDB collection.
    Args:
    collection: MongoDB collection object
    definition: Dictionary containing the index definition
    index_name: Name of the index (default: "keyword_index")
    """
    search_index_model = {
        "name": index_name, 
        "type": "search", 
        "definition": definition, 
        }
    try:
        result = collection.create_search_index(search_index_model)
        print(f"Creating index '{index_name}' for {collection.name} collection: {result}")
        return result
    except Exception as e:
        print(f"Error creating index '{index_name}' for {collection.name} collection: {e}")
        return None

In [105]:
drug_review_keyword_search_index = {
    "mappings": {
        "dynamic": False,
        "fields": {
            "drugName": {
                "type": "keyword",
            },
            "condition": {
                "type": "string",
            },
            "review":{
                "type": "string",
            },
        },
    }
}

In [106]:
conversations_keyword_search_index = {
    "mappings":{
        "dynamic": False,
        "fields":{
            "input": {
                "type": "string",
            },
        },
    }
}

In [107]:
asyncio.run(
    setup_vector_search_index(
        drug_reviews_collection,
        drug_review_keyword_search_index,
        "keyword_index"
    )
)

Creating index keyword_index for drug_reviews collection: <coroutine object AsyncCollection.create_search_index at 0x0000024D25DD63E0>


<coroutine object AsyncCollection.create_search_index at 0x0000024D25DD63E0>

In [108]:
asyncio.run(
    setup_vector_search_index(
        healthcare_conversation_collection,
        conversations_keyword_search_index,
        "keyword_index"
    )
)

Creating index keyword_index for conversations collection: <coroutine object AsyncCollection.create_search_index at 0x0000024D25DD7010>


<coroutine object AsyncCollection.create_search_index at 0x0000024D25DD7010>

#Data Ingestion

In [109]:
healthcare_conversation_dataset = healthcare_conversation_dataset.to_dict("records")
drug_reviews_dataset = drug_reviews_dataset.to_dict("records")

In [111]:
await healthcare_conversation_collection.insert_many(healthcare_conversation_dataset)
await drug_reviews_collection.insert_many(drug_reviews_dataset)
print("Data ingestion into MongoDB completed")

Data ingestion into MongoDB completed


#Text Search

In [199]:
async def text_search_with_mongodb(query, collection, top_k=5, paths="review"):
    """
    Perform a text search on a MongoDB collection using the provided query.
    
    Args:
        query (str): The search query.
        collection: Motor async MongoDB collection object.
        top_k (int): Number of top results to return (default: 5).
        paths (str or list): The field(s) to search in (default: "review").
    
    Returns:
        list: List of documents matching the search query.
    """
    if not isinstance(paths, list):
        paths = [paths]

    text_search_stage = {
        "$search": {
            "index": "keyword_index",  # Name of your Atlas Search index
            "text": {
                "query": query,
                "path": paths,
            },
        }
    }

    limit_stage = {"$limit": top_k}
    
    projection_stage = {
        "$project": {
            "_id": 0,
            "embedding": 0,
        }
    }

    pipeline = [text_search_stage, limit_stage, projection_stage]

    # Correct usage in Motor
    cursor =  await collection.aggregate(pipeline)
    results = [doc async for doc in cursor]

    return results

In [200]:
query = "cough"

 
get_knowledge_full_text_mdb = await text_search_with_mongodb(
    query,
    drug_reviews_collection,
    top_k=5,
    paths=["review", "drugName", "condition"]
)


In [201]:
get_knowledge_full_text_mdb

[]

In [202]:
pd.DataFrame(get_knowledge_full_text_mdb).head()

#Semantic Search

In [210]:
async def semantic_search_with_mongodb(query, collection, top_k=5, vector_search_index_name="vector_index"):
    """
    Perform a semantic search on a MongoDB collection using the provided query.
    
    Args:
        query (str): The search query.
        collection: Motor async MongoDB collection object.
        vector_search_index_name (str): Name of the vector search index to use (default: "vector_index").
        top_k (int): Number of top results to return (default: 5).
    Returns:
        list: List of documents matching the semantic search query.
    """
    query_embedding = await get_embedding(query, task_prefix="semantic_search")
    if not query_embedding:
        return "No embedding generated for the query."        
    
    vector_search_stage = {
        "$vectorSearch": {
            "index": vector_search_index_name,
            "queryVector": query_embedding,
            "path": "embedding",
            "numCandidates": 100,
            "limit": top_k,
        }
    }

    projection_stage = {
        "$project": {
            "_id": 0,
            "embedding": 0,
            "score": {"$meta": "vectorSearchScore"},
        }
    }

    pipeline = [vector_search_stage, projection_stage]
    cursor = await collection.aggregate(pipeline)
    results = [doc async for doc in cursor]
    return list(results)

In [211]:
query = "I have a cough, what drug can I use?"

get_knowledge_semantic_mdb = await semantic_search_with_mongodb(
    query,
    drug_reviews_collection,
    top_k=5,
    vector_search_index_name= vector_search_float32_ann_index_name
)

In [212]:
pd.DataFrame(get_knowledge_semantic_mdb).head()

#Hybrid Search

In [225]:
async  def hybrid_search_with_mongodb(
 user_query,
 collection,
 vector_search_index_name="vector_index",
 text_search_index_name="text_search_index",
 vector_weight=0.5,
 full_text_weight=0.5,
 top_k=10,
 text_search_paths=["review"],
):
    """
    Perform a hybrid search combining vector search and text search on a MongoDB collection.
    
    Args:
        user_query (str): The search query.
        collection: async MongoDB collection object.
        vector_search_index_name (str): Name of the vector search index to use (default: "vector_index").
        text_search_index_name (str): Name of the text search index to use (default: "text_search_index").
        vector_weight (float): Weight for the vector search results (default: 0.5).
        full_text_weight (float): Weight for the full-text search results (default: 0.5).
        top_k (int): Number of top results to return (default: 10).
        text_search_paths (list): List of fields to search in for full-text search.
    
    Returns:
        list: List of documents matching the hybrid search query.
    """
    collection_name = await collection.name
    query_embedding = await get_embedding(user_query, task_prefix="hybrid_search")

    pipeline = [
        {
            "vectorSearch": {
                "index": vector_search_index_name,
                "queryVector": query_embedding,
                "path": "embedding",
                "numCandidates": 100,
                "limit": top_k,
            }
        },
        {
            "$group": {
                "_id": None,
                "docs": {"push": "$$ROOT"},
            }
        },
        {
            "$unwind": {
                "path": "$docs",
                "IncludeArrayIndex": "rank",
            }
        },
        {
        "$addFields": {
            "vs_score": {
                "$multiply": [
                    vector_weight, # Apply configurable weight to vector 
                    {
                    "$divide": [1.0, {"$add": ["$rank", 60]}]
                    }, # Score formula: 1/(rank+60)
                ]
            }
        }
        },
        {
        "$project": {
            "vs_score": 1,
            "_id": "$docs._id",
            "review": "$docs.review",
            "drugName": "$docs.drugName",
            "condition": "$docs.condition",
            }
        },
        # PART 2: TEXT SEARCH
        # Combine with full-text search results using unionWith
        {
            "$unionWith": {
                "coll": collection_name, # Collection to search
                "pipeline": [
                # Perform full text search using Atlas Search
                    {
                        "$search": {
                        "index": text_search_index_name, # Name of the te
                        "text": {"query": user_query, "path": text_search_paths},
                        },
                    },   
                    {"$limit": top_k}, 
                    # Group text search results similar to vector search
                    {"$group": {"_id": None, "docs": {"$push": "$$ROOT"}}},
                    # Unwind and add ranking just like in vector search
                    {"$unwind": {"path": "$docs", "includeArrayIndex": "rank"}},
                    # Calculate a full-text search score based on rank
                    # Using the same formula as vector search
                    {
                        "$addFields": {
                            "fts_score": {
                                "$multiply": [
                                    full_text_weight,
                                    {"$divide": [1.0, {"$add": ["$rank", 60]}]}
                                ] # Score formula: 1/(rank+60)
                            },
                        },
                    },
                    # Project only the needed fields for text search results
                    {
                        "$project": {
                            "fts_score": 1,
                            "_id": "$docs._id",
                            "review": "$docs.review",
                            "drugName": "$docs.drugName",
                            "condition": "$docs.condition",
                        }
                    },
                ]
            },
        },
        # PART 3: COMBINING RESULTS
        # Group by document ID to handle duplicates from both searches
        # This ensures we don't return the same document twice
        {
            "$group": {
            "_id": "$_id",
            "review": {"$first": "$review"},
            "drugName": {"$first": "$drugName"},
            "condition": {"$first": "$condition"},
            "vs_score": {
                "$max": "$vs_score"
                }, # Take highest vector score if present in both
            "fts_score": {
                "$max": "$fts_score"
                }, # Take highest text score if present in both
            }
        },
        # Handle documents that only appeared in one search type
        # by setting missing scores to 0
        {
            "$project": {
                "_id": 1,
                "review": 1,
                "drugName": 1,
                "condition": 1,
                "vs_score": {
                    "$ifNull": ["$vs_score", 0]
                    }, # Default to 0 if not in vector results
                "fts_score": {
                    "$ifNull": ["$fts_score", 0]
                    }, # Default to 0 if not in text results
            }
        },
        # Calculate the final combined score and remove _id from results
        {
            "$project": {
            "score": {"$add": ["$fts_score", "$vs_score"]}, # Combined fi
            "_id": 0, # Exclude MongoDB ID
            "review": 1,
            "drugName": 1,
            "condition": 1,
            "vs_score": 1, # Keep individual scores for analysis
            "fts_score": 1,
            }
        },
        # Sort by the combined score in descending order
        {"$sort": {"score": -1}},
        # Return only the top k results based on combined score
        {"$limit": top_k},
    ]
 # Execute the aggregation pipeline and convert results to a list
    cursor = await collection.aggregate(pipeline)
    results = [doc async for doc in cursor]
    return list(results)

In [226]:
query_text = "I have a cough, what drug would be best?"

get_knowledge_hybrid_mdb =  hybrid_search_with_mongodb(
    query_text, 
    drug_reviews_collection, 
    vector_weight=0.5, 
    full_text_weight=0.5, 
    top_k=10, 
    text_search_paths=[
        "review",
        "condition",
        "drugName",
    ], 
)

In [227]:
pd.DataFrame(get_knowledge_hybrid_mdb).head()

ValueError: DataFrame constructor not properly called!

#Building Intelligent Search System

In [228]:
async def custom_rag_pipeline(user_query, collection):
    """
    Implements a custom Retrieval-Augmented Generation (RAG) pipeline.
    Args:
    user_query (str): The user's question or query.
    collection (MongoCollection): MongoDB collection to search for relevan
    Returns:
    str: The LLM-generated response with citations.
    """
    retrieved_docs = hybrid_search_with_mongodb(
        user_query,
        collection,
        vector_search_index_name=vector_search_float32_ann_index_name,
    )
    formatted_context = ""
    if retrieved_docs and len(retrieved_docs) > 0:
        formatted_context = "\n\nRelevant information from drug reviews:\n\n"

    async for i, doc in enumerate(retrieved_docs):
        review = doc.get("review", "No review available")
        condition = doc.get("condition", "No condition available")
        drug_name = doc.get("drugName", "No drug name available")
        formatted_context += f"[{i+1}] Review: {review}\nCondition: {condition}\nDrug Name: {drug_name}\n\n"

    prompt = f"""
        Based on the following information, please answer the user's question:
        User Question: {user_query}
        {formatted_context}
        Please provide a comprehensive answer based on the information above.
        If the provided information does not contain the answer, state that clearly.
        Include citation numbers [X] to indicate which sources were used for specific 
        """
    response = await llm.ainvoke(prompt.format(user_query=user_query, context=formatted_context))
    return response

In [229]:
user_query = "I have a cough, can you help me with some medications"

asyncio.run(custom_rag_pipeline(user_query, drug_reviews_collection))

TypeError: object of type 'coroutine' has no len()

#AI Agent Creation

In [None]:
instructions="""
 You are a virtual primary care assistant dedicated to providing reliable
 and evidence-based health guidance. Your role is to help patients understand
 their primary care needs, triage symptoms, answer common health question
 when to seek further medical care. Ensure that your responses are clear,
 and informed by current medical guidelines, always prioritizing patient care.
 Input: {input}
"""
virtual_primary_care_assistant = instructions | llm

run_result = virtual_primary_care_assistant.invoke({"input": "Get me information on cough medications and their reviews."})
print(run_result.content)