In [None]:
# !pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
# !pip install --upgrade langchain langchain-google-genai langgraph pydantic
# !pip install --upgrade python-dotenv

In [1]:
import os
import pickle
import base64
from typing import TypedDict, Annotated, List, Optional, Dict
from dotenv import load_dotenv

# LangChain / LangGraph imports
# These let us build the AI agent and the workflow graph
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END
from pydantic import BaseModel, Field

# Google API imports
# These are for the authentication flow and building the Gmail service
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build

# Load environment variables
# This looks for a .env file to load API keys (we will create this next if needed)
load_dotenv()

print("Libraries imported successfully!")

  from .autonotebook import tqdm as notebook_tqdm


Libraries imported successfully!


In [2]:
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']

def get_gmail_service():
    creds = None
    # 1. Check if we have already logged in before (token.pickle)
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
            
    # 2. If not logged in (or token expired), start the login flow
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            # This looks for your credentials.json file
            if not os.path.exists('credentials.json'):
                raise FileNotFoundError("Could not find credentials.json! Make sure it is in the folder.")
                
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            # This opens a browser window for you to click "Allow"
            creds = flow.run_local_server(port=0)
            
        # 3. Save the login so we don't have to do this next time
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)

    service = build('gmail', 'v1', credentials=creds)
    return service

# Run the authentication immediately to test it
try:
    gmail_service = get_gmail_service()
    print("Gmail Service Authenticated Successfully!")
except Exception as e:
    print(f"Authentication Failed: {e}")

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=1050417952816-2pebbltou3q6f5htfercljcapi45voh8.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A50075%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fgmail.modify&state=iAFAK6XMIN67A91liHKLTYkjtnHI6p&access_type=offline
Gmail Service Authenticated Successfully!


In [3]:
def fetch_latest_emails(service, max_results=3):
    """
    Fetches the latest unread emails from the user's inbox.
    Returns a list of dictionaries containing email details.
    """
    # Request list of messages with label 'INBOX' and 'UNREAD'
    results = service.users().messages().list(
        userId='me', 
        labelIds=['INBOX', 'UNREAD'], 
        maxResults=max_results
    ).execute()
    
    messages = results.get('messages', [])
    email_data = []
    
    print(f"Found {len(messages)} unread emails.")

    for msg in messages:
        # Get full details for each message ID
        msg_detail = service.users().messages().get(userId='me', id=msg['id']).execute()
        headers = msg_detail['payload']['headers']
        
        # Extract meaningful headers (defaults provided if missing)
        subject = next((h['value'] for h in headers if h['name'] == 'Subject'), "No Subject")
        sender = next((h['value'] for h in headers if h['name'] == 'From'), "Unknown")
        snippet = msg_detail.get('snippet', '')
        
        email_data.append({
            "id": msg['id'],
            "sender": sender,
            "subject": subject,
            "body": snippet 
        })
        
    return email_data

def create_draft(service, user_id, message_body, recipient, subject):
    """
    Creates a draft email in the user's Gmail account.
    """
    # Create the email content in base64 format (required by Gmail API)
    message_content = (
        f"To: {recipient}\r\n"
        f"Subject: Re: {subject}\r\n\r\n"
        f"{message_body}"
    )
    
    encoded_message = base64.urlsafe_b64encode(message_content.encode("utf-8")).decode("utf-8")
    
    create_message = {'message': {'raw': encoded_message}}
    
    # Call the Gmail API to create the draft
    draft = service.users().drafts().create(userId=user_id, body=create_message).execute()
    
    print(f"Draft created successfully! Draft ID: {draft['id']}")
    return draft

print("Gmail Tools (Fetch & Draft) defined successfully!")

Gmail Tools (Fetch & Draft) defined successfully!


In [23]:
import os
import time
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel, Field
from typing import TypedDict, Optional

# --- 1. Define the Structure for the AI's Decision ---
class TriageDecision(BaseModel):
    """The strict structure for the email classification."""
    action: str = Field(..., description="The action to take: 'reply', 'archive', or 'ignore'.")
    reasoning: str = Field(..., description="Why this action was chosen.")

# --- 2. Define the Graph State ---
class EmailState(TypedDict):
    """
    This dictionary holds all the data that passes through our workflow.
    """
    # Inputs
    email_id: str
    sender: str
    subject: str
    body: str
    gmail_service: object 
    
    # Outputs
    triage_decision: Optional[TriageDecision] 
    draft_reply: Optional[str]

# --- 3. Initialize the Gemini Models ---

# Set the API Key
os.environ["GOOGLE_API_KEY"] = "AIzaSyBE8aP9V3E_wLV9xo8uG6FI5axfM85ZoCc"

if not os.environ.get("GOOGLE_API_KEY"):
    print("⚠️ WARNING: GOOGLE_API_KEY not found. Please set it!")

# CHANGE: Using "gemini-flash-latest" 
# This is the safest alias for Free Tier users to avoid quota limits.
llm_triage = ChatGoogleGenerativeAI(
    model="gemini-flash-latest", 
    temperature=0
)

llm_writer = ChatGoogleGenerativeAI(
    model="gemini-flash-latest", 
    temperature=0.7
)

print("AI Models and State defined successfully! (Using Flash Latest)")

AI Models and State defined successfully! (Using Flash Latest)


In [24]:
# --- Node 1: The Analyzer (Triage) ---
def node_triage_email(state: EmailState):
    """
    Analyzes the email content and decides the next step.
    """
    print(f"\n--- Triaging Email from {state['sender']} ---")
    
    # 1. Construct the prompt for the AI
    prompt = f"""
    You are an expert email assistant. Your job is to categorize this email.
    
    Sender: {state['sender']}
    Subject: {state['subject']}
    Body: {state['body']}
    
    Decide between these three options:
    1. 'reply': If the email requires a response (questions, meeting requests, direct communication).
    2. 'archive': If it is a newsletter, receipt, or notification that doesn't need a reply.
    3. 'ignore': If it is spam or irrelevant.
    
    Provide a brief reasoning for your choice.
    """
    
    # 2. Force the AI to use our strict 'TriageDecision' format
    structured_llm = llm_triage.with_structured_output(TriageDecision)
    decision = structured_llm.invoke(prompt)
    
    print(f"Decision: {decision.action.upper()} | Reason: {decision.reasoning}")
    
    # 3. Update the state with the decision
    return {"triage_decision": decision}


# --- Node 2: The Writer (Drafting) ---
def node_draft_reply(state: EmailState):
    """
    Drafts a reply and saves it to Gmail.
    """
    print("--- Drafting Reply ---")
    
    # 1. Construct the prompt for the writer
    prompt = f"""
    You are a professional email assistant. Write a polite and concise reply to this email:
    
    Sender: {state['sender']}
    Subject: {state['subject']}
    Original Email: {state['body']}
    
    Instructions:
    - Address the sender by name if possible.
    - Be professional but friendly.
    - Sign off with 'Best regards, [Your AI Assistant]'.
    - Do NOT include the subject line in your output, just the body.
    """
    
    # 2. Generate the email text
    response = llm_writer.invoke(prompt)
    email_content = response.content
    
    # 3. ACTUALLY create the draft in Gmail (Using the tool we made in Step 2)
    # We use a try/except block to keep the code safe if Gmail fails
    try:
        create_draft(
            service=state['gmail_service'],
            user_id='me',
            message_body=email_content,
            recipient=state['sender'],
            subject=state['subject']
        )
        print("Draft saved to Gmail!")
    except Exception as e:
        print(f"Failed to save draft: {e}")
    
    # 4. Update state
    return {"draft_reply": email_content}


# --- Node 3: The Archiver ---
def node_archive_email(state: EmailState):
    """
    Simulates archiving an email.
    """
    print(f"--- Archiving Email ID: {state['email_id']} ---")
    print("(Action simulated: In a real app, this would apply a label or move to Trash)")
    return {}

print("Workflow Nodes defined successfully!")

Workflow Nodes defined successfully!


In [25]:
# 1. Initialize the Graph
workflow = StateGraph(EmailState)

# 2. Add the Nodes we just created
workflow.add_node("triage", node_triage_email)
workflow.add_node("draft", node_draft_reply)
workflow.add_node("archive", node_archive_email)

# 3. Define the Entry Point
# The graph always starts here
workflow.set_entry_point("triage")

# 4. Define the Logic (The "Traffic Cop")
def route_email(state: EmailState):
    """
    Check the Triage Decision and return the name of the next node.
    """
    decision = state["triage_decision"].action.lower()
    
    if decision == "reply":
        return "draft"
    elif decision == "archive":
        return "archive"
    else:
        # If ignore, we stop immediately
        return END

# 5. Connect the Logic to the Graph
workflow.add_conditional_edges(
    "triage",        # Start at Triage
    route_email,     # Run this logic function
    {                # Map the logic output to the actual Node names
        "draft": "draft",
        "archive": "archive",
        END: END
    }
)

# 6. Define the End of the Line
# After drafting or archiving, the work is done.
workflow.add_edge("draft", END)
workflow.add_edge("archive", END)

# 7. Compile the Graph into a runnable App
app = workflow.compile()

print("Graph Compiled! The AI Email Assistant is ready to run.")

Graph Compiled! The AI Email Assistant is ready to run.


In [27]:

# 1. Fetch real emails from your inbox
print("Checking Inbox...")
emails = fetch_latest_emails(gmail_service, max_results=3)

if not emails:
    print("No unread emails found! Send yourself a test email and run this cell again.")

# 2. Process each email through the AI Graph
for email in emails:
    # Set up the initial state with the raw email data
    initial_state = {
        "email_id": email['id'],
        "sender": email['sender'],
        "subject": email['subject'],
        "body": email['body'],
        "gmail_service": gmail_service,
        "triage_decision": None,
        "draft_reply": None
    }
    
    # Run the graph!
    # recursion_limit protects us from infinite loops
    final_state = app.invoke(initial_state, {"recursion_limit": 10})
    
    print("-" * 50) # Just a visual separator

Checking Inbox...
Found 3 unread emails.

--- Triaging Email from Tamil_ Baskar <bstamil226@gmail.com> ---
Decision: IGNORE | Reason: The email has a generic subject ('hai') and an empty body, suggesting it is spam or highly irrelevant communication that does not require a response or archiving.
--------------------------------------------------

--- Triaging Email from GitHub <noreply@github.com> ---
Decision: ARCHIVE | Reason: This is an informational notification from GitHub about the features included in the free Copilot plan and does not require a reply.
--- Archiving Email ID: 19bf900cf91f6634 ---
(Action simulated: In a real app, this would apply a label or move to Trash)
--------------------------------------------------

--- Triaging Email from LinkedIn <linkedin@em.linkedin.com> ---
Decision: ARCHIVE | Reason: This is a promotional email from LinkedIn offering a free trial of Sales Navigator. It is a notification that does not require a reply.
--- Archiving Email ID: 19bf8934