In [1]:
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('gmail_jobs')
table.delete_item(Key={'job_id': 'gmail_jobs'})
item = {
    'job_id': 'gmail_ingest',
    'job_description': 'Ingest data from gmail to s3',
    'is_active': 'Y',
    'baseline_days': 45
}
table.put_item(Item=item)

{'ResponseMetadata': {'RequestId': '37L350HB28LD8TL1LR3AJR6ML3VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Sun, 15 May 2022 10:53:45 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '37L350HB28LD8TL1LR3AJR6ML3VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [2]:
!aws s3 rm s3://itversitydata/messages --recursive

In [3]:
import boto3
import pickle


def get_creds():
    sm_client = boto3.client(
        'secretsmanager',
        region_name='us-east-1'
    )

    secret_token = sm_client.get_secret_value(SecretId='gmail_token')['SecretBinary']
    creds = pickle.loads(secret_token)
    return creds

In [4]:
from googleapiclient.discovery import build

def get_users():
    creds = get_creds()
    service = build('gmail', 'v1', credentials=creds)
    return service.users()

In [5]:
def get_job_details(job_name):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('gmail_jobs')
    job_details = table.get_item(Key={'job_id': job_name})['Item']
    return job_details

In [6]:
import datetime
import time

def get_job_run_time_range(job_details):
    job_start_time = int(time.mktime(datetime.datetime.now().timetuple()))
    if job_details.get('job_run_bookmark_details'):
        job_run_bookmark_details = job_details.get('job_run_bookmark_details')
        last_run_start_time_epoch = int(job_run_bookmark_details['last_run_start_time_epoch'])
        last_run_end_time_epoch = int(job_run_bookmark_details['last_run_end_time_epoch'])
        last_run_diff = datetime.datetime.now().date() - datetime.datetime.fromtimestamp(last_run_end_time_epoch).date()
        if last_run_diff.days > 1:
            start_time_epoch = last_run_end_time_epoch
            end_time = datetime.datetime.fromtimestamp(last_run_end_time_epoch).date() + datetime.timedelta(days=1)
            end_time_epoch = int(time.mktime(end_time.timetuple()))
        else:
            start_time_epoch = last_run_end_time_epoch
            end_time_epoch = int(time.mktime(datetime.datetime.now().timetuple()))  
    else:
        baseline_days = int(job_details['baseline_days'])
        start_time = datetime.datetime.now().date() - datetime.timedelta(days=int(baseline_days))
        end_time = start_time + datetime.timedelta(days=1)
        start_time_epoch = int(time.mktime(start_time.timetuple()))
        end_time_epoch = int(time.mktime(end_time.timetuple()))
    return job_start_time, start_time_epoch, end_time_epoch

In [7]:
def get_message_ids(start_time_epoch, end_time_epoch):
    message_ids = []
    next_page_token = None

    users = get_users()
    while True:
        if next_page_token:
            print(f'Processing in range between {start_time_epoch} and {end_time_epoch} using token {next_page_token}')
            messages = users. \
                messages(). \
                list(
                    userId='me', 
                    q=f'after:{start_time_epoch} before:{end_time_epoch}',
                    pageToken=next_page_token
                ). \
                execute()
            message_ids += messages['messages']
            next_page_token = messages.get('nextPageToken')
        else:
            print(f'Processing in range between {start_time_epoch} and {end_time_epoch}')
            messages = users. \
                messages(). \
                list(
                    userId='me', 
                    q=f'after:{start_time_epoch} before:{end_time_epoch}'
                ). \
                execute()
            message_ids = messages['messages']
            next_page_token = messages.get('nextPageToken')
        if next_page_token == None:
            break
    return message_ids

In [8]:
import pandas as pd

def get_messages(message_ids):
    users = get_users()
    messages = []
    for message_id in message_ids:
        message = users.messages().get(userId='me', id=message_id['id']).execute()
        messages.append(message)
    return pd.DataFrame(messages)

In [9]:
import uuid


def write_messages_to_s3(messages_df, s3_bucket, s3_prefix):
    messages_df.to_json(f's3://{s3_bucket}/{s3_prefix}/part-{uuid.uuid1()}.json', orient='records', lines=True)
    print(f'Successfully saved messages to s3://{s3_bucket}/{s3_prefix}/part-{uuid.uuid1()}.json')

In [10]:
def save_job_run_details(job_details, job_start_time, message_ids, start_time_epoch, end_time_epoch, file_name):
    dynamodb = boto3.resource('dynamodb')
    message_count = len(message_ids)
    max_message_id = max([message_id['id'] for message_id in message_ids])
    job_run_details_item = {
        'job_id': job_details['job_id'],
        'job_run_time': job_start_time,
        'job_run_bookmark_details': {
            'max_message_id': max_message_id,
            'start_time_epoch': start_time_epoch,
            'end_time_epoch': end_time_epoch
        },
        'rows_processed': message_count,
        'file_name': file_name
    }
    job_run_details_table = dynamodb.Table('gmail_job_run_details')
    job_run_details_table.put_item(Item=job_run_details_item)
    
    job_details_table = dynamodb.Table('gmail_jobs')
    job_details['job_run_bookmark_details'] = {
        'last_run_max_message_id': max_message_id,
        'last_run_start_time_epoch': start_time_epoch,
        'last_run_end_time_epoch': end_time_epoch 
    }
    job_details_table.put_item(Item=job_details)

In [12]:
%%time

while True:
    job_details = get_job_details('gmail_ingest')
    job_start_time, start_time_epoch, end_time_epoch = get_job_run_time_range(job_details)
    current_date_epoch = int(time.mktime(datetime.datetime.now().date().timetuple()))

    if end_time_epoch > current_date_epoch:
        break
    message_ids = get_message_ids(start_time_epoch, end_time_epoch)
    messages = get_messages(message_ids)
    file_name = write_messages_to_s3(messages, 'itversitydata', 'messages')
    save_job_run_details(job_details, job_start_time, message_ids, start_time_epoch, end_time_epoch, file_name)

Processing in range between 1648665000 and 1648751400
Successfully saved messages to s3://itversitydata/messages/part-94b6631a-d43d-11ec-b424-3e22fbd03f7b.json
Processing in range between 1648751400 and 1648837800
Successfully saved messages to s3://itversitydata/messages/part-b34aa7dc-d43d-11ec-b424-3e22fbd03f7b.json
Processing in range between 1648837800 and 1648924200
Successfully saved messages to s3://itversitydata/messages/part-c925e012-d43d-11ec-b424-3e22fbd03f7b.json
Processing in range between 1648924200 and 1649010600
Successfully saved messages to s3://itversitydata/messages/part-da42165e-d43d-11ec-b424-3e22fbd03f7b.json
Processing in range between 1649010600 and 1649097000
Successfully saved messages to s3://itversitydata/messages/part-f055c774-d43d-11ec-b424-3e22fbd03f7b.json
Processing in range between 1649097000 and 1649183400
Processing in range between 1649097000 and 1649183400 using token 10277798134644464037
Successfully saved messages to s3://itversitydata/messages/

In [None]:
!aws s3 ls s3://itversitydata/messages/