In [8]:
from typing import Annotated
from langgraph.graph import StateGraph


# 1. State Definition
class State:
    def __init__(self, user_query, chat_history=None, request_report=False):
        self.user_query = user_query  # Current query from the user
        self.route = None             # 'sql', 'search', or 'reporting'
        self.sql_result = None        # Result from SQL Agent
        self.search_result = None     # Result from Search Agent
        self.final_response = None    # Aggregated final response
        self.report = None            # Structured report from Reporting Agent
        self.request_report = request_report  # True if user requested a report
        self.chat_history = chat_history or []  # Tracks previous interactions
    
    def __repr__(self):
        return (f"State(user_query={self.user_query}, route={self.route}, "
                f"sql_result={self.sql_result}, search_result={self.search_result}, "
                f"final_response={self.final_response}, report={self.report}, "
                f"request_report={self.request_report}, chat_history={self.chat_history})")
    
    def validate(self):
        if not isinstance(self.user_query, str):
            raise TypeError("user_query must be a string")
        if not isinstance(self.chat_history, list):
            raise TypeError("chat_history must be a list")
        if not isinstance(self.request_report, bool):
            raise TypeError("request_report must be a boolean")


# 2. Orchestrator Agent
def orchestrator_agent(state: State) -> Annotated[State, "orchestrator_output"]:
    state.validate()
    print("[DEBUG] Entering orchestrator_agent")
    
    if state.route is None:
        if "database" in state.user_query or "SQL" in state.user_query:
            state.route = "sql"
        else:
            state.route = "search"
    
    if state.route == "search":
        state.final_response = f"Search Result: {state.search_result}"
    elif state.route == "sql":
        state.final_response = f"Database Result: {state.sql_result}"
    
    state.chat_history.append({
        "user_query": state.user_query,
        "agent": state.route,
        "response": state.final_response
    })
    
    if state.request_report:
        state.route = "reporting"
    
    return state


# 3. SQL Agent
def sql_agent(state: State) -> Annotated[State, "sql_output"]:
    state.validate()
    print("[DEBUG] Entering sql_agent")
    
    context = "\n".join([
        f"User: {entry['user_query']}, Agent: {entry['agent']}, Response: {entry['response']}" 
        for entry in state.chat_history
    ])
    state.sql_result = f"SQL Query Result for '{state.user_query}' with context:\n{context}"
    state.route = "orchestrator"
    
    return state


# 4. Enterprise Search Agent
def search_agent(state: State) -> Annotated[State, "search_output"]:
    state.validate()
    print("[DEBUG] Entering search_agent")
    
    context = "\n".join([
        f"User: {entry['user_query']}, Agent: {entry['agent']}, Response: {entry['response']}" 
        for entry in state.chat_history
    ])
    state.search_result = f"Search Query Result for '{state.user_query}' with context:\n{context}"
    state.route = "orchestrator"
    
    return state


# 5. Reporting Agent
def reporting_agent(state: State) -> Annotated[State, "report_output"]:
    state.validate()
    print("[DEBUG] Entering reporting_agent")
    
    state.report = f"Aggregated Report:\n{state.final_response}\nGenerated using the final aggregated results."
    
    state.chat_history.append({
        "user_query": state.user_query,
        "agent": "reporting",
        "response": state.report
    })
    
    state.final_response = state.report
    state.route = "orchestrator"
    
    return state


# 6. Workflow Graph
graph = StateGraph(State)

# Add nodes with explicit annotations
graph.add_node("orchestrator_agent", orchestrator_agent)
graph.add_node("sql_agent", sql_agent)
graph.add_node("search_agent", search_agent)
graph.add_node("reporting_agent", reporting_agent)

# Define edges with annotated routing
graph.add_edge("orchestrator_agent", "sql_agent")
graph.add_edge("orchestrator_agent", "search_agent")
graph.add_edge("orchestrator_agent", "reporting_agent")
graph.add_edge("sql_agent", "orchestrator_agent")
graph.add_edge("search_agent", "orchestrator_agent")
graph.add_edge("reporting_agent", "orchestrator_agent")

# Entry and finish points
graph.set_entry_point("orchestrator_agent")
graph.set_finish_point("orchestrator_agent")

# Compile the workflow
app = graph.compile()


# 7. Test Workflow Scenarios

# Step 1: Initial SQL Query
state = State(user_query="Get customer sales data from the database.")
state = app.invoke(state)
print("\nStep 1 - Final Response:", state.final_response)

# Step 2: Follow-Up Query to Search Agent
state = State(
    user_query="Are there any documents about these sales?", 
    chat_history=state.chat_history
)
state = app.invoke(state)
print("\nStep 2 - Final Response:", state.final_response)

# Step 3: Another Follow-Up Query to Search Agent
state = State(
    user_query="Do these documents include customer feedback?", 
    chat_history=state.chat_history
)
state = app.invoke(state)
print("\nStep 3 - Final Response:", state.final_response)

# Step 4: Generate Report
state = State(
    user_query="Summarize these findings into a report.", 
    request_report=True, 
    chat_history=state.chat_history
)
state = app.invoke(state)
print("\nStep 4 - Final Report:", state.final_response)


[DEBUG] Entering orchestrator_agent
[DEBUG] Entering sql_agent
[DEBUG] Entering search_agent
[DEBUG] Entering reporting_agent


InvalidUpdateError: At key '__root__': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE