# Example of how to connect from Python to a Gmail Account

Connect to a test gmail account in order to read email data.

Different guide found [here](https://www.directedignorance.com/blog/gmail-with-python) (with Perplexity [example](https://www.perplexity.ai/search/on-databricks-how-can-i-create-ZLspdEp0Slm5Xoh4yZsOTA#0))

## Gmail authentication (won't work on serverless)

In [0]:
!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

In [0]:
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import json
import os
import email.utils
from datetime import datetime, timezone, timedelta
import base64
from googleapiclient.discovery import build

In [0]:
catalog_ = 'users'
schema_ = 'gabriele_albini'
volume_ = 'ec_volume'
spark.sql('USE CATALOG ' + catalog_)
spark.sql('USE SCHEMA ' + schema_)
spark.sql('CREATE VOLUME IF NOT EXISTS ' + volume_)

CREDENTIALS_PATH = "/Volumes/"+catalog_+'/'+schema_+'/'+volume_+"/credentials_ga_webapp.json"
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
TMP_TOKEN_FILE = "/Volumes/"+catalog_+'/'+schema_+'/'+volume_+"/ga_tmp_token.json"

In [0]:
# Read from credentials file and return as JSON
def load_credentials(path_ = CREDENTIALS_PATH):
    with open(path_, 'r') as f:
        return json.load(f)

# Authenticate to Google Manually
def gmail_authenticate_manual(CREDENTIALS_PATH, TMP_TOKEN_FILE):
  existing_creds_ = False

  # Try to retrieve credentials from token file
  try:
    creds = Credentials.from_authorized_user_file(TMP_TOKEN_FILE, SCOPES)
    if creds and not creds.expired:
      existing_creds_ = True
      print(":: Reading credentials from token file")
  except:
    pass

  if not existing_creds_:
    #Load credentials from Google Json file
    print(":: Re-generating credentials")
    credentials_ = load_credentials()

    #Manual OAuth flow
    flow = Flow.from_client_config(credentials_, scopes=SCOPES)
    flow.redirect_uri = 'http://localhost:8080/'  # Must match your OAuth client redirect URI
    auth_url, _ = flow.authorization_url(prompt='consent')

    print("Go to this URL in your browser; after signing in, the URL will fail, however extract the &code= from the generated URL:\n")
    print(auth_url)

    code = input("---> URL code: ")
    flow.fetch_token(code=code)

    #Generate credentials
    creds = flow.credentials

    # Save credentials to token file
    os.makedirs(os.path.dirname(TMP_TOKEN_FILE), exist_ok=True)
    with open(TMP_TOKEN_FILE, 'w') as token:
      token.write(creds.to_json())
  
  return creds

In [0]:
access_ = gmail_authenticate_manual(CREDENTIALS_PATH, TMP_TOKEN_FILE)

## Fetch data from Gmail

In [0]:
def get_email_messages_since(service, since_datetime, max_results=100):
    # Convert datetime to Unix timestamp in seconds
    since_ts = int(since_datetime.timestamp())
    query = f"after:{since_ts}"

    emails = []
    response = service.users().messages().list(userId='me', q=query, maxResults=max_results).execute()

    messages = response.get('messages', [])

In [0]:
# Read Emails
def get_email_messages_since(service, since_day, since_month, since_year = 2025, max_results=100):
    # Convert datetime to Unix timestamp in seconds
    since_datetime = datetime(since_year, since_month, since_day, 0, 0, 0, tzinfo=timezone(timedelta(hours=2)))
    since_ts = int(since_datetime.timestamp())
    query = f"after:{since_ts}"
    response = service.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
    messages = response.get('messages', [])
    emails = []

    def extract_email_address(from_header):
    # Parse the email address from the From header 'John Doe <john.doe@example.com>' -> 'john.doe@example.com'
      parsed = email.utils.parseaddr(from_header)
      return parsed[1]  # parsed is (name, email)

    def get_body(parts):
        for part in parts:
            if part['mimeType'] == 'text/plain' and 'data' in part['body']:
                data = part['body']['data']
                return base64.urlsafe_b64decode(data).decode('utf-8')
            elif part['mimeType'].startswith('multipart'):
                return get_body(part.get('parts', []))
        return None

    for msg in messages:
        msg_data = service.users().messages().get(userId='me', id=msg['id'], format='full').execute()
        payload = msg_data['payload']
        headers = payload.get('headers', [])

        email_data = {'received_timestamp': None, 'from': None, 'subject': None, 'body': None}

        # Extract headers
        for header in headers:
            if header['name'] == 'From':
                email_data['from'] = extract_email_address(header['value'])
            elif header['name'] == 'Subject':
                email_data['subject'] = header['value']
            elif header['name'] == 'Date':
                # Parse date string to datetime object
                try:
                    parsed_date = email.utils.parsedate_to_datetime(header['value'])
                    email_data['received_timestamp'] = parsed_date.isoformat()
                except Exception:
                    email_data['received_timestamp'] = None

        # Extract body (handle multipart or plain)
        if 'parts' in payload:
            body = get_body(payload['parts'])
        else:
            body_data = payload.get('body', {}).get('data')
            body = base64.urlsafe_b64decode(body_data).decode('utf-8') if body_data else None

        email_data['body'] = body
        emails.append(email_data)

    return emails

In [0]:
# From above access, Build Gmail API service
service_ = build('gmail', 'v1', credentials=access_)
emails = get_email_messages_since(service_, since_day=25, since_month=3, since_year = 2025)

if emails:
  spark_emails = spark.createDataFrame(emails)
  display(spark_emails)
else:
  spark_emails = None
  print("No emails found.")