## **Name: ETL (Extract, Transform, Load)**

**Overview:** Connects to gmail via api, extracts all email data, and formulates it into a structured pandas dataframe for downstream ML unsupervised learning
                
**Data Scientist:** Aaron Medina

**GitHub:**

**Creation Date:** 10/27/2022

**Instance:** Local

**References:** https://developers.google.com/gmail/api/quickstart/python

**Script Change Notes:**

x/x/xxxx: Aaron - Note

In [1]:
# Import required libraries
import re
import time
import pickle
import base64
import logging
import warnings
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime
from typing import List #Union, Any, List, Optional, cast

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow

In [3]:
# Initialize parameters
credentials_file = 'credentials.json'
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
BATCH_SIZE = 100 # Maximum number of requests per second
pdf_output_path = 'data/pdf.pkl'

In [4]:
# Initialize gmail API (Needs manual approval for now)
flow = InstalledAppFlow.from_client_secrets_file(credentials_file, SCOPES)
creds = flow.run_local_server(port=0)
service = build('gmail', 'v1', credentials=creds)

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=1023023998484-uojj8dlutmfnuh59r0d76bd08261nppa.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A49147%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fgmail.readonly&state=2ngUHuR5bxlO3tygTm8yhCui9Sp6Px&access_type=offline


In [5]:
# Function to capture total number of email ids
def list_messages_with_labels(service, user_id, label_ids=[]):
    response = (service
                .users()
                .messages()
                .list(userId=user_id,
                      labelIds=label_ids).execute())
    messages = list()
    if 'messages' in response:
        messages.extend(response['messages'])
        while 'nextPageToken' in response:
            page_token = response['nextPageToken']
            response = (service
                        .users()
                        .messages()
                        .list(userId=user_id,
                              labelIds=label_ids,
                              pageToken=page_token).execute())
            messages.extend(response['messages'])
    return messages

In [6]:
class Email(object):

    """ Class that grabs all of the required tags and metadata from message API"""

    def __init__(self, email: dict):
        self._logger = logging.getLogger('Email')
        self.id: str = email['id']
        self.label_ids: List[str] = email.get('labelIds', None)
        self.date: datetime = datetime.fromtimestamp(int(email['internalDate'])/1000)
        self.size: int = email['sizeEstimate']
        self.sender: str = None
        self.to: str = None
        self.subject: str = None
            
        if 'headers' in email['payload']:
            self._parse_headers(email)
        else:
            self._logger.warning(f'Headers not found for email with id: {self.id}')
            
        self.__dict__ = self._as_dict()
    
    def _parse_headers(self, email: dict):
        headers = email['payload']['headers']
        for header in headers:
            if header['name'] == 'From':
                self.sender = header['value']
            elif header['name'] == 'To':
                self.to = header['value']
            elif header['name'] == 'Subject':
                self.subject = header['value']
                
    def _as_dict(self):
        return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}

In [7]:
# Pull emails in batches containing all tag data
emails = list() # List of Dictionaries with the emails
email_ids = list_messages_with_labels(service, 'me')

def add_emails(request_id, response, exception):

    """Callback function that handles the result of each request"""

    if exception is not None:
        # Do something with the exception
        raise ValueError(exception)
    else:

        # Convert the email to a dictionary using our Email class
        emails.append(vars(Email(response)))

batch = service.new_batch_http_request()

for i, msg_id in enumerate(email_ids):

    batch.add(service
               .users()
               .messages()
               .get(userId = 'me', id = msg_id['id'])
               , callback=add_emails)

    if i % BATCH_SIZE == 0:
        
        batch.execute()
        batch = service.new_batch_http_request()
        print(f'{i} out of {len(email_ids)} done')
        time.sleep(2)

# Create a DataFrame from our list of emails
tags_pdf = pd.DataFrame(emails)

0 out of 2719 done
100 out of 2719 done
200 out of 2719 done
300 out of 2719 done
400 out of 2719 done
500 out of 2719 done
600 out of 2719 done
700 out of 2719 done
800 out of 2719 done
900 out of 2719 done
1000 out of 2719 done
1100 out of 2719 done
1200 out of 2719 done
1300 out of 2719 done
1400 out of 2719 done
1500 out of 2719 done
1600 out of 2719 done
1700 out of 2719 done
1800 out of 2719 done
1900 out of 2719 done
2000 out of 2719 done
2100 out of 2719 done
2200 out of 2719 done
2300 out of 2719 done
2400 out of 2719 done
2500 out of 2719 done
2600 out of 2719 done
2700 out of 2719 done


In [8]:
warnings.filterwarnings("ignore") 
""" Primary extraction loop process to pull all message bodies """

# Main variables
message_id_list = tags_pdf['id']
msg_pdf = pd.DataFrame()
start_time = datetime.now()

# Loop through each message id to find the message
for i, message_id in enumerate(message_id_list):

    # Query main payload
    msg = service.users().messages().get(userId="me", id = message_id).execute()
    payload = msg['payload']

    # Conditions for different types of email messages (mimeType)
    # There are many types of email formats, so need to encode/decode carefully
    if payload['mimeType'] == "text/html":

        # Convert byte code to html code
        byte_code = payload["body"]["data"]
        msg_html = base64.urlsafe_b64decode(byte_code).decode("utf-8")

        # Error handling if tables or documents don't exist 
        try:
            html_stage_pdf = pd.read_html(msg_html, index_col=0)
            msg_body = html_stage_pdf[0].iloc[0].name
        
        except IndexError:
            html_stage_pdf = pd.read_html(msg_html, header=0, index_col=0)[0]

            # Special handling for html df output
            try:
                msg_body_list = html_stage_pdf['Unnamed: 1'].dropna().drop_duplicates().tolist()
                msg_body = " ".join(msg_body_list)
            
            # In case html df content is buried in the index section
            except KeyError:
                msg_body = html_stage_pdf.index.name

        # Extract plain text from within specified html context
        except ValueError:
            html_regex = 'serif">(.*?)</span>'
            msg_body = str(re.findall(html_regex, msg_html))

    # This section handles emails that are broken up into multiple sections
    # We need to walk through these sections to extract all the text data
    elif payload['mimeType'] in ["multipart/alternative", "multipart/mixed"]:

        msg_body = ""
        msg_body_part = ""
        
        # Step through each part of the payload, containing multiple pieces of text
        for part in payload["parts"]:

            # Some parts contain blank fields, so skip
            try:
                byte_code = part["body"]["data"]
                msg_body_part = base64.urlsafe_b64decode(byte_code).decode("utf-8")

                # Condition that only BeautifulSoup can decode
                if msg_body_part.find("DOCTYPE") != -1:

                    html_parsed = BeautifulSoup(msg_body_part, 'html.parser')
                    msg_body_part = ""

                    for para in html_parsed.find_all("p"):
                        msg_body_part = msg_body_part + " " + para.get_text()

            # Skip sections that don't contain 'data' tag
            except KeyError:
                pass
            
            msg_body = msg_body + " " + msg_body_part

            # Some message parts require additional html formatting to pandas
            try:
                msg_body = pd.read_html(msg_body, header=0, index_col=0)[0].columns[0]
            except:
                
                pass

    else:
        print(payload['mimeType'])

    # Format all data into a structured pandas df
    stage_pdf = pd.DataFrame(columns = ['id', 'body'])
    stage_pdf.loc[0, 'id'] = message_id
    stage_pdf.loc[0, 'body'] = msg_body

    msg_pdf = pd.concat([msg_pdf, stage_pdf])

    # Provide status updates
    if i % BATCH_SIZE == 0:
        print(f'{i} out of {len(email_ids)} done')

print("process complete:", datetime.now() - start_time)

0 out of 2719 done
100 out of 2719 done
200 out of 2719 done
300 out of 2719 done
400 out of 2719 done
500 out of 2719 done
600 out of 2719 done
700 out of 2719 done
800 out of 2719 done
900 out of 2719 done
1000 out of 2719 done
1100 out of 2719 done
1200 out of 2719 done
1300 out of 2719 done
1400 out of 2719 done
1500 out of 2719 done
1600 out of 2719 done
1700 out of 2719 done
1800 out of 2719 done
1900 out of 2719 done
2000 out of 2719 done
2100 out of 2719 done
2200 out of 2719 done
2300 out of 2719 done
2400 out of 2719 done
2500 out of 2719 done
2600 out of 2719 done
2700 out of 2719 done
process complete: 0:15:02.732005


In [9]:
# Merge tag df with message df
pdf = tags_pdf.merge(msg_pdf, how = "left", on = ['id'])
pdf['sub_body'] = pdf['subject'] + " " + pdf['body']

In [10]:
# Save the file for downstream preprocessing
pickle.dump(pdf, open(pdf_output_path, "wb"))

In [11]:
# Preview of the data output
pdf.head()

Unnamed: 0,id,label_ids,date,size,sender,to,subject,body,sub_body
0,18423c04ca1ade5c,"[CATEGORY_PROMOTIONS, UNREAD, INBOX]",2022-10-29 12:38:35,220669,Best Buy Black Friday <BestBuy@email.bestbuy.com>,AARONJMEDINA12@gmail.com,You'll LOVE this - the offers don't stop,,You'll LOVE this - the offers don't stop
1,18420d6a61ff19e7,"[CATEGORY_PROMOTIONS, UNREAD, INBOX]",2022-10-28 20:55:27,135637,The Exchange <email@e.shopmyexchange.com>,aaronjmedina12@gmail.com,Your Weekly Ad is Here: 12 Weeks of Scream-Wor...,The Exchange. Tax Free Shopping.\r\nView Emai...,Your Weekly Ad is Here: 12 Weeks of Scream-Wor...
2,184208a2e0b9f4b6,"[CATEGORY_PROMOTIONS, UNREAD, INBOX]",2022-10-28 21:40:37,44129,"""Freddy's"" <marketing@l.freddys.com>",aaronjmedina12@gmail.com,Fry-teningly Delicious 👻😋,( http://url7889.l.freddys.com/ls/click?upn=-...,Fry-teningly Delicious 👻😋 ( http://url7889.l....
3,18420731aded8f8a,"[CATEGORY_PROMOTIONS, UNREAD, INBOX]",2022-10-28 21:15:09,233275,Best Buy Black Friday <BestBuy@email.bestbuy.com>,AARONJMEDINA12@gmail.com,Your SALE update is HERE! There are so many of...,,Your SALE update is HERE! There are so many of...
4,184203652769a7fa,[SENT],2022-10-28 20:09:01,8155,Aaron Medina <aaronjmedina12@gmail.com>,Stephen Lush from Charles River Laboratories I...,Re: REMOTE Sr Data Scientist Search-Help Lead ...,"Hello Stephen,\r\n\r\nThank you for your inte...",Re: REMOTE Sr Data Scientist Search-Help Lead ...
