In [31]:
test_record = {
  "Records":[  
    {  
      "eventVersion":"2.0",
      "eventSource":"aws:s3",
      "awsRegion":"us-west-2",
      "eventTime":"1970-01-01T00:00:00.000Z",
      "eventName":"ObjectCreated:Put",
      "userIdentity":{  
        "principalId":"AIDAJDPLRKLG7UEXAMPLE"
      },
      "requestParameters":{  
        "sourceIPAddress":"127.0.0.1"
      },
      "responseElements":{  
        "x-amz-request-id":"C3D13FE58DE4C810",
        "x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
      },
      "s3":{  
        "s3SchemaVersion":"1.0",
        "configurationId":"testConfigRule",
        "bucket":{  
          "name":"darrin-testing-2",
          "ownerIdentity":{  
            "principalId":"A3NL1KOZZKExample"
          },
          "arn":"arn:aws:s3:::sourcebucket"
        },
        "object":{  
          "key":"Hardcore_Leveling_Warrior_Manga_parsed_2019-04-14_06_33_48.txt",
          "size":1024,
          "eTag":"d41d8cd98f00b204e9800998ecf8427e",
          "versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko"
        }
      }
    }
  ]
}

In [32]:
filename = test_record['Records'][0]['s3']['object']['key']
bucket_name = test_record['Records'][0]['s3']['bucket']['name']
print(filename)
print(bucket_name)

Hardcore_Leveling_Warrior_Manga_parsed_2019-04-14_06_33_48.txt
darrin-testing-2


In [12]:
# INGEST LAMBDA - SCRAPES THE SITE
import os
import time
import boto3
import logging
import requests
import pandas as pd
from bs4 import BeautifulSoup

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.resource('s3')

def scraper_handler(event, context):
    manga_list = ['http://www.tenmanga.com/book/KINGDOM.html',
                  'http://www.tenmanga.com/book/Hardcore+Leveling+Warrior']
    raw_bucket = 'darrin-testing'
    
    scrape_manga(manga_list,raw_bucket)
    logging.info('Manga list scraped and sent to {bucket}.'.format(bucket=raw_bucket))
    return True

def send_txt_to_s3(filename,data,bucket):
    with open(filename,'w') as file:
        file.write(str(data))
    s3.Bucket(bucket).put_object(Key=filename,Body=open(filename,'rb'))
    logging.info('{filename} saved to S3.'.format(filename=filename))
    os.remove(filename)
    return True

def scrape_manga(manga_list,raw_bucket):
    for manga in manga_list:
        r = requests.get(manga)
        soup = BeautifulSoup(r.text, 'html.parser')

        # Add code to extract the manga name
        manga_names = [soup.find_all('div',{'class': 'book-info'})[0].find('h1').text]

        # Add code to extract the raw list of chapters
        chapter_list_raw = soup.find_all('ul',{'class': 'chapter-box'})

        # Send the raw chapter list and manga names to S3
        manga = manga_names[0].replace(' ','_')
        import_time = time.strftime("%Y-%m-%d_%H_%M_%S")
        filename = '_'.join([manga,'raw',import_time]) + '.txt'
        data = manga_names + [str(chapter_list_raw[0])]
        send_txt_to_s3(filename,data,raw_bucket)
    return True

In [13]:
scraper_handler(test_record,None)

True

In [23]:
# INITIAL PROCESSING LAMBDA - FOR NOW, DOES NOTHING BUT COPY
# add code to copy the file from the raw bucket to the processed bucket
# you can use this example: https://medium.com/@stephinmon.antony/aws-lambda-with-python-examples-2eb227f5fafe
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client('s3')

def copy_processing_handler(event,context):
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    target_bucket = 'darrin-testing-2'
    copy_source = {'Bucket': source_bucket,
                   'Key': key}
    
    try:
        logging.info('Waiting for the file to persist in the source bucket.')
        waiter = s3.get_waiter('object_exists')
        waiter.wait(Bucket=source_bucket, Key=key)
        logging.info('Copying object from source s3 bucket to target s3 bucket.')
        s3.copy_object(Bucket=target_bucket, Key=key, CopySource=copy_source)
    except Exception as e:
        logging.error(e)
        logging.error('Error getting object {} from bucket {}.'.format(key,source_bucket))
        raise e
    
    return True

In [24]:
copy_processing_handler(test_record,None)

INFO:root:Waiting for the file to persist in the source bucket.
INFO:root:Copying object from source s3 bucket to target s3 bucket.


True

In [27]:
# TRANSFORMATION LAMBDA - TRANSFORMS INGESTED FILES INTO DFS
# https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
#     Doc for using AWS Lambda with an S3 event as a trigger
import os
import ast 
import time
import json
import boto3
import logging
from bs4 import BeautifulSoup

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.resource('s3',region_name='us-east-2')

def send_dict_to_s3(filename,data,bucket):
    import_time = time.strftime("%Y-%m-%d_%H_%M_%S")
    with open(filename,'w') as outfile:
        json.dump(data,outfile)
    s3.Bucket(bucket).put_object(Key=filename,Body=open(filename,'rb'))
    os.remove(filename)
    return True

def data_lengths_test(data):
    if len(data['chapter']) == len(data['date_uploaded']) == len(data['url']):
        return True
    else:
        raise ValueError('Data Column Lengths are not equal')

# Download file that was just added
def manga_processing_handler(event,context):
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    filename_from_event = event['Records'][0]['s3']['object']['key']
    target_bucket = 'darrin-testing-2'
    s3.Bucket(source_bucket).download_file(filename_from_event,filename_from_event)

    # Read in data
    with open(filename_from_event,'r') as data:
        data = ast.literal_eval(data.read())
    os.remove(filename_from_event)
    manga_name = data[0]
    chapter_list_raw = BeautifulSoup(data[1], 'html.parser')

    # Parse for chapter information
    chapter_links = []; chapter_names = []; date_uploads = [] 
    for chapter in chapter_list_raw.find_all('li',{'class':None}):
        chapter_links.append(chapter.find('div',{'class': 'chapter-name short'}).a.get('href'))
        date_uploads.append(chapter.find('div',{'class': 'add-time page-hidden'}).text)
        chapter_name_parts = chapter.find('div',{'class': 'chapter-name short'}).text.split(' ')
        if chapter_name_parts[-1].strip()[-3:] == 'new':
            chapter_names.append(chapter_name_parts[-1].strip()[:-3])
        else:
            chapter_names.append(chapter_name_parts[-1])

    # Create dict and send data
    data = {'title':manga_name,
            'chapter':chapter_names, 
            'date_uploaded':date_uploads, 
            'url':chapter_links}
    logger.info('{} parsed'.format(manga_name))

    # Run Unit Tests
    data_lengths_test(data)

    # It doesn't matter if the file extension is json or txt
    # - that just helps the machine know what to open it with
    filename = filename_from_event.replace('raw','parsed')
    send_dict_to_s3(filename,data,target_bucket)
    logging.info('{filename} saved to {bucket}'.format(filename=filename,
                                                       bucket=target_bucket))
    return True

In [28]:
manga_processing_handler(test_record,None)

INFO:root:Hardcore Leveling Warrior Manga parsed
INFO:root:Hardcore_Leveling_Warrior_Manga_parsed_2019-04-14_06_33_48.txt saved to darrin-testing-2


True

In [38]:
# COMPARISON LAMBDA - ADD NEW LINES TO DYNAMODB TABLE
import os
import time
import json
import boto3
import logging
from decimal import Decimal, InvalidOperation
from boto3.dynamodb.conditions import Key

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Connect to S3 & DynamoDB table
s3 = boto3.resource('s3',region_name='us-east-2')
dynamodb = boto3.resource('dynamodb',region_name='us-east-1')
table = dynamodb.Table('manga_chapters')

def ix_to_remove(data,response):
    new_chapter_list = data['chapter']
    new_date_uploaded_list = data['date_uploaded']
    new_url_list = data['url']
    remove_ix = []
    for item in response['Items']:
        try:
            remove_ix.append(new_chapter_list.index(str(item['chapter'])))
        except ValueError:
            # If can't find the item, it's new
            pass
    return remove_ix

# Download file that was just added and load data
def manga_db_handler(event,context):
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    filename_from_event = event['Records'][0]['s3']['object']['key']
    s3.Bucket(source_bucket).download_file(filename_from_event,filename_from_event)
    logging.info('{filename} downloaded'.format(filename=filename_from_event))
    
    with open(filename_from_event) as json_file:
        data = json.load(json_file)
    os.remove(filename_from_event)

    # Query the DynamoDB table for that title
    title = data['title']
    response = table.query(TableName='manga_chapters',
                           KeyConditionExpression=Key('title').eq(title))

    # If no records at present, insert all records as new
    if response['Count'] == 0:
        logger.info('Ingesting a whole new manga: {}'.format(title))
        # Need to add all items
        cnt = 0
        for i in range(len(data['chapter'])):
            try:
                table.put_item(
                    Item = {
                        'chapter': Decimal(data['chapter'][i]),
                        'date_ingested': time.strftime("%Y-%m-%d"),
                        'date_uploaded': data['date_uploaded'][i],
                        'title': title,
                        'url': data['url'][i]
                    }
                )
                cnt += 1
            except InvalidOperation:
                logger.info('{} not added.'.format(data['chapter'][i]))
    # If records exist, only insert new records
    else:
        logger.info('There are existing chapters for {}'.format(title))
        # Need to compare chapter numbers
        remove_ix = ix_to_remove(data,response)

        for index in sorted(remove_ix,reverse=True):
            for my_list in [data['chapter'],data['date_uploaded'],data['url']]:
                del my_list[index]
        cnt = 0
        for i in range(len(data['chapter'])):
            try:
                table.put_item(
                    Item = {
                        'title': title,
                        'chapter': Decimal(data['chapter'][i]),
                        'date_ingested': time.strftime("%Y-%m-%d"),
                        'date_uploaded': data['date_uploaded'][i],
                        'url': data['url'][i]
                    }
                )        
                cnt += 1
            except InvalidOperation:
                logger.info('{} not added.'.format(data['chapter'][i]))
    logger.info('{cnt} new chapters added to {title}.'.format(cnt=cnt,
                                                          title=title))
    return True

In [39]:
manga_db_handler(test_record,None)

INFO:root:Hardcore_Leveling_Warrior_Manga_parsed_2019-04-14_06_33_48.txt downloaded
INFO:root:There are chapters for Hardcore Leveling Warrior Manga
INFO:root:0 chapters added to Hardcore Leveling Warrior Manga.


True

In [None]:
# Notes on Lambda logging: https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html

In [None]:
# Example Lambda
# https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html
def lambda_handler(event, context):
   number1 = event['Number1']
   number2 = event['Number2']
   sum = number1 + number2
   product = number1 * number2
   difference = abs(number1 - number2)
   quotient = number1 / number2
   return {
       "Number1": number1,
       "Number2": number2,
       "Sum": sum,
       "Product": product,
       "Difference": difference,
       "Quotient": quotient
   }

In [None]:
# Using CloudWatch for scheduled triggering of Lambdas
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/RunLambdaSchedule.html