# Lab 4: Persistence and Streaming

## Environment Setup

We begin by establishing our agent environment. This process involves loading the necessary environment variables, importing required modules, initializing our Tavily search tool, defining the agent state, and finally, constructing our agent.


In [3]:
from dotenv import load_dotenv
import os
import sys
import json
import re
import pprint
import boto3
from botocore.client import Config
import warnings

warnings.filterwarnings("ignore")
import logging

# import local modules
dir_current = os.path.abspath("")
dir_parent = os.path.dirname(dir_current)
if dir_parent not in sys.path:
    sys.path.append(dir_parent)
from utils import utils

bedrock_config = Config(
    connect_timeout=120, read_timeout=120, retries={"max_attempts": 0}
)

# Set basic configs
logger = utils.set_logger()
pp = utils.set_pretty_printer()

# Load environment variables from .env file or Secret Manager
_ = load_dotenv("../.env")
aws_region = os.getenv("AWS_REGION")
tavily_ai_api_key = utils.get_tavily_api("TAVILY_API_KEY", aws_region)


# Create a bedrock runtime client
bedrock_rt = boto3.client(
    "bedrock-runtime", region_name=aws_region, config=bedrock_config
)

# Create a bedrock client to check available models
bedrock = boto3.client("bedrock", region_name=aws_region, config=bedrock_config)


[2025-10-02 10:59:56,207] p3563 {utils.py:46} INFO - TAVILY_API_KEY variable correctly retrieved from the .env file.


In [4]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_aws import ChatBedrockConverse
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

In [5]:
tool = TavilySearchResults(max_results=2)

## Implementing Persistence

We now turn our attention to implementing persistence. To achieve this, we introduce the concept of a checkpointer in LangGraph. The checkpointer's function is to create state snapshots after and between each node in our agent's processing graph.

#RESOURCE For a more comprehensive understanding of LangGraph's capabilities and usage, refer to the official LangGraph documentation.

In this implementation, we utilize a SQLite saver as our checkpointer. This lightweight solution leverages SQLite, a built-in database engine. While we use an in-memory database for this demonstration, it's important to note that this can be easily adapted to connect to an external database for production environments. LangGraph also supports other persistence solutions, including Redis and Postgres, for scenarios requiring more robust database systems.

After initializing the checkpointer, we pass it to the `graph.compile` method. We've enhanced our agent to accept a `checkpointer` parameter, which we set to our memory object.


In [7]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

## The Agent Class: A Detailed Examination

The `Agent` class serves as the cornerstone of our implementation, orchestrating the interactions between the language model (Claude), tools (such as the Tavily search), and the overall conversation flow. Let's examine its key components:

1. `__init__` method: This initializer sets up the agent with a model, tools, checkpointer, and an optional system message. It constructs the state graph that defines the agent's behavior.

2. `call_bedrock` method: This method is responsible for invoking the Claude model via Amazon Bedrock. It processes the current state (messages) and returns the model's response.

3. `exists_action` method: This method evaluates whether the latest message from the model includes any tool calls (actions to be executed).

4. `take_action` method: This method executes any tool calls specified by the model and returns the results.

The `Agent` class utilizes a `StateGraph` to manage the conversation flow, enabling complex interactions while maintaining a clear and manageable structure. This design choice facilitates the implementation of persistence and streaming capabilities.

## Streaming Implementation

With our agent now configured, we can implement streaming functionality. There are two primary aspects of streaming to consider:

1. Message Streaming: This involves streaming individual messages, including the AI message that determines the next action and the observation message that represents the action's result.

2. Token Streaming: This involves streaming each token of the language model's response as it's generated.

We'll begin by implementing message streaming. We create a human message (e.g., "What is the weather in SF?") and introduce a thread config. This thread config is crucial for managing multiple conversations simultaneously within the persistent checkpointer, a necessity for production applications serving multiple users.

We invoke the graph using the `stream` method instead of `invoke`, passing our messages dictionary and thread config. This returns a stream of events representing real-time updates to the state.

Upon execution, we observe a stream of results: first, an AI message from Claude determining the action to take, followed by a tool message containing the Tavily search results, and finally, another AI message from Claude answering our initial query.


In [8]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_bedrock)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges(
            "llm", self.exists_action, {True: "action", False: END}
        )
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_bedrock(self, state: AgentState):
        messages = state["messages"]
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {"messages": [message]}

    def exists_action(self, state: AgentState):
        result = state["messages"][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state["messages"][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t["name"]].invoke(t["args"])
            results.append(
                ToolMessage(tool_call_id=t["id"], name=t["name"], content=str(result))
            )
        print("Back to the model!")
        return {"messages": results}

In [9]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""


model = ChatBedrockConverse(
    client=bedrock_rt,
    model="anthropic.claude-3-haiku-20240307-v1:0",
    temperature=0,
    max_tokens=None,
)
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

  warn_beta(


In [12]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [11]:
thread = {"configurable": {"thread_id": "1"}}

In [14]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v["messages"])

[AIMessage(content=[{'type': 'tool_use', 'name': 'tavily_search_results_json', 'input': {'query': 'weather in san francisco'}, 'id': 'tooluse_7WYCoEagQTCH6RVnxIbxVg'}], response_metadata={'ResponseMetadata': {'RequestId': '80f157db-4874-4008-a2a3-4d4c11668aef', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 02 Oct 2025 07:05:14 GMT', 'content-type': 'application/json', 'content-length': '334', 'connection': 'keep-alive', 'x-amzn-requestid': '80f157db-4874-4008-a2a3-4d4c11668aef'}, 'RetryAttempts': 0}, 'stopReason': 'tool_use', 'metrics': {'latencyMs': 709}}, id='run-f6e0cf5c-0ec5-4244-9bd5-f60bcc69d73a-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'weather in san francisco'}, 'id': 'tooluse_7WYCoEagQTCH6RVnxIbxVg', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1499, 'output_tokens': 61, 'total_tokens': 1560})]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in san francisco'}, 'id': 'tooluse_7WYCoEagQTCH6RVnxIbxVg', '

## Demonstrating Persistence

To illustrate the effectiveness of our persistence implementation, we continue the conversation with a follow-up question: "What about in LA?". By using the same thread ID, we ensure continuity from the previous interaction. Claude maintains context, understanding that we're still inquiring about weather conditions due to the persistence provided by our checkpoint system.

We can further emphasize the importance of thread ID by altering it and posing the question, "Which one is warmer?". With the original thread ID, Claude can accurately compare temperatures. However, changing the thread ID results in Claude losing context, as it no longer has access to the conversation history.


In [15]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content=[{'type': 'tool_use', 'name': 'tavily_search_results_json', 'input': {'query': 'weather in los angeles'}, 'id': 'tooluse_ptdZY9o2RdaBYt6O0-8t7w'}], response_metadata={'ResponseMetadata': {'RequestId': '612141a9-2f52-4ee0-9f1c-a3985a5436af', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 02 Oct 2025 07:34:45 GMT', 'content-type': 'application/json', 'content-length': '333', 'connection': 'keep-alive', 'x-amzn-requestid': '612141a9-2f52-4ee0-9f1c-a3985a5436af'}, 'RetryAttempts': 0}, 'stopReason': 'tool_use', 'metrics': {'latencyMs': 1254}}, id='run-bf79a49a-37f8-4338-a6ea-2d420d810d4e-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'weather in los angeles'}, 'id': 'tooluse_ptdZY9o2RdaBYt6O0-8t7w', 'type': 'tool_call'}], usage_metadata={'input_tokens': 2556, 'output_tokens': 61, 'total_tokens': 2617})]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in los angeles'}, 'id': 'tooluse_ptdZY9o2RdaBYt6O0

In [None]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

In [None]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

## Token-Level Streaming

For a more granular approach to streaming, we implement token-level updates using the `astream_events` method. This asynchronous method necessitates an async checkpointer, which we implement using `AsyncSqliteSaver`.

Asynchronous programming allows our application to handle multiple operations concurrently without blocking the main execution thread. In the context of streaming tokens from an AI model, this translates to processing and displaying tokens as they're generated, resulting in a more responsive user experience. The `astream_events` method leverages this asynchronous approach to efficiently stream token-level updates from Claude.

We initiate a new conversation with a fresh thread ID and iterate over the events, specifically looking for events of type "on_chat_model_stream". Upon encountering these events, we extract and display the content.

When executed, we observe tokens streaming in real-time. We see Claude invoke the function (which doesn't generate streamable content), followed by the final response streaming token by token.


In [17]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

# # If you are using a newer version of LangGraph, the package was separated:
# # !pip install langgraph-checkpoint-sqlite

# from langgraph.checkpoint.memory import MemorySaver
# from langgraph.checkpoint.sqlite import SqliteSaver
# from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)

    messages = [HumanMessage(content="What is the weather in SF?")]
    thread = {"configurable": {"thread_id": "4"}}
    async for event in abot.graph.astream_events(
        {"messages": messages}, thread, version="v1"
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of Amazon Bedrock means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")

[{'type': 'text', 'text': 'Okay', 'index': 0}]|[{'type': 'text', 'text': ', let', 'index': 0}]|[{'type': 'text', 'text': ' me', 'index': 0}]|[{'type': 'text', 'text': ' look', 'index': 0}]|[{'type': 'text', 'text': ' up the current', 'index': 0}]|[{'type': 'text', 'text': ' weather in San Francisco', 'index': 0}]|[{'type': 'text', 'text': ':', 'index': 0}]|[{'index': 0}]|[{'type': 'tool_use', 'name': 'tavily_search_results_json', 'id': 'tooluse_dgVGYV6ZQD2obaFyoLcH_g', 'index': 1}]|[{'type': 'tool_use', 'input': '', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': '{"query":', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': ' "c', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': 'urrent ', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': 'we', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': 'ather i', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': 'n san franci', 'id': None, 'index': 1}]|[{'type': 'tool_use', 'input': 'sco"}', 'id': None, '

## Conclusion

This lab has provided a comprehensive exploration of persistence and streaming implementation using Anthropic's Claude model on Amazon Bedrock. While these concepts are straightforward to implement, they offer powerful capabilities for building production-grade AI applications.

The ability to manage multiple simultaneous conversations, coupled with a robust memory system for conversation resumption, is crucial for scalable AI solutions. Moreover, the capacity to stream both final tokens and intermediate messages provides unparalleled visibility into the AI's decision-making process.

Persistence also plays a vital role in enabling human-in-the-loop interactions, a topic we will explore in greater depth in our subsequent lab.

 To gain a deeper understanding of the practical implications of these concepts, we recommend exploring real-world case studies of persistence and streaming in production AI applications.
