<a href="https://colab.research.google.com/github/canfielder/DSBA-6190_Proj4_Serverless-Pipeline/blob/master/notebooks/DSBA_6190_Project_4_Serverless_Data_Engineering_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import

## Installs

In [0]:
# Boto3
!pip -q install boto3

## Packages

In [0]:
# General
import os
import io
import json
import pandas as pd
import pprint
from IPython.display import display



# AWS Connection
from google.colab import drive
import boto3



# Set Up AWS Connection

## Mount Google Drive


In [3]:
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


Now we verify we can access AWS Credentials. We will the AWS credentials from my personal Google Drive and into the appropriate local folder.

In [4]:
path = "/content/gdrive/My Drive/aws/credentials/"

aws_dir = os.listdir(path)
print(aws_dir)

aws_credentials = aws_dir.pop(0)

['credentials']


Establish the source location of the AWS credential file.

In [5]:
aws_credentials_src = os.path.join(path, aws_credentials)
aws_credentials_src = aws_credentials_src.replace(" ", "\ ")
print(aws_credentials_src)

/content/gdrive/My\ Drive/aws/credentials/credentials


Establish the destination of where to copy the AWS credential file.

In [6]:
aws_credentials_dst = "~/.aws/credentials"
print(aws_credentials_dst)    

~/.aws/credentials


Copy the credentials from the my mounted Google Drive to the local folder.

In [0]:
#!/usr/bin/env python3
mkdir -p ~/.aws &&\
  cp -r {aws_credentials_src} {aws_credentials_dst} 

Verify the credentials were correctly copied.

In [8]:
#!/usr/bin/env python3
ls -R {aws_credentials_dst}

/root/.aws/credentials


## Establish Boto3 Session and Region
By establishing a Boto3 session with Region, all downstream uses of Boto3 will import these associated values. We do not have to define the region multiple times.

In [0]:
session = boto3.Session(profile_name='dsba_6190_proj_4', 
                        region_name="us-east-1")

## API Call Test
Test that we have access to AWS Comprehend

In [10]:
comprehend = session.client(service_name='comprehend')
text_sample = "I hope I don't have to be quarentined."
comprehend.detect_sentiment(Text=text_sample, LanguageCode='en')

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '163',
   'content-type': 'application/x-amz-json-1.1',
   'date': 'Tue, 17 Mar 2020 00:37:23 GMT',
   'x-amzn-requestid': '2cdd058a-9ace-4e42-b6a8-ee6beea2aa33'},
  'HTTPStatusCode': 200,
  'RequestId': '2cdd058a-9ace-4e42-b6a8-ee6beea2aa33',
  'RetryAttempts': 0},
 'Sentiment': 'NEUTRAL',
 'SentimentScore': {'Mixed': 1.1915316463273484e-05,
  'Negative': 0.11930244415998459,
  'Neutral': 0.44884905219078064,
  'Positive': 0.4318366050720215}}

# Import DynamoDB Table
The following section will import the DynamoDB table we will use for this project and then create a list of items to be analyzed with AWS Comprehend.

First we will pull the name of the table we want to use as a string. We need the DynamoDB Client function for this.

In [11]:
dynamo_c = session.client(service_name = 'dynamodb')
table_proj4 = dynamo_c.list_tables()['TableNames'][0]
print("The table to be used for this project is named: {0}".format(table_proj4))

The table to be used for this project is named: Ultras


Now, with the table name as a string, we need the DynamoDB Resource function. We will then scan. This will return a list of dictionaries. We will use this list to check the Wikipedia API.

In [12]:
dynamo_r = session.resource(service_name = 'dynamodb')
table_producer = dynamo_r.Table(table_proj4)
response = table_producer.scan()
items = response['Items']
items

[{'Races': 'Barkley Marathons'},
 {'Races': 'Leadville Trail 100'},
 {'Races': 'Vermont 100 Mile Endurance Run'},
 {'Races': 'Ultra-Trail du Mont-Blanc'},
 {'Races': 'Western States Endurance Run'},
 {'Races': 'Hardrock Hundred Mile Endurance Run'}]

# Verify Wikipedia API
The following code cells check values called against the Wikpedia API. This also verifies that every item entered that will be pulled from the DynamoDB database has a correspondig wikipedia entry.


In [0]:
!pip -q install wikipedia

In [0]:
import wikipedia

In [0]:
def wiki_summary(item_list):
  for item in item_list:
    entry = item['Races']
    entry_summary = wikipedia.summary(entry)
    print("{0}:\n{1}\n".format(item['Races'], entry_summary))
  return 

In [16]:
wiki_summary(items)

Barkley Marathons:
The Barkley Marathons is an ultramarathon trail race held in Frozen Head State Park near Wartburg, Tennessee. If runners complete 60 miles (97 km) this is known as a "fun run". The full course is 100 miles (160 km) (distances are approximate). The race is limited to a 60-hour period,  and takes place in late March or early April of each year.



Leadville Trail 100:
The Leadville Trail 100 Run (aka The Race Across The Sky or the LT100) is an ultramarathon held annually on rugged trails and dirt roads near Leadville, Colorado, through the heart of the Rocky Mountains. First run in 1983, the race course climbs and descends 15,600 feet (4,800 m), with elevations ranging from 9,200-12,620 feet (2,800-3,850 m). In most years, fewer than half the starters complete the race within the 30-hour time limit.

Vermont 100 Mile Endurance Run:
The Vermont 100 Mile Endurance Run ("Vermont 100") is a 100-mile (162 km) long ultramarathon held annually in July at Silver Hill Meadow in

# AWS Comprehend Testing
Now that we know we can import the DynamoDB table into this notebook, and that each entry in the table has a corresponding wikipedia entry, we will test **AWS Comprehend** functions against these wikipedia entries.

## Sentiment Analysis
With the wikipedia entries available, we will test the sentiment analysis function.

In [0]:
def sentiment(item_list):
  for item in item_list:
    entry = item['Races']
    entry_summary = wikipedia.summary(entry)
    sentiment = comprehend.detect_sentiment(Text=entry_summary, 
                                            LanguageCode='en')
    pprint.pprint(sentiment)
    print("\n\n\n")
  return 

In [18]:
sentiment(items)

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '162',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Tue, 17 Mar 2020 00:37:42 GMT',
                                      'x-amzn-requestid': '53599f5c-1b71-4adb-a607-056ab9610f97'},
                      'HTTPStatusCode': 200,
                      'RequestId': '53599f5c-1b71-4adb-a607-056ab9610f97',
                      'RetryAttempts': 0},
 'Sentiment': 'NEUTRAL',
 'SentimentScore': {'Mixed': 1.324443815065024e-06,
                    'Negative': 0.000236633321037516,
                    'Neutral': 0.9850009679794312,
                    'Positive': 0.01476116944104433}}




{'ResponseMetadata': {'HTTPHeaders': {'content-length': '165',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Tue, 17 Mar 2020 00:37:42 GMT',
                                      'x-amzn-

## Entity Recognition
We will now test the Entity Recognition function.

In [0]:
def create_entity(input_str):
  payload = comprehend.detect_entities(Text = input_str, LanguageCode = 'en')
  entity = payload['Entities']
  return entity

In [0]:
def entity_process(item_str):
  for item in item_str:
    entry = item['Races']
    entry_summary = wikipedia.summary(entry)
    entity_dict = create_entity(entry_summary)
    
    # Conver to Data Fraem
    df = pd.DataFrame.from_dict(entity_dict)
    df = df[['Text', 'Type', 'Score', 'BeginOffset', 'EndOffset']]
    #pprint.pprint(df)
    print(entry)
    display(df)
    print("\n\n\n")
  return


In [21]:
entity_process(items)

Barkley Marathons


Unnamed: 0,Text,Type,Score,BeginOffset,EndOffset
0,Barkley Marathons,ORGANIZATION,0.592081,4,21
1,Frozen Head State Park,LOCATION,0.999226,61,83
2,"Wartburg, Tennessee",LOCATION,0.953557,89,108
3,60 miles,QUANTITY,0.999934,130,138
4,97 km,QUANTITY,0.999859,140,145
5,100 miles,QUANTITY,0.99991,196,205
6,160 km,QUANTITY,0.999602,207,213
7,60-hour,QUANTITY,0.999419,269,276
8,late March,DATE,0.990913,305,315
9,early April,DATE,0.980301,319,330






Leadville Trail 100


Unnamed: 0,Text,Type,Score,BeginOffset,EndOffset
0,Leadville Trail 100 Run,EVENT,0.963261,4,27
1,Race Across The,EVENT,0.598275,37,52
2,LT100,EVENT,0.742535,64,69
3,"Leadville, Colorado",LOCATION,0.919218,142,161
4,Rocky Mountains,LOCATION,0.984929,188,203
5,First run,QUANTITY,0.980282,205,214
6,1983,DATE,0.999746,218,222
7,"15,600 feet",QUANTITY,0.996622,260,271
8,"4,800 m",QUANTITY,0.998061,273,280
9,"9,200-12,620 feet",QUANTITY,0.814761,312,329






Vermont 100 Mile Endurance Run


Unnamed: 0,Text,Type,Score,BeginOffset,EndOffset
0,Vermont 100 Mile Endurance Run,EVENT,0.99882,4,34
1,Vermont 100,EVENT,0.933523,37,48
2,100-mile,QUANTITY,0.999911,56,64
3,162 km,QUANTITY,0.998309,66,72
4,July,DATE,0.996884,110,114
5,Silver Hill Meadow,LOCATION,0.999326,118,136
6,West Windsor,LOCATION,0.774987,140,152
7,Vermont,LOCATION,0.743249,154,161
8,one,QUANTITY,0.942272,170,173
9,five 100-mile races,QUANTITY,0.82585,181,200






Ultra-Trail du Mont-Blanc


Unnamed: 0,Text,Type,Score,BeginOffset,EndOffset
0,Ultra-Trail du Mont-Blanc,EVENT,0.909135,4,29
1,UTMB,EVENT,0.436939,31,35
2,single-stage,QUANTITY,0.699018,42,54
3,first,QUANTITY,0.894652,78,83
4,2003,DATE,0.999281,92,96
5,Ultra-Trail World Tour,EVENT,0.980149,118,140






Western States Endurance Run


Unnamed: 0,Text,Type,Score,BeginOffset,EndOffset
0,Western States Endurance Run,EVENT,0.986855,4,32
1,Western States 100,EVENT,0.995623,56,74
2,100-mile,QUANTITY,0.999477,81,89
3,161 km,QUANTITY,0.993769,91,97
4,California,LOCATION,0.997449,133,143
5,Sierra Nevada Mountains,LOCATION,0.969223,146,169
6,each year,QUANTITY,0.977766,177,186
7,last full weekend of June,DATE,0.912418,194,219
8,Squaw Valley,LOCATION,0.995626,256,268
9,Placer High School,ORGANIZATION,0.871898,300,318






Hardrock Hundred Mile Endurance Run


Unnamed: 0,Text,Type,Score,BeginOffset,EndOffset
0,Hardrock Hundred Mile Endurance Run,EVENT,0.997111,4,39
1,100.5 miles,QUANTITY,0.999517,60,71
2,161.7 km,QUANTITY,0.999533,73,81
3,"33,000 feet",QUANTITY,0.999701,99,110
4,"10,000 m",QUANTITY,0.998688,112,120
5,"over 11,000 feet",QUANTITY,0.80107,158,174
6,"3,400 m",QUANTITY,0.998977,176,183
7,4WD,OTHER,0.496832,223,226
8,Southern Colorado,LOCATION,0.983297,268,285
9,"San Juan Range, USA",LOCATION,0.886801,288,307








# Lambda Function Test
Now that we have the basic functions down, we will look at how the Lamdba functions will actually process and output the data.

## Process Payload

The following payload is an example event with one race.

In [0]:
event_1 = {
  "Records": [
    {
      "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
      "receiptHandle": "MessageReceiptHandle",
      "body": {
          "Races": "Western States Endurance Run"
      },
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1523232000000",
        "SenderId": "123456789012",
        "ApproximateFirstReceiveTimestamp": "1523232000001"
      },
      "messageAttributes": {},
      "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
      "awsRegion": "us-east-1"
    }
  ]
}

The following payload is an example event with two races.

In [0]:
event_2 = {
  "Records": [
    {
      "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
      "receiptHandle": "MessageReceiptHandle",
      "body": {
          "Races": "Western States Endurance Run"
      },
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1523232000000",
        "SenderId": "123456789012",
        "ApproximateFirstReceiveTimestamp": "1523232000001"
      },
      "messageAttributes": {},
      "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
      "awsRegion": "us-east-1"
    },
    {
      "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
      "receiptHandle": "MessageReceiptHandle",
      "body": {
          "Races": "Barkley Marathons"
      },
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1523232000000",
        "SenderId": "123456789012",
        "ApproximateFirstReceiveTimestamp": "1523232000001"
      },
      "messageAttributes": {},
      "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
      "awsRegion": "us-east-1"
    }   
  ]
}

For downstream testing, we define the event as either a one or two race event.

In [0]:
event = event_1

We check the type of object this event is.

In [25]:
type(event)

dict

## Function Flow
We will now run though each function to verify correct process flow. We will not include the logging calls.

### Lambda Handler For Loop
The following code chuck is the FOR loop at the start of the lambda handle event. This loop creates a list of race names contained in the event.

In [26]:
race = []
for record in event['Records']:
  #print(record)
  body = record['body']
  print(body)
  race_name = body['Races']
  print(race_name)

  race.append(race_name)
  print(race)

{'Races': 'Western States Endurance Run'}
Western States Endurance Run
['Western States Endurance Run']


### Generate Wikipedia Snippet
'The next code chunk takes the list of races as an input. It then generates the wikipedia entry for each race, and populates a dictionary with a key/value pair of race and race wikipedia entry.


In [0]:
def races_to_wikipedia(race_list):
    race_dict = {}
    wikipedia_snippit = []
    n = 0
    for race in race_list:
        wikipedia_snippit.append(wikipedia.summary(race, sentences=1))
        race_dict.update({race:wikipedia_snippit[n]})
        n += 1
    return race_dict

In [28]:
race_dict = races_to_wikipedia(race)
pprint.pprint(race_dict)

{'Western States Endurance Run': 'The Western States Endurance Run, known '
                                 'commonly as the Western States 100, is a '
                                 '100-mile (161 km) ultramarathon that takes '
                                 "place on California's Sierra Nevada "
                                 'Mountains trails each year on the last full '
                                 'weekend of June.'}


### Perform Entity Analysis on the Wikipedia Snippet
The next code chunk takes the race/wiki dictionary and performs entity analysis on the wikiepdia entry. The chunk then returns a dictionary with key/value pair of race and entity dictionary.


In [0]:
def entity_detect(race_dict):
    entity_dict = {}
    # Extract Entities from Wikipedia Snippet
    for race in race_dict:
        entry_summary = race_dict[race]
        payload = comprehend.detect_entities(Text = entry_summary, 
                                             LanguageCode = 'en')
        entity = payload['Entities']
        entity_dict.update({race:entity})
        
    return entity_dict

In [30]:
entity_dict = entity_detect(race_dict)
pprint.pprint(entity_dict)

{'Western States Endurance Run': [{'BeginOffset': 4,
                                   'EndOffset': 32,
                                   'Score': 0.9937021136283875,
                                   'Text': 'Western States Endurance Run',
                                   'Type': 'EVENT'},
                                  {'BeginOffset': 56,
                                   'EndOffset': 74,
                                   'Score': 0.9960070848464966,
                                   'Text': 'Western States 100',
                                   'Type': 'EVENT'},
                                  {'BeginOffset': 81,
                                   'EndOffset': 89,
                                   'Score': 0.999893844127655,
                                   'Text': '100-mile',
                                   'Type': 'QUANTITY'},
                                  {'BeginOffset': 91,
                                   'EndOffset': 97,
                             

### Write Entity Analysis to  S3 Bucket
The following code chuck will take the dictionary of race/entity reports and write them to an S3 bucket as a CSV file.

In [0]:
def write_s3(race_dict_ent, bucket):
    """Write S3 Bucket"""
    
    for race in race_dict_ent:
        df = pd.DataFrame.from_dict(race_dict_ent[race])
        
        # Re-order Columns
        df = df[['Text', 'Type', 'Score', 'BeginOffset', 'EndOffset']]
        
        # Convert Daataframe to CSV
        csv_buffer = io.StringIO()
        df.to_csv(csv_buffer, index = False)

        race = race.lower().replace(' ', '_')

        # Send to S3
        s3_resource = session.resource('s3')
        res = s3_resource.Object(bucket, f'entity_{race}.csv').\
            put(Body=csv_buffer.getvalue())
        #LOG.info(f"result of write name: {race} to bucket: {bucket} with:\n {res}")


In [0]:
bucket = "dsba-6190-project4-serverless-data-engineering-pipeline"

write_s3(entity_dict, bucket)