In [1]:
from typing import Iterator, Dict
import os
import re 
import pandas as pd
from pprint import pprint

pd.options.display.max_columns = None

# ETL

## get_all_files → read_email → parser_email → clean_text

In [9]:
def get_all_files(main_path:str='/Users/hugoramirez/Downloads/maildir') -> Iterator[str]:
    for index, (path, currentDirectory, files) in enumerate(os.walk(main_path)):
        for file in files:
            if file not in ('.DS_Store'):
                yield os.path.join(path, file)

                
def read_email(file_path: str) -> str:
    with open(file_path, errors='ignore') as f: #, encoding='utf-16', ) as f:
        lines = f.readlines()
    return lines


def clean_text(text: str) -> str:
    if text is None:
        return ''
    
    text = text.lower()
    text = re.sub('re:', '', text)
    text = re.sub('-', '', text)
    text = re.sub('_', '', text)

    # removes punctuation
    text = re.sub(r'[^\w\s]', ' ', text)
    text = re.sub(r'\n', ' ', text)
    
    # complete contractions
    text = re.sub(r"\'ve", " have ", text)
    text = re.sub(r"can't", "cannot ", text)
    text = re.sub(r"n't", " not ", text)
    text = re.sub(r"I'm", "I am", text)
    text = re.sub(r" m ", " am ", text)
    text = re.sub(r"\'re", " are ", text)
    text = re.sub(r"\'d", " would ", text)
    text = re.sub(r"\'ll", " will ", text)

    text = text.strip()
    return text
    
    
headers = [
    'Message-ID',
    'Date',
    'From',
    'To',
    'Subject',
    'Cc',
    'Mime-Version',
    'Content-Type',
    'Content-Transfer-Encoding',
    'Bcc',
    'X-From',
    'X-To',
    'X-cc',
    'X-bcc',
    'X-Folder',
    'X-Origin',
    'X-FileName',
]


def parser_email(email_content: list[str]) -> Dict:  
    email_data = {}
    email_text = []
    current_header = 0
    
    for line in email_content:
        line = line.replace('\n', '')
        line = line.replace('\t', '')
        line = line.strip()
        
        if not line:
            continue
            
        if current_header < len(headers):
            header = headers[current_header]
            
            # search current header
            if line.startswith(f"{header}:"):
                email_data[header] = line[len(header) + 2:]
                current_header = current_header + 1
                
            # search next header
            elif line.startswith(f"{headers[current_header + 1]}:"):
                email_data[headers[current_header + 1]] = line[len(headers[current_header + 1]) + 2:]
                current_header = current_header + 2
            else:
                # add current line to current header
                if current_header < (len(headers) - 1) and not line.startswith(f"{headers[current_header + 1]}") and line:
                    email_data[headers[current_header - 1]] = email_data[headers[current_header - 1]] + line
        else:
            email_text.append(line)
        
    email_text = ' '.join(email_text)
    email_data['text'] = clean_text(email_text)
    return email_data


def rename_columns(df: pd.DataFrame) -> pd.DataFrame:
    return df.rename(
        columns={
            'Message-ID': 'Message_ID',
            'Mime-Version': 'Mime_Version',
            'Content-Type': 'Content_Type',
            'Content-Transfer-Encoding': 'Content_Transfer_Encoding',
            'X-From': 'X_From',
            'X-To': 'X_To',
            'X-cc': 'X_cc',
            'X-bcc': 'X_bcc',
            'X-Folder': 'X_Folder',
            'X-Origin': 'X_Origin',
            'X-FileName': 'X_FileName',
        })


# Sentiment Analysis

##  sentiment = {Negative, Positive, Neutral}

In [3]:
import vaderSentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

analyser = SentimentIntensityAnalyzer()


def get_sentiment(compound: float) -> str:
    if compound >= 0.05:
        return "Positive"
    elif compound <= -0.05:
        return "Negative"
    else:
        return "Neutral"

    
def sentiment_analysis(email_data: Dict) -> None:
    polarity_score = analyser.polarity_scores(email_data['text'])
    
    email_data['neg'] = polarity_score['neg']
    email_data['neu'] = polarity_score['neu']
    email_data['pos'] = polarity_score['pos']
    email_data['compound'] = polarity_score['compound']
    email_data['sentiment'] = get_sentiment(polarity_score['compound'])


# Save to BigQuery

In [4]:
from google.cloud import bigquery

client = bigquery.Client.from_service_account_json('/Users/hugoramirez/Downloads/data-castle-bravo-40b8a58d089e.json')

def load_to_bigquery(df: pd.DataFrame):
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
    job = client.load_table_from_dataframe(df, 'data-castle-bravo.bootcamp_mlops.emails', job_config=job_config)
    job.result()
    

In [10]:
BATCH_SIZE = 50_000
emails = []
df = None

for file in get_all_files():
    email_dict = {}
    email_content = read_email(file)
    email_dict = parser_email(email_content)
    sentiment_analysis(email_dict)
    emails.append(email_dict)

    if len(emails) % BATCH_SIZE == 0:
        df = pd.DataFrame.from_records(emails)
        df['Date'] = pd.to_datetime(df['Date'], utc=True)
        df = rename_columns(df)
        load_to_bigquery(df)
        print(f'batch until index {index}')
        
        emails = []
        df = None        
            

batch until index 49999
batch until index 99999
batch until index 149999
batch until index 199999
batch until index 249999
batch until index 299999
batch until index 349999
batch until index 399999
batch until index 449999
batch until index 499999
