In [88]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.pydantic_v1 import BaseModel, Field
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END

import pickle
import os
import json
import time
from pathlib import Path
from dotenv import load_dotenv
from typing import Any, Dict, List, Optional, Literal, TypedDict
from dataclasses import dataclass, asdict

from langchain_huggingface import HuggingFaceEmbeddings
from langchain_astradb import AstraDBVectorStore

from langchain_community.document_loaders import JSONLoader
from langchain.document_loaders import PyPDFLoader
from langchain.vectorstores import FAISS


from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory


from flask import Flask, request, jsonify
load_dotenv()

True

In [89]:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
ASTRA_API_KEY = os.getenv("ASTRA_API_KEY")
DB_ENDPOINT = os.getenv("DB_ENDPOINT")

In [83]:
llm = ChatGoogleGenerativeAI(model = "gemini-2.0-flash", api_key=GEMINI_API_KEY, temperature= 0.2)

E0000 00:00:1758285378.407604  142308 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


In [84]:
llm.invoke("hello").content

'Hello! How can I help you today?'

In [21]:
class DiskConversationMemory:
    def __init__(self, filename: str = "chat_memory.pkl"):
        self.filename = Path(filename)
        self.memory = ConversationBufferMemory(return_messages=True)
        self._load()


    def _load(self):
        if self.filename.exists():
            try:
                with open(self.filename, "rb") as f:
                    self.memory = pickle.load(f)
                print(f"Loaded memory from {self.filename}")
            except Exception as e:
                print("Failed to load memory, starting fresh:", e)


    def persist(self):
        try:
            with open(self.filename, "wb") as f:
                pickle.dump(self.memory, f)
                print(f"Persisted memory to {self.filename}")
        except Exception as e:
            print("Failed to persist memory:", e)

In [23]:
def node_twilio_ingress(state: Dict[str, Any], config, runtime):
    payload = state.get("twilio_payload")
    if not payload:
        return state


    text = payload.get("Body") or payload.get("Message") or payload.get("text")
    sender = payload.get("From") or payload.get("from")

    state["user_message"] = text
    state["user_meta"] = {"sender": sender, "raw_payload": payload}
    return state

In [26]:
def main():
    state1 = {
        "twilio_payload": {
            "Body": "Hello from Twilio!",
            "From": "+911234567890"
        }
    }
    print("Test 1 Output:", node_twilio_ingress(state1))

    state2 = {
        "twilio_payload": {
            "Message": "Hi, using Message field",
            "from": "+919876543210"
        }
    }
    print("Test 2 Output:", node_twilio_ingress(state2))

    state3 = {
        "twilio_payload": {
            "text": "Hello using text field",
            "From": "+1111111111"
        }
    }
    print("Test 3 Output:", node_twilio_ingress(state3))

    state4 = {}
    print("Test 4 Output:", node_twilio_ingress(state4))


if __name__ == "__main__":
    main()

Test 1 Output: {'twilio_payload': {'Body': 'Hello from Twilio!', 'From': '+911234567890'}, 'user_message': 'Hello from Twilio!', 'user_meta': {'sender': '+911234567890', 'raw_payload': {'Body': 'Hello from Twilio!', 'From': '+911234567890'}}}
Test 2 Output: {'twilio_payload': {'Message': 'Hi, using Message field', 'from': '+919876543210'}, 'user_message': 'Hi, using Message field', 'user_meta': {'sender': '+919876543210', 'raw_payload': {'Message': 'Hi, using Message field', 'from': '+919876543210'}}}
Test 3 Output: {'twilio_payload': {'text': 'Hello using text field', 'From': '+1111111111'}, 'user_message': 'Hello using text field', 'user_meta': {'sender': '+1111111111', 'raw_payload': {'text': 'Hello using text field', 'From': '+1111111111'}}}
Test 4 Output: {}


In [28]:
def node_load_vaccination_json(state: Dict[str, Any], config, runtime):
    path = state.get(r"/Users/aashutoshkumar/Documents/Projects/healthgraph-assistant/data/vaccination_schedule.json", r"data/vaccination_schedule.json")
    loader = JSONLoader(file_path=path)
    docs = loader.load()
    state["vaccination_docs"] = docs
    return state

In [32]:
def node_load_outbreak_pdf(state: Dict[str, Any], config: Any = None, runtime: Any = None):
    path = state.get("outbreak_pdf_path", "data/outbreak_report.pdf")
    loader = PyPDFLoader(path)
    docs = loader.load_and_split()
    state["outbreak_docs"] = docs
    return state

In [36]:
def main():
    state1 = {
        "outbreak_pdf_path": r"/Users/aashutoshkumar/Documents/Projects/healthgraph-assistant/latest_weekly_outbreak/31st_weekly_outbreak.pdf" 
    }
    try:
        result1 = node_load_outbreak_pdf(state1)
        print(f"✅ Test 1: Loaded {len(result1['outbreak_docs'])} docs from {state1['outbreak_pdf_path']}")
    except Exception as e:
        print(f"❌ Test 1 failed: {e}")

    state2 = {
        "outbreak_pdf_path": r"/Users/aashutoshkumar/Documents/Projects/healthgraph-assistant/latest_weekly_outbreak/31st_weekly_outbreak.pdf" 
    }
    try:
        
        result2 = node_load_outbreak_pdf(state2)
        print(f"✅ Test 2: Loaded {len(result2['outbreak_docs'])} docs from default path")
    except Exception as e:
        print(f"❌ Test 2 failed: {e}")


if __name__ == "__main__":
    main()

✅ Test 1: Loaded 32 docs from /Users/aashutoshkumar/Documents/Projects/healthgraph-assistant/latest_weekly_outbreak/31st_weekly_outbreak.pdf
✅ Test 2: Loaded 32 docs from default path


In [37]:
def node_build_faiss_index(state: Dict[str, Any], config, runtime):
    emb = GoogleGenerativeAIEmbeddings(model="gemini-embedding-001")
    docs = []
    docs.extend(state.get("outbreak_docs", []))
    docs.extend(state.get("vaccination_docs", []))


    if not docs:
        print("Nothing found")
        return state


    index_dir = Path(state.get("index_dir", "./faiss_index"))
    index_dir.mkdir(parents=True, exist_ok=True)


    vectorstore = FAISS.from_documents(docs, embedding=emb)
    vectorstore.save_local(index_dir)
    state["local_vectorstore"] = vectorstore
    return state

In [69]:
def node_router(state: Dict[str, Any], config: Any = None, runtime: Any = None):

    message = state.get("user_message", "")
    if not message:
        state["route_decision"] = {"route": "general_query", "reason": "empty_message"}
        return state


    lower = message.lower()
    if any(w in lower for w in ["urgent", "emergency", "outbreak", "hospital", "clinic", "immediate"]):
        state["route_decision"] = {"route": "emergency_outbreak", "reason": "keyword_match"}
        return state


    if any(w in lower for w in ["symptom", "fever", "cough", "vomit", "rash", "pain"]):
        state["route_decision"] = {"route": "symptom", "reason": "keyword_match"}
        return state


    if any(w in lower for w in ["vaccine", "vaccination", "due", "schedule", "immunize"]):
        state["route_decision"] = {"route": "vaccination_schedule", "reason": "keyword_match"}
        return state


    prompt = (
        "You are a router deciding how to handle user messages. "
        "Choose one of: emergency_outbreak, symptom, vaccination_schedule, general_query. "
        "Respond with a JSON object {'route':'...', 'reason':'...'} only.\n"
        f"Message: {message}"
    )


    decision_text = llm.invoke([{"role": "user", "content": prompt}]).content
    import json
    try:
        decision = json.loads(decision_text)
        state["route_decision"] = decision
    except Exception:
        state["route_decision"] = {"route": "general_query", "reason": "llm_parse_failed"}


    return state

In [70]:
def main():
    test_states = [
        {"user_message": ""},  # empty
        {"user_message": "This is an urgent hospital emergency"},  # emergency_outbreak
        {"user_message": "I have fever and cough"},  # symptom
        {"user_message": "When is my next vaccination due?"},  # vaccination
        {"user_message": "Tell me about health services"},  # fallback -> mock LLM
    ]

    for i, state in enumerate(test_states, 1):
        result = node_router(state)  # use_mock=True avoids real API calls
        print(f"Test {i} Input: {state['user_message']!r}")
        print(f"Test {i} Output: {result['route_decision']}")
        print("-" * 60)


if __name__ == "__main__":
    main()

Test 1 Input: ''
Test 1 Output: {'route': 'general_query', 'reason': 'empty_message'}
------------------------------------------------------------
Test 2 Input: 'This is an urgent hospital emergency'
Test 2 Output: {'route': 'emergency_outbreak', 'reason': 'keyword_match'}
------------------------------------------------------------
Test 3 Input: 'I have fever and cough'
Test 3 Output: {'route': 'symptom', 'reason': 'keyword_match'}
------------------------------------------------------------
Test 4 Input: 'When is my next vaccination due?'
Test 4 Output: {'route': 'vaccination_schedule', 'reason': 'keyword_match'}
------------------------------------------------------------
Test 5 Input: 'Tell me about health services'
Test 5 Output: {'route': 'general_query', 'reason': 'llm_parse_failed'}
------------------------------------------------------------


In [72]:
def node_emergency_outbreak(state: Dict[str, Any], config: Any = None, runtime: Any = None):
    message = state.get("user_message")
    if not message:
        state["response"] = "No message provided."
        return state

    vectorstore: Optional["FAISS"] = state.get("local_vectorstore")
    if vectorstore is None:
        state["response"] = "Outbreak data not indexed. Please run ingestion first."
        return state

    try:
        retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

        conv = ConversationalRetrievalChain.from_llm(
            llm=llm,
            retriever=retriever,
            memory=state.get("disk_memory").memory if state.get("disk_memory") else None
        )

        result = conv.run(question=message)
        state["response"] = result

        if state.get("disk_memory"):
            state["disk_memory"].persist()

    except Exception as e:
        state["response"] = f"Error while handling outbreak query: {e}"

    return state

In [112]:
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from typing import Dict, Any

def node_symptom(state: Dict[str, Any], config, runtime):
    message = state.get("user_message")

    hf_embedding = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-MiniLM-L6-v2"
    )

    vector_store = AstraDBVectorStore(
        embedding=hf_embedding,
        api_endpoint=DB_ENDPOINT,
        namespace="default_keyspace",
        token=ASTRA_API_KEY,
        collection_name="medical_v2",
    )

    retriever = vector_store.as_retriever()
    if retriever is None:
        state["response"] = "Symptom database not connected (Astra DB retriever missing)."
        return state

    # ✅ Fix: include both "context" and "question"
    llm_prompt = PromptTemplate(
        input_variables=["context", "question"],
        template="""
You are a knowledgeable medical assistant. Use the following retrieved content to answer the user’s question accurately.

Context:
{context}

Question:
{question}

Instructions:
1. Base your answer only on the retrieved content; do not make unsupported guesses.
2. Keep explanations clear and professional, suitable for medical guidance.
3. If the retrieved information does not answer the question, politely state that the information is unavailable instead of fabricating an answer.
4. Avoid unnecessary technical jargon unless the user asks for detailed medical terms.

Answer:
"""
    )

    # Use RetrievalQA for single-query RAG
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        chain_type="stuff",
        chain_type_kwargs={"prompt": llm_prompt}
    )

    result = qa_chain.run(message)
    state["response"] = result
    return state


In [113]:
if __name__ == "__main__":
    test_state = {
        "user_message": "What are the common symptoms of diabetes?",
        "disk_memory": None
    }
    config = {}
    runtime = {}

    updated_state = node_symptom(test_state, config, runtime)

    print("User Question:", test_state["user_message"])
    print("RAG Response:", updated_state["response"])

User Question: What are the common symptoms of diabetes?
RAG Response: The retrieved content mentions that common symptoms of diabetes can include polyuria (frequent urination), polydipsia (excessive thirst), lethargy, and anorexia (loss of appetite).
