In [1]:
import os
import base64
import json
import csv

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from bs4 import BeautifulSoup

In [3]:
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def get_gmail_service():
    creds = None
    # token.json stores your access/refresh tokens
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # If no valid creds, run the OAuth flow
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save for next time
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    return build('gmail', 'v1', credentials=creds)

service = get_gmail_service()

In [5]:
def list_message_ids(max_results=200):
    resp = service.users().messages().list(userId='me', maxResults=max_results).execute()
    return [m['id'] for m in resp.get('messages', [])]

def fetch_raw_html(msg_id):
    msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
    payload = msg.get('payload', {})
    # Prefer the text/html part
    for part in payload.get('parts', []):
        if part.get('mimeType') == 'text/html':
            data = part.get('body', {}).get('data')
            if data:
                return base64.urlsafe_b64decode(data).decode('utf-8')
    # Fallback to top-level body
    data = payload.get('body', {}).get('data')
    return base64.urlsafe_b64decode(data).decode('utf-8') if data else ''

def html_to_text(html):
    soup = BeautifulSoup(html, "html.parser")
    body = soup.body or soup
    text = body.get_text(separator="\n")
    lines = [ln.strip() for ln in text.splitlines()]
    return "\n".join([ln for ln in lines if ln])

def fetch_and_clean_text(msg_id):
    html = fetch_raw_html(msg_id)
    return html_to_text(html)

In [7]:
message_ids = list_message_ids(200)  # adjust as needed
with open('raw_emails.csv', 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['id', 'text'])
    for msg_id in message_ids:
        txt = fetch_and_clean_text(msg_id).replace('\n', ' ').strip()
        writer.writerow([msg_id, txt])
print("Exported raw_emails.csv")

✅ Exported raw_emails.csv — add your `label` column next


In [9]:
prompt = f"""Categorize the email below into exactly one of the following categories:

Internship — only if it contains an update about the status of my internship applications (e.g., interview invitations, rejections, offers). Application confirmation or acknowledgment emails such as "Thank you for applying", "Thank you for your interest", "Thank you for your application", or "Your application is under review" is considered Irrelevant.

Canvas — only if it announces a grade release for an assignment, quiz, exam, or important information regarding the courses.

Personal — only if the message is personally written to me and is NOT spam, automated, or promotional.

Irrelevant — anything else.

Respond with ONLY the category name."""

with open('raw_emails.csv', newline='', encoding='utf-8') as f_in, \
     open('train.jsonl', 'w', encoding='utf-8') as f_out:
    reader = csv.DictReader(f_in)
    for row in reader:
        example = {
            "messages": [
                {"role": "system",    "content": prompt},
                {"role": "user",      "content": row["text"]},
                {"role": "assistant", "content": row["label"]}
            ]
        }
        f_out.write(json.dumps(example) + "\n")
print("Generated train.jsonl")

Generated train.jsonl
