# ⚙️ Fault Tolerance (Checkpointing) in LangGraph

### 🧩 **Concept Overview**

**Fault tolerance** refers to the capability of a workflow to continue functioning even when unexpected errors occur during execution.  
In **LangGraph**, this is achieved using **checkpointing**, which allows the workflow to **save intermediate states** and **resume** from the last successful step instead of restarting completely.

### 🔍 **Key Idea**

- **Checkpoint:** A saved snapshot of the workflow’s current state, including node outputs and memory.  
- **Purpose:** Ensures that failures such as network issues, API timeouts, or LLM errors do not force a full rerun.  
- **In LangGraph:** Checkpointing is supported via built-in or external backends like memory, Redis, or Postgres.

### ⚙️ **How It Works**

1. **Workflow Execution Begins:**  
   The LangGraph workflow starts and begins executing its nodes in sequence or parallel.

2. **Checkpoint Creation:**  
   After each node successfully executes, a **checkpoint** (snapshot) of the current state is saved.  
   This includes:
   - Node outputs  
   - Current state variables  
   - Memory context  

3. **Failure Handling:**  
   If a failure occurs (e.g., API failure, network drop, or timeout), LangGraph stops execution.

4. **State Recovery:**  
   Upon restarting, LangGraph **retrieves the last saved checkpoint** and resumes execution from that exact node — skipping already completed steps.

5. **Completion:**  
   The workflow continues until it reaches the end state (`END`), preserving both progress and results.

### 💡 **Benefits of Checkpointing**

| Benefit | Description |
|----------|-------------|
| 🧠 **Resilience** | Enables the workflow to recover from runtime errors or interruptions. |
| 💰 **Cost Efficiency** | Avoids re-running expensive LLM nodes unnecessarily. |
| ⚡ **Performance** | Reduces total runtime by resuming from the last successful node. |
| 🧩 **Scalability** | Suitable for large, multi-step pipelines where failure risk is higher. |
| 🔁 **Continuity** | Maintains consistent state across retries and system restarts. |




### 🧠 **Example Use Case**

**Use Case: Product Data Processing Pipeline**

A company processes product descriptions using multiple LLM-based steps:

1. **Summarize** — Condenses the product data.  
2. **Translate** — Converts it into multiple languages.  
3. **Categorize** — Classifies the product into appropriate categories.  

If the **translation step** fails due to an API timeout, checkpointing ensures that:
- The **summary output** is already saved.  
- When the system restarts, it **resumes from the translation step**.  

This avoids redundant API calls, reduces cost, and ensures reliability in production.

In [26]:
import json
import time
from typing import TypedDict, Dict, List
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

In [21]:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

In [22]:
class ProductState(TypedDict, total=False):  # total=False makes all the fields optional
    """State for product processing with summary, enriched text, translations, and category."""
    product_text: str
    target_languages: List[str]
    summary: str
    enriched: str
    translations: Dict[str, str]
    category: str


In [None]:
# Simulate a transient failure on the first translate attempt
_TRANSLATION_FIRST_ATTEMPT = {"pending": True}

# Prompts
summarize_prompt = ChatPromptTemplate.from_messages([
    ("system", "You summarize product descriptions concisely."),
    ("human", "Summarize this product in 1-2 sentences. Return JSON {{\"summary\": str}}.\n\n{product_text}")
])

enrich_prompt = ChatPromptTemplate.from_messages([
    ("system", "You expand and enrich summaries with key features and benefits."),
    ("human", "Expand this summary into a richer product blurb with salient details. Return JSON {{\"enriched\": str}}.\n\n{summary}")
])

translate_prompt = ChatPromptTemplate.from_messages([
    ("system", "You translate text accurately to requested language codes."),
    ("human", "Translate the summary into each of these languages (ISO codes). Return JSON mapping code->translated string.\nSummary: {summary}\nLanguages: {languages_json}")
])

categorize_prompt = ChatPromptTemplate.from_messages([
    ("system", "You categorize products."),
    ("human", "Given the text, choose exactly one category from: ['Electronics','Apparel','Home & Kitchen','General']. Return JSON {{\"category\": str}}.\n\n{text}")
])


def summarize(state: ProductState) -> ProductState:
    """LLM summary of product text."""
    msgs = summarize_prompt.format_messages(product_text=state["product_text"]) 
    resp = llm.invoke(msgs)
    try:
        data = json.loads(resp.content)
        return {"summary": data["summary"]}
    except Exception:
        return {"summary": state["product_text"][:200]}


def enrich(state: ProductState) -> ProductState:
    """LLM enrichment; intentionally slow to simulate an expensive step."""
    time.sleep(3.0)
    msgs = enrich_prompt.format_messages(summary=state["summary"])
    resp = llm.invoke(msgs)
    try:
        data = json.loads(resp.content)
        return {"enriched": data["enriched"]}
    except Exception:
        return {"enriched": state["summary"]}


def translate(state: ProductState) -> ProductState:
    """LLM translation; fail once to demo checkpoint resume."""
    if _TRANSLATION_FIRST_ATTEMPT["pending"]:
        _TRANSLATION_FIRST_ATTEMPT["pending"] = False
        raise RuntimeError("Simulated translation API timeout")
    basis = state.get("enriched") or state["summary"]
    languages = state.get("target_languages", ["es", "fr"]) 
    msgs = translate_prompt.format_messages(
        summary=basis, languages_json=json.dumps(languages)
    )
    resp = llm.invoke(msgs)
    try:
        translations = json.loads(resp.content)
        return {"translations": translations}
    except Exception:
        return {"translations": {lang: basis for lang in languages}}


def categorize(state: ProductState) -> ProductState:
    """LLM categorization from text."""
    text = state.get("enriched") or state.get("summary") or state["product_text"]
    msgs = categorize_prompt.format_messages(text=text)
    resp = llm.invoke(msgs)
    try:
        data = json.loads(resp.content)
        return {"category": data["category"]}
    except Exception:
        return {"category": "General"}


In [24]:
# Build graph with checkpointing
workflow = StateGraph(ProductState)
workflow.add_node("summarize", summarize)
workflow.add_node("enrich", enrich)
workflow.add_node("translate", translate)
workflow.add_node("categorize", categorize)
workflow.set_entry_point("summarize")
workflow.add_edge("summarize", "enrich")
workflow.add_edge("enrich", "translate")
workflow.add_edge("translate", "categorize")
workflow.add_edge("categorize", END)

checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)


In [25]:
import time

# Reset simulated failure and input
_TRANSLATION_FIRST_ATTEMPT["pending"] = True
initial_state: ProductState = {
    "product_text": (
        "Wireless Bluetooth earbuds with noise cancellation, 24-hour battery life, "
        "fast USB-C charging, and ergonomic fit for workouts and commuting."
    ),
    "target_languages": ["es", "fr"],
}

# Create checkpoint by failing once at translate
tid = f"benchmark-{int(time.time()*1000)}"
try:
    app.invoke(initial_state, config={"configurable": {"thread_id": tid}})
except Exception:
    pass

# Resume from saved state (skips already-completed nodes)
t0 = time.time()
res_resume = app.invoke({}, config={"configurable": {"thread_id": tid}})
t1 = time.time()

# Restart from scratch with a fresh thread
t2 = time.time()
fresh_tid = f"{tid}-fresh"
res_restart = app.invoke(initial_state, config={"configurable": {"thread_id": fresh_tid}})
t3 = time.time()

resume_s = t1 - t0
restart_s = t3 - t2
saved_s = restart_s - resume_s

print("=== Resume from saved state ===")
print(f"Time: {resume_s:.3f}s")
print("Summary:", res_resume.get("summary"))
print("Translations:", res_resume.get("translations"))
print("Category:", res_resume.get("category"))

print("\n=== Restart from scratch ===")
print(f"Time: {restart_s:.3f}s")
print("Summary:", res_restart.get("summary"))
print("Translations:", res_restart.get("translations"))
print("Category:", res_restart.get("category"))

print(f"\nTime saved: {saved_s:.3f}s")


=== Resume from saved state ===
Time: 14.839s
Summary: Wireless Bluetooth earbuds featuring noise cancellation, a 24-hour battery life, fast USB-C charging, and an ergonomic design ideal for workouts and commuting.
Translations: {'es': "Experience unparalleled audio quality and convenience with our state-of-the-art Wireless Bluetooth Earbuds. Designed for the modern lifestyle, these earbuds feature advanced noise cancellation technology that immerses you in your favorite music or podcasts, blocking out distractions whether you're at the gym, commuting, or simply enjoying a moment of peace. With an impressive 24-hour battery life, you can enjoy extended listening sessions without the worry of running out of power. Plus, the fast USB-C charging capability ensures that you can quickly recharge your earbuds and get back to your day in no time. The ergonomic design provides a secure and comfortable fit, making them perfect for workouts or long commutes. Elevate your audio experience with ou