# Assignment: Build a Robust Multi-Node LangGraph App with Retry, Fallback, and Tracing

---

### Objective:
This assignment challenges you to design and implement a **resilient multi-node LangGraph application** that incorporates crucial production-ready features: **retry mechanisms, fallback strategies, and comprehensive tracing**. You'll build a workflow that simulates a content generation and review process, demonstrating how to handle potential failures gracefully and monitor your agent's execution. This project will deepen your understanding of building robust and observable LLM-powered applications.

---

### Instructions:
1.  **LLM Access**: You'll need access to an LLM API (e.g., Google's Gemini, OpenAI's GPT-4). For this assignment, we'll primarily use **Google's Gemini Pro model** via the `langchain-google-genai` integration.
2.  **Environment Setup**: Install the necessary Python libraries: `pip install langchain-google-genai langgraph langsmith pydantic tenacity`.
3.  **API Keys**: Securely handle your LLM API key and your LangSmith API key. It's best practice to load them from environment variables.
4.  **Jupyter Notebook**: All your code, outputs, workflow diagrams (if you generate them), observations, and analysis must be documented in this Jupyter Notebook.
5.  **Simulated Failures**: You will need to introduce *simulated failures* (e.g., random exceptions, LLM outputting incorrect formats) to test your retry and fallback mechanisms.
6.  **Tracing**: Use LangSmith for tracing your workflow executions. Provide screenshots or links to your LangSmith traces.
7.  **Analysis**: Critically evaluate your LangGraph workflow's resilience and observability.

---

## Part 1: Setup and Workflow State Definition
Begin by configuring your LLM and LangSmith, then define the state that your LangGraph workflow will manage.

### Task 1.1: API and LangSmith Configuration
Set up your Google Generative AI API key and configure LangSmith for tracing. Initialize the `ChatGoogleGenerativeAI` model.

In [None]:
import os
import random
import time
from typing import Literal, TypedDict
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field, ValidationError

from langgraph.graph import StateGraph, END

# --- YOUR API KEYS HERE ---
# It's highly recommended to load API keys from environment variables for security.
# On Linux/macOS: export GOOGLE_API_KEY='your_key' ; export LANGCHAIN_API_KEY='your_key' ; export LANGCHAIN_TRACING_V2='true' ; export LANGCHAIN_PROJECT='your_project_name'
# On Windows (cmd): set GOOGLE_API_KEY=your_key & set LANGCHAIN_API_KEY=your_key & set LANGCHAIN_TRACING_V2=true & set LANGCHAIN_PROJECT=your_project_name

os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY_HERE" # Replace with your actual Google API key

# Configure LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "YOUR_LANGCHAIN_API_KEY_HERE" # Replace with your actual LangSmith API key
os.environ["LANGCHAIN_PROJECT"] = "LangGraph_Retry_Fallback_Assignment" # Choose a project name for LangSmith

# Initialize the LLM (Gemini Pro)
llm = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.7) # A bit of temperature for creativity

print("LLM and LangSmith configured!")

### Task 1.2: Define Workflow State
Define a `TypedDict` to represent the state of your LangGraph workflow for content generation and review.

* **Requirements for State**:
    * `topic`: The subject of the content to be generated.
    * `generated_content`: The text content generated by the LLM (optional, as it's generated later).
    * `review_feedback`: Feedback from the review process (optional).
    * `status`: Current status of the content (e.g., "Draft", "Needs Review", "Approved", "Rejected", "Failed").
    * `retries_left`: An integer to track how many retries are available for a failed step.
    * `error_message`: An optional string to store details of any error that occurred.

In [None]:
class ContentWorkflowState(TypedDict):
    topic: str
    generated_content: str
    review_feedback: str
    status: Literal["Draft", "Needs Review", "Approved", "Rejected", "Failed"]
    retries_left: int
    error_message: str

print("ContentWorkflowState defined!")

---

## Part 2: Define Nodes with Retry and Fallback
Create the individual nodes (functions) that represent steps in your workflow, incorporating retry logic and fallback mechanisms.

### Task 2.1: `generate_content` Node with Retry
This node will generate content based on the `topic`. Implement a **retry mechanism** using `tenacity` for transient failures (e.g., API errors, rate limits).

* **Simulated Failure**: Introduce a `random.random() < 0.3` check to raise a `RuntimeError` (simulating a transient API error) in about 30% of calls.
* **Retry Strategy**: Use `@retry` decorator with `stop_after_attempt` (e.g., 3 attempts) and `wait_fixed` (e.g., 2 seconds).
* **Output**: Update `generated_content` and `status` to "Needs Review" upon success.

In [None]:
generation_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a content writer. Generate a short article (around 150-200 words) on the following topic."),
    ("user", "Topic: {topic}")
])

generation_chain = generation_prompt | llm

@retry(
    stop=stop_after_attempt(3),
    wait=wait_fixed(2),
    retry=retry_if_exception_type(RuntimeError),
    reraise=True # Re-raise the exception after retries are exhausted
)
def _generate_content_inner(topic: str) -> str:
    # Simulate a transient failure
    if random.random() < 0.3: # 30% chance of failure
        raise RuntimeError("Simulated API error during content generation!")

    print(f"  Generating content for: {topic}...")
    response = generation_chain.invoke({"topic": topic})
    return response.content

def generate_content_node(state: ContentWorkflowState) -> ContentWorkflowState:
    print("\n--- Node: Generating Content ---")
    topic = state["topic"]
    retries_left = state.get("retries_left", 3) # Initial retries

    if retries_left <= 0:
        print("  No retries left for content generation. Falling back...")
        return {"status": "Failed", "error_message": "Content generation failed after multiple retries."}

    try:
        content = _generate_content_inner(topic)
        print("  Content generated successfully.")
        return {"generated_content": content, "status": "Needs Review", "retries_left": 3, "error_message": ""}
    except RuntimeError as e:
        print(f"  Content generation failed: {e}. Retrying...")
        return {"status": "Draft", "retries_left": retries_left - 1, "error_message": str(e)}
    except Exception as e:
        print(f"  An unexpected error occurred during content generation: {e}. Escalating...")
        return {"status": "Failed", "error_message": f"Unexpected error in content generation: {e}"}

print("generate_content_node defined!")

### Task 2.2: `review_content` Node with Fallback
This node will review the generated content. Implement a **fallback mechanism** if the LLM fails to provide a structured review (e.g., using Pydantic output).

    * **Simulated Failure**: Introduce a `random.random() < 0.2` check to raise a `ValidationError` (simulating LLM outputting malformed data) in about 20% of calls. Also, occasionally simulate an LLM giving a 'Rejected' status even if content is okay, to test the retry to `generate_content`.
    * **Fallback Strategy**: If `ValidationError` occurs, the node should `pass` the content directly to an `escalate_human_review` node instead of failing the workflow.
    * **Output**: Update `review_feedback` and `status` to "Approved" or "Rejected".

In [None]:
class ReviewOutput(BaseModel):
    status: Literal["Approved", "Rejected"]
    feedback: str = Field(description="Detailed feedback for approval or rejection.")

review_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a content reviewer. Review the following content for quality, relevance, and adherence to the topic. Provide feedback and categorize the content as 'Approved' or 'Rejected'."),
    ("user", "Topic: {topic}\n\nContent: {content}")
]).partial(
    format_instructions=ReviewOutput.schema_json()
)

review_chain = review_prompt | llm.with_structured_output(ReviewOutput)

def review_content_node(state: ContentWorkflowState) -> ContentWorkflowState:
    print("\n--- Node: Reviewing Content ---")
    topic = state["topic"]
    content = state["generated_content"]

    # Simulate LLM outputting malformed data (ValidationError) or a 'Rejected' status for good content
    if random.random() < 0.2: # 20% chance of ValidationError
        print("  Simulating ValidationError during review (malformed output).")
        # Raise a fake ValidationError that will be caught
        raise ValidationError([{'loc': ('sentiment',), 'msg': 'value is not a valid enumeration member', 'type': 'type_error.enum'}], ReviewOutput)

    # Occasionally simulate a 'Rejected' output even if content is good, to test retry to generation
    if random.random() < 0.1: # 10% chance of a 'false negative' review
        print("  Simulating a 'Rejected' review to trigger re-generation.")
        return {
            "review_feedback": "Content was unexpectedly rejected during review process, needs re-generation.",
            "status": "Rejected" # Will lead back to generate_content
        }

    try:
        review_result = review_chain.invoke({"topic": topic, "content": content})
        print(f"  Review result: {review_result.status} - {review_result.feedback}")
        return {
            "review_feedback": review_result.feedback,
            "status": review_result.status,
            "error_message": ""
        }
    except ValidationError as e:
        print(f"  Review failed validation: {e}. Falling back to human review.")
        return {
            "status": "Failed", # Mark as failed to trigger escalation
            "error_message": f"Review output validation failed: {e}"
        }
    except Exception as e:
        print(f"  An unexpected error occurred during content review: {e}. Falling back to human review.")
        return {
            "status": "Failed", # Mark as failed to trigger escalation
            "error_message": f"Unexpected error in content review: {e}"
        }

print("review_content_node defined!")

### Task 2.3: `escalate_human_review` Node
This node handles situations where automated steps fail or a human needs to intervene. It should capture the reason for escalation.

* **Purpose**: Marks the workflow for human intervention.
* **Output**: Updates `status` to "Failed" and sets `error_message` with a clear reason.

In [None]:
def escalate_human_review_node(state: ContentWorkflowState) -> ContentWorkflowState:
    print("\n--- Node: Escalating to Human Review ---")
    reason = state.get("error_message", "Unknown reason for escalation.")
    if state["status"] == "Rejected":
        reason = "Content rejected by automated review. Needs human re-evaluation."
    elif state["status"] == "Draft" and "retries_left" in state and state["retries_left"] <= 0:
        reason = "Content generation failed after multiple retries. Needs human intervention."

    print(f"  Escalated. Reason: {reason}")
    return {
        "status": "Failed",
        "error_message": reason
    }

print("escalate_human_review_node defined!")

---

## Part 3: Constructing the LangGraph Workflow
Combine your nodes into a directed graph using LangGraph's `StateGraph`, defining conditional logic for retries and fallbacks.

### Task 3.1: Define Edges and Conditional Logic
Define the graph's nodes, entry point, and the conditional edges that determine the flow based on the `status` and `retries_left`.

* **Graph Flow**:
    1.  **Entry Point**: `generate_content`.
    2.  From `generate_content`:
        * If `status` is "Needs Review", go to `review_content`.
        * If `status` is "Draft" and `retries_left > 0`, loop back to `generate_content` (retry).
        * If `status` is "Failed" or `retries_left == 0`, go to `escalate_human_review` (fallback).
    3.  From `review_content`:
        * If `status` is "Approved", `END` the workflow.
        * If `status` is "Rejected", loop back to `generate_content` (re-generation).
        * If `status` is "Failed" (due to review validation error), go to `escalate_human_review` (fallback).
    4.  From `escalate_human_review`: `END` the workflow.

In [None]:
# Define the graph
workflow = StateGraph(ContentWorkflowState)

# Add nodes
workflow.add_node("generate", generate_content_node)
workflow.add_node("review", review_content_node)
workflow.add_node("escalate", escalate_human_review_node)

# Set entry point
workflow.set_entry_point("generate")

# Define conditional edges from 'generate'
def route_after_generate(state: ContentWorkflowState) -> Literal["review", "generate", "escalate"]:
    if state["status"] == "Needs Review":
        return "review"
    elif state["status"] == "Draft" and state["retries_left"] > 0:
        return "generate" # Retry generation
    else: # status is 'Failed' or retries_left == 0
        return "escalate" # Fallback to human review

workflow.add_conditional_edges(
    "generate",
    route_after_generate,
    {
        "review": "review",
        "generate": "generate",
        "escalate": "escalate",
    },
)

# Define conditional edges from 'review'
def route_after_review(state: ContentWorkflowState) -> Literal["generate", "escalate", END]:
    if state["status"] == "Approved":
        return END
    elif state["status"] == "Rejected":
        return "generate" # Re-generate content
    else: # status is 'Failed' (due to validation error or unexpected error)
        return "escalate" # Fallback to human review

workflow.add_conditional_edges(
    "review",
    route_after_review,
    {
        "generate": "generate",
        "escalate": "escalate",
        END: END,
    },
)

# Define edge from 'escalate' to END
workflow.add_edge("escalate", END)

# Compile the graph
app = workflow.compile()

print("LangGraph workflow compiled!")

# Optional: Visualize the graph (requires 'graphviz' and 'pydotplus')
# try:
#     from IPython.display import Image, display
#     display(Image(app.get_graph().draw_png()))
# except ImportError:
#     print("Install graphviz and pydotplus for visualization: pip install graphviz pydotplus")

---

## Part 4: Testing and Tracing the Workflow
Test your workflow with various topics, observing its behavior, retries, fallbacks, and utilizing LangSmith for tracing.

### Task 4.1: Test Cases and Observation
Run the workflow with the following types of inputs. For each run, observe the console output and then examine the corresponding trace in LangSmith.

* **Test Topics**:
    1.  `'The benefits of sustainable agriculture'` (Expected: Approved, if no simulated failures)
    2.  `'The future of quantum computing'` (Expected: Should trigger retry/fallback, depending on `generate_content` failures)
    3.  `'Tips for healthy eating'` (Expected: Should trigger review rejection/fallback, depending on `review_content` failures)
    4.  `'The history of artificial intelligence'` (Test multiple times to see different paths due to random failures)

For each test case:
1.  Run the workflow by calling `run_workflow_and_print_state`.
2.  **Paste the console output (steps and final state)**.
3.  **Provide a screenshot or a direct link to the LangSmith trace** for that specific run.
4.  **Describe** what happened during the execution (e.g., "Retried 2 times for generation, then approved", "Generated successfully, but review failed validation and escalated").

In [None]:
def run_workflow_and_print_state(topic: str):
    print(f"\n\n=========== Processing Topic: '{topic}' ===========")
    initial_state = {
        "topic": topic,
        "generated_content": "",
        "review_feedback": "",
        "status": "Draft", # Initial status
        "retries_left": 3, # Max retries for generation
        "error_message": ""
    }

    final_state = None
    try:
        for s in app.stream(initial_state):
            print(f"Current state: {s}")
            final_state = s
        print(f"Final state for '{topic}': {final_state}")
    except Exception as e:
        print(f"An unhandled error occurred during workflow execution: {e}")

# --- Run your test cases ---
test_topics = [
    'The benefits of sustainable agriculture', # Expected: Approved, if no simulated failures
    'The future of quantum computing',          # Expected: Should trigger retry/fallback
    'Tips for healthy eating',                  # Expected: Should trigger review rejection/fallback
    'The history of artificial intelligence'    # Test multiple times to see different paths
]

# Uncomment and run each test case individually to analyze its LangSmith trace clearly
# run_workflow_and_print_state(test_topics[0])
# run_workflow_and_print_state(test_topics[1])
# run_workflow_and_print_state(test_topics[2])
# run_workflow_and_print_state(test_topics[3])

print("Uncomment the 'run_workflow_and_print_state' calls above to execute tests.")
print("Remember to observe console output and check LangSmith for traces.")

### Analysis for Task 4.1:

**Test Case 1: 'The benefits of sustainable agriculture'**

* **Console Output:**
    ```
    [Paste console output here]
    ```
* **LangSmith Trace Link/Screenshot:**
    [Paste LangSmith Trace Link or embed screenshot here]
* **Execution Description:**
    [Describe what happened during this run.]

**Test Case 2: 'The future of quantum computing'**

* **Console Output:**
    ```
    [Paste console output here]
    ```
* **LangSmith Trace Link/Screenshot:**
    [Paste LangSmith Trace Link or embed screenshot here]
* **Execution Description:**
    [Describe what happened during this run, specifically focusing on retries/fallbacks.]

**Test Case 3: 'Tips for healthy eating'**

* **Console Output:**
    ```
    [Paste console output here]
    ```
* **LangSmith Trace Link/Screenshot:**
    [Paste LangSmith Trace Link or embed screenshot here]
* **Execution Description:**
    [Describe what happened during this run, specifically focusing on review failures/fallbacks.]

**Test Case 4: 'The history of artificial intelligence' (Run multiple times)**

* **Run 1 Console Output:**
    ```
    [Paste console output here]
    ```
* **Run 1 LangSmith Trace Link/Screenshot:**
    [Paste LangSmith Trace Link or embed screenshot here]
* **Run 1 Execution Description:**
    [Describe what happened during this run.]

* **Run 2 Console Output:**
    ```
    [Paste console output here]
    ```
* **Run 2 LangSmith Trace Link/Screenshot:**
    [Paste LangSmith Trace Link or embed screenshot here]
* **Run 2 Execution Description:**
    [Describe what happened during this run.]

---

## Part 5: Conclusion and Reflection
In this markdown cell, provide a comprehensive summary of your findings and reflections based on this assignment.

* **Effectiveness of Resilience Patterns**: How effective were the retry and fallback mechanisms in making your LangGraph workflow more robust? Provide concrete examples from your traces.
* **Value of Tracing**: How did LangSmith tracing aid in debugging, understanding, and evaluating your complex LangGraph workflow, especially when failures occurred?
* **Challenges in Implementation**: What were the main challenges you faced in implementing retry, fallback, and integrating tracing?
* **Future Enhancements**: If you were to extend this workflow, what other resilience patterns or observability features would you add (e.g., circuit breakers, alerts, more sophisticated error handling, different fallback options)?
* **Production Readiness**: Discuss what it means for an LLM application to be "production-ready" from the perspective of resilience and observability, based on this assignment.

---

### Submission:
* Ensure all code cells have been executed and their outputs are visible.
* All analysis and reflections are clearly written in markdown cells.
* Provide screenshots or direct links to your LangSmith traces for each test case.
* Save your Jupyter Notebook as `[YourName]_LangGraph_Resilience_Assignment.ipynb`.