<img src="assets/persistence.png" align="left" width="900" style="margin-right:15px;"/>

LangGraph has a built-in persistence layer, implemented through checkpointers. When you compile a graph with a checkpointer, the checkpointer saves a checkpoint of the graph state at every super-step. Those checkpoints are saved to a thread, which can be accessed after graph execution. Because threads allow access to graph’s state after execution, several powerful capabilities including human-in-the-loop, memory, time travel, and fault-tolerance are all possible.

In [None]:
from dotenv import load_dotenv, find_dotenv
from env_utils import doublecheck_env
from langchain_anthropic import ChatAnthropic
from typing import Annotated, List
import operator
from pydantic import BaseModel, Field
from typing_extensions import TypedDict

path = find_dotenv()
print("Loaded env from:", path)

# Load environment variables from .env
load_dotenv(find_dotenv())

# Check and print results
doublecheck_env(path)

llm = ChatAnthropic(model="claude-sonnet-4-5-20250929")

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

In [None]:
def pretty_print_snapshot(snapshot):
    """Pretty print a StateSnapshot object with each attribute on a separate line"""
    print("StateSnapshot(")
    print(f"    values={snapshot.values},")
    print(f"    next={snapshot.next},")
    print(f"    config={snapshot.config},")
    print(f"    metadata={snapshot.metadata},")
    print(f"    created_at='{snapshot.created_at}',")
    print(f"    parent_config={snapshot.parent_config},")
    print(f"    tasks={snapshot.tasks},")
    if hasattr(snapshot, 'interrupts'):
        print(f"    interrupts={snapshot.interrupts}")
    print(")")

# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
snapshot = graph.get_state(config)
pretty_print_snapshot(snapshot)

# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
snapshot = graph.get_state(config)
pretty_print_snapshot(snapshot)


In [None]:
# Get state history
config = {"configurable": {"thread_id": "1"}}
state_history = list(graph.get_state_history(config))
for snapshot in state_history:
    pretty_print_snapshot(snapshot)

### Replay

It’s also possible to play-back a prior graph execution. If we invoke a graph with a thread_id and a checkpoint_id, then we will re-play the previously executed steps before a checkpoint that corresponds to the checkpoint_id, and only execute the steps after the checkpoint.

#### Real-life Example
Imagine you are watching a movie and you have recorded every scene (checkpoints) and you have labelled the movie session (thread). Now:
-   You say: “I want to start from scene 5 of session ‘A’.”
-   The system restores everything up to scene 4 (that’s the checkpoint) so you don’t watch scenes 1-4 again.
-   Then you begin watching from scene 5 onwards.
-   That way you skip replaying scenes 1-4 (they are already done / state restored) and only go through the remainder.

In [None]:
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)

### Update State
update_state is a method you can call on your graph to manually change the current saved state (or fork from an earlier state) of a thread. You usually use it when you want to adjust the state outside of the normal node-execution flow.

-   config: tells which thread you are updating (and optionally which checkpoint in that thread). If you only give the thread_id, you are updating the latest state in that thread. If you also give checkpoint_id, you are forking from that previous checkpoint.
-   values: the new values you want to merge into the state. These values will be applied to the state channels (keys) defined in your state schema. Important: some channels might have reducers defined (meaning updates will append/merge rather than directly overwrite).
-   as_node (optional): you can specify an as_node argument so that this update behaves as if it was emitted by that node in the graph. This affects which node is considered last executed and thus which next node(s) should run.
    -   When you call graph.update_state(config, values, as_node=NODE_NAME), you are telling the system: “Treat this state update as if it came from the node called NODE_NAME.”
    -   If you don’t explicitly provide as_node, the system will try to infer which node should be treated as the origin of the update — typically it uses the last node that updated the state, if that is unambiguous.

In [34]:
from typing import Annotated
from typing_extensions import TypedDict
from operator import add


graph.update_state(config, {"foo": 2, "bar": ["b"]})

{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1f0b8d8d-1a46-6690-8003-08ff31d0789c'}}

<img src="assets/persistence_2.png" align="left" width="900" style="margin-right:15px;"/>