# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [6]:
!pip install langgraph-checkpoint-sqlite

Collecting langgraph-checkpoint-sqlite
  Downloading langgraph_checkpoint_sqlite-2.0.3-py3-none-any.whl.metadata (3.0 kB)
Collecting aiosqlite<0.21.0,>=0.20.0 (from langgraph-checkpoint-sqlite)
  Downloading aiosqlite-0.20.0-py3-none-any.whl.metadata (4.3 kB)
Downloading langgraph_checkpoint_sqlite-2.0.3-py3-none-any.whl (12 kB)
Downloading aiosqlite-0.20.0-py3-none-any.whl (15 kB)
Installing collected packages: aiosqlite, langgraph-checkpoint-sqlite
Successfully installed aiosqlite-0.20.0 langgraph-checkpoint-sqlite-2.0.3


In [7]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [19]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=memory)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [27]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatGroq(model="llama3-70b-8192")
with SqliteSaver.from_conn_string(":memory:") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)

    messages = [HumanMessage(content="What is the weather in sf?")]
    thread = {"configurable": {"thread_id": "1"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v['messages'])

    messages = [HumanMessage(content="What about in la?")]
    thread = {"configurable": {"thread_id": "1"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)

    messages = [HumanMessage(content="Which one is warmer?")]
    thread = {"configurable": {"thread_id": "1"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)    

    messages = [HumanMessage(content="Which one is warmer?")]
    thread = {"configurable": {"thread_id": "2"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_r287', 'function': {'arguments': '{"query":"current weather in san francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 52, 'prompt_tokens': 1014, 'total_tokens': 1066, 'completion_time': 0.154603588, 'prompt_time': 0.085667677, 'queue_time': 0.069323831, 'total_time': 0.240271265}, 'model_name': 'llama3-70b-8192', 'system_fingerprint': 'fp_2f30b0b571', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-70d15780-515e-4f82-b3a0-233b094d2e09-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in san francisco'}, 'id': 'call_r287', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1014, 'output_tokens': 52, 'total_tokens': 1066})]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in san francisco'}, 'id': 'call_r287', 'type': 'tool_call'}
Back to the model!
[To

In [31]:
with SqliteSaver.from_conn_string(":memory:") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)

    messages = [HumanMessage(content="what was my last question?")]
    thread = {"configurable": {"thread_id": "1"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_a0xn', 'function': {'arguments': '{"query":"what was my last question"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 51, 'prompt_tokens': 1013, 'total_tokens': 1064, 'completion_time': 0.152530951, 'prompt_time': 0.075628695, 'queue_time': 0.23889469100000002, 'total_time': 0.228159646}, 'model_name': 'llama3-70b-8192', 'system_fingerprint': 'fp_2f30b0b571', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-647c878f-3fd4-4701-bcc5-d015ee9bd13a-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'what was my last question'}, 'id': 'call_a0xn', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1013, 'output_tokens': 51, 'total_tokens': 1064})]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'what was my last question'}, 'id': 'call_a0xn', 'type': 'tool_call'}
Back to the model!
[ToolMessage(con

## Streaming tokens

In [28]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

ModuleNotFoundError: No module named 'langgraph.checkpoint.aiosqlite'

In [None]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")