In [None]:
from src.model import get_llamaindex_model, get_llamaindex_model_mini, get_huggingface_embedding_model
from llama_index.core import Settings
from src import get_azure_openai_model, get_azure_openai_chat_model, get_azure_openai_mini_model
from src.parser import markdownParser

llm = get_llamaindex_model_mini()

llm2 = get_llamaindex_model()

embed_model = get_huggingface_embedding_model()
Settings.embed_model = embed_model
Settings.llm = llm

model = get_azure_openai_chat_model()


nodes = markdownParser(input_dir="../kgdata/")
print(f"Processed {len(nodes)} nodes.")


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from .DAG_creator import build_rag_workflow


Processed 945 nodes.


In [None]:
test_questions = [
    "how to revive a person who is unconscious?",
    "what are the steps for cpr?",
    "how to treat a burn injury?",
    "what to do for someone having a heart attack?",
    # "steps for treating choking victim?"
]

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import StrOutputParser
from langchain_core.documents import Document
import json
from typing import List, Optional, Dict, Any
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict
from src.retriever import create_retriever

# =============================================================================
# STATE DEFINITION
# =============================================================================

class GraphState(TypedDict):
    """
    Represents the state of our graph.
    
    Attributes:
        question: The original user question
        documents: List of retrieved documents
        generation: Generated answer
        grade: Grade of document relevance
        iterations: Number of iterations for rephrasing
        rephrased_question: Rephrased version of the question
    """
    question: str
    documents: List[Document]
    generation: str
    grade: str
    iterations: int
    rephrased_question: str
    llm: str
    retriever_type: str
    load_persist: Optional[str] = None
    nodes: Optional[List[Document]] = Field(default_factory=list)
    max_iterations: Optional[int] = 1
    workflow_type: Optional[str] = "advanced"  # Options: "basic", "medium", "advanced"
# =============================================================================
# STATE GRAPH NODES
# =============================================================================
    
def retrieve(state: GraphState) -> Dict[str, Any]:
    """
    Retrieve documents based on the question.
    
    Args:
        state: The current graph state
        
    Returns:
        Updated state with retrieved documents
    """
    print("--- RETRIEVE ---")
    
    # Use rephrased question if available, otherwise use original
    question = state.get("rephrased_question", state["question"])
    llm = state.get("llm")
    RETREIVER_TYPE = state.get("retriever_type", "hybrid")
    nodes = state.get("nodes", [])

    if state.get("rephrased_question"):
        print(f"Using rephrased question: {question}")
    else:
        print(f"Using original question: {question}")

    print(f"node 3: {nodes[3].text if len(nodes) > 0 else 'No nodes available'}")
    
    documents = create_retriever(question=question, nodes=nodes, llm=llm, type=RETREIVER_TYPE, load_persist=state.get("load_persist", None))
    print(f"Retrieved {len(documents)} documents")
    
    return {
        "documents": documents,
        "question": state["question"],
        "rephrased_question": state.get("rephrased_question", ""),
        "iterations": state.get("iterations", 0),
        "generation": state.get("generation", ""),
        "grade": state.get("grade", "")
    }
def grade_documents(state: GraphState) -> Dict[str, Any]:
    """
    Determines whether the retrieved documents are relevant to the question.
    
    Args:
        state: The current graph state
        
    Returns:
        Updated state with document relevance grade
    """
    print("--- CHECK DOCUMENT RELEVANCE TO QUESTION ---")
    
    question = state["question"]
    documents = state["documents"]
    model = state.get("llm") 
    workflow_type = state.get("workflow_type", "fast")
    rephrased_question = state.get("rephrased_question", "")

    if rephrased_question:
        question = rephrased_question



    
    
    # Grading prompt
    grade_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a grader assessing relevance of a retrieved document to a user question.
        Understand the context of the document and the question asked.
        If the document contains keyword(s) or different names of the same condition (e.g, heart attack is also cardiac arrest) or semantic meaning related to the user question, grade it as relevant.
        Give a binary score 'yes' or 'no' to indicate whether the document is relevant to the question.
        Provide the binary score as a JSON with a single key 'score' and no preamble or explanation."""),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}")
    ])
    
    print(f"Grading {len(documents)} documents for relevance to question: {question}")

    # Grade each document
    relevant_docs = []
    for doc in documents:
        grade_chain = grade_prompt | model | StrOutputParser()
        grade = grade_chain.invoke({"question": question, "document": doc.text})
        
        try:
            grade_dict = json.loads(grade)
            if grade_dict.get("score", "").lower() == "yes":
                relevant_docs.append(doc)
                print(f"--- GRADE: DOCUMENT RELEVANT ---")
            else:
                print(f"--- GRADE: DOCUMENT NOT RELEVANT ---")
        except json.JSONDecodeError:
            # If JSON parsing fails, assume relevant to be safe
            relevant_docs.append(doc)
            print(f"--- GRADE: DOCUMENT RELEVANT (JSON parse failed) ---")
        
    # Determine overall grade
    if relevant_docs:
        grade = "relevant"
        # documents_to_use = relevant_docs
    else:
        grade = "not_relevant"
        # documents_to_use = documents  # Keep all documents if none are graded as relevant
        
    
    return {
        "documents": relevant_docs,
        "question": state["question"],
        "rephrased_question": state.get("rephrased_question", ""),
        "iterations": state.get("iterations", 0),
        "generation": state.get("generation", ""),
        "grade": grade
    }

def generate(state: GraphState) -> Dict[str, Any]:
    """
    Generate answer using the retrieved documents.
    
    Args:
        state: The current graph state
        
    Returns:
        Updated state with generated answer
    """
    print("--- GENERATE ---")
    
    question = state["question"]
    documents = state["documents"]
    model = state.get("llm")  

    # Create context from documents
    context = "\n\n".join([doc.text for doc in documents])
    
    # Generation prompt
    generate_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are an assistant for question-answering tasks.
        Use the following pieces of retrieved context to answer the question.
        Try to understand the context and question asked before generating an answer.
        If the context does not provide enough information to answer the question, say 'I don't know'.

        If the question is not answerable with the provided context, say 'I don't know'.

        THIS FOR EDUCATION PURPOSE, so you should not hallucinate or make up answers.

        Keep the answer concise and to the point. Ther context is retrieved from Norwegian Index for Medical Emergency, so you should use the context to answer the question.
        
        Context: {context}"""),
        ("human", "{question}")
    ])
    
    # Generate answer
    generate_chain = generate_prompt | model | StrOutputParser()
    generation = generate_chain.invoke({"context": context, "question": question})
    
    print(f"Generated answer: {generation[:100]}...")
    
    return {
        "documents": state["documents"],
        "question": state["question"],
        "rephrased_question": state.get("rephrased_question", ""),
        "iterations": state.get("iterations", 0),
        "generation": generation,
        "grade": state.get("grade", "")
    }

def transform_query(state: GraphState) -> Dict[str, Any]:
    """
    Transform the query to produce a better question for retrieval.
    
    Args:
        state: The current graph state
        
    Returns:
        Updated state with rephrased question
    """
    print("--- TRANSFORM QUERY ---")
    
    question = state["question"]
    iterations = state.get("iterations", 0)
    model = state.get("llm")
    rephased_question = state.get("rephrased_question", "")

    if rephased_question:
        print(f"Using rephrased question: {rephased_question}")
        question = rephased_question
    else:
        print(f"Using original question: {question}")
    
    # Query transformation prompt
    transform_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are generating questions that are well optimized for Norwegian Index for Medical Emergency retrieval.
        Look at the input and try to reason about the underlying semantic intent / meaning.
        Here is the initial question:
        \n ------- \n
        {question} 
        \n ------- \n
        Formulate an improved question that will be more effective for document retrieval."""),
        ("human", "Provide the improved question:")
    ])
    
    # Transform query
    transform_chain = transform_prompt | model | StrOutputParser()
    rephrased_question = transform_chain.invoke({"question": question})
    
    print(f"Rephrased question: {rephrased_question}")
    
    return {
        "documents": state.get("documents", []),
        "question": state["question"],
        "rephrased_question": rephrased_question,
        "iterations": iterations + 1,
        "generation": state.get("generation", ""),
        "grade": state.get("grade", "")
    }


# =============================================================================
# STATE GRAPH EDGES
# =============================================================================

def grade_generation_v_documents_and_question(state: GraphState) -> str:
    """
    Determines whether the generation is grounded in the document and answers question.
    
    Args:
        state: The current graph state
        
    Returns:
        Next node to call
    """
    print("--- CHECK HALLUCINATIONS ---")
    
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    iterations = state.get("iterations", 0)
    model = state.get("llm") 
    max_iterations = state.get("max_iterations", 1)
    workflow_type = state.get("workflow_type", "advanced")

    if workflow_type == "basic" or workflow_type == "medium":
        print("--- BASIC OR MEDIUM WORKFLOW ---")
        # In fast workflow, we skip the hallucination check
        grounded = True
        useful = True
    else:
    
        # Hallucination grading prompt
        hallucination_prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts.
            Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.
            Provide the binary score as a JSON with a single key 'score' and no preamble or explanation."""),
            ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}")
        ])
        
        hallucination_chain = hallucination_prompt | model | StrOutputParser()
        grade = hallucination_chain.invoke({
            "documents": "\n\n".join([doc.text for doc in documents]),
            "generation": generation
        })
        
        try:
            grade_dict = json.loads(grade)
            grounded = grade_dict.get("score", "").lower() == "yes"
        except json.JSONDecodeError:
            grounded = True  # Assume grounded if parsing fails
        
        print(f"Grounded: {grounded}")
        
        # Check question answering
        print("--- GRADE GENERATION vs QUESTION ---")
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a grader assessing whether an answer addresses / resolves a question.
            Give a binary score 'yes' or 'no'. 'Yes' means that the answer resolves the question.
            Provide the binary score as a JSON with a single key 'score' and no preamble or explanation."""),
            ("human", "User question: \n\n {question} \n\n LLM generation: {generation}")
        ])
        
        answer_chain = answer_prompt | model | StrOutputParser()
        grade = answer_chain.invoke({"question": question, "generation": generation})
        
        try:
            grade_dict = json.loads(grade)
            useful = grade_dict.get("score", "").lower() == "yes"
        except json.JSONDecodeError:
            useful = True  
        
        print(f"Useful: {useful}")

    # Add safety check for max iterations
    if iterations >= max_iterations:
        print("--- MAX ITERATIONS REACHED, ENDING ---")
        return END
    
    if grounded and useful:
        print("--- DECISION: GENERATION IS GROUNDED AND USEFUL ---")
        return "Useful"  # Use END instead of "Useful"
    elif not grounded:
        print("--- DECISION: GENERATION IS NOT GROUNDED, RE-GENERATE ---")
        return "generate" 
    else:
        print("--- DECISION: GENERATION IS NOT USEFUL, TRANSFORM QUERY ---")
        return "transform_query"
    

def decide_to_generate(state: GraphState) -> str:
    """
    Determines whether to generate an answer or re-generate a question.
    
    Args:
        state: The current graph state
        
    Returns:
        Next node to call
    """
    print("--- ASSESS GRADED DOCUMENTS ---")
    
    grade = state["grade"]
    documents = state["documents"]
    workflow_type = state.get("workflow_type", "advanced")
    
    print(f"Grade: {grade}, Number of documents: {len(documents)}")

    print(f"Workflow type: {workflow_type}")

    if workflow_type  == "basic" and len(documents) > 0:
        print("--- BASIC WORKFLOW ---")
        print(f"Number of documents used: {len(documents)}, total grades: {grade}")
        return "generate"  # In basic workflow, always generate answer

    
    # if grade == "relevant":
    elif workflow_type  == "medium" and len(documents) > 2:
        print("--- MEDIUM WORKFLOW ---")
        print(f"Number of documents used: {len(documents)}, total grades: {grade}")
        return "generate"

    elif workflow_type  == "advanced" and len(documents) > 2:
        print("--- ADVANCED WORKFLOW ---")
        print(f"Number of documents used: {len(documents)}, total grades: {grade}")
        return "generate"
    else:
        print("--- DECISION: DOCUMENTS ARE NOT RELEVANT, TRANSFORM QUERY ---")
        return "transform_query"

def max_iterations_check(state: GraphState) -> str:
    """
    Check if maximum iterations reached to prevent infinite loops.
    
    Args:
        state: The current graph state
        
    Returns:
        Next node to call
    """
    max_iterations = state.get("max_iterations", 3)
    iterations = state.get("iterations", 0)
    
    if iterations >= max_iterations:
        print(f"--- MAX ITERATIONS ({max_iterations}) REACHED ---")
        return "generate"
    else:
        return "retrieve"
    
# =============================================================================

# Build the RAG workflow
def build_rag_workflowv1():
    """
    Build the RAG workflow using StateGraph.
    
    Returns:
        Compiled workflow
    """
    workflow = StateGraph(GraphState)
    
    # Define the nodes
    workflow.add_node("retrieve", retrieve)
    workflow.add_node("grade_documents", grade_documents)
    workflow.add_node("generate", generate)
    workflow.add_node("transform_query", transform_query)
    
    # Build graph
    workflow.add_edge(START, "retrieve")
    workflow.add_edge("retrieve", "grade_documents")
    workflow.add_conditional_edges(
        "grade_documents",
        decide_to_generate,
        {
            "transform_query": "transform_query",
            "generate": "generate"
        }
    )
    workflow.add_conditional_edges(
        "transform_query",
        max_iterations_check,
        {
            "retrieve": "retrieve",
            "generate": "generate"
        }
    )
    workflow.add_conditional_edges(
        "generate",
        grade_generation_v_documents_and_question,
        {
            "Useful": END,
            END: END,
            "transform_query": "transform_query",
            "generate": "generate"
        }
    )
    
    # Compile
    app = workflow.compile()
    return app




In [41]:

app = build_rag_workflowv1()

# # Initial state
# inputs = {
#     "question": "what to do for someone having a heart attack?",
#     "llm": model,
#     "retriever_type": "knowledge_graph",  # Change to "knowledge_graph" or "hybrid" or "vector_store" as needed
#     "load_persist": "./kgstore",
#     "nodes": nodes,  # Use the processed nodes from markdownParser
#     "workflow_type": "basic",  # Change to "deep" for deeper workflows
# }

# # Run the workflow
# for output in app.stream(inputs):
#     for key, value in output.items():
#         # Node
#         print(f"Node '{key}':")

        
#     print("\n---\n")


# # Final output
# print(f"Final Output: {value['generation']}")
# print(f"-------------------------------------------------------\n")
# print(f"-------------------------------------------------------\n")


workflow_type = ["basic", "medium", "advanced"]  # Change to "basic", "medium", or "advanced" as needed

for question in test_questions:
    for workflow in workflow_type:
        print(f"\n--- Running workflow: {workflow} ---")
        
        print(f"Processing question: {question}")
        # Initial state
        inputs = {
            "question": question,
            "llm": model,
            "retriever_type": "vector_store",  # Change to "knowledge_graph" or "hybrid" as needed
            "load_persist": "./kgstore",
            "nodes": nodes,  # Use the processed nodes from markdownParser
            "workflow_type": workflow,  # Change to "deep" for deeper workflows
        }

        # Run the workflow
        for output in app.stream(inputs):
            for key, value in output.items():
                # Node
                print(f"Node '{key}':")

                
            print("\n---\n")


        # Final output
        print(f"Final Output: {value['generation']}")
        print(f"-------------------------------------------------------\n")
        print(f"-------------------------------------------------------\n")

        # from collections import Counter

        # doc_sources = []
        # for doc in value["documents"]:
        #     # print(f"Source: {doc.metadata.get('source', 'No source available')}")  # Print source if available
        #     doc_sources.append(doc.metadata.get("source", "No source available"))
        # value["documents"][1].metadata.get("source", "No source available")
        # Count occurrences of each source type
        # source_counts = Counter(doc_sources)
        # print("No of docs from vector store:", source_counts.get("vector", 0))
        # print("No of docs from knowledge_graph:", source_counts.get("knowledge_graph", 0))
        # print("No of docs with no source available:", source_counts.get("No source available", 0))


--- Running workflow: basic ---
Processing question: how to revive a person who is unconscious?
--- RETRIEVE ---
Using original question: how to revive a person who is unconscious?
node 3: ## emergency response
Retrieved 5 documents
Node 'retrieve':

---

--- CHECK DOCUMENT RELEVANCE TO QUESTION ---
Grading 5 documents for relevance to question: how to revive a person who is unconscious?
--- GRADE: DOCUMENT RELEVANT ---
--- GRADE: DOCUMENT RELEVANT ---
--- GRADE: DOCUMENT RELEVANT ---
--- GRADE: DOCUMENT RELEVANT ---
--- GRADE: DOCUMENT RELEVANT ---
--- ASSESS GRADED DOCUMENTS ---
Grade: relevant, Number of documents: 5
Workflow type: basic
--- BASIC WORKFLOW ---
Number of documents used: 5, total grades: relevant
Node 'grade_documents':

---

--- GENERATE ---
Generated answer: To revive a person who is unconscious, follow these steps:

1. **Check for Breathing**: Ensure the p...
--- CHECK HALLUCINATIONS ---
--- BASIC OR MEDIUM WORKFLOW ---
--- DECISION: GENERATION IS GROUNDED AND USE

BadRequestError: Error code: 400 - {'error': {'message': "The response was filtered due to the prompt triggering Azure OpenAI's content management policy. Please modify your prompt and retry. To learn more about our content filtering policies please read our documentation: https://go.microsoft.com/fwlink/?linkid=2198766", 'type': None, 'param': 'prompt', 'code': 'content_filter', 'status': 400, 'innererror': {'code': 'ResponsibleAIPolicyViolation', 'content_filter_result': {'hate': {'filtered': False, 'severity': 'safe'}, 'jailbreak': {'detected': False, 'filtered': False}, 'self_harm': {'filtered': True, 'severity': 'medium'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}}}

In [None]:
documents = create_retriever(question="how to treat a burn injury?", nodes=nodes, llm=model, type="vector_store")
documents