In [1]:
import os
import json
from typing import List, Dict
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
import base64

from langchain_community.llms import Ollama

In [2]:
# =========================
# Config / constants
# =========================
CREDENTIALS_FILE = "credentials.json"
TOKEN_FILE = "token.json"
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']

In [3]:
# Job Email Categories
JOB_CATEGORIES = {
    "application_confirmed": {"label": "Applied ‚úì"},
    "interview_request": {"label": "Interview üìÖ"},
    "interview_reminder": {"label": "Interview Reminder ‚è∞"},
    "offer": {"label": "Job Offer üéâ"},
    "rejected": {"label": "Rejected ‚ùå"},
    "assessment": {"label": "Assessment üìù"},
    "follow_up": {"label": "Follow-up üí¨"},
    "job_alert": {"label": "Job Alert üîî"},
    "newsletter": {"label": "Newsletter üì∞"},
    "spam": {"label": "Spam üóëÔ∏è"},
    "uncategorized": {"label": "Other üìß"}
}

In [None]:
# =========================
# Gmail Handler
# =========================


class GmailHandler:
    """
    Gmail API integration with LLM categorization.
    This Class handles authentication, fetching unread emails based on numbers of emails specified,
    and applying labels to emails.
    """

    def __init__(self):
        self.service = None
        self.authenticate()

    def authenticate(self):
        creds = None
        if os.path.exists(TOKEN_FILE):
            creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)

        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
                creds = flow.run_local_server(port=0)
            with open(TOKEN_FILE, 'w') as token:
                token.write(creds.to_json())

        self.service = build('gmail', 'v1', credentials=creds)

    def get_unread_emails(self, max_results=50) -> List[Dict]:
        """Fetch unread emails"""
        try:
            results = self.service.users().messages().list(
                userId='me',
                labelIds=['INBOX'],
                q='is:unread',
                maxResults=max_results
            ).execute()

            messages = results.get('messages', [])
            emails = []
            for msg in messages:
                details = self.get_email_details(msg['id'])
                if details:
                    emails.append(details)
            return emails

        except Exception as e:
            print(f"‚ùå Error fetching emails: {e}")
            return []

    def get_email_details(self, msg_id: str) -> Dict:
        """Fetch full email content"""
        try:
            msg = self.service.users().messages().get(userId='me', id=msg_id, format='full').execute()
            headers = msg['payload']['headers']
            subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '(No Subject)')
            sender = next((h['value'] for h in headers if h['name'] == 'From'), '(Unknown)')

            # Extract body
            body = ""
            if 'parts' in msg['payload']:
                for part in msg['payload']['parts']:
                    if part['mimeType'] == 'text/plain' and 'data' in part['body']:
                        body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
                        break
            elif 'body' in msg['payload'] and 'data' in msg['payload']['body']:
                body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8')

            return {"id": msg_id, "from": sender, "subject": subject, "body": body[:2000]}

        except Exception as e:
            print(f"‚ùå Error reading email: {e}")
            return None

    def apply_label(self, msg_id: str, label_name: str) -> bool:
        """Apply label to an email, creating it if needed"""
        try:
            label_id = self._get_or_create_label(label_name)
            self.service.users().messages().modify(
                userId='me',
                id=msg_id,
                body={'addLabelIds': [label_id], 'removeLabelIds': ['UNREAD']}
            ).execute()
            return True
        except Exception as e:
            print(f"‚ùå Error applying label: {e}")
            return False

    def _get_or_create_label(self, label_name: str) -> str:
        try:
            labels = self.service.users().labels().list(userId='me').execute().get('labels', [])
            for label in labels:
                if label['name'] == label_name:
                    return label['id']
            created = self.service.users().labels().create(
                userId='me',
                body={'name': label_name, 'labelListVisibility': 'labelShow', 'messageListVisibility': 'show'}
            ).execute()
            return created['id']
        except Exception as e:
            print(f"‚ùå Error creating label: {e}")
            return None

    # =========================
    # LLM Categorization
    # =========================
    def categorize_email_llm(self, email: Dict) -> str:
        """Use LLM to determine category key"""
        prompt = f"""
You are a job email classifier. Categorize this email into one of these categories: {list(JOB_CATEGORIES.keys())}.
Email Subject: {email['subject']}
Email Body: {email['body'][:1000]}

Respond ONLY with the category key (no explanations).
"""
        try:
            response = self.llm.invoke(prompt).strip()
            if response in JOB_CATEGORIES:
                return response
        except Exception as e:
            print(f"‚ùå LLM categorization error: {e}")
        return "uncategorized"

    def process_last_emails(self, max_results=50):
        """Fetch last emails, categorize via LLM, and apply labels"""
        emails = self.get_unread_emails(max_results)
        print(f"üì¨ Processing {len(emails)} emails...\n")
        for email in emails:
            category_key = self.categorize_email_llm(email)
            label_name = JOB_CATEGORIES[category_key]['label']
            success = self.apply_label(email['id'], label_name)
            print(f"{'‚úÖ' if success else '‚ùå'} {email['subject'][:50]} -> {label_name}")


In [None]:
# =========================
# RUN
# =========================
if __name__ == "__main__":
    gmail = GmailHandler()
    gmail.process_last_emails(max_results=5)

  self.llm = Ollama(model="llama3.2:3b", base_url="http://localhost:11434", temperature=0)


üì¨ Processing 50 emails...

‚ùå LLM categorization error: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/generate (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001B4BA4EAF90>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
‚ùå Error creating label: <HttpError 403 when requesting https://gmail.googleapis.com/gmail/v1/users/me/labels?alt=json returned "Request had insufficient authentication scopes.". Details: "[{'message': 'Insufficient Permission', 'domain': 'global', 'reason': 'insufficientPermissions'}]">
‚ùå Error applying label: <HttpError 403 when requesting https://gmail.googleapis.com/gmail/v1/users/me/messages/19a74bb98d65a368/modify?alt=json returned "Request had insufficient authentication scopes.". Details: "[{'message': 'Insufficient Permission', 'domain': 'global', 'reason': 'insufficientPermissions'}]">
‚ùå OpenA

KeyboardInterrupt: 