# LangGraph Essentials V1: Build A Workflow
<div style="display:flex; align-items:flex-start;">
  <img src="./Assets/EmailWorkflow.png" width="600" style="margin-right:15px;"/>
</div>

In [1]:
import uuid
from typing import Literal, TypedDict

## Step 1 Define state schema

In [2]:
# Define the structure for email classification
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

# Define the state schema the application
class EmailAgentState(TypedDict):
    # Raw email data
    email_content: str
    sender_email: str
    email_id: str

    # Classification result
    classification: EmailClassification | None

    # Bug Tracking
    ticket_id: str | None

    # Raw search/API results
    search_results: list[str] | None  # List of raw document chunks
    customer_history: dict | None  # Raw customer data from CRM

    # Generated content
    draft_response: str | None


## Step 2: Define Nodes, Edges

In [6]:
from typing import Literal

from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt

llm = ChatOpenAI(model="gpt-5-mini")

def read_email(state: EmailAgentState) -> dict:
    """Extract and parse email content."""
    # In production, this would connect to your email service
    # email_content is being passed in when the graph is invoked
    pass
    
def classify_intent(state: EmailAgentState) -> dict:
    """Use LLM to classify email intent and urgency, then route accordingly."""

    # Create structured LLM that returns EmailClassification dict
    structured_llm = llm.with_structured_output(EmailClassification)

    # Format the prompt on-demand, not stored in state
    classification_prompt = f"""
    Analyze this customer email and classify it:

    Email: {state['email_content']}
    From: {state['sender_email']}

    Provide classification, including intent, urgency, topic, and summary.
    """

    # Get structured response directly as dict
    classification = structured_llm.invoke(classification_prompt)

    # Store classification as a single dict in state
    return {"classification": classification}

def search_documentation(state: EmailAgentState) -> dict:
    """Search knowledge base for relevant information."""

    # Build search query from classification
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # Implement your search logic here
        # Store raw search results, not formatted text
        search_results = [
            "--Search_Result_1--",
            "--Search_Result_2--",
            "--Search_Result_3--"
        ]
    except SearchAPIError as e:
        # For recoverable search errors, store error and continue
        search_results = [f"Search temporarily unavailable: {str(e)}"]

    return {"search_results": search_results}  # Store raw results or error


def bug_tracking(state: EmailAgentState) -> dict:
    """Create or update bug tracking ticket."""

    # Create ticket in your bug tracking system
    ticket_id = f"BUG_{uuid.uuid4}"  # Would be created via API

    return {
            "ticket_id": ticket_id,
        }

def write_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    """Generate response using context and route based on quality."""

    classification = state.get('classification', {})
    
    # Format context from raw state data on-demand
    context_sections = []

    if state.get('search_results'):
        # Format search results for the prompt
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"Relevant documentation:\n{formatted_docs}")

    if state.get('customer_history'):
        # Format customer data for the prompt
        context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}")

    # Build the prompt with formatted context
    draft_prompt = f"""
    Draft a response to this customer email:
    {state['email_content']}

    Email intent: {classification.get('intent', 'unknown')}
    Urgency level: {classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    Guidelines:
    - Be professional and helpful
    - Address their specific concern
    - Use the provided documentation when relevant
    - Be brief
    """

    response = llm.invoke(draft_prompt)
    # Determine if human review is needed based on urgency and intent
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )

    # Route to the appropriate next node
    if needs_review:
        goto = "human_review"
        print("Needs approval")
    else:
        goto = "send_reply"

    return Command(
        update={"draft_response": response.content},  # Store only the raw response
        goto=goto
    )

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    """Pause for human review using interrupt and route based on decision."""

    classification = state.get('classification', {})

    # interrupt() must come first - any code before it will re-run on resume
    human_decision = interrupt({
        "email_id": state['email_id'],
        "original_email": state['email_content'],
        "draft_response": state.get('draft_response', ""),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "Please review and approve/edit this response"
    })

    # Now process the human's decision
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state['draft_response'])},
            goto="send_reply"
        )
    else:
        # Rejection means human will handle directly
        return Command(update={}, goto=END)

def send_reply(state: EmailAgentState) -> dict:
    """Send the email response."""
    # Integrate with email service
    print(f"Sending reply: {state['draft_response'][:60]}...")
    return {}


## Step 3: Build the graph

In [9]:
from langgraph.checkpoint.memory import MemorySaver

# Create the graph
workflow = StateGraph(EmailAgentState)

# Add nodes 
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)
workflow.add_node( "search_documentation", search_documentation )
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("write_response", write_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# Add edges (no edges for Command)
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("classify_intent", "search_documentation")
workflow.add_edge("classify_intent", "bug_tracking")
workflow.add_edge("search_documentation", "write_response")
workflow.add_edge("bug_tracking", "write_response")
workflow.add_edge("send_reply", END)

# Compile with checkpointer for persistence
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

In [None]:

display(Image(graph.get_graph().draw_mermaid_png()))

<div style="display:flex; align-items:flex-start;">
  <img src="./Assets/NunoGraph.png" width="300" style="margin-right:15px;"/>
</div>

In [17]:
# Test with an urgent billing issue
initial_state = {
    "email_content": "I was charged twice for my subscription! This is urgent!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
}

# Run with a thread_id for persistence
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)

Needs approval


In [18]:
result.keys()

dict_keys(['email_content', 'sender_email', 'email_id', 'classification', 'ticket_id', 'search_results', 'draft_response', '__interrupt__'])

In [19]:
# The graph will pause at human_review
print(f"Draft ready for review: {result['draft_response'][:60]}...\n")

# When ready, provide human input to resume
human_response = Command(
    resume={
        "approved": True,
    }
)

# Resume execution
final_result = app.invoke(human_response, config)
print("Email sent successfully!")


Draft ready for review: Subject: Re: Duplicate charge — we’ll resolve this ASAP

Hi,...

Sending reply: Subject: Re: Duplicate charge — we’ll resolve this ASAP

Hi,...
Email sent successfully!


In [20]:
email_content = [
    "I was charged two times for my subscription! This is urgent!",
    "I was wondering if this was available in blue?",
    "Can you tell me how long the sale is on?",
    "The tire won't stay on the car!",
    "My subscription is going to end in a few months, what is the new rate?"
]
needs_approval = []

for i, content in enumerate(email_content): 

    initial_state = {
        "email_content": content,
        "sender_email": "customer@example.com",
        "email_id": f"email_{i}",
    }
    print(f"{initial_state['email_id']}: ", end="")

    thread_id = uuid.uuid4()
    config =  {"configurable": {"thread_id": thread_id}}
    result = app.invoke(initial_state, config)
    if "__interrupt__" in result.keys():
        result['thread_id'] = thread_id
        needs_approval.append(result)


email_0: Needs approval
email_1: Sending reply: Hello — thanks for reaching out.

Could you tell me which pr...
email_2: Sending reply: Hi,

Thanks for reaching out. Which sale do you mean — the o...
email_3: Needs approval
email_4: Sending reply: Hi [Name],

Thanks for checking — I can help with that. To c...


>LangSmith Trace - [Start-to-End](https://smith.langchain.com/public/3898d0d0-c934-4681-b325-7c4e1e88a826/r)  
>LangSmith Trace - [Interrupt](https://smith.langchain.com/public/c23a3aed-cfa8-42aa-8f1e-78f58941aecd/r)

In [21]:
len(needs_approval)

2

In [22]:
for c_response in needs_approval:
    
    print(f"{c_response['email_id']}: ", end="")
    human_response = Command( resume={"approved": True,} )
    
    config = {"configurable": {"thread_id": c_response["thread_id"]}}
    return_result = app.invoke(human_response, config)  

email_0: Sending reply: Hi — I’m sorry about the duplicate charge and I’ll get this ...
email_3: Sending reply: Thanks — I’m very sorry and I understand how urgent this is....


[LangSmith Trace]( https://smith.langchain.com/public/36e1b840-69ca-4b45-b31f-c71a598e92d2/r)

In [119]:
foo= uuid.uuid4()
type(foo)

uuid.UUID

In [120]:
print(foo.hex[-4:])

c45a
