For some actions , we may need to require human approval before running to ensure that everythin is running as intended

In [None]:
from typing import Annotated
import operator, json
from typing import TypedDict, Sequence
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_groq import ChatGroq
from dotenv import load_dotenv
from IPython.display import Image, display
from langchain_core.messages import AIMessage, ToolMessage

In [None]:
load_dotenv()

In [None]:
llm = ChatGroq(model_name="Gemma2-9b-It")

In [None]:
llm.invoke("hi").content

In [None]:
@tool
def multiply(first_number: int, secont_number: int) -> int:
    """ Multyply two interger numbers """
    return first_number * secont_number

In [None]:
multiply({"first_number": 20, "second_number": 10})

In [None]:
multiply.invoke({"first_number": 20, "second_number": 10})

In [None]:
# Expencive tool
@tool
def web_search(query:str):
    """ Perform the web search on the user query """
    tavily = TavilySearchResults()
    result = tavily.invoke(query)
    return result

In [None]:
web_search("Who is the current president of USA?")

In [None]:
web_search.invoke("Who is the current president of USA?")

In [None]:
tools_kit_1 = [web_search, multiply]

In [None]:
model_with_tools = llm.bind_tools(tools_kit_1)

In [None]:
tool_mapping = {tool.name : tool for tool in tools_kit_1}

In [None]:
tool_mapping

In [None]:
response = model_with_tools.invoke("Who is the current president of USA?")

In [None]:
tool_details = response.additional_kwargs.get("tool_calls")

In [None]:
tool_details

In [None]:
tool_details[0]["function"]["name"]

In [None]:
tool_details[0]["function"]["arguments"]

In [None]:
tool_mapping[tool_details[0]["function"]["name"]].invoke(json.loads(tool_details[0]["function"]["arguments"]))


In [None]:
class AgentState(TypedDict):
    messages : Annotated[Sequence[BaseMessage], operator.add]

In [None]:
def invoke_model(state: AgentState):
    messages = state["messages"]
    question = messages[-1] # Fetching the user question
    return {"messages": [model_with_tools.invoke(question)]}

In [None]:
def invoke_tool(state:AgentState):
    tool_details = state["messages"][-1].additional_kwargs.get("tool_calls", [])[0]

    if tool_details is None:
        raise Exception("no tool call found")
    
    print(f"Selected tool: {tool_details.get("function").get("name")}")

    if tool_details.get("function").get("name") == "search":
        response = input(prompt=f"[y/n] continue with expensive web search?")
        if response == "n":
            raise Exception("web search was discarded")

    response = tool_mapping[tool_details["function"]["name"]].invoke(json.loads(tool_details.get("function")))
    return {"messages": [response]}  
    

In [None]:
def router(state):
    tool_calls = state["messages"][-1].additional_kwargs.get("tool_calls", [])

    if len(tool_calls):
        return "tool"
    
    else:
        return "end"

In [None]:
# Graph

graph = StateGraph(AgentState) # StateGraph with AgentState

graph.add_node("ai_assitant", invoke_model)
graph.add_node("tool", invoke_tool)

graph.add_conditional_edges("ai_assistant", router, {"tool":"tool", "end":END})

graph.add_edge("tool", END)

#graph.add_edge("tool", "ai_assistant")

graph.set_entry_point("ai_assistant")

app_1 = graph.compile()

In [None]:
display(Image(app_1.get_graph().draw_mermaid_png()))

In [None]:
# We will be asked before searching the web
for s in app_1.stream({"messages": ["Who is the current president of USA?"]}):
    print(list(s.values())[0])
    print("-----")

In [None]:
# We will not be asked before doing the multiplication
for s in app_1.stream({"messages": ["what is the result of multiplication of 15 and 62?"]}):
    print(list(s.values())[0])
    print("-----")

LangGraph supports human-in-loop workflows in a number of ways, in this section, i will use LangGraph's interrupt_before functionality to always break the tool node.

In [None]:
class AgentState1(TypedDict):
    messages : Annotated[list, add_messages]

In [None]:
tavily = TavilySearchResults()

In [None]:
tools = [tavily]

In [None]:
llm_with_tool = llm.bind_tools(tools)

In [None]:
def ai_assistant(state: AgentState1):
    return {"messages": [llm_with_tool.invoke(state["messages"])]}

In [None]:
memory = MemorySaver()

In [None]:
graph_builder = StateGraph(AgentState1)

graph_builder.add_node("ai_assistant", ai_assistant)
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)

graph_builder.add_edge(START, "ai_assistant")
graph_builder.add_conditional_edges(
    "ai_assistant",
    tools_condition
)
graph_builder.add_edge("tools", "at_assistant")

app_2 = graph_builder.compile(
    checkpointer=memory,

    # This is new
    interrupt_before=["tools"],
    # Note : can also interrupt_after_ tools, if desires.
    # interrupt_after=["tools"]
)

In [None]:
display(Image(app_2.get_graph().draw_mermaid_png()))

In [None]:
user_input = "What is the capital of USA?"
config = {"configurable": {"thread_id": "1"}}

In [None]:
# The confing is the **second positional argument** to stream() or invoke()

events = app_2.stream(
    {"messages": [("user", user_input)]},
    config=config,
    stream_mode="values"
)

In [None]:
for envet in events:
    if "messages" in envet:
        envet["messages"][-1].pretty_print()

In [None]:
snapshot = app_2.get_state(config=config)

In [None]:
snapshot.next

In [None]:
last_message = snapshot.values["messages"][-1]

In [None]:
last_message.tool_calls

In [None]:
# 'None' will append nothing new to the current state, letting it resume as if it had never been interrupted
events = app_2.stream(None, config=config, stream_mode="values")

In [None]:
for envet in events:
    if "messages" in envet:
        envet["messages"][-1].pretty_print()

In [None]:
user_input = "What is the weather there?"
config = {"configurable": {"thread_id": "1"}}

In [None]:
events = app_2.stream(
    {"messages": [("user", user_input)]},
    config=config,
    stream_mode="values"
)

In [None]:
for envet in events:
    if "messages" in envet:
        envet["messages"][-1].pretty_print()

In [None]:
snapshot_1 = app_2.get_state(config=config)

In [None]:
snapshot_1.next

In [None]:
last_message_1 = snapshot_1.values["messages"][-1]

In [None]:
last_message_1.tool_calls

In [None]:
# 'None' will append nothing new to the current state, letting it resume as if it had never been interrupted
events = app_2.stream(None, config=config, stream_mode="values")

In [None]:
for envet in events:
    if "messages" in envet:
        envet["messages"][-1].pretty_print()

In [None]:
snapshot_2 = app_2.get_state(config=config)

In [None]:
snapshot_2.next

In [None]:
user_input = "give me the recent news of it?"
config = {"configurable": {"thread_id": "1"}}

In [None]:
events = app_2.stream(
    {"messages": [("user", user_input)]},
    config=config,
    stream_mode="values"
)

In [None]:
for envet in events:
    if "messages" in envet:
        envet["messages"][-1].pretty_print()

In [None]:
snapshot_3 = app_2.get_state(config=config)

In [None]:
current_message = snapshot_3.values["messages"][-1]

In [None]:
current_message.pretty_print()

In [None]:
tool_call_id = current_message.tool_calls[0]["id"]

In [None]:
tool_call_id

In [None]:
answer = "it is just related to raining which is happening on daily basis"

In [None]:
new_messages = [
    ToolMessage(content=answer, tool_call_id=tool_call_id),
    AIMessage(content=answer),
]

In [None]:
app_2.update_state(
    config=config,
    {"messages": new_messages},
)

In [None]:
print(app_2.get_state(config=config).values["messages"][-1:])