Mass Email Application for Event Communication
===========================================

This script is designed to send personalized mass emails to event participants while 
maintaining a personal touch. It's particularly useful for event organizers who need 
to communicate with large groups of participants while avoiding spam filters and 
maintaining professional communication standards.

Key Features:
------------
- Reads participant data from a CSV file (First Name, Last Name, Email)
- Converts DOCX into HTML email templates that support personalized greetings
- Handles multiple participants from the same family/email address
- Includes attachment support for documents and images
- Uses Gmail API for reliable email delivery
- Implements smart exponential backoff with jittering for rate limiting
- Validates email addresses and logs failed deliveries
- Automatic retry logic for failed sends
- Handles Google OAuth2 authentication with token persistence

Dependencies:
------------
- pandas: Data processing
- google-auth-oauthlib: Google API authentication
- google-auth: Google authentication
- google-api-python-client: Gmail API interface
- pypandoc: Document conversion (DOCX to HTML)

Required Files:
-------------
- credentials.json: Google API credentials
    * If needed obtain from Google Cloud Console (console.cloud.google.com)
- CSV file with participant data:
    * Should NOT contain a header row
    * Must contain exactly 3 columns in this order:
        1. First Name
        2. Last Name
        3. Email Address
    * Example: "John,Smith,john.smith@email.com"
- DOCX template for email content:
    * Must contain the placeholder "{person or persons}" where names should be inserted
    * This placeholder will be replaced with either a single name or a comma-separated
      list of names for family groups
    * Example: "Hi {person or persons}, Thank you for participating..."
- Any attachments to be included

Usage:
-----
1. Configure the variables at the top of the script:
   - base_subject: Email subject line template
   - csv_file_path: Path to participant data
   - docx_template_path: Path to email template
   - attachments: List of files to attach

2. The script will:
   - Process the participant data
   - Convert the DOCX template to HTML
   - Authenticate with Gmail
   - Send personalized emails to all participants:
     * Personalizes greeting with individual or family names (e.g., "Hi John" or "Hi Mary, Bob, and Jane")
     * Option to customizes email subject line with family name(s) (e.g., "Ada's Forever 5K Thank You - Smith" or "Ada's Forever 5K Thank You - Smith and Smith-Johnson")
     * Groups multiple participants sharing the same email address into a single email
   - Handle rate limiting automatically

Note: The script includes safety features like:
- Email validation
- Rate limiting
- Error logging
- Test email capability

In [None]:
import pandas as pd
from collections import defaultdict
import os
import pickle
import pypandoc
import base64
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
import time
import random
import re

Note that formatting can be a little tricky when converting docx to html. I think that you either want to use actual bullets in Google Docs or use spaces and '-', don't let it autocorrect it. Also, use Shift+Enter instead of Enter in Word to create a "soft line break" rather than a new paragraph. This tells Word (and subsequently pypandoc) that you want a line break but not a new paragraph.

In [None]:
def mask_email(email):
    """Mask email address for privacy in cell outputs."""
    if '@' not in email:
        return "***invalid***"
    return f"{email[:3]}***@{email.split('@')[1]}"

In [None]:
# Files to Load and Variables to check

# Remember that Python doesn't need (or like) escapes before spaces

# csv file with all email address. three columns: First name, Last Name,
#    Email Address with NO header
#    Can get in terminal using: realpath foo.bar
csv_file_path = '/Users/joelswenson/Documents/Adas_spark/email_app/listserv_cleaned_4-22-25.csv'

# Path to template email in docx format
docx_template_path = '/Users/joelswenson/Documents/Adas_spark/email_app/Deadline_Extended.docx'

# Use custom subject if provided, otherwise use filename
custom_subject = "Deadline Extended - Sign-up Today!"  # Set this to override the default otherwise set 'custom_subject = None'
base_subject = (custom_subject 
                if custom_subject 
                else os.path.splitext(os.path.basename(docx_template_path))[0])
if base_subject.startswith('Adas'):  # Optional automatic fix
    base_subject = base_subject.replace('Adas', "Ada's", 1)

# Use last names in subject line? E.g. Should it say "base_subject - last_name" 
#     like "Ada's 2024 5K - Swenson"
last_name_in_subject_line = 0 # Binary: 0 means don't have last_name in subject line.

# Specify the paths to the attachments
attachments = []

# Comment the below out if there are no attachments. You can attach multiple attachments by
# adding multiple filepaths below, separated by commas
#attachments = [
#    '/Users/joelswenson/Documents/Adas_spark/email_app/2025_Ada_5K_flyer.png'
#]

In [None]:
print(f"Current subject format: '{base_subject}{' - [Last Name]' if last_name_in_subject_line else ''}'")

In [None]:
print("Remember that the signature from Ada's Spark is not automtically added to emails if you send emails through the API so put it in the Google Doc if you want it included....there is a way to add it programmatically but it isn't worth the lift and will be hard to make look pretty.")

In [None]:
attachments

In [None]:
def is_valid_email(email):
    """Basic email format validation."""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

In [None]:
# Load the CSV file of email addresses
data = pd.read_csv(csv_file_path, header=None, names=["First Name", "Last Name", "Email"])

In [None]:
# Group by email address
families = defaultdict(list) # Don't use a set to allow two people to have the
#                                 same first name in a family
last_names = defaultdict(set)  # Use a set to avoid duplicate last names in one family
invalid_emails = []

for _, row in data.iterrows():
    email_clean = row["Email"].strip().lower()
    if not is_valid_email(email_clean):
        invalid_emails.append({
            'email': row["Email"],
            'name': f"{row['First Name']} {row['Last Name']}"
        })
        continue
    families[email_clean].append(row["First Name"])
    last_names[email_clean].add(row["Last Name"])  # Collect all last names

# Deals with the somewhat common case of a family having the same email address but different last names
# Convert sets to formatted strings like "Smith and Johnson"
last_names = {email: " and ".join(sorted(names)) for email, names in last_names.items()}

In [None]:
# After processing all emails, report any invalid ones
if invalid_emails:
    print("\nWarning: The following emails were invalid and will be skipped:")
    for entry in invalid_emails:
        print(f"- {entry['name']}: {mask_email(entry['email'])}")
    
    # Optionally save to file
    with open('invalid_emails.txt', 'w') as f:
        f.write("Invalid emails found:\n")
        for entry in invalid_emails:
            f.write(f"{entry['name']}: {entry['email']}\n")

In [None]:
# Automatically generate the template path from dox path by replacing .docx with .html
template_path = docx_template_path.replace('.docx', '.html')

print(f"Docx Template Path: {docx_template_path}")
print(f"HTML Template Path: {template_path}")

# Convert docx template email to HTML
pypandoc.convert_file(docx_template_path, 'html', outputfile=template_path)

# Read the HTML email template
with open(template_path, 'r') as file:
    email_template = file.read()

# Check for required placeholder
if "{person or persons}" not in email_template:
    raise ValueError("Error: Template is missing required placeholder "
                     "'{person or persons}'. Please check the template file.")


In [None]:
# Generate personalized emails
personalized_emails = {}
for email_clean, names in families.items():
    if len(names) == 1:
        salutation = f"{names[0]}"
    else:
        salutation = f"{', '.join(names[:-1])} and {names[-1]}"
    
    personalized_email = email_template.replace("{person or persons}", salutation)
    personalized_emails[email_clean] = personalized_email

In [None]:
'''
# Display an example of the personalized email
example_email = list(personalized_emails.items())[-1]
masked_example = (mask_email(example_email[0]), example_email[1])
masked_example
'''

In [None]:
# Authenticate and Authorize with gmail API

In [None]:
# If modifying these SCOPES, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.send']

In [None]:
# Function to handle token refresh error
def handle_refresh_error(token_path):
    if os.path.exists(token_path):
        os.remove(token_path)
        print(f"Deleted the expired or invalid token file: {token_path}")
    else:
        print(f"Token file {token_path} does not exist.")

In [None]:
def authenticate_gmail_api(max_retries=3, current_retry=0):
    """Authenticates and returns a Gmail API service instance."""
    creds = None
    token_path = 'token.pickle'
    try:
        # The file token.pickle stores the user's access and refresh tokens, and is
        # created automatically when the authorization flow completes for the first
        # time.
        if os.path.exists(token_path):
            with open(token_path, 'rb') as token:
                creds = pickle.load(token)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    'credentials.json', SCOPES)
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open(token_path, 'wb') as token:
                pickle.dump(creds, token)
    except Exception as e:
        print(f"An error occurred during authentication: {e}")
        if current_retry < max_retries:
            handle_refresh_error(token_path)
            return authenticate_gmail_api(max_retries, current_retry + 1)
        else:
            raise Exception(f"Failed to authenticate after {max_retries} attempts")

    service = build('gmail', 'v1', credentials=creds)
    return service

In [None]:
# Authenticate and authorize
service = authenticate_gmail_api()

In [None]:
def include_attachments(message, attachments):
    """Function to attach files to the email message."""
    for attachment in attachments:
        content_type, encoding = mimetypes.guess_type(attachment)
        if content_type is None or encoding is not None:
            content_type = 'application/octet-stream'
        main_type, sub_type = content_type.split('/', 1)
        
        with open(attachment, 'rb') as file:
            msg = MIMEBase(main_type, sub_type)
            msg.set_payload(file.read())
            encoders.encode_base64(msg)
            msg.add_header(
                'Content-Disposition',
                f'attachment; filename={os.path.basename(attachment)}'
            )
            message.attach(msg)
    return message

In [None]:
def send_email(service, to, subject, body, attachments=None):
    try:
        # Create the email message
        message = MIMEMultipart()
        message['to'] = to
        message['subject'] = subject

        # Attach the email body
        msg = MIMEText(body, 'html')
        message.attach(msg)

        # Attach any files
        if attachments:
            message = include_attachments(message, attachments)

        raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()

        # Send the email
        message = (service.users().messages().send(userId="me", body={'raw': raw_message})
                   .execute())
        print('Message Id: %s' % message['id'])
        return message
    except Exception as e:
        print(f'An error occurred while sending email to {mask_email(to)}: {e}')
        # Log failed email to file
        with open('failed_emails.txt', 'a') as f:
            f.write(f"{to}: {str(e)}\n")
        return None

In [None]:
###### Email sending is below, pay attention!

In [None]:
###### Email sending is below, pay attention!

In [None]:
# Constants for expotential backoff strategy to avoid hitting rate-limits
BASE_DELAY = 5  # Initial delay in seconds
MAX_DELAY = 300  # Maximum backoff time
MAX_RETRIES = 10  # Maximum retries before giving up

In [None]:
def get_backoff_time(attempt, base_delay=BASE_DELAY, max_delay=MAX_DELAY):
    """Calculate exponential backoff time with jitter."""
    delay = min(base_delay * (2 ** attempt), max_delay)
    jitter = random.uniform(-0.1 * delay, 0.1 * delay)
    return delay + jitter

def log_failed_email(email, error):
    """Log failed email attempts to a file."""
    with open('failed_emails.log', 'a') as f:
        timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
        f.write(f"{timestamp} - {email}: {str(error)}\n")

In [None]:
def send_emails_with_backoff(service, personalized_emails, last_names, base_subject, attachments=None):
    """Send emails with exponential backoff for rate limits and transient failures."""
    for email_clean, personalized_email in personalized_emails.items():
        attempt = 0
        while attempt < MAX_RETRIES:
            try:
                print(f"Sending email to: {mask_email(email_clean)}")
                last_name = last_names[email_clean]
                if last_name_in_subject_line:
                    subject = f"{base_subject} - {last_name}"
                else:
                    subject = base_subject
                
                result = send_email(service, email_clean, subject, personalized_email, attachments)
                
                if result:  # Success
                    print(f"Email sent successfully to {mask_email(email_clean)}")
                    time.sleep(random.uniform(1, 3))  # Small random delay between sends
                    break  # Exit loop on success
                else:
                    raise Exception("Unknown email sending failure")
            except HttpError as e:
                if e.resp.status in [403, 429, 500, 503]:  # Handle rate limits and server errors
                    attempt += 1
                    delay = get_backoff_time(attempt)
                    print(f"Rate limit or server error ({e.resp.status}) for {mask_email(email_clean)},"
                        f" backing off for {delay:.2f} seconds.")
                    time.sleep(delay)
                else:
                    print(f"Non-retryable error for {mask_email(email_clean)}: {e}")
                    log_failed_email(email_clean, f"Non-retryable error: {e}")
                    break  # Do not retry non-retryable errors
            except Exception as e:
                print(f"Unexpected error sending to {mask_email(email_clean)}: {str(e)}")
                attempt += 1
                delay = get_backoff_time(attempt)
                print(f"Backing off for {delay:.2f} seconds.")
                time.sleep(delay)
            if attempt >= MAX_RETRIES:
                print(f"Max retries reached for {mask_email(email_clean)}. Skipping.")
                log_failed_email(email_clean, "Max retries exceeded")

In [None]:
# Find emails to send tests to
# [mask_email(email) for email in personalized_emails.keys()]
email_list = list(personalized_emails.keys())
target_email = "example_1@gmail.com"
#target_email = "example_2@gmail.com"
for i, email in enumerate(email_list):
    if email == target_email:
        print(f"Found email {mask_email(email)} at index {i}")
        print(mask_email(email_list[i]))
        break

In [None]:
# Assign the email to test_email
test_email = email_list[i]

In [None]:
# Send personalized HTML emails (only to a specific email address for testing)
if len(email_list) > 1:
    # Create a single-email dictionary for testing
    test_emails = {test_email: personalized_emails[test_email]}
    # Use the new backoff function with just one email
    send_emails_with_backoff(service, test_emails, last_names, base_subject, attachments)


In [None]:
# Note that on the command line you can run something like

# caffeinate -t 1800 &

# which will will launch macOS’s caffeinate utility in the
# background (&) and tell it to hold off idle sleep for
# 1,800 seconds (30 minutes). In other words, it prevents
# your Mac from going to sleep (or dimming the display due
# to inactivity) for the next half hour, then exits
# automatically when the timer expires.

In [None]:

# Uncomment this cell to send all the emails.
# It is just commented out so that I don't accidently email a bunch of people!

# Send personalized HTML emails

send_emails_with_backoff(service, personalized_emails, last_names, base_subject, attachments)
