In [7]:
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph

# Define a simple state
class State(TypedDict):
    message: str


In [8]:
def step_1(state: State) -> State:
    print("→ Step 1 running")
    return state

def step_2(state: State) -> State:
    print("→ Step 2 running")
    # Dynamic breakpoint: stop if message contains "pause"
    if "pause" in state["message"].lower():
        raise NodeInterrupt("Message asked to pause — stopping dynamically.")
    return state

def step_3(state: State) -> State:
    print("→ Step 3 running")
    return {"message": "Process complete "}


In [9]:
# Build the graph
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)

builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

graph = builder.compile(checkpointer=MemorySaver())

# Run with a message that triggers the dynamic breakpoint
input_state = {"message": "Please pause here"}
thread = {"configurable": {"thread_id": "dynamic-1"}}

for event in graph.stream(input_state, thread, stream_mode="values"):
    print(event)


{'message': 'Please pause here'}
→ Step 1 running
{'message': 'Please pause here'}
→ Step 2 running
{'__interrupt__': (Interrupt(value='Message asked to pause — stopping dynamically.', id='placeholder-id'),)}


/var/folders/6d/nrcfk1zj04jb6zj0_p75ydvw0000gn/T/ipykernel_26435/1647178422.py:9: LangGraphDeprecatedSinceV10: NodeInterrupt is deprecated. Please use `langgraph.types.interrupt` instead. Deprecated in LangGraph V1.0 to be removed in V2.0.
  raise NodeInterrupt("Message asked to pause — stopping dynamically.")


In [10]:
# Get and inspect the current state
state = graph.get_state(thread)
print("Stopped at:", state.next)

# Resume the graph
for event in graph.stream(None, thread, stream_mode="values"):
    print(event)


Stopped at: ('step_2',)
{'message': 'Please pause here'}
→ Step 2 running
{'__interrupt__': (Interrupt(value='Message asked to pause — stopping dynamically.', id='placeholder-id'),)}


/var/folders/6d/nrcfk1zj04jb6zj0_p75ydvw0000gn/T/ipykernel_26435/1647178422.py:9: LangGraphDeprecatedSinceV10: NodeInterrupt is deprecated. Please use `langgraph.types.interrupt` instead. Deprecated in LangGraph V1.0 to be removed in V2.0.
  raise NodeInterrupt("Message asked to pause — stopping dynamically.")


In [11]:
# Get the interrupted state
state = graph.get_state(thread)
print("Stopped at:", state.next)
print("Before edit:", state.values["message"])

# Update the message to continue
state.values["message"] = "Continue process"
graph.update_state(thread, state.values)

print("After edit:", graph.get_state(thread).values["message"])



Stopped at: ('step_2',)
Before edit: Please pause here
After edit: Continue process


In [12]:
# Continue execution after the edit
for event in graph.stream(None, thread, stream_mode="values"):
    print(event)



{'message': 'Continue process'}
→ Step 2 running
{'message': 'Continue process'}
→ Step 3 running
{'message': 'Process complete '}
