# Understanding LangGraph Checkpointing

This notebook explores how LangGraph stores conversation state in PostgreSQL using checkpointing.

## Overview

LangGraph saves the complete **State** object at each graph execution step in PostgreSQL. The conversation history is embedded in the `messages` field of the State.

**Key Tables:**
- `checkpoints`: Main table with metadata AND the checkpoint data itself
  - `checkpoint` column: Contains JSONB/serialized State data
  - `checkpoint_id`, `thread_id`, `type`, `parent_checkpoint_id`, `checkpoint_ns`
- `checkpoint_blobs`: Stores channel-specific data (messages stored here!)
  - `blob` column: Contains msgpack-serialized channel data
  - Links via `thread_id` and `checkpoint_ns`
- `checkpoint_writes`: Tracks which channels were updated at each step

**The flow:** 
1. User message ‚Üí Graph executes 
2. State updated (with new messages) 
3. Metadata serialized to `checkpoint` column in `checkpoints` table
4. Channel-specific data (messages) goes to `checkpoint_blobs` as msgpack-serialized blobs

---

## Key Discovery

**Messages are stored as FULL SNAPSHOTS**, not incremental deltas!
- Each checkpoint blob contains ALL messages from the start of the conversation
- This creates O(N¬≤) storage growth but provides O(1) retrieval speed

## Setup: Database Connection

We'll explore the PostgreSQL checkpoint storage by connecting directly to the database and querying the checkpoint tables.

In [12]:
import psycopg
import msgpack
from langgraph.checkpoint.postgres import PostgresSaver

In [6]:
# Connect to the database
conn = psycopg.connect("postgresql://langgraph_user:langgraph_password@localhost:5433/langgraph_db")
cur = conn.cursor()

# Query what's in the checkpoints table
print("=" * 80)
print("CHECKPOINT METADATA")
print("=" * 80)

cur.execute("""
    SELECT thread_id, checkpoint_id, type, parent_checkpoint_id
    FROM checkpoints
    WHERE thread_id IN ('test000001', 'test000002')
    ORDER BY thread_id, checkpoint_id
""")

checkpoints = cur.fetchall()
for thread_id, checkpoint_id, checkpoint_type, parent_id in checkpoints:
    print(f"Thread: {thread_id} | Checkpoint: {checkpoint_id} | Type: {checkpoint_type} | Parent: {parent_id}")

print(f"\nTotal checkpoints: {len(checkpoints)}")

CHECKPOINT METADATA
Thread: test000001 | Checkpoint: 1f0e71bf-55d7-65ba-bfff-dce8037a7852 | Type: None | Parent: None
Thread: test000001 | Checkpoint: 1f0e71bf-55da-6b70-8000-90fb362c7cdc | Type: None | Parent: 1f0e71bf-55d7-65ba-bfff-dce8037a7852
Thread: test000001 | Checkpoint: 1f0e71bf-68ee-6efa-8001-4f59ac69283a | Type: None | Parent: 1f0e71bf-55da-6b70-8000-90fb362c7cdc
Thread: test000001 | Checkpoint: 1f0e71bf-920a-619a-8002-43f1186e7b6b | Type: None | Parent: 1f0e71bf-68ee-6efa-8001-4f59ac69283a
Thread: test000001 | Checkpoint: 1f0e71bf-a35f-66b6-8003-37cf3d5707f0 | Type: None | Parent: 1f0e71bf-920a-619a-8002-43f1186e7b6b
Thread: test000001 | Checkpoint: 1f0e71c0-014a-6f14-8004-7d827d6f9200 | Type: None | Parent: 1f0e71bf-a35f-66b6-8003-37cf3d5707f0
Thread: test000001 | Checkpoint: 1f0e71c6-dfc2-6c14-8005-cc7756456f1e | Type: None | Parent: 1f0e71c0-014a-6f14-8004-7d827d6f9200
Thread: test000001 | Checkpoint: 1f0e71c6-dfc4-6ae6-8006-abcfb1d8e1a4 | Type: None | Parent: 1f0e71c6-

## Exploring Checkpoint Structure

The `checkpoints` table contains metadata and state channel values (but NOT the actual messages).

In [7]:
# Get the actual checkpoint data (deserialize from the checkpoint column)
print("\n" + "=" * 80)
print("CHECKPOINT DATA WITH DESERIALIZATION")
print("=" * 80)

# Query to get checkpoint data from checkpoints table
cur.execute("""
    SELECT thread_id, checkpoint_id, checkpoint, checkpoint_ns
    FROM checkpoints
    WHERE thread_id = 'test000001'
    ORDER BY checkpoint_id DESC
    LIMIT 3
""")

checkpoints_data = cur.fetchall()

for thread_id, checkpoint_id, checkpoint_data, cp_ns in checkpoints_data:
    print(f"\n{'='*60}")
    print(f"Thread: {thread_id}")
    print(f"Checkpoint ID: {checkpoint_id}")
    print(f"Namespace: {cp_ns}")
    
    try:
        # The checkpoint column might be JSONB (already deserialized) or pickled bytes
        if checkpoint_data:
            # Check if it's already a dict (JSONB) or needs unpickling
            if isinstance(checkpoint_data, dict):
                deserialized = checkpoint_data
                print(f"Checkpoint structure: dict (already deserialized from JSONB)")
            elif isinstance(checkpoint_data, bytes):
                deserialized = pickle.loads(checkpoint_data)
                print(f"Checkpoint structure: {type(deserialized)} (unpickled from bytes)")
            else:
                deserialized = checkpoint_data
                print(f"Checkpoint structure: {type(deserialized)} (raw)")
            
            if isinstance(deserialized, dict):
                print(f"Keys: {list(deserialized.keys())}")
                
                # LangGraph typically stores state in 'channel_values'
                if 'channel_values' in deserialized:
                    channel_vals = deserialized['channel_values']
                    print(f"\n‚úì Found 'channel_values' - exploring structure...")
                    print(f"Channel values type: {type(channel_vals)}")
                    
                    if isinstance(channel_vals, dict):
                        print(f"Channel names: {list(channel_vals.keys())}")
                        print(f"\n‚ö† Note: 'messages' channel is NOT in channel_values!")
                        print(f"   Messages are stored separately in checkpoint_blobs table")
        else:
            print("No checkpoint data found")
            
    except Exception as e:
        print(f"Error processing checkpoint: {e}")
        import traceback
        traceback.print_exc()

print("\n" + "=" * 80)


CHECKPOINT DATA WITH DESERIALIZATION

Thread: test000001
Checkpoint ID: 1f0e71c7-2d48-6a9c-8008-caeed9f2f3f7
Namespace: 
Checkpoint structure: dict (already deserialized from JSONB)
Keys: ['v', 'id', 'ts', 'versions_seen', 'channel_values', 'channel_versions', 'updated_channels']

‚úì Found 'channel_values' - exploring structure...
Channel values type: <class 'dict'>
Channel names: ['answer', 'iteration', 'final_answer', 'question_relevant']

‚ö† Note: 'messages' channel is NOT in channel_values!
   Messages are stored separately in checkpoint_blobs table

Thread: test000001
Checkpoint ID: 1f0e71c7-014c-63d0-8007-89203ea9a7ea
Namespace: 
Checkpoint structure: dict (already deserialized from JSONB)
Keys: ['v', 'id', 'ts', 'versions_seen', 'channel_values', 'channel_versions', 'updated_channels']

‚úì Found 'channel_values' - exploring structure...
Channel values type: <class 'dict'>
Channel names: ['answer', 'iteration', 'final_answer', 'question_relevant', 'branch:to:agent_node']

‚ö†

## Finding the Messages: checkpoint_blobs Table

**Key Discovery**: Messages are stored in the `checkpoint_blobs` table with `channel='messages'`, not in the main `checkpoints` table!

In [8]:
# Check checkpoint_blobs table for messages channel
print("\n" + "=" * 80)
print("CHECKPOINT_BLOBS TABLE - LOOKING FOR MESSAGES")
print("=" * 80)

# Import msgpack for deserialization
import msgpack

# Get the latest checkpoint_id for the thread
cur.execute("""
    SELECT checkpoint_id
    FROM checkpoints
    WHERE thread_id = 'test000001'
    ORDER BY checkpoint_id DESC
    LIMIT 1
""")

latest_checkpoint = cur.fetchone()

if latest_checkpoint:
    checkpoint_id = latest_checkpoint[0]
    print(f"Latest checkpoint_id: {checkpoint_id}\n")
    
    # Now query checkpoint_blobs for this checkpoint - no checkpoint_id column in blobs table
    cur.execute("""
        SELECT thread_id, checkpoint_ns, channel, type, blob
        FROM checkpoint_blobs
        WHERE thread_id = 'test000001' AND channel = 'messages'
        LIMIT 5
    """)
    
    blobs_data = cur.fetchall()
    
    if blobs_data:
        print(f"Found {len(blobs_data)} message blob entries\n")
        
        for thread_id, cp_ns, channel, blob_type, blob in blobs_data:
            print(f"{'='*60}")
            print(f"Channel: '{channel}' | Type: {blob_type}")
            
            try:
                if blob:
                    # LangGraph uses msgpack for serialization
                    if isinstance(blob, bytes):
                        try:
                            # Deserialize using msgpack
                            deserialized = msgpack.unpackb(blob, raw=False, strict_map_key=False)
                            print(f"  ‚úì Successfully deserialized with msgpack")
                            print(f"  Structure: {type(deserialized).__name__}")
                        except Exception as msgpack_err:
                            # Fallback to pickle
                            try:
                                deserialized = pickle.loads(blob)
                                print(f"  ‚úì Successfully deserialized with pickle")
                                print(f"  Structure: {type(deserialized).__name__}")
                            except Exception as pickle_err:
                                print(f"  ‚úó msgpack error: {msgpack_err}")
                                print(f"  ‚úó pickle error: {pickle_err}")
                                continue
                    elif isinstance(blob, (dict, list)):
                        deserialized = blob
                        print(f"  Structure: {type(deserialized).__name__} (already deserialized)")
                    else:
                        print(f"  Unexpected blob type: {type(blob)}")
                        continue
                    
                    # Display messages
                    print(f"\n  üéØ MESSAGES CHANNEL DATA:")
                    if isinstance(deserialized, list):
                        print(f"  ‚úì Contains {len(deserialized)} messages\n")
                        for i, msg in enumerate(deserialized[:3], 1):  # Show first 3
                            print(f"  Message {i}:")
                            if hasattr(msg, 'content'):
                                role = getattr(msg, 'type', 'unknown')
                                content = msg.content[:150]
                                print(f"    Role: {role}")
                                print(f"    Content: {content}...")
                            else:
                                print(f"    {str(msg)[:200]}...")
                            print()
                    elif isinstance(deserialized, dict):
                        print(f"  Dict with keys: {list(deserialized.keys())}")
                        print(f"  Preview: {str(deserialized)[:300]}...")
                    else:
                        print(f"  Type: {type(deserialized)}")
                        print(f"  Preview: {str(deserialized)[:300]}...")
                            
            except Exception as e:
                print(f"  ‚úó Error processing blob: {e}")
    else:
        print("‚ö† No message blobs found for thread 'test000001'")
else:
    print("No checkpoints found for thread 'test000001'")


CHECKPOINT_BLOBS TABLE - LOOKING FOR MESSAGES
Latest checkpoint_id: 1f0e71c7-2d48-6a9c-8008-caeed9f2f3f7

Found 5 message blob entries

Channel: 'messages' | Type: msgpack
  ‚úì Successfully deserialized with msgpack
  Structure: list

  üéØ MESSAGES CHANNEL DATA:
  ‚úì Contains 1 messages

  Message 1:
    {'role': 'user', 'content': 'Can I get earphones for myself, a laptop bag for my wife and something cool for my kids?'}...

Channel: 'messages' | Type: msgpack
  ‚úì Successfully deserialized with msgpack
  Structure: list

  üéØ MESSAGES CHANNEL DATA:
  ‚úì Contains 2 messages

  Message 1:
    {'role': 'user', 'content': 'Can I get earphones for myself, a laptop bag for my wife and something cool for my kids?'}...

  Message 2:
    ExtType(code=5, data=b'\x94\xbalangchain_core.messages.ai\xa9AIMessage\x89\xa7content\xd9bI will look for earphones, laptop bags, and cool items for kids in the available products for you.\xb1addition...

Channel: 'messages' | Type: msgpack
  ‚úì Suc

## Storage Pattern Analysis: Snapshot vs Incremental

Let's verify whether LangGraph stores messages incrementally (only new messages) or as full snapshots (all messages each time).

In [9]:
# DIAGNOSTIC: Check if LangGraph stores messages incrementally or as snapshots
print("\n" + "=" * 80)
print("MESSAGE STORAGE PATTERN ANALYSIS")
print("=" * 80)
print("Question: Does LangGraph add 1 message per row, or store full history snapshots?\n")

import msgpack

# Get all message blobs for the thread, ordered chronologically
cur.execute("""
    SELECT checkpoint_ns, channel, type, blob
    FROM checkpoint_blobs
    WHERE thread_id = 'test000001' AND channel = 'messages'
    ORDER BY checkpoint_ns
""")

blobs_data = cur.fetchall()

if blobs_data:
    print(f"Found {len(blobs_data)} message blob entries for thread 'test000001'\n")
    
    for idx, (cp_ns, channel, blob_type, blob) in enumerate(blobs_data, 1):
        print(f"{'='*70}")
        print(f"Blob #{idx} | Namespace: {cp_ns}")
        
        try:
            if blob and isinstance(blob, bytes):
                # Deserialize using msgpack
                deserialized = msgpack.unpackb(blob, raw=False, strict_map_key=False)
                
                if isinstance(deserialized, list):
                    num_messages = len(deserialized)
                    print(f"  üìä Contains: {num_messages} message(s)")
                    
                    # Show first 2 messages preview
                    for i, msg in enumerate(deserialized[:2], 1):
                        if hasattr(msg, 'content'):
                            role = getattr(msg, 'type', getattr(msg, '__class__', 'unknown'))
                            content_preview = str(msg.content)[:80] if hasattr(msg, 'content') else str(msg)[:80]
                            print(f"    Message {i}: [{role}] {content_preview}...")
                        elif isinstance(msg, dict):
                            role = msg.get('type', msg.get('role', 'unknown'))
                            content_preview = str(msg.get('content', msg))[:80]
                            print(f"    Message {i}: [{role}] {content_preview}...")
                        else:
                            print(f"    Message {i}: {str(msg)[:80]}...")
                    
                    if num_messages > 2:
                        print(f"    ... ({num_messages - 2} more messages)")
                else:
                    print(f"  ‚ö† Unexpected structure: {type(deserialized)}")
                    
        except Exception as e:
            print(f"  ‚úó Error: {e}")
    
    print("\n" + "=" * 80)
    print("CONCLUSION:")
    print("If each blob has increasing counts (1, 2, 4, 6...), LangGraph stores FULL SNAPSHOTS")
    print("If each blob has ~1 message, LangGraph stores INCREMENTAL messages")
    print("=" * 80)
else:
    print("No message blobs found")


MESSAGE STORAGE PATTERN ANALYSIS
Question: Does LangGraph add 1 message per row, or store full history snapshots?

Found 6 message blob entries for thread 'test000001'

Blob #1 | Namespace: 
  üìä Contains: 1 message(s)
    Message 1: [user] Can I get earphones for myself, a laptop bag for my wife and something cool for ...
Blob #2 | Namespace: 
  üìä Contains: 2 message(s)
    Message 1: [user] Can I get earphones for myself, a laptop bag for my wife and something cool for ...
    Message 2: ExtType(code=5, data=b'\x94\xbalangchain_core.messages.ai\xa9AIMessage\x89\xa7co...
Blob #3 | Namespace: 
  üìä Contains: 5 message(s)
    Message 1: [user] Can I get earphones for myself, a laptop bag for my wife and something cool for ...
    Message 2: ExtType(code=5, data=b'\x94\xbalangchain_core.messages.ai\xa9AIMessage\x89\xa7co...
    ... (3 more messages)
Blob #4 | Namespace: 
  üìä Contains: 6 message(s)
    Message 1: [user] Can I get earphones for myself, a laptop bag for my wife a

## Storage Overhead Analysis: Snapshot vs Incremental

### Is Storing Full Snapshots Wasteful?

**YES - It's definitely an overhead!** Storing complete message history at each checkpoint means:

#### Storage Growth Pattern:
```
Blob 1: [msg1]                    ‚Üí 1 message stored
Blob 2: [msg1, msg2]              ‚Üí 2 messages stored (msg1 duplicated)
Blob 3: [msg1, msg2, msg3, msg4, msg5]  ‚Üí 5 messages stored (msg1-msg2 duplicated)
Blob N: [msg1...msgN]             ‚Üí N messages stored
```

**Total storage = 1 + 2 + 5 + 6 + 7 + 8 = 29 messages** for a conversation with only 8 unique messages!

---

### Why LangGraph Chose Snapshots (Design Trade-offs)

#### ‚úÖ Advantages:
1. **Simplicity**: Read latest checkpoint ‚Üí get complete state (no complex joins or reconstructions)
2. **Fast retrieval**: Single database query to restore full conversation context
3. **Time-travel debugging**: Each checkpoint is self-contained and independently usable
4. **Consistency**: No risk of missing incremental updates or corrupted state chains
5. **Easy rollback**: Can fork conversations from any checkpoint without complex state merging

#### ‚ùå Disadvantages:
1. **Storage cost**: O(N¬≤) growth for N messages (each message stored multiple times)
2. **Write overhead**: Larger payloads to serialize/deserialize on each checkpoint
3. **Network cost**: More data transferred between application and database
4. **Backup size**: Database backups grow faster due to duplication

---

### When Does This Matter?

#### üü¢ **Acceptable Overhead** (LangGraph's typical use case):
- **Short-to-medium conversations** (< 50 messages): Storage is negligible
- **Development/prototyping**: Fast iteration matters more than storage optimization
- **Small-scale deployments**: Hundreds of conversations, not millions
- **Rich debugging needs**: Want easy time-travel and state inspection

**Example**: 100 conversations √ó 20 messages each √ó 500 bytes/message √ó snapshot growth = ~200 MB (trivial)

#### üî¥ **Significant Overhead** (When to optimize):
- **Very long conversations** (100+ messages): Quadratic growth becomes expensive
- **High-volume production** (millions of threads): Storage costs multiply
- **Frequent checkpoints**: Graph with many nodes = many snapshots per turn
- **Large message payloads**: Images, audio, long documents in messages

**Example**: 1M conversations √ó 100 messages each √ó 2 KB/message √ó snapshot growth = ~200 GB+

---

### Alternative Strategies (If Overhead Becomes a Problem)

#### 1. **Incremental Storage** (Custom Implementation)
Store only deltas (new messages) and reconstruct on read:
```python
# Instead of: [msg1, msg2, msg3]
# Store: [msg3] with pointer to previous checkpoint
```
- ‚úÖ O(N) storage instead of O(N¬≤)
- ‚ùå O(N) read time (must traverse checkpoint chain)

#### 2. **Periodic Compaction**
Keep only recent snapshots + occasional full snapshots:
```python
# Keep last 10 snapshots + snapshot every 50 messages
```

#### 3. **Message Deduplication**
Use content-addressable storage (hash messages, store once):
```python
# checkpoint_blobs: [msg_hash1, msg_hash2, msg_hash3]
# message_store: {msg_hash1: actual_message_content}
```

#### 4. **External Message Store**
Store messages separately and only keep references in checkpoints:
```python
# S3/blob storage for messages
# PostgreSQL only stores: [msg_id_1, msg_id_2, msg_id_3]
```

#### 5. **Automatic Pruning**
Delete old checkpoints, keep only:
- Latest checkpoint (current state)
- Branch points (for conversation forking)
- Periodic snapshots (for time-travel)

---

### LangGraph's Philosophy

**LangGraph optimizes for developer experience and correctness over storage efficiency.**

This is appropriate because:
- Most AI apps have moderate conversation lengths
- Storage is cheap compared to developer time debugging state issues
- PostgreSQL handles this data size efficiently
- Users can implement custom checkpointers if needed (LangGraph provides the interface)

**When to worry**: If you're building a high-scale production system with long-running conversations, you may need to implement a custom checkpointer with incremental storage or add pruning logic.

---

### Practical Recommendations

1. **For most apps**: Use LangGraph's default - the overhead is negligible
2. **Monitor your data**: Track `checkpoint_blobs` table size in production
3. **Set retention policies**: Auto-delete checkpoints older than 30 days
4. **Consider conversation limits**: Cap max conversation length (e.g., 100 messages, then start new thread)
5. **For high-scale**: Implement custom checkpointer with delta storage when you hit real cost issues

## Using LangGraph's Official Checkpointer API

Instead of querying the database directly, we can use LangGraph's `PostgresSaver` API which handles all the complexity of loading messages from `checkpoint_blobs`.

In [10]:
# Better way: Use LangGraph's checkpointer to retrieve state
print("\n" + "=" * 80)
print("RETRIEVING STATE USING LANGGRAPH'S CHECKPOINTER API")
print("=" * 80)

from langgraph.checkpoint.postgres import PostgresSaver

try:
    with PostgresSaver.from_conn_string("postgresql://langgraph_user:langgraph_password@localhost:5433/langgraph_db") as checkpointer:
        # Get all checkpoint IDs for a thread
        config = {"configurable": {"thread_id": "test000001"}}
        
        # List all checkpoint tuples for this thread
        checkpoints_list = list(checkpointer.list(config))
        
        print(f"\nTotal checkpoints for thread 'test000001': {len(checkpoints_list)}")
        
        if checkpoints_list:
            # Get the latest (most recent) checkpoint
            latest = checkpoints_list[0]
            print(f"\nLatest checkpoint structure: {type(latest)}")
            
            # CheckpointTuple has structure: (config, checkpoint_data, metadata, parent_config, tasks)
            if isinstance(latest, tuple) and len(latest) >= 2:
                checkpoint_config, checkpoint_data = latest[0], latest[1]
                print(f"Checkpoint data keys: {checkpoint_data.keys() if isinstance(checkpoint_data, dict) else 'N/A'}")
            
            print("\n" + "=" * 70)
            for idx, checkpoint_tuple in enumerate(checkpoints_list[:3]):  # Show first 3 checkpoints
                try:
                    # Unpack the CheckpointTuple
                    checkpoint_config = checkpoint_tuple[0]
                    checkpoint_data = checkpoint_tuple[1]  # This is the actual checkpoint dict
                    
                    print(f"\nüìç Checkpoint #{idx + 1}")
                    print(f"   Config: {checkpoint_config}")
                    
                    # Messages are stored in checkpoint_blobs, but we can also use .get_tuple()
                    # The checkpointer API automatically loads messages from blobs
                    if isinstance(checkpoint_data, dict):
                        # Try to get messages using the checkpointer's get() method
                        full_state = checkpointer.get_tuple(checkpoint_config)
                        
                        if full_state and len(full_state) >= 2:
                            state_data = full_state[1]  # The checkpoint dict
                            
                            # Check if channel_values has messages
                            if 'channel_values' in state_data:
                                channel_vals = state_data['channel_values']
                                
                                # LangGraph loads messages into channel_values from checkpoint_blobs
                                if isinstance(channel_vals, dict) and 'messages' in channel_vals:
                                    messages = channel_vals['messages']
                                    print(f"   ‚úÖ Found {len(messages)} messages in conversation")
                                    
                                    # Show first 2 messages
                                    for i, msg in enumerate(messages[:2], 1):
                                        print(f"\n   Message {i}:")
                                        if hasattr(msg, 'content'):
                                            msg_type = getattr(msg, 'type', type(msg).__name__)
                                            print(f"     Type: {msg_type}")
                                            print(f"     Content: {str(msg.content)[:100]}...")
                                        else:
                                            print(f"     {str(msg)[:150]}...")
                                    
                                    if len(messages) > 2:
                                        print(f"   ... and {len(messages) - 2} more messages")
                                else:
                                    print(f"   ‚ö† No messages in channel_values")
                                    print(f"   Available channels: {list(channel_vals.keys()) if isinstance(channel_vals, dict) else 'N/A'}")
                            else:
                                print(f"   ‚ö† No 'channel_values' in checkpoint data")
                                print(f"   Available keys: {list(state_data.keys())}")
                        
                except Exception as e:
                    print(f"   ‚úó Error processing checkpoint {idx + 1}: {e}")
                    import traceback
                    traceback.print_exc()
            
            print("\n" + "=" * 70)
        else:
            print("No checkpoints found for this thread.")
            
except Exception as e:
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()


RETRIEVING STATE USING LANGGRAPH'S CHECKPOINTER API

Total checkpoints for thread 'test000001': 10

Latest checkpoint structure: <class 'langgraph.checkpoint.base.CheckpointTuple'>
Checkpoint data keys: dict_keys(['v', 'id', 'ts', 'versions_seen', 'channel_values', 'channel_versions', 'updated_channels'])


üìç Checkpoint #1
   Config: {'configurable': {'thread_id': 'test000001', 'checkpoint_ns': '', 'checkpoint_id': '1f0e71c7-2d48-6a9c-8008-caeed9f2f3f7'}}
   ‚úÖ Found 8 messages in conversation

   Message 1:
     {'role': 'user', 'content': 'Can I get earphones for myself, a laptop bag for my wife and something cool for my kids?'}...

   Message 2:
     Type: ai
     Content: I will look for earphones, laptop bags, and cool items for kids in the available products for you....
   ... and 6 more messages

üìç Checkpoint #2
   Config: {'configurable': {'thread_id': 'test000001', 'checkpoint_ns': '', 'checkpoint_id': '1f0e71c7-014c-63d0-8007-89203ea9a7ea'}}
   ‚úÖ Found 7 messages in

## Reusable Helper Function

Here's a clean helper function to retrieve conversation history for any thread.

In [11]:
# Helper function to retrieve and display conversation history cleanly
def get_conversation_history(thread_id: str) -> list:
    """Retrieve the full conversation history for a thread using LangGraph's checkpointer API"""
    try:
        with PostgresSaver.from_conn_string("postgresql://langgraph_user:langgraph_password@localhost:5433/langgraph_db") as checkpointer:
            config = {"configurable": {"thread_id": thread_id}}
            
            # Get the latest checkpoint tuple
            checkpoint_tuple = checkpointer.get_tuple(config)
            
            if not checkpoint_tuple:
                print(f"No checkpoints found for thread: {thread_id}")
                return []
            
            # Unpack the CheckpointTuple (config, checkpoint_data, metadata, parent_config, tasks)
            if isinstance(checkpoint_tuple, tuple) and len(checkpoint_tuple) >= 2:
                checkpoint_data = checkpoint_tuple[1]
            else:
                print(f"Unexpected checkpoint structure: {type(checkpoint_tuple)}")
                return []
            
            # Extract messages from channel_values (where LangGraph loads messages from checkpoint_blobs)
            if isinstance(checkpoint_data, dict):
                if 'channel_values' in checkpoint_data:
                    channel_vals = checkpoint_data['channel_values']
                    if isinstance(channel_vals, dict) and 'messages' in channel_vals:
                        return channel_vals['messages']
                    else:
                        print(f"No messages in channel_values. Available channels: {list(channel_vals.keys()) if isinstance(channel_vals, dict) else 'N/A'}")
                        return []
                else:
                    print(f"No 'channel_values' in checkpoint. Available keys: {list(checkpoint_data.keys())}")
                    return []
            
            print(f"Checkpoint data is not a dict: {type(checkpoint_data)}")
            return []
            
    except Exception as e:
        print(f"Error retrieving conversation history: {e}")
        import traceback
        traceback.print_exc()
        return []

# Test retrieving conversation history
print("\n" + "=" * 80)
print("FULL CONVERSATION HISTORY FOR THREAD 'test000001'")
print("=" * 80)

messages = get_conversation_history("test000001")

if messages:
    for i, msg in enumerate(messages, 1):
        print(f"\n{i}. {msg}")
    print(f"\n\nTotal messages in conversation: {len(messages)}")
else:
    print("No messages retrieved or thread has no data.")


FULL CONVERSATION HISTORY FOR THREAD 'test000001'

1. {'role': 'user', 'content': 'Can I get earphones for myself, a laptop bag for my wife and something cool for my kids?'}

2. content='I will look for earphones, laptop bags, and cool items for kids in the available products for you.' additional_kwargs={} response_metadata={} tool_calls=[{'name': 'get_formatted_context', 'args': {'query': 'earphones', 'top_k': 5}, 'id': 'call_0', 'type': 'tool_call'}, {'name': 'get_formatted_context', 'args': {'query': 'laptop bag', 'top_k': 5}, 'id': 'call_1', 'type': 'tool_call'}, {'name': 'get_formatted_context', 'args': {'query': 'cool kids items', 'top_k': 5}, 'id': 'call_2', 'type': 'tool_call'}]

3. content="- ID: B09VB5M3L5, rating: 4.3, description: Empsun Wired Earbuds Headphones with Microphone Stereo Bass Earphones Noise Isolation in-Ear Headset Compatible with All Smartphones Tablets iPod IPad MP3 Player That with 3.5 mm Interface(Black) . [In-line microphone]: With in-line microphone ca

## Key Takeaways

1. **Messages are stored separately** from other state channels in the `checkpoint_blobs` table
2. **msgpack serialization** is used for efficient binary storage
3. **Full snapshots** are stored at each checkpoint (not incremental deltas)
4. **Storage overhead is O(N¬≤)** but provides O(1) retrieval and simplifies the architecture
5. **LangGraph's checkpointer API** handles all the complexity of loading messages from blobs into `channel_values`
6. Use `checkpointer.get_tuple(config)` to retrieve the latest state with all messages loaded
7. Access messages via `checkpoint_data['channel_values']['messages']`