# Introduction to LangGraph Persistence

**Duration:** 60-90 minutes  
**Level:** Beginner to Intermediate

## Learning Objectives

By the end of this tutorial, you will be able to:

1. **Understand the mental model**: Thread (conversation) → Checkpoints (snapshots) → Replay/Update (time travel/edits) → Memory Store (cross-thread facts)
2. **Run a graph with persistence** using checkpointers
3. **Inspect state and history** to see saved checkpoints
4. **Time-travel** to replay from previous checkpoints
5. **Edit state** using update_state with reducers
6. **Store user memories** across multiple threads using Store

---

## Setup: Install Dependencies

First, let's install the required packages:

**Note**: Since LangGraph v0.2, the SqliteSaver requires a separate package installation.

In [None]:
# Uncomment and run if needed
# !pip install langgraph langgraph-checkpoint-sqlite langchain-core langchain-openai

In [None]:
import os
from getpass import getpass

# Set your OpenAI API key
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API Key: ")

---

## Section 1: Mental Model — The Git Analogy (10 min)

Understanding LangGraph persistence is easiest through a **Git analogy**:

| LangGraph Concept | Git Equivalent | Description |
|------------------|----------------|-------------|
| **Thread** | Branch | A unique conversation or execution path |
| **Checkpoint** | Commit | A snapshot of state at a specific point in time |
| **Replay** | Checkout + re-run | Go back to a checkpoint and continue from there |
| **Update state** | Commit amend | Modify the state at a checkpoint (with reducers) |
| **Memory Store** | Shared repo | Cross-thread persistent storage (not tied to one thread) |

### Key Concepts

1. **Threads** identify unique conversation flows (like Git branches)
2. **Checkpoints** save state after each "super-step" (like Git commits)
3. **Time-travel** lets you replay from any checkpoint
4. **Reducers** control how state updates merge or overwrite
5. **Store** provides cross-thread memory for facts that transcend individual conversations

---

## Section 2: Minimal Working Example (15 min)

Let's build the smallest possible graph with persistence to understand the fundamentals.

### Step 1: Define a Simple State Schema

In [None]:
from typing import TypedDict

class State(TypedDict):
    """Simple state with a counter and a list of messages."""
    count: int
    messages: list[str]

### Step 2: Create Simple Nodes

In [None]:
def increment_node(state: State) -> State:
    """Increment the counter by 1."""
    return {"count": state["count"] + 1, "messages": state["messages"] + [f"Count is now {state['count'] + 1}"]}

def double_node(state: State) -> State:
    """Double the counter."""
    new_count = state["count"] * 2
    return {"count": new_count, "messages": state["messages"] + [f"Doubled to {new_count}"]}

### Step 3: Build the Graph with SqliteSaver

In [None]:
import sqlite3
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver

# Create a checkpointer (persists to SQLite in-memory database)
conn = sqlite3.connect(":memory:", check_same_thread=False)
checkpointer = SqliteSaver(conn)

# Build the graph
workflow = StateGraph(State)
workflow.add_node("increment", increment_node)
workflow.add_node("double", double_node)

# Define edges
workflow.set_entry_point("increment")
workflow.add_edge("increment", "double")
workflow.add_edge("double", END)

# Compile with checkpointer
graph = workflow.compile(checkpointer=checkpointer)

print("Graph compiled successfully with persistence!")

### Step 4: Run the Graph with a Thread ID

In [None]:
# Thread ID identifies this conversation
config = {"configurable": {"thread_id": "thread_1"}}

# Initial state
initial_state = {"count": 5, "messages": ["Starting"]}

# Run the graph
result = graph.invoke(initial_state, config)
print("\nFinal State:")
print(result)

### Step 5: Inspect Current State

In [None]:
# Get the current state
current_state = graph.get_state(config)

print("\nCurrent State Info:")
print(f"Values: {current_state.values}")
print(f"Next nodes: {current_state.next}")
print(f"Checkpoint ID: {current_state.config['configurable']['checkpoint_id']}")
print(f"Step: {current_state.metadata.get('step', 'N/A')}")

### Step 6: View State History

In [None]:
# Get all checkpoints in history
history = list(graph.get_state_history(config))

print(f"\nTotal checkpoints: {len(history)}\n")

for i, checkpoint in enumerate(history):
    print(f"Checkpoint {i}:")
    print(f"  ID: {checkpoint.config['configurable']['checkpoint_id']}")
    print(f"  Step: {checkpoint.metadata.get('step', 'N/A')}")
    print(f"  Values: {checkpoint.values}")
    print()

### Key Observations

- **`values`**: The actual state at this checkpoint
- **`next`**: Which nodes will execute next (empty list means done)
- **`checkpoint_id`**: Unique identifier for this snapshot
- **`step`**: The step number in the execution

---

## Section 3: Time-Travel & Forking (10 min)

One of the most powerful features is the ability to **replay** from any checkpoint.

### Understanding Replay Semantics

- **Before the checkpoint**: Steps are replayed without side effects (read from history)
- **After the checkpoint**: Execution continues normally (creates new checkpoints)

### Step 1: Pick a Checkpoint to Replay From

In [None]:
# Let's pick the second checkpoint (index 1)
checkpoint_to_replay = history[2]

print(f"Replaying from checkpoint: {checkpoint_to_replay.config['configurable']['checkpoint_id']}")
print(f"State at this checkpoint: {checkpoint_to_replay.values}")

### Step 2: Create a New Config with checkpoint_id

In [None]:
# Config with both thread_id and checkpoint_id
replay_config = {
    "configurable": {
        "thread_id": "thread_1",
        "checkpoint_id": checkpoint_to_replay.config["configurable"]["checkpoint_id"]
    }
}

# Get state at this checkpoint
state_at_checkpoint = graph.get_state(replay_config)
print(f"\nState at checkpoint: {state_at_checkpoint.values}")

### Step 3: Continue Execution (Creating a Fork)

In [None]:
# Resume from this checkpoint - this creates a new branch!
# Pass None as input to continue from where we left off
forked_result = graph.invoke(None, replay_config)

print("\nForked execution result:")
print(forked_result)

### Step 4: Compare Original vs Forked

In [None]:
# Original thread history
original_history = list(graph.get_state_history(config))
print(f"Original thread has {len(original_history)} checkpoints")

# The forked execution created new checkpoints from the replay point
print(f"\nForked result: {forked_result}")

---

## Section 4: Update State with Reducers (10 min)

Sometimes you need to **modify state** at a checkpoint without re-running nodes. This is where `update_state()` comes in.

### Understanding Reducers

- **Without reducer**: Values are **overwritten**
- **With reducer**: Values are **merged** according to the reducer function

### Step 1: Define State with Reducers

In [None]:
from typing import Annotated
from operator import add

class StateWithReducer(TypedDict):
    """State with a reducer on the messages list."""
    count: int  # No reducer - overwrites
    messages: Annotated[list[str], add]  # Reducer - appends

### Step 2: Create a New Graph with Reducers

In [None]:
def increment_with_reducer(state: StateWithReducer) -> StateWithReducer:
    return {"count": state["count"] + 1, "messages": [f"Incremented to {state['count'] + 1}"]}

def double_with_reducer(state: StateWithReducer) -> StateWithReducer:
    new_count = state["count"] * 2
    return {"count": new_count, "messages": [f"Doubled to {new_count}"]}

# Build new graph with reducer state
workflow_reducer = StateGraph(StateWithReducer)
workflow_reducer.add_node("increment", increment_with_reducer)
workflow_reducer.add_node("double", double_with_reducer)
workflow_reducer.set_entry_point("increment")
workflow_reducer.add_edge("increment", "double")
workflow_reducer.add_edge("double", END)

# Create checkpointer for reducer graph
conn_reducer = sqlite3.connect(":memory:", check_same_thread=False)
graph_reducer = workflow_reducer.compile(checkpointer=SqliteSaver(conn_reducer))

### Step 3: Run and Update State

In [None]:
config_reducer = {"configurable": {"thread_id": "thread_reducer"}}

# Run the graph
result_reducer = graph_reducer.invoke({"count": 3, "messages": ["Start"]}, config_reducer)
print("Initial result:")
print(result_reducer)

### Step 4: Update State with update_state()

In [None]:
# Update the state
# - count will be OVERWRITTEN (no reducer)
# - messages will be APPENDED (has reducer)
graph_reducer.update_state(
    config_reducer,
    {"count": 100, "messages": ["Manual update"]}
)

# Check updated state
updated_state = graph_reducer.get_state(config_reducer)
print("\nUpdated state:")
print(updated_state.values)

### Key Observations

- **`count`** was overwritten to 100 (no reducer)
- **`messages`** had "Manual update" appended to the existing list (has reducer)

### Exercise: Remove the Reducer

**Question**: How would you make `messages` overwrite instead of append?

**Answer**: Remove the `Annotated` and reducer:

```python
class StateWithReducer(TypedDict):
    count: int
    messages: list[str]  # No Annotated - will overwrite
```

---

## Section 5: Memory Across Threads — The Store (10 min)

**Checkpoints** are per-thread. But what if you need to store facts that span multiple conversations?

That's where **Store** comes in!

### Store vs Checkpoints

| Feature | Checkpoints | Store |
|---------|------------|-------|
| Scope | Single thread | Cross-thread |
| Use case | Conversation history | User preferences, facts |
| Analogy | Git commits | Shared database |

### Step 1: Create an InMemoryStore

In [None]:
from langgraph.store.memory import InMemoryStore

store = InMemoryStore()

print("InMemoryStore created!")

### Step 2: Store User Memories

In [None]:
# Store memories with namespacing: (user_id, "memories")
user_id = "user_123"
namespace = (user_id, "memories")

# Store some memories
store.put(namespace, "memory_1", {"text": "User likes pizza", "timestamp": "2025-01-01"})
store.put(namespace, "memory_2", {"text": "User is a software engineer", "timestamp": "2025-01-02"})
store.put(namespace, "memory_3", {"text": "User lives in San Francisco", "timestamp": "2025-01-03"})

print(f"Stored 3 memories for {user_id}")

### Step 3: Search Memories (Cross-Thread)

In [None]:
# Search for all memories
memories = store.search(namespace)

print(f"\nAll memories for {user_id}:")
for item in memories:
    print(f"  - Key: {item.key}")
    print(f"    Value: {item.value}")
    print()

### Step 4: Use Store in a Different Thread

In [None]:
# This simulates a new conversation (different thread)
# but we can still access the same memories!

def simulate_new_thread():
    """Simulate accessing memories in a completely different thread."""
    user_memories = store.search((user_id, "memories"))
    
    print("Accessing memories in NEW thread (different conversation):")
    for item in user_memories:
        print(f"  - {item.value['text']}")

simulate_new_thread()

### Step 5: Integrate Store with Graph

In [None]:
from langgraph.store.base import BaseStore

class StateWithStore(TypedDict):
    messages: list[str]
    user_id: str

def node_with_store_access(state: StateWithStore, *, store: BaseStore) -> StateWithStore:
    """Node that can access the store via the store parameter."""
    # Access user memories from store
    user_id = state["user_id"]
    memories = list(store.search((user_id, "memories")))
    
    memory_texts = [m.value["text"] for m in memories]
    
    return {
        "messages": state["messages"] + [f"Found {len(memories)} memories"] + memory_texts,
        "user_id": user_id
    }

# Build graph with store
workflow_with_store = StateGraph(StateWithStore)
workflow_with_store.add_node("fetch_memories", node_with_store_access)
workflow_with_store.set_entry_point("fetch_memories")
workflow_with_store.add_edge("fetch_memories", END)

# Compile with both checkpointer AND store
conn_store = sqlite3.connect(":memory:", check_same_thread=False)
graph_with_store = workflow_with_store.compile(
    checkpointer=SqliteSaver(conn_store),
    store=store
)

# Run the graph
result_with_store = graph_with_store.invoke(
    {"messages": ["Starting"], "user_id": user_id},
    {"configurable": {"thread_id": "thread_with_store"}}
)

print("\nResult with store access:")
for msg in result_with_store["messages"]:
    print(f"  {msg}")

### Key Takeaways

- **Store is cross-thread**: Accessible from any thread
- **Namespacing**: Organize data with tuples like `(user_id, "memories")`
- **Integration**: Pass `store=store` when compiling the graph
- **Access in nodes**: Use `store` parameter in node functions

---

## Section 6: Human-in-the-Loop & Fault Tolerance (5 min)

Persistence enables two critical capabilities:

### 1. Human-in-the-Loop (HITL)

- **Inspect** state before continuing
- **Approve/reject** actions
- **Edit** state if needed
- **Resume** from where you left off

Example workflow:
```python
# Pause before a critical action
state = graph.get_state(config)
if state.next == ["dangerous_action"]:
    user_approval = input("Approve? (y/n): ")
    if user_approval == "y":
        graph.invoke(None, config)  # Resume
```

### 2. Fault Tolerance

- **Automatic checkpointing** preserves state
- **Restart from failure**: If a node crashes, restart from the last checkpoint
- **Pending writes preserved**: Uncommitted state changes are saved

Example:
```python
try:
    result = graph.invoke(input, config)
except Exception as e:
    print(f"Error: {e}")
    # State is preserved! Resume later:
    result = graph.invoke(None, config)
```

---

## Guided Lab (15-20 min)

Now it's your turn! Complete these exercises:

### Exercise 1: Run Graph and View State

Create a new graph, run it, and inspect the current state.

In [None]:
# TODO: Create a graph, run it with a thread_id, and view the state
# Your code here...


### Exercise 2: View History and Identify Checkpoints

Get the state history and print details about each checkpoint.

In [None]:
# TODO: Get state history and identify at least 4 checkpoints
# Your code here...


### Exercise 3: Time-Travel (Replay from Checkpoint)

Pick checkpoint 2 and replay from there.

In [None]:
# TODO: Replay from checkpoint 2 and observe the branch
# Your code here...


### Exercise 4: Update State

Use `update_state()` to fix a value and observe the change.

In [None]:
# TODO: Update state to fix a value, then re-run one step
# Your code here...


### Exercise 5: Cross-Thread Memory

Add a user memory, then read it in a completely different thread.

In [None]:
# TODO: Add a memory for a user, then access it in a new thread
# Your code here...


---

## Common Pitfalls (5 min)

### 1. Forgetting `thread_id` → Nothing Persists

```python
# ❌ Wrong - no thread_id
graph.invoke(input)  # Nothing is saved!

# ✅ Correct
graph.invoke(input, {"configurable": {"thread_id": "my_thread"}})
```

### 2. Expecting Overwrite Where a Reducer Merges

```python
# If messages has a reducer:
messages: Annotated[list[str], add]

# update_state will APPEND, not replace!
graph.update_state(config, {"messages": ["new"]})
```

### 3. Confusing Checkpoints (Per-Thread) with Store (Cross-Thread)

- **Checkpoints**: Thread-specific conversation history
- **Store**: Shared facts across all threads

### 4. Assuming Replay Re-Calls External APIs

- Replay **reads from history** for steps before the checkpoint
- It does **NOT** re-execute those steps or call APIs again

---

## Cheat Sheet

### Run with Persistence

```python
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

# Create checkpointer
conn = sqlite3.connect(":memory:", check_same_thread=False)
checkpointer = SqliteSaver(conn)

# Or for a file-based database:
# conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
# checkpointer = SqliteSaver(conn)

graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "thread_1"}}
result = graph.invoke(input, config)
```

### Inspect State

```python
# Get current state
current = graph.get_state(config)

# Get all history
history = list(graph.get_state_history(config))
```

### Time-Travel

```python
config_with_checkpoint = {
    "configurable": {
        "thread_id": "thread_1",
        "checkpoint_id": "checkpoint_id_here"
    }
}
graph.invoke(None, config_with_checkpoint)
```

### Update State

```python
graph.update_state(
    config,
    {"key": "value"},
    as_node="node_name"  # Optional
)
```

### Cross-Thread Memory (Store)

```python
from langgraph.store.memory import InMemoryStore

store = InMemoryStore()
store.put((user_id, "memories"), "key", {"data": "value"})
results = store.search((user_id, "memories"))

# Compile with store
graph = workflow.compile(checkpointer=checkpointer, store=store)
```

---

## Assessment Questions

### Q1: When do you need a Store vs. Checkpoints?

**Answer**: 
- Use **checkpoints** for thread-specific conversation history and state
- Use **Store** for cross-thread facts (user preferences, memories) that need to be accessed from any conversation

### Q2: What happens to steps BEFORE the checkpoint_id during replay?

**Answer**: 
- Steps before the checkpoint are **replayed from history** without re-executing
- No side effects occur (APIs are not called again)
- Only steps **after** the checkpoint execute normally

---

## Optional Stretch Exercises

### 1. Swap In-Memory → File-Based SqliteSaver

Try changing from in-memory persistence to a real SQLite file:

```python
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

conn = sqlite3.connect("./my_checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
```

This will persist checkpoints to disk so they survive between program runs!

### 2. Using Context Manager Approach

Alternatively, you can use the context manager approach:

```python
from langgraph.checkpoint.sqlite import SqliteSaver

with SqliteSaver.from_conn_string("./checkpoints.db") as checkpointer:
    graph = workflow.compile(checkpointer=checkpointer)
    result = graph.invoke(input, config)
```

---

## Summary

Congratulations! You've learned:

✅ The **Git analogy** for persistence (threads, checkpoints, replay, store)  
✅ How to **run graphs with persistence** using checkpointers  
✅ How to **inspect state and history**  
✅ How to **time-travel** and create forks  
✅ How **reducers** control state updates (merge vs overwrite)  
✅ How to use **Store** for cross-thread memory  
✅ Common pitfalls to avoid  

### Next Steps

- Explore **human-in-the-loop** patterns
- Try **production checkpointers** (PostgreSQL, Redis)
- Build **multi-agent systems** with shared memory
- Implement **semantic search** in your Store

### Resources

- [LangGraph Persistence Docs](https://langchain-ai.github.io/langgraph/concepts/persistence/)
- [LangGraph Memory Docs](https://langchain-ai.github.io/langgraph/concepts/memory/)
- [State and Reducers](https://langchain-ai.github.io/langgraph/concepts/low_level/)

---

**Happy Building! 🎉**