In [None]:
from typing import Annotated, TypedDict, List, Dict, Any, Optional
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from pydantic import BaseModel, Field
from IPython.display import Image, display
import gradio as gr
import uuid
from dotenv import load_dotenv
from langchain.agents import Tool
from langchain_community.utilities import GoogleSerperAPIWrapper
import requests
import os

### Load environment variables from .env file

In [None]:
load_dotenv(override=True)

### Tool 1: Search tool using Google Serper API

In [None]:
serper = GoogleSerperAPIWrapper()
#serper.run("What is the capital of India?")
tool_search =Tool(
        name="search",
        func=serper.run,
        description="Useful for when you need more information from an online search"
    )

### Tool 2: push notification tool using Pushover API

In [None]:

# Ensure to have the Pushover API token and user key set in environment variables
pushover_token = os.getenv("PUSHOVER_TOKEN")
pushover_user = os.getenv("PUSHOVER_USER")
pushover_url = "https://api.pushover.net/1/messages.json"

def push(text: str):
    """Send a push notification to the user"""
    requests.post(pushover_url, data = {"token": pushover_token, "user": pushover_user, "message": text})

tool_push = Tool(
        name="send_push_notification",
        func=push,
        description="useful for when you want to send a push notification"
    )

#tool_push.invoke("Hello, me")    

### Define the tools to be used in the graph


In [None]:

tools= [tool_search, tool_push]

### Define a structured output

In [None]:

class EvaluatorOutput(BaseModel):
    feedback: str = Field(description="Feedback on the assistant's response")
    success_criteria_met: bool = Field(description="Whether the success criteria have been met")
    user_input_needed: bool = Field(description="True if more input is needed from the user, or clarifications, or the assistant is stuck")

### Define the state of the graph

In [None]:

# This will include the messages, success criteria, feedback, and flags for user input and success
class State(TypedDict):
    messages: Annotated[List[Any], add_messages]
    success_criteria: str
    feedback_on_work: Optional[str]
    success_criteria_met: bool
    user_input_needed: bool

### Create a ChatOpenAI instance with the models for Worker/Assistant and Evaluator

In [None]:
# Create a ChatOpenAI instance with the model you want to use and bind the tools to it
# This worker/assistant LLM will be used to generate the response and can call the tools
# The tools will be used to search for information or send notifications as needed

worker_llm = ChatOpenAI(model="gpt-4.1-mini")
worker_llm_with_tools = worker_llm.bind_tools(tools)

# Initialize the evaluator LLM with structured output
# This LLM will be used to evaluate the assistant's response based on the success criteria

evaluator_llm = ChatOpenAI(model="gpt-4.1-mini")
evaluator_llm_with_output = evaluator_llm.with_structured_output(EvaluatorOutput)

### The Worker/Assistant node function 

This function prepares a system prompt for the assistant (including task criteria and feedback), ensures it’s included in the message history, invokes the LLM with tools, and returns the updated state with the assistant’s response.

In [None]:


# Defines a function worker that takes a state dictionary and returns a dictionary.
def worker(state: State) -> Dict[str, Any]:
    #Creates a system_message string that instructs the assistant on how to behave, including the task’s success criteria from the state. 
    system_message = f"""You are a helpful assistant that can use tools to complete tasks.
You keep working on a task until either you have a question or clarification for the user, or the success criteria is met.
This is the success criteria:
{state['success_criteria']}
You should reply either with a question for the user about this assignment, or with your final response.
If you have a question for the user, you need to reply by clearly stating your question. An example might be:

Question: please clarify whether you want a summary or a detailed answer

If you've finished, reply with the final answer, and don't ask a question; simply reply with the answer.
"""
    # If there is feedback on previous work, include it in the system message to guide the assistant and 
    # help it meet the success criteria.
    if state.get("feedback_on_work"):
        system_message += f"""
Previously you thought you completed the assignment, but your reply was rejected because the success criteria was not met.
Here is the feedback on why this was rejected:
{state['feedback_on_work']}
With this feedback, please continue the assignment, ensuring that you meet the success criteria or have a question for the user."""
    
    # Checks if there is already a SystemMessage in the messages list.
    # If found, updates its content with the new system_message.
    # Add in the system message

    found_system_message = False
    messages = state["messages"]
    for message in messages:
        if isinstance(message, SystemMessage):
            message.content = system_message
            found_system_message = True
    # If no SystemMessage was found, prepend the system message to the messages list.
    # This ensures that the system message is always present at the start of the conversation.
    if not found_system_message:
        messages = [SystemMessage(content=system_message)] + messages
    
    # Invoke the LLM with tools to generate a response based on the current messages.
    response = worker_llm_with_tools.invoke(messages)
    
    # Return updated state
    # The response is added to the messages list, and the state is updated with the new messages.
    return {
        "messages": [response],
    }

### Worker Router Function

This function decides the next step in a workflow based on the last message in the state. It acts as a router. If the last message requests tool usage, it returns "tools". Otherwise, it returns "evaluator". This helps the workflow decide what to do next based on the assistant’s output.

In [None]:
def worker_router(state: State) -> str:
    last_message = state["messages"][-1]
    
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    else:
        return "evaluator"

### Converts a list of user and assistant messages into a readable chat transcript

In [None]:
# Labeling each line as either "User" or "Assistant" and handling cases where the assistant’s message is empty 
# (e.g., when a tool was used).
def format_conversation(messages: List[Any]) -> str:
    conversation = "Conversation history:\n\n"
    for message in messages:
        if isinstance(message, HumanMessage):
            conversation += f"User: {message.content}\n"
        elif isinstance(message, AIMessage):
            text = message.content or "[Tools use]"
            conversation += f"Assistant: {text}\n"
    return conversation

### Evaluator function

This function uses an LLM to evaluate the assistant’s last answer against the assignment’s success criteria, provides feedback, and decides if the task is complete or if more user input is needed. It updates the workflow state with this evaluation.

In [None]:


def evaluator(state: State) -> State:
    last_response = state["messages"][-1].content
    
    # Prepares a system prompt for the evaluator LLM, instructing it to judge the assistant’s answer against the success criteria.
    
    system_message = f"""You are an evaluator that determines if a task has been completed successfully by an Assistant.
Assess the Assistant's last response based on the given criteria. Respond with your feedback, and with your decision on whether the success criteria has been met,
and whether more input is needed from the user."""
    
    # Builds a detailed user prompt for the evaluator LLM, including: 
    # The full conversation history (formatted),
    # The assignment’s success criteria,
    # The assistant’s last response,
    # Instructions to give feedback and decide if the criteria are met or if more user input is needed.
    
    user_message = f"""You are evaluating a conversation between the User and Assistant. You decide what action to take based on the last response from the Assistant.

The entire conversation with the assistant, with the user's original request and all replies, is:
{format_conversation(state['messages'])}

The success criteria for this assignment is:
{state['success_criteria']}

And the final response from the Assistant that you are evaluating is:
{last_response}

Respond with your feedback, and decide if the success criteria is met by this response.
Also, decide if more user input is required, either because the assistant has a question, needs clarification, or seems to be stuck and unable to answer without help.
"""
    # If there was previous feedback, appends it to the prompt, reminding the evaluator to check for repeated mistakes.
    if state["feedback_on_work"]:
        user_message += f"Also, note that in a prior attempt from the Assistant, you provided this feedback: {state['feedback_on_work']}\n"
        user_message += "If you're seeing the Assistant repeating the same mistakes, then consider responding that user input is required."
    
    # Prepares the message list for the LLM: a system message (instructions) and a human message (the evaluation task).
    evaluator_messages = [SystemMessage(content=system_message), HumanMessage(content=user_message)]
    
    # Calls the evaluator LLM with the prepared messages and gets the result (which should include feedback, a success flag, and a user input flag).
    eval_result = evaluator_llm_with_output.invoke(evaluator_messages)
    
    # Builds a new state dictionary: 
    # Adds the evaluator’s feedback as a message, Stores the feedback, whether the criteria were met, 
    # and if more user input is needed.
    new_state = {
        "messages": [{"role": "assistant", "content": f"Evaluator Feedback on this answer: {eval_result.feedback}"}],
        "feedback_on_work": eval_result.feedback,
        "success_criteria_met": eval_result.success_criteria_met,
        "user_input_needed": eval_result.user_input_needed
    }
    return new_state

### Router function routes the workflow from Evaluator

If the task is done or needs user input, it ends the workflow.
Otherwise, it sends the state back to the worker for more work.

In [None]:
def route_based_on_evaluation(state: State) -> str:
    
    # If the task’s success criteria have been met (success_criteria_met is True), 
    # or if more input is needed from the user (user_input_needed is True), 
    # the function returns "END", signaling the workflow to stop or move to the end node.
    if state["success_criteria_met"] or state["user_input_needed"]:
        return "END"
    else:
        return "worker"

### Build a LangGraph workflow 

Three main nodes (`worker`, `tools`, `evaluator`),
sets up the logic for moving between them based on the state, and 
compiles the workflow for execution with memory checkpointing.

In [None]:
# Set up Graph Builder with State
# Create a StateGraph instance to manage the workflow
graph_builder = StateGraph(State)

# Add nodes
# Adds three nodes to the graph: 
#   worker: Runs the `worker` function (handles main task logic). 
#   tools: Runs a `ToolNode` (handles tool calls, e.g., web search, calculator).
#   evaluator: Runs the `evaluator` function (checks if the task is complete or needs user input).
graph_builder.add_node("worker", worker)
graph_builder.add_node("tools", ToolNode(tools=tools))
graph_builder.add_node("evaluator", evaluator)

# Add edges
# Adds conditional edges from the worker node: 
# Uses the worker_router function to decide the next node.
# If worker_router returns tools, go to the tools node.
# If it returns evaluator, go to the evaluator node.
graph_builder.add_conditional_edges("worker", worker_router, {"tools": "tools", "evaluator": "evaluator"})
graph_builder.add_edge("tools", "worker")
graph_builder.add_conditional_edges("evaluator", route_based_on_evaluation, {"worker": "worker", "END": END})
graph_builder.add_edge(START, "worker")

# Compile the graph
# Compiles the graph into an executable workflow.
# Uses MemorySaver() as a checkpointer to save the state/progress of the workflow.
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

### Generates and displays a visual diagram of LangGraph workflow


In [None]:
display(Image(graph.get_graph().draw_mermaid_png()))

#### Let's Test the workflow :)

In [None]:
state = {
    "messages": [HumanMessage(content="What is the GDP of USA?")],
    "success_criteria": "The answer must include the latest GDP figure for the USA.",
    "feedback_on_work": "",
    "success_criteria_met": False,
    "user_input_needed": False
}
result = graph.invoke(
    state,
    config={"thread_id": "my-unique-thread-id"}
)
print(result)

### Print the messages in the result

In [None]:

# This will display the conversation history, including the user's input, assistant's responses, and any tool calls made.
for m in result["messages"]:
    m.pretty_print()

### Next comes the gradio Callback to kick off a super-step

In [None]:

# make_thread_id() function generates a unique thread ID as a string using Python’s uuid module.
#  It is useful for tracking or checkpointing conversations or sessions uniquely.

def make_thread_id() -> str:
    return str(uuid.uuid4())

def format_flow(messages):
    # Build a string in your desired format
    flow = ""
    for m in messages:
        if isinstance(m, HumanMessage):
            flow += "======== Human Message ========\n" + m.content + "\n"
        elif hasattr(m, "tool_calls") and m.tool_calls:
            flow += "======== Ai Message (Tool Calls) ========\n"
            for call in m.tool_calls:
                flow += f"Tool: {call['name']}\nArgs: {call['args']}\n"
        elif isinstance(m, AIMessage):
            flow += "======== Ai Message ========\n" + m.content + "\n"
        elif isinstance(m, dict) and m.get("role") == "assistant":
            flow += "======== Ai Message ========\n" + m.get("content", "") + "\n"
        else:
            flow += str(m) + "\n"
    return flow


def format_for_gradio(messages):
    formatted = []
    for m in messages:
        # Skip evaluator feedback
        if (
            isinstance(m, dict)
            and m.get("role") == "assistant"
            and isinstance(m.get("content"), str)
            and m["content"].startswith("Evaluator Feedback on this answer:")
        ):
            continue  # Skip this message

        # If m is a dict with 'role' and 'content'
        if isinstance(m, dict) and "role" in m and "content" in m:
            formatted.append([m["role"], m["content"]])
        # If m is a LangChain message object
        elif hasattr(m, "content"):
            role = getattr(m, "role", m.__class__.__name__.lower())
            formatted.append([role, m.content])
        else:
            formatted.append(["assistant", str(m)])
    return formatted

# process_message function handles a single interaction in the conversation workflow, updating the state and 
# returning the updated conversation history.
# Parameters:
# message: The current user message(s) (could be a list or a single message).
# success_criteria: The criteria for a successful answer.
# history: The conversation history so far.
# thread: The unique thread ID for this conversation.

async def process_message(message, success_criteria, history, thread):

    config = {"configurable": {"thread_id": thread}}

    state = {
        "messages": message,
        "success_criteria": success_criteria,
        "feedback_on_work": None,
        "success_criteria_met": False,
        "user_input_needed": False
    }
    result = await graph.ainvoke(state, config=config)
    user = {"role": "user", "content": message}
    reply = {"role": "assistant", "content": result["messages"][-2].content}
    feedback = {"role": "assistant", "content": result["messages"][-1].content}
    
    return format_for_gradio(history + [user, reply, feedback]), format_flow(history + [user, reply, feedback])
    #return format_for_gradio(history + [user, reply, feedback])
    #return history + [user, reply, feedback]

# Resets the conversation state.
# Returns: Empty values for message, success_criteria, history, and a new unique thread ID.
async def reset():
    return "", "", None, make_thread_id()


### And now launch our UI

In [None]:

with gr.Blocks(theme=gr.themes.Default(primary_hue="emerald")) as demo:
    gr.Markdown("## Personal Co-worker")
    thread = gr.State(make_thread_id())
    
    with gr.Row():
        chatbot = gr.Chatbot(label="How Can I Help You today ? ", height=300)
    with gr.Group():
        with gr.Row():
            message = gr.Textbox(show_label=False, placeholder="Your request to your sidekick")
        with gr.Row():
            success_criteria = gr.Textbox(show_label=False, placeholder="What are your success critiera?")
    with gr.Row():
        reset_button = gr.Button("Reset", variant="stop")
        go_button = gr.Button("Go!", variant="primary")
    with gr.Row():
        flow_window = gr.Code(label="Conversation Flow", language="python", interactive=False, lines=20)
        
    #message.submit(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    #success_criteria.submit(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    #go_button.click(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    #reset_button.click(reset, [], [message, success_criteria, chatbot, thread])
    message.submit(process_message, [message, success_criteria, chatbot, thread], [chatbot, flow_window])
    success_criteria.submit(process_message, [message, success_criteria, chatbot, thread], [chatbot, flow_window])
    go_button.click(process_message, [message, success_criteria, chatbot, thread], [chatbot, flow_window])
    reset_button.click(reset, [], [message, success_criteria, chatbot, thread, flow_window])
    
demo.launch()

In [None]:
# This will display the conversation history, including the user's input, assistant's responses, and any tool calls made.
for m in result["messages"]:
    m.pretty_print()

In [None]:
import gradio as gr
print(gr.__version__)