In [None]:
import sys, os
import dotenv
from datetime import datetime, timedelta
from typing import Literal

dotenv.load_dotenv()

True

In [12]:
from pydantic import BaseModel, Field

from email_assistant.utils import parse_email, format_email_markdown
from email_assistant.prompts import triage_system_prompt, triage_user_prompt, default_triage_instructions, default_background

from langchain.chat_models import init_chat_model
from langchain_openai import AzureChatOpenAI
from langchain.tools import tool
from langgraph.graph import END
from langgraph.types import Command

from langgraph.graph import MessagesState

# Define tools

In [4]:
@tool
def write_email(to: str, subject: str, content: str) -> str:
    """Write and send an email"""
    # TODO: Implement email sending logic here
    return f"Email sent to {to} with subject '{subject}' and content: {content}."

@tool
def schedule_meeting(
    attendees: list[str], subject: str, duration_minutes: int, preferred_day: datetime, start_time: int
) -> str:
    """Schedule a calendar meeting."""
    # TODO: Implement meeting scheduling logic here
    date_str = preferred_day.strftime("%A, %B %d, %Y")
    return f"Meeting '{subject}' scheduled on {date_str} at {start_time} for {duration_minutes} minutes with attendees: {', '.join(attendees)}."

@tool
def check_calendar_availability(day: str) -> str:
    """Check calendar availability for a given day."""
    # TODO: Implement calendar availability checking logic here
    return f"Available times on {day}: 9:00 AM, 2:00 PM, 4:00 PM."

@tool
class Done(BaseModel):
    """E-mail has been sent."""
    done: bool


# Define Router

Router handles the user query triage.

It is important to consider the information to track over the agent graph. This information is the **State**. We use langchain prebuilt `MessageState`, which is a dictionary with a `messages` key that appends messages returned by nodes as its update logic. We extend the `MessageState` object and adds custom keys to suit this application.

In [None]:
class State(MessagesState):
    email_input: dict
    classification_decision: Literal['ignore', 'respond', 'notify']

 ## Initialize LLM

In [None]:
model_name = os.environ["AZURE_OPENAI_DEPLOYMENT"]

azure_model = init_chat_model(
    model=f"azure_openai:{model_name}",                  # target model family
    model_provider="azure_openai",               # tell LC to use Azure OpenAI
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_key=os.environ["AZURE_OPENAI_API_KEY"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    azure_deployment=model_name,  # deployment name in Azure
    temperature=0.5,
    timeout=10,
    max_tokens=1000,
)

# Test the model connection
response = azure_model.invoke([
    ("human", "Hello, Azure OpenAI")
])

print(response)

content='Hello! How can I assist you today?' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 12, 'total_tokens': 22, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_f97eff32c5', 'id': 'chatcmpl-D4DpVkBASAzvPUzqMJ45tsoyF7Oyg', 'prompt_filter_results': [{'prompt_index': 0, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'jailbreak': {'filtered': False, 'detected': False}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}], 'finish_reason': 'stop', 'logprobs': None, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'p

## Router LLM

Define the structured output of the router LLM, and assign the schema to the router LLM using `with_structured_output` function. The LLM output will be coerced to follow the output schema.

In [None]:
class RouterSchema(BaseModel):
    """Anallyze the unread email and route it according to its content."""
    reasoning: str = Field(
        description="Step-by-step reasoning behind the classification"
    )
    classification: Literal["ignore", "respond", "notify"] = Field(
        description="""
The classification of an email: ''
- 'ignore' for irrelevant emails.
- 'notify' for important information that doesn't need a response.
- 'respond' for emails that need a reply.
"""
    )

In [15]:
# Create a structured output LLM with the defined schema
router_llm = azure_model.with_structured_output(RouterSchema)

## Router Node

The router is a node in the LangGraph agent graph. We define the router as a function that takes the `MessagesState` defined earlier, and returns appropriate node to move onto next. In this work, the router node can optionally move to the `response_agent` node or the `END` node.

In [None]:
def triage_router(state: State) -> Command[Literal["response_agent", END]]:
    """Analyze email content to decide if we should respond, notify, or ignore."""

    author, to, subject, email_thread = parse_email(state["email_input"])
    system_prompt = triage_system_prompt.format(
        background=default_background,
        triage_instructions=default_triage_instructions
    )

    # construct the user prompt using the email details
    user_prompt = triage_user_prompt.format(
        author=author,
        to=to,
        subject=subject,
        email_thread=email_thread
    )
    # invoke the LLM with the system and user prompts
    result = router_llm.invoke_messages([
        ("system", system_prompt),
        ("user", user_prompt)
    ])
    # The result is automatically parsed into the RouterSchema
    if result.classification == "respond":
        print("Classification: RESPOND - This email requires a response.")
        # Next node to go to
        goto = "response_agent"
        # Update agent state
        update = {
            "messages": [
                {
                    "role": "user",
                    "content": f""
                }
            ]
        }

We look at the triage system prompt.

In [18]:
print(triage_system_prompt)



< Role >
Your role is to triage incoming emails based upon instructs and background information below.
</ Role >

< Background >
{background}. 
</ Background >

< Instructions >
Categorize each email into one of three categories:
1. IGNORE - Emails that are not worth responding to or tracking
2. NOTIFY - Important information that worth notification but doesn't require a response
3. RESPOND - Emails that need a direct response
Classify the below email into one of these categories.
</ Instructions >

< Rules >
{triage_instructions}
</ Rules >



We look at the email triage instruction.

In [19]:
print(default_triage_instructions)


Emails that are not worth responding to:
- Marketing newsletters and promotional emails
- Spam or suspicious emails
- CC'd on FYI threads with no direct questions

There are also other things that should be known about, but don't require an email response. For these, you should notify (using the `notify` response). Examples of this include:
- Team member out sick or on vacation
- Build system notifications or deployments
- Project status updates without action items
- Important company announcements
- FYI emails that contain relevant information for current projects
- HR Department deadline reminders
- Subscription status / renewal reminders
- GitHub notifications

Emails that are worth responding to:
- Direct questions from team members requiring expertise
- Meeting requests requiring confirmation
- Critical bug reports related to team's projects
- Requests from management requiring acknowledgment
- Client inquiries about project status or features
- Technical questions about documen