# Lesson 4: Persistence and Streaming

In [2]:
from dotenv import load_dotenv

_ = load_dotenv()

In [3]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [4]:
tool = TavilySearchResults(max_results=2)

In [5]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [6]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [7]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    # using a loop here for parallel execution (without llm in b/w calls)
    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [8]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [12]:
messages = [HumanMessage(content="What is the weather in hyd? what should I wear today")]

In [13]:
thread = {"configurable": {"thread_id": "1"}}

In [14]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content="The current weather in Hyderabad is misty with a temperature of 18.4°C (65.1°F). The temperature throughout the day is expected to range between 17.46°C and 27.68°C. \n\nGiven these conditions, here's what you might consider wearing today:\n\n1. **Morning:** Since it's cooler in the morning with mist, a light jacket or sweater would be comfortable.\n\n2. **Afternoon:** As the temperature rises, you can switch to lighter clothing, such as a t-shirt or a light long-sleeve shirt. Comfortable jeans or trousers would be appropriate.\n\n3. **Evening:** If you're out in the evening, having a light layer handy might be useful as temperatures drop slightly.\n\nAdditionally, since the UV index is low, you don't have to worry much about sun exposure, but it's always good to have sunglasses if you're sensitive to light.", response_metadata={'token_usage': {'completion_tokens': 180, 'prompt_tokens': 825, 'total_tokens': 1005, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_

In [15]:
messages = [HumanMessage(content="What about in goa?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_buXLdi4NDGMXGsc8otdsxkwM', 'function': {'arguments': '{"query":"Goa weather today"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 1018, 'total_tokens': 1040, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_d28bcae782', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5b14d44d-b209-4b51-a504-54800a476b73-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'Goa weather today'}, 'id': 'call_buXLdi4NDGMXGsc8otdsxkwM'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'Goa weather today'}, 'id': 'call_buXLdi4NDGMXGsc8otdsxkwM'}
Back to the model!
{'messages': [To

In [16]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="Comparing the current temperatures, Goa is warmer than Hyderabad. Goa's temperature is 23.4°C (74.2°F) with sunny conditions, while Hyderabad has a misty 18.4°C (65.1°F).", response_metadata={'token_usage': {'completion_tokens': 50, 'prompt_tokens': 1730, 'total_tokens': 1780, 'prompt_tokens_details': {'cached_tokens': 1664, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_d28bcae782', 'finish_reason': 'stop', 'logprobs': None}, id='run-422043be-1091-4f76-9535-ca214d3e72ca-0')]}


In [17]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please specify the two options or locations you are comparing in terms of warmth?', response_metadata={'token_usage': {'completion_tokens': 18, 'prompt_tokens': 149, 'total_tokens': 167, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_831e067d82', 'finish_reason': 'stop', 'logprobs': None}, id='run-df57605a-8e17-4231-a2f7-d31796d3278e-0')]}


## Streaming tokens

In [18]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [21]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="  ")

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_GSH3cPbktcADlPzJpuDpTCPy'}
Back to the model!
The   current   weather   in   San   Francisco   is   partly   cloudy   with   a   temperature   of     61  °F   (  16  .  1  °C  ).   The   wind   is   blowing   from   the   north   at     9  .  2   mph   (  14  .  8   k  ph  ),   and   the   humidity   is   at     50  %.   The   visibility   is     16   km   (  9   miles  ).  