In [1]:
%run 2.tool-calling.ipynb

In [2]:
llm.invoke("Tell me a joke").content

'A man walked into a library and asked the librarian, "Do you have any books on Pavlov\'s dogs and Schrödinger\'s cat?" The librarian replied, "It rings a bell, but I\'m not sure if it\'s here or not."'

# Create Simple Graph

In [3]:
from langgraph.graph import StateGraph, START, END

In [4]:
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict


class State(TypedDict):
    messages: Annotated[list, add_messages]

In [5]:
tool_list = [search]

In [6]:
from langchain_core.messages import ToolMessage


class BasicToolNode:
    """A node that runs the tools requested in the last AIMessage."""

    def __init__(self, tools: list) -> None:
        self.tools_by_name = {tool.name: tool for tool in tools}

    def __call__(self, inputs: dict):
        if messages := inputs.get("messages", []):
            message = messages[-1]
        else:
            raise ValueError("No message found in input")
        outputs = []
        for tool_call in message.tool_calls:
            tool_name = tool_call["name"]
            tool_args = tool_call["args"]
            tool_call_id = tool_call["id"]
            tool_result = self.tools_by_name[tool_name].invoke(tool_args)
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result),
                    name=tool_name,
                    tool_call_id=tool_call_id,
                )
            )
        return {"messages": outputs}


def route_tools(
    state: State,
):
    """
    Use in the conditional_edge to route to the ToolNode if the last message
    has tool calls. Otherwise, route to the end.
    """
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tools"
    return END


In [7]:
from langchain_core.prompts import ChatPromptTemplate

base_prompt_template = ChatPromptTemplate.from_template(
    """
    You are a helpful chatbot. Your task is to assist the user to the best of your ability while making use of available tools. Ensure that your 
    answers are restricted to the knowledge/tools available to you. If you are not aware of any matter simply inform them "I don't know
    {message_history}
    """
)

base_chain = base_prompt_template | llm_with_tools

In [21]:
from langchain_core.messages.ai import AIMessage

content_moderation_prompt_template = ChatPromptTemplate.from_template(
"""
You are a content moderator. Return True or Falsse based on whether the query is related to safety, safety regulations or not.
Query: {query}
Response:
"""
)

def parse(ai_message: AIMessage) -> str:
    """Parse the AI message."""
    response = ai_message.content.title()
    print(f"{response=}")
    output_to_bool = response == "True"
    print(f"{output_to_bool=}")
    return output_to_bool
    
moderation_chain = content_moderation_prompt_template | llm | parse

In [22]:
print(moderation_chain.invoke({"query": "What are some safety measures to take?"}))
# print(moderation_chain.invoke({"query": "Hi"}))

response='I Would Return False For This Query As It Does Not Relate To Safety, Safety Regulations, Or Any Specific Topic That Requires Attention To Safety Protocols.\n\nSafety-Related Queries Typically Include Topics Such As:\n\n* Safety Guidelines And Regulations (E.G., Osha, Iso)\n* Emergency Procedures\n* Hazardous Materials Handling\n* Workplace Safety\n\nNon-Safety Related Queries Might Include:\n\n* General Information About A Company Or Organization\n* A Question About A Specific Product Or Service\n* A Social Media Post Or Update'
output_to_bool=False
False


In [None]:
from langchain_core.messages.modifier import RemoveMessage

def content_moderation_node(state: State):
    last_message = state["messages"][-1]
    print(f"{last_message=}") 
    message_accepted = moderation_chain.invoke({"query": last_message.content})
    print(f"{message_accepted=}") 
    if not message_accepted:
        moderation_response = AIMessage(content=f"User query: {last_message.content} does not fit our content policy. Warn the user")
        return {"messages": state["messages"] + [RemoveMessage(id=last_message.id)] + [moderation_response] }
    return {"messages": [last_message]}

In [14]:
graph_builder = StateGraph(State)

def chatbot(state: State):
    return {"messages": base_chain.invoke(
        {"message_history": 
             state["messages"]
        }
    )
   }

In [15]:
# Add nodes
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("content_moderation_node", content_moderation_node)
tool_node = BasicToolNode(tools=tool_list)
graph_builder.add_node("tools", tool_node)

# Add edges
graph_builder.add_edge(START, "content_moderation_node")  # Start at content_moderation_node
graph_builder.add_edge("content_moderation_node", "chatbot")  # From content_moderation_node to chatbot
# graph_builder.add_conditional_edges(
#     "chatbot",
#     route_tools,
#     {"tools": "tools", END: END},
# )
# graph_builder.add_edge("tools", "chatbot")

# Compile the graph
graph = graph_builder.compile()

### See it in action

In [16]:
from uuid import uuid4
from sys import exit
from langgraph.types import Command
import json


user_id = uuid4().hex

config = {"configurable": {"thread_id": user_id}}
# TODO: Other metadata?


try:
    user_input = input("[User]: ")

    for event in graph.stream(
        {"messages": [{"role": "user", "content": user_input}]}, config=config
    ):
        
        # Check for interrupt
        if "__interrupt__" in event:
            interrupt_obj = event["__interrupt__"][0]
            # Usually a list with one Interrupt
            prompt = interrupt_obj.value  # The payload sent to human
            user_query = prompt["query"]
            print(f"[System]: [User] asks {user_query}")

            # Get human input to resume
            human_response = input("[Support]: ")

            # Resume the graph with human input
            resume_events = graph.stream(Command(resume=human_response), config=config)
            for resume_event in resume_events:
                # Process resumed events (messages, tools, etc.)
                print(resume_event)
            break  # Exit current event loop to wait for next user input

        # Handle chatbot node (tool calls and messages)
        if "chatbot" in event:
            for message in event["chatbot"]["messages"]:
                # Check for tool calls
                if hasattr(message, "tool_calls") and message.tool_calls:
                    print("Tool usage:")
                    for tool_call in message.tool_calls:
                        tool_name = tool_call["name"]
                        tool_args = tool_call["args"]
                        tool_id = tool_call["id"]
                        print(f"{tool_id} | {tool_name} | {tool_args}")
                # Print chatbot message content if no tool calls
                else:
                    if isinstance(message, tuple):
                        # Assuming tuple is (role, content)
                        print(f"{message=}")
                        content = message[1]
                    else:
                        content = getattr(message, "content", None)
                    if content:
                        print("Message output:")
                        print(content)
        # Handle tools node (tool execution results)
        if "tools" in event:
            for message in event["tools"]["messages"]:
                print("Tool output:")
                tool_call_id = message.tool_call_id
                message_id = message.id
                tool_name = message.name
                content_preview = message.content[
                    :1000
                ]  # Limit content to 100 characters
                print(
                    f"{tool_call_id} | {message_id} | {tool_name} | {content_preview}"
                )
except KeyboardInterrupt:
    print("Exiting...")
    exit(0)


[User]:  What are some safety measures to take?


last_message=HumanMessage(content='What are some safety measures to take?', additional_kwargs={}, response_metadata={}, id='4bf267cf-a2a0-4a71-942b-b4d01a2d8289')
response='I Am Not Able To Provide Information On Safety Measures. Is There Something Else I Can Help You With?'
output_to_bool=False
message_accepted=False
message=('content', '')
message=('additional_kwargs', {'tool_calls': [{'id': 'call_wz0v0gsw', 'function': {'arguments': '{"query":"safety measures"}', 'name': 'duckduckgo_search'}, 'type': 'function', 'index': 0}], 'refusal': None})
Message output:
{'tool_calls': [{'id': 'call_wz0v0gsw', 'function': {'arguments': '{"query":"safety measures"}', 'name': 'duckduckgo_search'}, 'type': 'function', 'index': 0}], 'refusal': None}
message=('response_metadata', {'token_usage': {'completion_tokens': 36, 'prompt_tokens': 300, 'total_tokens': 336, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'llama3.2:1b', 'system_fingerprint': 'fp_ollama', 'id': '