In [None]:
!pip install langchain==0.3.20
!pip install langchain-openai==0.3.9
!pip install langchain-community==0.3.20
!pip install langgraph==0.3.18

In [None]:
!pip install langchain-chroma==0.2.2

In [None]:
!pip install ipywidgets
!pip install jupyter-ui-poll==1.0.0

In [None]:
from getpass import getpass

OPENAI_KEY = getpass('Enter Open AI API Key: ')

In [None]:
print(os.environ["OPENAI_API_KEY"][:10])

In [None]:
import os

import os
import logging

os.environ["CHROMA_TELEMETRY"] = "FALSE"

#logging.getLogger("chromadb.telemetry").setLevel(logging.CRITICAL)
#logging.getLogger("chromadb.telemetry.product.posthog").setLevel(logging.CRITICAL)

os.environ['OPENAI_API_KEY'] = OPENAI_KEY


In [None]:
# or download manually from https://drive.google.com/file/d/1_bQj7VkXDMwwqJmspFgRzH2mgK1CVMUY/view?usp=sharing and upload to colab or your notebook location
!gdown 1_bQj7VkXDMwwqJmspFgRzH2mgK1CVMUY

In [None]:
import json

with open("./healthcare_db.json", "r") as f:
    knowledge_base = json.load(f)

knowledge_base[:4]

In [None]:
knowledge_base[-3:]

**Store as LangChain Documents**

In [None]:
from langchain.docstore.document import Document
from tqdm import tqdm

processed_docs = []

for doc in tqdm(knowledge_base):
    metadata = doc['metadata']
    data = doc['text']
    processed_docs.append(Document(page_content=data,
                                   metadata=metadata))

processed_docs[:3]

In [None]:

processed_docs[:-3]

**Create Vector DB**

In [None]:
from langchain_openai import OpenAIEmbeddings
openai_embed_model = OpenAIEmbeddings(model='text-embedding-3-small')

In [None]:

import os, shutil
from langchain_chroma import Chroma

PERSIST_DIR = "/content/knowledge_base_V3"

if os.path.exists(PERSIST_DIR):
    shutil.rmtree(PERSIST_DIR)
os.makedirs(PERSIST_DIR, exist_ok=True)

kbase_db = Chroma.from_documents(
    documents=processed_docs,
    collection_name="knowledge_base_v3",
    embedding=openai_embed_model,
    collection_metadata={"hnsw:space": "cosine"}, #indexing method hierarchial navigable small world. Graph based algorithm used in Approximate Nearest Neighbour (ANN)
    persist_directory="./knowledge_base")

#from langchain_chroma import Chroma
#Chroma.delete_collection()
#kbase_db = Chroma.from_documents(documents=processed_docs,
 #                               collection_name='knowledge_base',
  #                              embedding=openai_embed_model,
   #                             collection_metadata={"hnsw:space": "cosine"}, #indexing method hierarchial navigable small world. Graph based algorithm used in Approximate Nearest Neighbour (ANN)
    #                            persist_directory="./knowledge_base")


**Create Vector Database Retriever Strategy**

In [None]:
kbase_search = kbase_db.as_retriever(search_type="similarity_score_threshold", #cosine similarity
                                     search_kwargs={"k": 3, "score_threshold": 0.3}) # 3 - top 3 documents 0.3 = if any document is less than 30% cosine similarity, the document will be dropped

**TESTING THE RETRIEVER**

In [None]:
query= 'How to book an appointment?'
metadata_filter = {'category' : 'appointments'}
kbase_search.search_kwargs["filter"] = metadata_filter
kbase_search.invoke(query)

In [None]:
results = kbase_db._collection.get(              # No filtering, fetches all documents
    include=["metadatas"]    # Include metadata in the result
)

In [None]:
# Extract the unique 'category' values from the metadata
unique_categories = set()
for metadata in results["metadatas"]:
    if "category" in metadata:
        unique_categories.add(metadata["category"])

print(unique_categories)

**BUILD ROUTER AGENTIC RAG **

step 1 - Building the Agent Schema

In [None]:
from typing import TypedDict, Literal
from pydantic import BaseModel

class CustomerSupportAgentState(TypedDict):
    customer_query: str
    query_category: str
    query_sentiment: str
    escalation_cust_info: dict
    oncall_cust_info: dict
    final_response: str

class QueryCategory(BaseModel):
    categorized_topic: Literal['Billing', 'Appointments', 'Records', 'Insurance']

class QuerySentiment(BaseModel):
    sentiment: Literal['Positive', 'Neutral', 'Negative', 'Distress']

In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0)

In [None]:
def categorize_inquiry(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:
    """
    Classify the customer query into 'Billing', 'Appointments', 'Records' or 'Insurance'.
    """

    query = support_state["customer_query"]
    ROUTE_CATEGORY_PROMPT = """Act as a customer support agent trying to best categorize the customer query.
                               You are a support agent for a healthcare company focusing on providing healthcare services to customers.
                               These services include:
                                - handling billing queries
                                - handling insurance queries
                                - handling appointment booking queries
                                - handling medical records queries

                               Please read the customer query below and
                               determine the best category from the following list:

                               'Billing', 'Appointments', 'Records' or 'Insurance'

                               Remember:
                                - Billing queries will focus more on payment and billing related aspects
                                - Appointments will focus more on booking, rescheduling, cancelling doctor appointments
                                - Records will focus more on updating, sharing, accessing medical records
                                - Insurance will focus more on insurance related queries like claims, updating insurance policy details

                                Return just the category name (from one of the above)

                                Query:
                                {customer_query}
                            """
    prompt = ROUTE_CATEGORY_PROMPT.format(customer_query=query)
    route_category = llm.with_structured_output(QueryCategory).invoke(prompt)


    # Extract category from nested structure
    if isinstance(route_category, dict) and 'values' in route_category:
        # It's {'values': ['Category']} - extract first item from list
        category = route_category['values'][0]
    elif hasattr(route_category, 'categorized_topic'):
        # Pydantic object with categorized_topic attribute
        category = route_category.categorized_topic
    elif hasattr(route_category, 'category'):
        # Pydantic object with category attribute
        category = route_category.category
    elif isinstance(route_category, dict) and 'category' in route_category:
        # Dict with 'category' key
        category = route_category['category']
    else:
        # Unknown structure - use first available value
        if isinstance(route_category, dict):
            category = list(route_category.values())[0]
            if isinstance(category, list):
                category = category[0]
        else:
            category = "Unknown"

    return {
        "query_category": category
    }

In [None]:
categorize_inquiry({"customer_query": "What doctors are available?"})

In [None]:
categorize_inquiry({"customer_query": "What insurance is available?"})

In [None]:
def analyze_inquiry_sentiment(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:
    """
    Classify the customer sentiment into 'Positive', 'Neutral', 'Negative', 'Distress'.
    """

    query = support_state["customer_query"]
    SENTIMENT_CATEGORY_PROMPT = """Act as a customer support agent trying to best categorize the sentiment of the customer query.
                               You are a support agent for a healthcare company focusing on providing healthcare services to customers.
                               These services include:
                                - handling billing queries
                                - handling insurance queries
                                - handling appointment booking queries
                                - handling medical records queries

                               Please read the customer query below and
                               determine the best category from the following list:

                               'Positive', 'Neutral', 'Negative', 'Distress'


                                Remember these rules when finding the sentiment:
                                - 'Distress' happens only when the customer is facing a health emergency and might need the on-call emergency doctor
                                - 'Negative' happens only when the customer is not happy with certain products, services offered by the company


                                Return just the category name (from one of the above)

                                Query:
                                {customer_query}
                            """
    prompt = SENTIMENT_CATEGORY_PROMPT.format(customer_query=query)
    sentiment_result = llm.with_structured_output(QuerySentiment).invoke(prompt)

    # Extract sentiment from nested structure
    if isinstance(sentiment_result, dict) and 'values' in sentiment_result:
        sentiment = sentiment_result['values'][0]
    elif hasattr(sentiment_result, 'sentiment'):
        sentiment = sentiment_result.sentiment
    elif isinstance(sentiment_result, dict) and 'sentiment' in sentiment_result:
        sentiment = sentiment_result['sentiment']
    else:
        if isinstance(sentiment_result, dict):
            sentiment = list(sentiment_result.values())[0]
            if isinstance(sentiment, list):
                sentiment = sentiment[0]
        else:
            sentiment = "Neutral"

    return {
        "query_sentiment": sentiment
    }

In [None]:
analyze_inquiry_sentiment({"customer_query": "I'm unable to breathe, need help"})

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict

def generate_department_response(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:
    """
    Provide a department support response by combining knowledge from the vector store and LLM.
    """
    # Retrieve category and ensure it is lowercase for metadata filtering
    categorized_topic = support_state["query_category"]
    query = support_state["customer_query"]

    # Use metadata filter for department - specific queries
    if categorized_topic.lower() == "billing":
        metadata_filter = {"category": "billing"}
        department = "Billing"
    elif categorized_topic.lower() == "appointments":
        metadata_filter = {"category": "appointments"}
        department = "Appointments"
    elif categorized_topic.lower() == "records":
        metadata_filter = {"category": "medical_records"}
        department = "Medical Records"
    elif categorized_topic.lower() == "insurance":
        metadata_filter = {"category": "insurance"}
        department = "Insurance"
    # apply metadata filter
    kbase_search.search_kwargs["filter"] = metadata_filter

    # Perform retrieval from VectorDB
    relevant_docs = kbase_search.invoke(query)
    retrieved_content = "\n\n".join(doc.page_content for doc in relevant_docs)

    # Combine retrieved information into the prompt
    prompt = ChatPromptTemplate.from_template(
        """
        Craft a clear and detailed support response for the following customer query about {department}.
        Use the provided knowledge base information to enrich your response.
        In case there is no knowledge base information or you do not know the answer just say:

        Apologies I was not able to answer your question, please reach out to +1-xxx-xxxx

        Customer Query:
        {customer_query}

        Relevant Knowledge Base Information:
        {retrieved_content}
        """
    )

    # Generate the final response using the LLM
    chain = prompt | llm
    reply = chain.invoke({
        "customer_query": query,
        "retrieved_content": retrieved_content,
        "department": department
    }).content

    # Update and return the modified support state
    return {
        "final_response": reply
    }

In [None]:
display(processed_docs[:5])

In [None]:
generate_department_response({"customer_query": "what payment modes do you accept?", "query_category": "Billing"})

In [None]:
import ipywidgets as widgets
from IPython.display import display
from jupyter_ui_poll import ui_events
import time

def accept_user_input_escalation(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:

# ‚úÖ CHECK FOR TEST MODE - Skip form if testing
    if support_state.get("test_mode", False):
        return {
            'escalation_cust_info': {
                'name': 'TEST_USER',
                'number': '000-000-0000',
                'email': 'test@example.com'
            }
        }
    # REMEMBER: You can always customize the way you accept user input by modifying the code below
    # here we use jupyter widgets so you don't have to install too many external dependencies

    global form_submitted  # status variable to track form submission
    form_submitted = False # initially form hasn't been submitted

    # UI Header: A simple HTML element to label the form
    header = widgets.HTML("<h3>Escalation Form - Please enter your details below:</h3>")

    # Text input fields to collect user information
    input1 = widgets.Text(description='Name:')     # User's full name
    input2 = widgets.Text(description='Number:')   # Contact number
    input3 = widgets.Text(description='Email:')    # Email address

    # Dictionary to store form responses after submission
    result = {}

    # Callback function to be triggered when the Submit button is clicked
    def on_submit(submit_button):
        global form_submitted
        form_submitted = True  # Mark the form as submitted
        # Store user inputs into the result dictionary
        result['name'] = input1.value
        result['number'] = input2.value
        result['email'] = input3.value
        # Provide visual feedback that form is submitted
        submit_button.description = 'üëç'

    # Submit button widget setup
    submit_button = widgets.Button(description="Submit")
    submit_button.on_click(on_submit)  # Attach callback to button

    # Combine all widgets into a vertical layout box
    vbox = widgets.VBox([header, input1, input2, input3, submit_button])
    display(vbox)  # Render the form in the notebook interface

    # Keep polling UI events until the form is submitted
    with ui_events() as poll:
        while form_submitted is False:
            poll(5)               # Listen for UI events
            print('.', end='')   # Show a dot to indicate waiting for input from user
            time.sleep(0.3)      # Slight delay to reduce CPU usage

    # Return updated agent state with captured user info for escalation
    return {
        'escalation_cust_info': result
    }


In [None]:
accept_user_input_escalation({})

In [None]:
def escalate_to_human_agent(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:

    # REMEMBER: You can always customize the way you notify the human support agent by adding custom code below.
    # This could include emailing them, paging them, sending them notifications using specific platform APIs like whatsapp etc.
    # Here we have kept it very simple:
    #  we just show a response back to the user showing the details they entered in the form earlier
    #  and telling them they will be contacted by a human support agent

    # get the customer info from agent state which they entered in the form
    escalation_cust_info = support_state['escalation_cust_info']
    # the following response will be shown to the user and can also be sent (customer form inputs) to your human support agents
    response = ("Apologies, " + escalation_cust_info['name'] +
                ",  we are really sorry! Someone from our team will be reaching out to via email shortly at "+
                escalation_cust_info['email'] + " and if needed we will also be calling you at: " +
                escalation_cust_info['number'] + " to help you out!")

    # NOTE: You can always add custom code here to call specific APIs like whatsapp or email etc to notify your human support agents

    return {
        "final_response": response
    }

In [None]:
def accept_user_input_oncall(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:
 # ‚úÖ CHECK FOR TEST MODE - Skip form if testing
  if support_state.get("test_mode", False):
        return {
            'escalation_cust_info': {
                'name': 'TEST_USER',
                'number': '000-000-0000',
                'email': 'test@example.com'
            }
        }

  global form_submitted  # status variable to track form submission
  form_submitted = False # initially form hasn't been submitted

  #Header
  header = widgets.HTML("<h3>Emergency Form - Please enter your details below:</h3>")

  # Text input fields to collect critical user info
  input1 = widgets.Text(description='Name:')     # User's full name
  input2 = widgets.Text(description='Number:')   # Contact number (required for emergency callback)

  # Dictionary to store the captured input values
  result = {}

  def on_submit(submit_button):
      global form_submitted
      form_submitted = True  # Form is now submitted
      # Save form data to result dictionar
      result['name'] = input1.value
      result['number'] = input2.value
      # Visual confirmation of submission
      submit_button.description = 'üëç'

  # Submit button widget setup
  submit_button = widgets.Button(description="Submit")
  submit_button.on_click(on_submit)  # Attach the callback function

  # Combine widgets vertically and render in notebook
  vbox = widgets.VBox([header, input1, input2, submit_button])
  display(vbox)

  #Keep polling for UI events until form is submitted
  with ui_events() as poll:
    while form_submitted is False:
        poll(5)               # Monitor UI events
        print('.', end='')   # Show a dot to indicate waiting for input from user
        time.sleep(0.3)      # Slight delay to reduce CPU usage

    # Return updated agent state with emergency form details
  return {
        'oncall_cust_info': result
    }

In [None]:
accept_user_input_oncall({})

In [None]:
def escalate_to_oncall_team(support_state: CustomerSupportAgentState) -> CustomerSupportAgentState:
    oncall_cust_info = support_state['oncall_cust_info']
   # the following response will be shown to the user and can also be sent (customer form inputs) to your on-call doctors
    response = ("Don't worry " + oncall_cust_info['name'] +
               "!, someone from our on-call expert doctors will be reaching out to your shortly at " +
                oncall_cust_info['number'] +
                " for assistance immediately!")

    # NOTE: You can always add custom code here to call specific APIs like whatsapp to notify your on-call doctors

    return {
        "final_response": response
    }

In [None]:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# Create a typed LangGraph state graph using the custom CustomerSupportAgentState
customer_support_graph = StateGraph(CustomerSupportAgentState)

# Register each functional node in the graph that represents a step in the agent workflow

# Step 1: Categorize the incoming query by department (e.g., billing, records, etc.)
customer_support_graph.add_node("categorize_inquiry", categorize_inquiry)
# Step 2: Analyze the user's sentiment (positive, neutral, negative, distress)
customer_support_graph.add_node("analyze_inquiry_sentiment", analyze_inquiry_sentiment)

# Step 3a: Accept user input for escalation to human support (for negative sentiment)
customer_support_graph.add_node("accept_user_input_escalation", accept_user_input_escalation)
# Step 3b: Escalate to a human support agent using the collected details
customer_support_graph.add_node("escalate_to_human_agent", escalate_to_human_agent)

# Step 4a: Accept user input for escalation to emergency on-call team (for distress sentiment)
customer_support_graph.add_node("accept_user_input_oncall", accept_user_input_oncall)
# Step 4b: Escalate to on-call emergency doctor team using submitted details
customer_support_graph.add_node("escalate_to_oncall_team", escalate_to_oncall_team)

# Step 5: Generate a department-specific response using RAG if sentiment is positive or neutral
customer_support_graph.add_node("generate_department_response", generate_department_response)

# Define the router function that directs the flow based on sentiment and category
def determine_route(support_state: CustomerSupportAgentState) -> str:

    # Determine the next node based on query sentiment and category.
    # - Escalate to human support agent if sentiment is negative i.e fill form for escalation
    # - Escalate to emergency on-call team if sentiment is distress i.e fill form for on-call doctors
    # - Otherwise, use department-specific RAG response

    if support_state["query_sentiment"] == "Negative":
        return "accept_user_input_escalation"
    elif support_state["query_sentiment"] == "Distress":
        return "accept_user_input_oncall"
    elif support_state["query_category"] in ["Billing", "Appointments", "Records", "Insurance"]:
        return "generate_department_response"

# Define the flow of transitions between the nodes in the graph

# After categorizing the query, move to sentiment analysis
customer_support_graph.add_edge("categorize_inquiry", "analyze_inquiry_sentiment")
# After sentiment analysis, use conditional routing to determine next steps
customer_support_graph.add_conditional_edges(
    "analyze_inquiry_sentiment",
    determine_route,
    [
        "accept_user_input_escalation",
        "accept_user_input_oncall",
        "generate_department_response",
    ]
)

# If the user input is collected for escalation, route to human agent
customer_support_graph.add_edge("accept_user_input_escalation", "escalate_to_human_agent")
customer_support_graph.add_edge("escalate_to_human_agent", END)

# If the user input is collected for on-call emergency, route to on-call team
customer_support_graph.add_edge("accept_user_input_oncall", "escalate_to_oncall_team")
customer_support_graph.add_edge("escalate_to_oncall_team", END)

# If sentiment is neutral or positive, generate a department response and finish
customer_support_graph.add_edge("generate_department_response", END)

# Set the starting point of the workflow
customer_support_graph.set_entry_point("categorize_inquiry")

# Compile the graph
memory = MemorySaver()
form_submitted = False # initially no form has been submitted
compiled_support_agent = customer_support_graph.compile(checkpointer=memory)

In [None]:
import inspect
print(inspect.getsource(categorize_inquiry))

In [None]:
from IPython.display import display, Image, Markdown

display(Image(compiled_support_agent.get_graph().draw_mermaid_png()))

In [None]:
def call_support_agent(agent, prompt, user_session_id, verbose=False):
    events = agent.stream(
        {"customer_query": prompt}, # initial state of the agent
        {"configurable": {"thread_id": user_session_id}},
        stream_mode="values",
    )

    print('Running Agent. Please wait...')
    for event in events:
        if verbose:
                print(event)


    print('\nFinal Response:')
    display(Markdown(event['final_response']))

In [None]:
[item['text'] for item in knowledge_base]

In [None]:
def test_agent_wrapper(query: str, user_session_id: str, verbose: bool = False):
    """
    Wrapper that bypasses interactive forms during testing
    """
    # Inject test mode into state
    result = compiled_support_agent.invoke(
        {
            "customer_query": query,
            "test_mode": True  # ‚Üê Tell agent we're testing
        },
        config={"configurable": {"thread_id": user_session_id}}
    )
    return result

In [None]:
from automated_agent_testing_framework import *

def test_agent_wrapper(query: str, user_session_id: str, verbose: bool = False):
    result = compiled_support_agent.invoke(
        {
            "customer_query": query,
            "test_mode": True  # ‚Üê Enables test mode
        },
        config={"configurable": {"thread_id": user_session_id}}
    )
    return result

# Run tests
test_cases = get_comprehensive_test_cases()
runner = AgentTestRunner(test_agent_wrapper)

results = runner.run_test_suite(
    test_cases,
    verbose=True,
    use_llm_judge=False  # Start with heuristics
)

print("\n" + TestReporter.generate_summary_report(results))