In [8]:
# langgraph_resume_example.py

import time
from typing import TypedDict

from langgraph.func import task
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver

# 1) Task: expensive work
@task
def expensive_compute(x: int) -> int:
    print("-> Running expensive_compute…")
    time.sleep(2)  # simulate slow/external work
    return x * 10

# 2) State definition
class MyState(TypedDict):
    value: int
    result: int

graph = StateGraph(MyState)

# 3) First node: run the expensive compute and write to state
def compute_node(state: MyState) -> dict:
    val = expensive_compute(state["value"]).result()
    print("-> compute_node stored", val)
    return {"result": val}

# 4) “No-op” second node: just exists to checkpoint after compute_node
def noop_node(state: MyState) -> dict:
    print("-> noop_node (already has result):", state["result"])
    return {}

# Add nodes & edges
graph.add_node("compute_node", compute_node)
graph.add_node("noop_node", noop_node)

graph.add_edge(START, "compute_node")
graph.add_edge("compute_node", "noop_node")
graph.add_edge("noop_node", END)

# 5) Compile with an in-memory checkpointer
checkpointer = InMemorySaver()
compiled = graph.compile(checkpointer=checkpointer)




In [9]:
thread_id = "thread_two_nodes"

print("\n=== First invocation ===")
out1 = compiled.invoke(
    {"value": 7, "result": 0},
    config={"configurable": {"thread_id": thread_id}},
)
print("First run final state:", out1)

print("\n=== Second (resume) invocation ===")
out2 = compiled.invoke(
    {},  # *no* new input — resume same thread
    config={"configurable": {"thread_id": thread_id}},
)
print("Resume final state:", out2)


=== First invocation ===
-> Running expensive_compute…
-> compute_node stored 70
-> noop_node (already has result): 70
First run final state: {'value': 7, 'result': 70}

=== Second (resume) invocation ===
-> Running expensive_compute…
-> compute_node stored 70
-> noop_node (already has result): 70
Resume final state: {'value': 7, 'result': 70}


In [11]:
import time
from typing import TypedDict

from langgraph.func import task
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command

# 1) A slow task we want to avoid rerunning:
@task
def expensive_compute(x: int) -> int:
    print("-> Running expensive_compute…")
    time.sleep(2)
    return x * 10

# 2) Define state for the graph
class MyState(TypedDict):
    value: int
    result: int
    approved: bool

graph = StateGraph(MyState)

# 3) Node that performs work and then interrupts for approval
def approval_node(state: MyState) -> dict:
    # Expensive compute
    val = expensive_compute(state["value"]).result()
    print("-> compute result:", val)
    
    # Pause and ask for approval
    # The value passed to interrupt() will appear in result["__interrupt__"]
    approved = interrupt("Do you approve saving result? (True/False)")
    
    print("-> resumed with approval =", approved)
    
    # Save state based on approval and return
    return {"approved": approved}

graph.add_node("approval_node", approval_node)
graph.add_edge(START, "approval_node")
graph.add_edge("approval_node", END)

# 4) Compile with InMemorySaver
checkpointer = InMemorySaver()
compiled = graph.compile(checkpointer=checkpointer)

# ——————————————————————
# Step 1: Initial run (should hit interrupt)
# ——————————————————————
thread_id = "interrupt_test"

print("\n--- Initial run → expecting interrupt ---")
result = compiled.invoke(
    {"value": 7, "result": 0, "approved": False},
    config={"configurable": {"thread_id": thread_id}},
)

print("Graph invoke result:", result)
print("Interrupt payload:", result.get("__interrupt__"))

# Now we should see result["__interrupt__"] supplying the interrupt text.
# At this point nothing after `interrupt()` in the node has run yet.
# ——————————————————————
# Step 2: Resume with approval
# ——————————————————————
print("\n--- Resuming with approval=True ---")
resume_value = True  # simulate user approval

resumed = compiled.invoke(
    Command(resume=resume_value),
    config={"configurable": {"thread_id": thread_id}},
)

print("Resumed final state:", resumed)


--- Initial run → expecting interrupt ---
-> Running expensive_compute…
-> compute result: 70
Graph invoke result: {'value': 7, 'result': 0, 'approved': False, '__interrupt__': [Interrupt(value='Do you approve saving result? (True/False)', id='9a66f91cd97efbb7f6f0d1f46724514f')]}
Interrupt payload: [Interrupt(value='Do you approve saving result? (True/False)', id='9a66f91cd97efbb7f6f0d1f46724514f')]

--- Resuming with approval=True ---
-> compute result: 70
-> resumed with approval = True
Resumed final state: {'value': 7, 'result': 0, 'approved': True}


In [None]:
# 原来如此，所以如果不使用interrupt的话，task decorator几乎没什么用
# 如果和interrupt搭配使用，interrupt之前的task是不会重复执行的，不知道存在哪里了，可能是pgsql，或者是memory
# task的结果也不会自动的保存到graph state中