<a href="https://colab.research.google.com/github/Saifullah785/Agentic_AI_LangGraph_Learning_Journey/blob/main/Persistence_in_LangGraph/persistence.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import necessary libraries and modules
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from langgraph.checkpoint.memory import InMemorySaver

In [None]:
# Load environment variables and initialize the ChatOpenAI model
load_dotenv()  # take environment variables from .env.

llm = ChatOpenAI()

In [None]:
# Define the state for the joke generation graph
class JokeState(TypedDict):
    topic: str
    joke: str
    explanation: str

In [None]:
# Define a function to generate a joke based on the topic in the state
def generate_joke(state: JokeState):

    prompt = f'generate a joke on the topic of {state["topic"]}'

    response = llm.invoke(prompt).content

    return {"joke": response}

In [None]:
# Define a function to generate an explanation for the joke in the state
def generate_explanation(state: JokeState):

    prompt= f'write an explanation for the joke: {state["joke"]}'

    response = llm.invoke(prompt).content

    return {"explanation": response}

In [None]:
# Build the StateGraph for joke generation and explanation
graph = StateGraph(JokeState)

graph.add_node('generate_joke', generate_joke)
graph.add_node('generate_explanation', generate_explanation)

graph.add_edge(START, 'generate_joke')
graph.add_edge('generate_joke', 'generate_explanation')
graph.add_edge('generate_explanation', END)

checkpointer = InMemorySaver()

workflow = graph.compile(checkpointer=checkpointer)

In [None]:
# Invoke the joke generation workflow with the topic "chickens" for the first thread
config1 = {"configurable": {'thread_id': '1'}}
workflow.invoke({"topic": "chickens"}, config=config1)

{'topic': 'chickens',
 'joke': 'Why did the chicken join a band? Because it had great drumsticks!',
 'explanation': 'This joke plays on the double meaning of the word "drumsticks." In this context, "drumsticks" can refer to both the chicken\'s actual legs (which are commonly referred to as drumsticks) and the drumsticks used by a drummer in a band. The joke humorously suggests that the chicken joined a band because it had great drumsticks, implying that it had good legs for drumming. The pun adds a playful and silly element to the joke.'}

In [None]:
# Get the current state of the first thread after the workflow execution
workflow.get_state(config1)

StateSnapshot(values={'topic': 'chickens', 'joke': 'Why did the chicken join a band? Because it had great drumsticks!', 'explanation': 'This joke plays on the double meaning of the word "drumsticks." In this context, "drumsticks" can refer to both the chicken\'s actual legs (which are commonly referred to as drumsticks) and the drumsticks used by a drummer in a band. The joke humorously suggests that the chicken joined a band because it had great drumsticks, implying that it had good legs for drumming. The pun adds a playful and silly element to the joke.'}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-e5cc-6fb5-8002-86171280ebc0'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2025-09-02T08:05:41.321669+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-d251-6e73-8001-dc803c6e014b'}}, tasks=(), interrupts=())

In [None]:
# Get the state history for the first thread
list(workflow.get_state_history(config1))

[StateSnapshot(values={'topic': 'chickens', 'joke': 'Why did the chicken join a band? Because it had great drumsticks!', 'explanation': 'This joke plays on the double meaning of the word "drumsticks." In this context, "drumsticks" can refer to both the chicken\'s actual legs (which are commonly referred to as drumsticks) and the drumsticks used by a drummer in a band. The joke humorously suggests that the chicken joined a band because it had great drumsticks, implying that it had good legs for drumming. The pun adds a playful and silly element to the joke.'}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-e5cc-6fb5-8002-86171280ebc0'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2025-09-02T08:05:41.321669+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-d251-6e73-8001-dc803c6e014b'}}, tasks=(), interrupts=()),
 StateSnapshot(values={'topic': 'chickens', 

In [None]:
# Invoke the joke generation workflow with the topic "computers" for a second thread
config2 = {'configurable': {'thread_id': '2'}}
workflow.invoke({"topic": "computers"}, config=config2)

{'topic': 'computers',
 'joke': 'Why did the computer go to the doctor?\nBecause it had a virus!',
 'explanation': 'This joke plays on the dual meaning of the word "virus." In the context of the joke, "virus" refers to a harmful program that can infect and damage a computer system. However, the word "virus" also commonly refers to a type of illness that requires medical attention from a doctor. The punchline humorously suggests that the computer "went to the doctor" because it had a "virus" in the form of a computer bug, making it a clever play on words.'}

In [None]:
# Get the current state of the first thread again
workflow.get_state(config1)

StateSnapshot(values={'topic': 'chickens', 'joke': 'Why did the chicken join a band? Because it had great drumsticks!', 'explanation': 'This joke plays on the double meaning of the word "drumsticks." In this context, "drumsticks" can refer to both the chicken\'s actual legs (which are commonly referred to as drumsticks) and the drumsticks used by a drummer in a band. The joke humorously suggests that the chicken joined a band because it had great drumsticks, implying that it had good legs for drumming. The pun adds a playful and silly element to the joke.'}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-e5cc-6fb5-8002-86171280ebc0'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2025-09-02T08:05:41.321669+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-d251-6e73-8001-dc803c6e014b'}}, tasks=(), interrupts=())

In [None]:
# Get the state history of the first thread again
list(workflow.get_state_history(config1))

[StateSnapshot(values={'topic': 'chickens', 'joke': 'Why did the chicken join a band? Because it had great drumsticks!', 'explanation': 'This joke plays on the double meaning of the word "drumsticks." In this context, "drumsticks" can refer to both the chicken\'s actual legs (which are commonly referred to as drumsticks) and the drumsticks used by a drummer in a band. The joke humorously suggests that the chicken joined a band because it had great drumsticks, implying that it had good legs for drumming. The pun adds a playful and silly element to the joke.'}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-e5cc-6fb5-8002-86171280ebc0'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2025-09-02T08:05:41.321669+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f087d39-d251-6e73-8001-dc803c6e014b'}}, tasks=(), interrupts=()),
 StateSnapshot(values={'topic': 'chickens', 

# Time Travel

In [None]:
# Get the state of the first thread at a specific checkpoint
workflow.get_state({"configurable": {"thread_id": "1", "checkpoint_id": "1f06cc6e-7232-6cb1-8000-f71609e6cec5"}})

StateSnapshot(values={}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_id': '1f06cc6e-7232-6cb1-8000-f71609e6cec5'}}, metadata=None, created_at=None, parent_config=None, tasks=(), interrupts=())

In [None]:
# Invoke the workflow from a specific checkpoint for the first thread
workflow.invoke(None, {"configurable": {"thread_id": "1", "checkpoint_id": "1f06cc6e-7232-6cb1-8000-f71609e6cec5"}})

In [None]:
# Get the state history of the first thread after invoking from a checkpoint
list(workflow.get_state_history(config1))

# Updating State

In [None]:
# Update the state of the first thread at a specific checkpoint
workflow.update_state({"configurable": {"thread_id": "1", "checkpoint_id": "1f06cc6e-7232-6cb1-8000-f71609e6cec5", "checkpoint_ns": ""}}, {'topic':'samosa'})

In [None]:
# Get the state history of the first thread after updating the state
list(workflow.get_state_history(config1))

In [None]:
# Invoke the workflow from another checkpoint for the first thread
workflow.invoke(None, {"configurable": {"thread_id": "1", "checkpoint_id": "1f06cc72-ca16-6359-8001-7eea05e07dd2"}})

In [None]:
# Get the state history of the first thread after invoking from the second checkpoint
list(workflow.get_state_history(config1))

# Fault Tolerance

In [None]:
# Import necessary libraries for the fault tolerance example
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict
import time

In [None]:
# Define the state for the fault tolerance example
class CrashState(TypedDict):
    input: str
    setp1: str
    step2: str

In [None]:
# Define the steps for the fault tolerance example, including a simulated hang
def step_1(state: CrashState) -> CrashState:
    print("✅ Step 1 executed")
    return {"step1": "done", "input": state["input"]}

def step_2(state: CrashState) -> CrashState:
    print("⏳ Step 2 hanging... now manually interrupt from the notebook toolbar (STOP button)")
    time.sleep(1000)  # Simulate long-running hang
    return {"step2": "done"}

def step_3(state: CrashState) -> CrashState:
    print("✅ Step 3 executed")
    return {"done": True}

In [None]:
# Build the StateGraph for the fault tolerance example
builder = StateGraph(CrashState)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)

builder.set_entry_point("step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

In [None]:
# Run the graph to simulate a crash during Step 2 and catch the KeyboardInterrupt
try:
    print("▶️ Running graph: Please manually interrupt during Step 2...")
    graph.invoke({"input": "start"}, config={"configurable": {"thread_id": 'thread-1'}})
except KeyboardInterrupt:
    print("❌ Kernel manually interrupted (crash simulated).")

In [None]:
# Re-run the graph to demonstrate fault tolerance and resume from the last saved state
print("\n🔁 Re-running the graph to demonstrate fault tolerance...")
final_state = graph.invoke(None, config={"configurable": {"thread_id": 'thread-1'}})
print("\n✅ Final State:", final_state)

In [None]:
# Get the state history of the fault tolerance example thread
list(graph.get_state_history({"configurable": {"thread_id": 'thread-1'}}))