# Persistence and Streaming

Persistance let you keep the state of a agent at particular point in time.This can help you go back to that state and run everything from that state.
This is very important for long running applications



In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator 
from langchain_core.messages import AnyMessage, SystemMessage,HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [16]:
#!pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver # in order for this to work we have to install langgraph-checkpoint-sqlite
import sqlite3
#memory = SqliteSaver.from_conn_string(":memory:")

from langgraph.checkpoint.sqlite import SqliteSaver

db_path = 'checkpoints.db'
conn = sqlite3.connect(db_path, check_same_thread=False)

memory = SqliteSaver(conn)

In [26]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [27]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [34]:
# pip install langgraph-checkpoint-sqlite

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

In [23]:
# Stream: the first thing can be about streaming individual message
# The second thing can be about token, where we stream the output of every token

messages = [HumanMessage(content="What is the weather in Kigali?")]
thread   = {"configurable": {"thread_id": "1"}} # this will allow us to have multiple conversation going at the same time
# the thread is also very useful when you have multiple people using your application at the same time



In [29]:
with SqliteSaver.from_conn_string(":memory:") as memory:
  abot = Agent(model, [tool], system=prompt, checkpointer=memory)

  messages = [HumanMessage(content="What is the weather in Kigali?")]
  thread = {"configurable": {"thread_id": "1"}}
  
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          try:
            print(v['messages'])
          except:
             pass
          
  messages = [HumanMessage(content="What about in Kampala?")]
  thread = {"configurable": {"thread_id": "1"}}
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v)


  messages = [HumanMessage(content="Which one is warmer?")]
  thread = {"configurable": {"thread_id": "1"}}
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v)

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_LVja7TnAU7nM4sZGU0iTxy48', 'function': {'arguments': '{"query":"current weather in Kigali"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4bb42f40-de2c-4779-85fd-5b0d9c831c1d-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Kigali'}, 'id': 'call_LVja7TnAU7nM4sZGU0iTxy48', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 22, 'total_tokens': 173, 'input_token_details': {'audio': 0,

### Losing memory
when the thread id is change, the model no longer have access to the memory and thus get confused

In [32]:
with SqliteSaver.from_conn_string(":memory:") as memory:
  abot = Agent(model, [tool], system=prompt, checkpointer=memory)

  messages = [HumanMessage(content="What is the weather in Kigali?")]
  thread = {"configurable": {"thread_id": "1"}}
  
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          try:
            print(v['messages'])
          except:
             pass
          
  messages = [HumanMessage(content="What about in Kampala?")]
  thread = {"configurable": {"thread_id": "1"}}
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v)


  messages = [HumanMessage(content="Which one is warmer?")]
  thread = {"configurable": {"thread_id": "2"}}
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v)

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_KedJ2CSLIsGXZJiMhQlXBrSx', 'function': {'arguments': '{"query":"Kigali weather current"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 151, 'total_tokens': 174, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_50cad350e4', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-587e924c-b84e-40a4-84fb-2e0fe3d18861-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'Kigali weather current'}, 'id': 'call_KedJ2CSLIsGXZJiMhQlXBrSx', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 23, 'total_tokens': 174, 'input_token_details': {'audio': 0, 'cach

## Streaming tokens

In [38]:
import asyncio
from contextlib import AsyncExitStack
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

stack = AsyncExitStack()
memory = await stack.enter_async_context(AsyncSqliteSaver.from_conn_string(":memory:"))

abot = Agent(model, [tool], system=prompt, checkpointer=memory)

messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")

await stack.aclose()

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'San Francisco current weather'}, 'id': 'call_7TJzX9fqfkV7aI8dXF0khrOO', 'type': 'tool_call'}
Back to the model!
The| current| weather| in| San| Francisco| is| sunny| with| a| temperature| of| |15|.|6|°C| (|60|.|1|°F|).| The| wind| is| blowing| from| the| northeast| at| |7|.|2| mph| (|11|.|5| k|ph|),| and| the| humidity| is| |33|%.| There| is| no| precipitation|,| and| the| visibility| is| |16| km| (|9| miles|).|