# Gmail API

In [2]:
%pip install -q --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


### Service setup

In [1]:
import os
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
import base64

In [5]:

# If modifying these SCOPES, delete the file token.json
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']

def gmail_authenticate():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    return build('gmail', 'v1', credentials=creds)

service = gmail_authenticate()


### Get unread messages

In [13]:
def get_unread_messages(service, max_results=10, count_only=False):
    """
    Get unread messages from Gmail.

    Args:
      service: Authorized Gmail API service instance.
      max_results: Maximum number of messages to retrieve (ignored if count_only is True).
      count_only: If True, return only the count of unread messages.

    Returns:
      If count_only is True: integer count of unread messages.
      Else: list of dicts of unread messages with keys: 'id', 'subject', 'from', 'date, 'snippet'.
    """
    results = service.users().messages().list(
        userId='me',
        labelIds=['UNREAD'],
        maxResults=max_results if not count_only else None
    ).execute()

    messages = results.get('messages', [])
    if count_only:
        # Return total unread message count
        # If there are more than maxResults, 'resultSizeEstimate' gives accurate count
        count = results.get('resultSizeEstimate', len(messages))
        return count

    unread_details = []
    for msg in messages:
        msg_detail = service.users().messages().get(userId='me', id=msg['id'], format='metadata', metadataHeaders=['Subject', 'From', 'Date']).execute()
        headers = msg_detail.get('payload', {}).get('headers', [])
        subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '')
        sender = next((h['value'] for h in headers if h['name'] == 'From'), '')
        date = next((h['value'] for h in headers if h['name']== 'Date'), '')
        snippet = msg_detail.get('snippet', '')
        unread_details.append({
            'id': msg['id'],
            'subject': subject,
            'from': sender,
            'date': date,
            'snippet': snippet
        })
    return unread_details


In [None]:
get_unread_messages(service, max_results=3)

In [14]:
get_unread_messages(service, count_only=True)

201

### Create draft

In [None]:
import base64
from email.message import EmailMessage

def create_draft(service, to, subject, message_text):
    message = EmailMessage()
    message.set_content(message_text)
    message['To'] = to
    message['From'] = 'me'
    message['Subject'] = subject

    raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
    draft_body = {'message': {'raw': raw_message}}
    draft = service.users().drafts().create(userId='me', body=draft_body).execute()
    return draft


In [None]:
create_draft(service, 'a@foodotcom', 'test draft', 'hello, world!')

### Send Message

In [None]:
import base64
from email.mime.text import MIMEText
from googleapiclient.errors import HttpError

def send_message(service, to, subject, message_text):
    """Send an email message using Gmail API.

    Args:
      service: Authorized Gmail API service instance.
      to: Recipient email address as string.
      subject: Subject of the email.
      message_text: Body text of the email.

    Returns:
      Sent message object on success, None on failure.
    """
    try:
        message = MIMEText(message_text)
        message['to'] = to
        message['from'] = 'me'
        message['subject'] = subject
        
        # Encode message in base64url format required by Gmail API
        raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
        body = {'raw': raw_message}
        
        # Call the Gmail API to send the message
        sent_message = service.users().messages().send(userId='me', body=body).execute()
        print(f"Message sent successfully, ID: {sent_message['id']}")
        return sent_message

    except HttpError as error:
        print(f"An error occurred: {error}")
        return None


In [None]:
send_message(service, 'a@foodot.com', 'test', 'hello, world!')

Message sent successfully, ID: 19897038947a15cd


{'id': '19897038947a15cd',
 'threadId': '19897038947a15cd',
 'labelIds': ['SENT']}

### Read message - with message id

In [17]:
import base64
def read_message_by_id(service, message_id):
    message = service.users().messages().get(userId='me', id=message_id, format='full').execute()
    payload = message.get('payload', {})
    headers = payload.get('headers', [])
    subject = next((header['value'] for header in headers if header['name'] == 'Subject'), '')
    date = next((header['value'] for header in headers if header['name'] == 'Date'), '')
    from_ = next((header['value'] for header in headers if header['name'] == 'From'), '')
    to = next((header['value'] for header in headers if header['name'] == 'To'), '')
    snippet = message.get('snippet')
    # To get the body, decode 'parts' if multipart, else decode 'body'
    body = ''
    if 'parts' in payload:
        for part in payload['parts']:
            if part['mimeType'] == 'text/plain':
                body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
    elif 'body' in payload and 'data' in payload['body']:
        body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8')
    return {'subject': subject, 'snippet': snippet, 'body': body, 'date': date, 'from': from_, 'to': to}


In [None]:
read_message_by_id(service, '1989704dda818adb')