In [8]:
# Connect LangChain to Anthropic
import os
from dotenv import load_dotenv

# Load environment variables from a .env file in the project root if present
load_dotenv()

api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
    raise RuntimeError(
        "Set ANTHROPIC_API_KEY in your environment or in a .env file."
    )

from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage

# Initialize the Anthropic chat model via LangChain
llm = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0)

# Minimal test call
response = llm.invoke([HumanMessage(content="Reply with 'pong' if you can hear me.")])
print(response.content)

pong


In [9]:
!pip install --upgrade google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client



In [10]:
import os
import pickle
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# Gmail API scope
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]

def get_gmail_service():
    creds = None
    # Reuse saved token if available
    if os.path.exists("token.pickle"):
        with open("token.pickle", "rb") as token:
            creds = pickle.load(token)

    # If no valid creds, go through login flow
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                "client_secret.json", SCOPES
            )
            creds = flow.run_local_server(port=0)

        # Save creds for next run
        with open("token.pickle", "wb") as token:
            pickle.dump(creds, token)

    return build("gmail", "v1", credentials=creds)

In [12]:
service = get_gmail_service()

# Get last 5 emails
results = service.users().messages().list(userId="me", maxResults=5).execute()
messages = results.get("messages", [])

for msg in messages:
    msg_data = service.users().messages().get(userId="me", id=msg["id"]).execute()
    snippet = msg_data.get("snippet", "")
    print(f"Message ID: {msg['id']}")
    print(f"Snippet: {snippet}")
    print("="*50)

Message ID: 198bfc95b6f0aff8
Snippet: Hi
Message ID: 198bfa32087d8dda
Snippet: You&#39;ve taken the first step! Complete your setup to start appearing on Google Search and Maps today. ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍


In [None]:
def extract_email_data(message):
    """Extract key information from a Gmail message."""
    headers = message['payload']['headers']
    
    # Extract key headers
    subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
    from_email = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown Sender')
    to_email = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown Recipient')
    date = next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown Date')
    
    # Extract message content
    content = extract_message_part(message['payload'])
    
    # Create email data object
    email_data = {
        "from_email": from_email,
        "to_email": to_email,
        "subject": subject,
        "page_content": content,
        "id": message['id'],
        "thread_id": message['threadId'],
        "send_time": date
    }
    
    return email_data

In [40]:
import base64
import re
from email.utils import parsedate_to_datetime

def get_email_data(service, message_id):
    """Extract email data from Gmail API message"""
    message = service.users().messages().get(userId="me", id=message_id).execute()
    headers = message['payload'].get('headers', [])
    
    # Helper functions
    def get_header(name):
        return next((h['value'] for h in headers if h['name'].lower() == name.lower()), "")
    
    def decode_body(data):
        return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='ignore')
    
    def extract_content(payload):
        if payload.get('body', {}).get('data'):
            return decode_body(payload['body']['data'])
        
        # Prefer plain text over HTML to avoid duplicates
        plain_content = ""
        html_content = ""
        
        for part in payload.get('parts', []):
            if part.get('mimeType') == 'text/plain' and part.get('body', {}).get('data'):
                plain_content = decode_body(part['body']['data'])
            elif part.get('mimeType') == 'text/html' and part.get('body', {}).get('data'):
                html_content = decode_body(part['body']['data'])
        
        # Return plain text if available, otherwise HTML
        return plain_content if plain_content else html_content
    
    def clean_email(email_str):
        match = re.search(r'<([^>]+)>|([^\s<>]+@[^\s<>]+)', email_str)
        return match.group(1) or match.group(2) if match else email_str.strip()
    
    def parse_date(date_str):
        try:
            return parsedate_to_datetime(date_str).strftime('%Y-%m-%d %H:%M:%S')
        except:
            return date_str
    
    # Extract data
    content = extract_content(message['payload'])
    content = re.sub(r'<[^>]+>', '', content)  # Remove HTML tags
    content = re.sub(r'\s+', ' ', content).strip()  # Clean whitespace
    
    return {
        "from_email": clean_email(get_header('From')),
        "to_email": clean_email(get_header('To')),
        "subject": get_header('Subject'),
        "page_content": content,
        "id": message['id'],
        "thread_id": message['threadId'],
        "send_time": parse_date(get_header('Date'))
    }

def print_email(email_data, max_content_length=200):
    """Print email data in a nice formatted way"""
    print("=" * 60)
    print(f"📧 EMAIL ID: {email_data['id']}")
    print("=" * 60)
    print(f"From:    {email_data['from_email']}")
    print(f"To:      {email_data['to_email']}")
    print(f"Subject: {email_data['subject']}")
    print(f"Date:    {email_data['send_time']}")
    print(f"Thread:  {email_data['thread_id']}")
    print("-" * 60)
    print("Content:")
    content = email_data['page_content']
    if max_content_length and len(content) > max_content_length:
        print(f"{content[:max_content_length]}...")
        print(f"[Content truncated - {len(content)} total characters]")
    else:
        print(content)
    print("=" * 60)
    print()

def print_all_emails(emails_array, max_emails=None):
    """Print all emails in the array with optional limit"""
    total_emails = len(emails_array)
    emails_to_print = emails_array[:max_emails] if max_emails else emails_array
    
    print(f"📬 SHOWING {len(emails_to_print)} OF {total_emails} EMAILS")
    print()
    
    for i, email_data in enumerate(emails_to_print, 1):
        print(f"[EMAIL {i}/{len(emails_to_print)}]")
        print_email(email_data, max_content_length=None)

In [41]:
service = get_gmail_service()

In [None]:
# Get list of messages
messages = service.users().messages().list(userId="me").execute()

# Create empty array to store email data
emails_array = []

# Loop through each message and extract data
for message_info in messages.get('messages', []):
    email_data = get_email_data(service, message_info["id"])
    emails_array.append(email_data)

print_all_emails(emails_array)

# Start of creating agent tools

In [43]:
from typing import Literal
from datetime import datetime
from pydantic import BaseModel
from langchain_core.tools import tool

In [45]:
# remember the agent is filling in the arguements 
@tool
def write_emails(to: str, subject: str, content: str) -> str:
    # for an agent to use a tool it needs a description
    """Write and send an email."""
    # place holder for a real app requeset

    return f"Email sent to {to} with subject '{subject} and content '{content}''"

@tool
def schedule_meeting(attendees: list[str], subject: str, duration_minutes: int, preferred_day: datetime, start_time: int) -> str:
    """ Schedule a calendar meeting """
    # real agent will check the calendar and schedule
    date_str = preferred_day.strftime("%A, %B %d, %Y")

    return f"Meeting '{subject}' scheduled on {date_str} at {start_time} for {duration_minutes} minutes with {len(attendees)} attendees"

@tool
# a day will be passed in by the agent 
def check_calendar_availability(day: str) -> str:
    """Check calendar availability for a given day."""
    # Placeholder response - in real app would check actual calendar
    return f"Available times on {day}: 9:00 AM, 2:00 PM, 4:00 PM" # the agent passes in the day and then we will return the needed
                                                                  # information back to the agent.

# Building the Agent assistant

Using the combination of a deterministic router and undeterministic agent with tools to create the workflow.

- e-mail input -> e-mail_router["ignore", "notify", "respond"] -> respond -> ReAct Agent

Creating the router agent triage. The router only focuses on the decision on what to do based on the content of the e-mail.

While the agent handles the response.

In [47]:
from langgraph.graph import MessagesState

# Here we are defining the state of our agent. MessagesState contains the messages: key value with its reducer to add messages to the
# array.
# also added keys email to keep track of the current email sent through the workflow
# and lastly the decision of the router
class State(MessagesState):
    # add a specific dict to handle the single email
    email:dict
    classification_decision: Literal["ignore", "respond", "notify"]

# The Triage Router Node

defined by a python function, we will use Pydanic structured outputs that gets passed as json to the llm to get the correct structured output

In [52]:
from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model
from langgraph.graph import END
from langgraph.types import Command
from typing import List, Any
import json
import html2text

# Utils to parse the email 

In [51]:
def parse_email(email_input: dict) -> dict:
    """Parse an email input dictionary.

    Args:
        email_input (dict): Dictionary containing email fields:
            - author: Sender's name and email
            - to: Recipient's name and email
            - subject: Email subject line
            - email_thread: Full email content

    Returns:
        tuple[str, str, str, str]: Tuple containing:
            - author: Sender's name and email
            - to: Recipient's name and email
            - subject: Email subject line
            - email_thread: Full email content
    """
    return (
        email_input["author"],
        email_input["to"],
        email_input["subject"],
        email_input["email_thread"],
    )

def format_email_markdown(subject, author, to, email_thread, email_id=None):
    """Format email details into a nicely formatted markdown string for display
    
    Args:
        subject: Email subject
        author: Email sender
        to: Email recipient
        email_thread: Email content
        email_id: Optional email ID (for Gmail API)
    """
    id_section = f"\n**ID**: {email_id}" if email_id else ""
    
    return f"""

**Subject**: {subject}
**From**: {author}
**To**: {to}{id_section}

{email_thread}

---
"""

In [55]:
# Email assistant triage prompt 
triage_system_prompt = """

< Role >
Your role is to triage incoming emails based upon instructs and background information below.
</ Role >

< Background >
{background}. 
</ Background >

< Instructions >
Categorize each email into one of three categories:
1. IGNORE - Emails that are not worth responding to or tracking
2. NOTIFY - Important information that worth notification but doesn't require a response
3. RESPOND - Emails that need a direct response
Classify the below email into one of these categories.
</ Instructions >

< Rules >
{triage_instructions}
</ Rules >
"""

# Email assistant triage user prompt 
triage_user_prompt = """
Please determine how to handle the below email thread:

From: {author}
To: {to}
Subject: {subject}
{email_thread}"""

# Default triage instructions 
default_triage_instructions = """
Emails that are not worth responding to:
- Marketing newsletters and promotional emails
- Spam or suspicious emails
- CC'd on FYI threads with no direct questions

There are also other things that should be known about, but don't require an email response. For these, you should notify (using the `notify` response). Examples of this include:
- Team member out sick or on vacation
- Build system notifications or deployments
- Project status updates without action items
- Important company announcements
- FYI emails that contain relevant information for current projects
- HR Department deadline reminders
- Subscription status / renewal reminders
- GitHub notifications

Emails that are worth responding to:
- Direct questions from team members requiring expertise
- Meeting requests requiring confirmation
- Critical bug reports related to team's projects
- Requests from management requiring acknowledgment
- Client inquiries about project status or features
- Technical questions about documentation, code, or APIs (especially questions about missing endpoints or features)
- Personal reminders related to family (wife / daughter)
- Personal reminder related to self-care (doctor appointments, etc)
"""

# Default background information 
default_background = """ 
I'm Jovanny, part of the Q-Start automation company.
"""

In [56]:
from rich.markdown import Markdown
Markdown(triage_system_prompt)

In [57]:
Markdown(triage_user_prompt)

In [58]:
Markdown(default_background)

In [59]:
Markdown(default_triage_instructions)

In [61]:
class RouterSchema(BaseModel):
    """Analyze the unread email and route it according to its content."""

    reasoning: str = Field(
        description="Step-by-step reasoning behind the classification."
    )
    classification: Literal["ignore", "respond", "notify"] = Field(
        description="The classification of an email: 'ignore' for irrelevant emails, "
        "'notify' for important information that doesn't need a response, "
        "'respond' for emails that need a reply",
    )

# Initialize the LLM for use with router / structured output
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_router = llm.with_structured_output(RouterSchema) 

def triage_router(state: State) -> Command[Literal["response_agent", "__end__"]]:
    """Analyze email content to decide if we should respond, notify, or ignore."""
    
    author, to, subject, email_thread = parse_email(state["email_input"])
    system_prompt = triage_system_prompt.format(
        background=default_background,
        triage_instructions=default_triage_instructions
    )

    user_prompt = triage_user_prompt.format(
        author=author, to=to, subject=subject, email_thread=email_thread
    )

    result = llm_router.invoke(
        [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
    )
    
    if result.classification == "respond":
        print("📧 Classification: RESPOND - This email requires a response")
        goto = "response_agent"
        update = {
            "messages": [
                {
                    "role": "user",
                    "content": f"Respond to the email: \n\n{format_email_markdown(subject, author, to, email_thread)}",
                }
            ],
            "classification_decision": result.classification,
        }
        
    elif result.classification == "ignore":
        print("🚫 Classification: IGNORE - This email can be safely ignored")
        goto = END
        update =  {
            "classification_decision": result.classification,
        }
        
    elif result.classification == "notify":
        print("🔔 Classification: NOTIFY - This email contains important information")
        # For now, we go to END. But we will add to this later!
        goto = END
        update = {
            "classification_decision": result.classification,
        }
        
    else:
        raise ValueError(f"Invalid classification: {result.classification}")
    return Command(goto=goto, update=update)

OpenAIError: The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable