In [2]:
import sqlite3
import time
from typing import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver

# ===================== STATE =====================
class CrashState(TypedDict):
    input: str
    step1: str
    step2: str
    step3: str

# ===================== STEPS =====================
def step_1(state: CrashState):
    print("Step 1 executed.")
    return {"step1": "done"}

def step_2(state: CrashState):
    print("Step 2 executing (sleeping 10s). Press Ctrl+C to interrupt for testing fault tolerance.")
    time.sleep(10)
    print("Step 2 completed.")
    return {"step2": "done"}

def step_3(state: CrashState):
    print("Step 3 executed.")
    return {"step3": "done"}

# ===================== GRAPH =====================
builder = StateGraph(CrashState)

builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)

builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# ===================== CHECKPOINTER (MANUAL CONNECTION) =====================
# Create a persistent SQLite connection
# check_same_thread=False is required for thread safety in LangGraph
conn = sqlite3.connect("fault_tolerance.db", check_same_thread=False)

# Create the checkpointer from the connection
checkpointer = SqliteSaver(conn)

# Compile the workflow with persistent checkpointer
workflow = builder.compile(checkpointer=checkpointer)

# ===================== CONFIG =====================
config = {"configurable": {"thread_id": "1"}}

# ===================== RUN THE WORKFLOW =====================
print("Starting workflow execution...\n")

# You can change this initial state only on the VERY FIRST run
# On subsequent runs (after interruption), pass an empty dict {} to resume from checkpoint
initial_input = {"input": "start"}  # Only needed first time

try:
    result = workflow.invoke(initial_input, config=config)
    print("\nWorkflow completed successfully!")
    print("Final state:", result)
except KeyboardInterrupt:
    print("\n\nWorkflow interrupted during step_2 (as expected for testing).")
    print("Checkpoint saved automatically.")

# ===================== INSPECT STATE AT ANY TIME =====================
print("\n--- Current Checkpointed State ---")
current_state = workflow.get_state(config)
print("Values:", current_state.values)
print("Next node to execute:", current_state.next)
print("Checkpoint metadata:", current_state.metadata)

# Optional: List all checkpoint timestamps for this thread
print("\n--- Checkpoint History ---")
history = list(workflow.get_state_history(config))
print(history)
# Note: Do NOT close the connection unless you're completely done with the session
conn.close()  # Uncomment only at the very end of your script/session if needed

Starting workflow execution...

Step 1 executed.
Step 2 executing (sleeping 10s). Press Ctrl+C to interrupt for testing fault tolerance.
Step 2 completed.
Step 3 executed.

Workflow completed successfully!
Final state: {'input': 'start', 'step1': 'done', 'step2': 'done', 'step3': 'done'}

--- Current Checkpointed State ---
Values: {'input': 'start', 'step1': 'done', 'step2': 'done', 'step3': 'done'}
Next node to execute: ()
Checkpoint metadata: {'source': 'loop', 'step': 28, 'parents': {}}

--- Checkpoint History ---
[StateSnapshot(values={'input': 'start', 'step1': 'done', 'step2': 'done', 'step3': 'done'}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0ecc6b-cc0a-6bdd-801c-13085e675f9e'}}, metadata={'source': 'loop', 'step': 28, 'parents': {}}, created_at='2026-01-08T19:17:56.290863+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f0ecc6b-cc07-6061-801b-62bc38bf3d02'}}, tasks=(), interrupt