Persistence and Streaming

In [129]:
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

In [130]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [131]:
tool = TavilySearchResults(max_results=2)

In [132]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [133]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

In [134]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [137]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")

abot = Agent(model, [tool], system=prompt, checkpointer=memory)
messages = [HumanMessage(content="What is the weather in Ankara?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content="The current weather in Ankara is partly cloudy with a temperature of 4.3°C (39.7°F). There's a southeast wind blowing at 3.6 kph (2.2 mph), and the humidity is 100%.", additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 50, 'prompt_tokens': 793, 'total_tokens': 843, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_0f8c83e59b', 'finish_reason': 'stop', 'logprobs': None}, id='run-fd035d83-4a16-40bd-9b2d-30b6c9665fab-0', usage_metadata={'input_tokens': 793, 'output_tokens': 50, 'total_tokens': 843, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]


In [138]:
messages = [HumanMessage(content="What about in Ankara?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content="It seems there might be some confusion, as you've repeated the question about the weather in Ankara. Could you please specify if there's anything else you're looking for regarding Ankara's weather or any other information about the city?", additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 854, 'total_tokens': 898, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_0f8c83e59b', 'finish_reason': 'stop', 'logprobs': None}, id='run-b677a79e-f1f1-44f0-991a-a43320bf7cbd-0', usage_metadata={'input_tokens': 854, 'output_tokens': 44, 'total_tokens': 898, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]


In [140]:
messages = [HumanMessage(content="What is the weather in Ankara?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_zh6GInUeKj1WMRf0RHbT5VyZ', 'function': {'arguments': '{"query":"Ankara weather forecast October 7 2023"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 28, 'prompt_tokens': 151, 'total_tokens': 179, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_0f8c83e59b', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-f739c39a-003e-434c-b082-889bcb0bda74-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'Ankara weather forecast October 7 2023'}, 'id': 'call_zh6GInUeKj1WMRf0RHbT5VyZ', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 28, 'total_tokens': 179, 'input_to

In [141]:
messages = [HumanMessage(content="What about in Ankara?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content="Could you please clarify what specific information you are looking for about Ankara? Whether it's additional weather details or another aspect of the city you're interested in, let me know so I can assist you further.", additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 41, 'prompt_tokens': 722, 'total_tokens': 763, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_0f8c83e59b', 'finish_reason': 'stop', 'logprobs': None}, id='run-05acfe11-cfa7-4341-a120-4a14494dc0b8-0', usage_metadata={'input_tokens': 722, 'output_tokens': 41, 'total_tokens': 763, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]


In [142]:
messages = [HumanMessage(content="Which questions did I ask you about Ankara? List them all.")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='Here are the questions you asked about Ankara:\n\n1. "What is the weather in Ankara?"\n2. "What about in Ankara?" (This was a bit unclear, so I asked for clarification.)\n\nIf you meant something specific with your second question, please let me know!', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 57, 'prompt_tokens': 782, 'total_tokens': 839, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_0f8c83e59b', 'finish_reason': 'stop', 'logprobs': None}, id='run-f95b9ef7-8b7f-4ef2-803c-6db2101ebf66-0', usage_metadata={'input_tokens': 782, 'output_tokens': 57, 'total_tokens': 839, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]


## Streaming tokens

In [150]:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
import asyncio

memory = AsyncSqliteSaver(":memory:")

async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)
    messages = [HumanMessage(content="What is the weather in Ankara?")]
    thread = {"configurable": {"thread_id": "4"}}
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event.get("event", "")
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")


AttributeError: 'str' object has no attribute 'is_alive'