CSCI E-599 Setup of *PubMiner* backend application
=====


`setup.ipynb` 5 May 2018
===


---

<a id='back-to-top'></a>


### [*Preliminaries*](#preliminaries)

### [*Creation of the DynamoDB databases*](#creation)
- [Creation of the dynamodb table called `demographics`](#create_table)
- [Creation of the metadata table called `demographics_meta`](#create_meta)  

### [*Creation of S3 buckets*](#management)
- [Bucket `pubmedcentral_oa`](#create_pubmedcentral_oa)
- [Bucket `pubminer-upload`](#create_pubminer_upload)

### [*(optional) Population of the DynamoDB and S3*](#populate)
- [Get the list of PMCIDs and PMIDs](#getPMCIDs)   
- [Batch populate the table with batch write](#batch_from_web)   
- [Populate the table from JSON](#populate_from_json)  

### [*Creation of Lambda function packages*](#lambda)
- [GetPMCUpdatesFromCSV](#getupdatePMC)   
- [DownloadPMC-OAFileToS3](#download)
- [SentenceMinerOnEC2Instance](#sentence)
- [TruncateTable](#truncate)
- [UpdateStatsInDemographicsMeta](#stats)

### [*Cloud Formation template*](#formation)
- [Create the template](#CFtemplate)   
- [Upload lamda packages to `pubminer_upload`](#uploadLambdas)
- [Launch the Cloud Formation stack](#launch_stack)
- [Tear down the Cloud Formation stack](#tear_down_stack)

### [*Testing the installation*](#testing)
- [Test events for AWS Lambda functions](#Testlambda)
- [BeautifulSoup tests](#crummytest)   

-------

Backend services on AWS:  DynamoDB, S3 buckets, and AWS Lambda setup
-----

The backend elements are running on AWS using the services AWS Lambda, DynamoDB, IAM, S3, EC2, Cloud Formation and CloudWatch. The installation and configuration of the DynamoDB tables and the S3 buckets has been performed with the boto3 library in Python 3.6. The IAM roles and policies, the AWS Lambda functions and the associated event triggers are created with a AWS Cloud Formation template. The entire backend can be installed from scratch using Python and boto3, first creating the database tables and S3 buckets (or using the ones in place, as “pubmedcentral_oa” is about 40 GB) and then launching the Cloud Formation template with boto3. Every step is provided with instructions in a Jupyter notebook setup.ipynb.

Currently all of the AWS services are running from a single account, and the AWS IAM service has been used to configure access for all team members, through a user Group and Role permissions. While the aspects of access management for our customer are out of scope for the project, a similar Group/Role approach can be taken after handover. The relevant sections of the Cloud Formation template would need adjusting for the Region and AccountID references.  As such, the code in this notebook will only work if run with the appropriate access to the account.



[back-to-top](#back-to-top)
<a id='preliminaries'></a>

*Preliminaries - libraries to load*  
=====


In [1]:
import os
import sys
import json
from datetime import datetime
from urllib.request import urlopen 
from io import StringIO
import csv
import decimal


import boto3
from boto3 import resource
from boto3.dynamodb.conditions import Key

from datetime import datetime
import bs4 as bs
import glob
import xml.etree.ElementTree as ET
from lxml import etree
from lxml import html
from unidecode import unidecode
from lxml.etree import tostring

In [2]:
## Markdown CSS
from IPython.core.display import HTML
HTML("""
<style>

div.cell { 
    margin-top:1em;
    margin-bottom:1em;
}

div.text_cell_render h1 {
    font-size: 1.8em;
    line-height:1.2em;
    text-align:center;
}

div.text_cell_render h2 {
margin-bottom: -0.2em;
}

table tbody tr td:first-child, 
table tbody tr th:first-child, 
table thead tr th:first-child, 
table tbody tr td:nth-child(4), 
table thead tr th:nth-child(4) {
    background-color: #edf4e8;
}

div.text_cell_render { 
    font-family: 'Garamond';
    font-size:1.4em;
    line-height:1.3em;
    padding-left:3em;
    padding-right:3em;
}

div#notebook-container    { width: 95%; }
div#menubar-container     { width: 65%; }
div#maintoolbar-container { width: 99%; }

</style>
""")

[back-to-top](#back-to-top)
<a id='creation'></a>

*Creation of the DynamoDB databases*  
=====

The DynamoDB tables “demographics” and “demographics_meta”  are created with just the primary key elements, and throughput capacities. The remaining attributes are created populated by the Lambda functions. The tables are created in setup.ipynb using boto3. There are also a full set of maintenance functions in the Jupyter notebook create_table.ipynb (batch populate, inquiring about the tables, database queries, updating, deleting items, etc.).




<a id='create_table'></a>


Creation the DynamoDB table called `demographics`
----


------

This cell creates the main DynamoDB table that will hold the demographic inforamtion on the articles. The table will be created in the **us-east-1** AWS Region. 

In [6]:
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')

try:
    table = dynamodb_client.create_table(
        TableName='demographics',
        KeySchema=[
            {
                'AttributeName': 'pmcid', 
                'KeyType': 'HASH'
            }
        ], 
        AttributeDefinitions=[
            {
                'AttributeName': 'pmcid', 
                'AttributeType': 'S'
            }
        ], 
        ProvisionedThroughput={
            'ReadCapacityUnits': 50, 
            'WriteCapacityUnits': 50
        },
        StreamSpecification={
        'StreamEnabled': True,
        'StreamViewType': 'NEW_AND_OLD_IMAGES'
        }
    )

    dynamodb_client.get_waiter('table_exists').wait(TableName='demographics')
    print("Table status:",  table['TableDescription']['TableStatus'])
    print("Item count:", table['TableDescription']['ItemCount'])
    
except dynamodb_client.exceptions.ResourceInUseException:
    print("Table in use error - do you really want to recreate the table?")
    pass
except OSError as err:
    print("OS error: {0}".format(err))
except ValueError:
    print("Could not convert data to an integer.")
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

Table status: CREATING
Item count: 0


[back-to-top](#back-to-top)
<a id='create_meta'></a>

Creation of the metadata table called `demographics_meta`
----

-----

This cell creates the DynamoDB table that will hold the metadata concerning the database updates. The table will be created in the us-east-1 region. 

In [7]:
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')

try:
    table = dynamodb_client.create_table(
        TableName='demographics_meta',
        KeySchema=[
            {
                'AttributeName': 'source', 
                'KeyType': 'HASH'
            }
        ], 
        AttributeDefinitions=[
            {
                'AttributeName': 'source', 
                'AttributeType': 'S'
            },
        ], 
        ProvisionedThroughput={
            'ReadCapacityUnits': 5, 
            'WriteCapacityUnits': 5
        },
        StreamSpecification={
        'StreamEnabled': True,
        'StreamViewType': 'NEW_AND_OLD_IMAGES'
        }
    )

    dynamodb_client.get_waiter('table_exists').wait(TableName='demographics_meta')
    print("Table status:",  table['TableDescription']['TableStatus'])
    print("Item count:", table['TableDescription']['ItemCount'])
    
except dynamodb_client.exceptions.ResourceInUseException:
    print("Table in use error - do you really want to recreate the table?")
    pass
except OSError as err:
    print("OS error: {0}".format(err))
except ValueError:
    print("Could not convert data to an integer.")
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

Table status: CREATING
Item count: 0


[back-to-top](#back-to-top)
<a id='management'></a>

*Creation of S3 buckets*  
=====


The S3 bucket “pubmedcentral_oa” holds all of the XML files for the Open Access articles which are the sources of information in the database (these are .nxml files). The S3 bucket "pubminer_upload" contains the configuration files, such as the Cloud Formation template, the Sentence Miner application jar file, and the source packages for the AWS Lambda functions. Both of these S3 buckets are created using boto3 in the cells below. 

<a id='create_pubmedcentral_oa'></a>

S3 Bucket `pubmedcentral`
------

----


In [10]:
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket='pubmedcentral_oa')
s3_resource.Bucket('pubmedcentral_oa').Acl().put(ACL='public-read')

{'ResponseMetadata': {'HTTPHeaders': {'content-length': '0',
   'date': 'Thu, 10 May 2018 18:54:23 GMT',
   'server': 'AmazonS3',
   'x-amz-id-2': 'UcUsj4SUWmU0Pve7CcpLVnE344u+wLy36j1Lz2656Bu18mKM8wkKN+U9kKCVMHFiEMP9F+W0Spw=',
   'x-amz-request-id': '3030D16D51FBBA7A'},
  'HTTPStatusCode': 200,
  'HostId': 'UcUsj4SUWmU0Pve7CcpLVnE344u+wLy36j1Lz2656Bu18mKM8wkKN+U9kKCVMHFiEMP9F+W0Spw=',
  'RequestId': '3030D16D51FBBA7A',
  'RetryAttempts': 0}}

<a id='create_pubminer_upload'></a>

S3 Bucket `pubminer-upload`
------

----


In [None]:
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket='pubminer-upload')
s3_resource.Bucket('pubminer-upload').Acl().put(ACL='public-read')

### Delete the contents and the S3 bucket

In [43]:
s3_resource = boto3.resource('s3')
for key in s3_resource.Bucket('pubmedcentral_oa').objects.all():
    key.delete()
s3_resource.Bucket('pubmedcentral_oa').delete()

{'ResponseMetadata': {'HTTPHeaders': {'date': 'Thu, 10 May 2018 19:17:24 GMT',
   'server': 'AmazonS3',
   'x-amz-id-2': 'HjfaPUYJkDly7ZneaAKB3a5IpYNd5sJSRhVST3UMU2WwqZYB7PgscabkX65APa94AZy1IH/2vgk=',
   'x-amz-request-id': 'A0B20433B3BB39C7'},
  'HTTPStatusCode': 204,
  'HostId': 'HjfaPUYJkDly7ZneaAKB3a5IpYNd5sJSRhVST3UMU2WwqZYB7PgscabkX65APa94AZy1IH/2vgk=',
  'RequestId': 'A0B20433B3BB39C7',
  'RetryAttempts': 0}}

In [None]:
s3_resource = boto3.resource('s3')
for key in s3_resource.Bucket('pubminer-upload').objects.all():
    key.delete()
s3_resource.Bucket('pubminer-upload').delete()

[back-to-top](#back-to-top)
<a id='management'></a>

*(optional) Population of the DynamoDB tables, and the necessary S3 files*  
=====


**The followinig two sections are optional**, to initially populate the `demographics` DynamoDB table. These steps are only necessary if not using the existing database.    


If the entire backend needs to be installed from scratch, the demographics table can be populated, if necessary, for the entire set of PMCIDs, using the Python functions in the cell below (since an AWS Lambda function GetPMCUpdatesFromCSV of such magnitude would time out). Once the primary key items for “pmcid” are populated, the rest of the backend will automatically generate from the sequence of Lambda functions, triggered initially from the stream for the demographics DynamoDB table.


[back-to-top](#back-to-top)
<a id='populate'></a>

Get the list of articles to populate the database table
----------

-----

Prior to populating the database, the full set of PMCIDs and PMIDs needs to be created, in the following cell (derived from the GetPMCUpdatesFromCSV Lambda function).

In [None]:
# Get a JSON data for the PMCIDs and PMIDs
NUM_DAYS = 2
def grabfrompubmed(num_days_delay):

    ## Query pubmed with e-search  - returns JSON
    ##
    # build the query string
    query = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils//esearch.fcgi?db=pubmed&retmax=4659616&retmode=json"
    query += "&datetype=edat&reldate=" + str(num_days_delay)
    query += "&term=((randomized+controlled+trial%5Bpt%5D)+OR+(controlled+clinical+trial%5Bpt%5D)+OR+(randomized%5Btiab%5D+OR+randomised%5Btiab%5D)+OR+(placebo%5Btiab%5D)+OR+(drug+therapy%5Bsh%5D)+OR+(randomly%5Btiab%5D)+OR+(trial%5Btiab%5D)+OR+(groups%5Btiab%5D))+NOT+(animals%5Bmh%5D+NOT+humans%5Bmh%5D)"

    # grab the content from pubmed with e-search
    print("Getting the OA CSV from NCMB FTP site ...")
    r = urlopen(query) 
    resp = json.loads(r.read().decode(r.info().get_param('charset') or 'utf-8'))
    
    # extract the list of target pubmed ids (pmids)
    target_pmids = resp['esearchresult']['idlist']
    print("\nSample of the first 10 pmids returned by search:")
    print(target_pmids[:10])
    print("Search of PubMed returned {} results".format(len(target_pmids)))
    
    ## Grab the OA file list from the NCBI FTP site
    ##
    print("\nGetting the OA CSV from NCMB FTP site ...")
    url = 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_file_list.csv'
    data = urlopen(url).read().decode('ascii', 'ignore')
    dataFile = StringIO(data)
    csvReader = csv.reader(dataFile)
    
    targs = set(target_pmids)
    merged_pmids = []
    merged_pmcids = []
    
    # scan through the pairs of pubmed ids (pmids) and pubmed pmc oa ids (pmcids)  
    # from the csv file and keep only those in the target list
    for row in csvReader:
        pmid = row[4]
        if pmid in targs:
            merged_pmids.append(pmid)
            merged_pmcids.append(row[2][3:])
    
    # get the number of articles to update
    num_updates = len(merged_pmcids)
    print("\nNumber of PMC OpenAccess updates: {}".format(num_updates))
    
    # some variables to hold todays date - both long and short format        
    now = str(datetime.now().strftime("%Y-%m-%d"))
    nowlong = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    ## Create the JSON objects - one for a JSON file and the other for immediately populatin 
    ##
    ##############################################
    
    # create a JSON object with the status elements for the demographics_meta table
    # it will also be saved to the S3 bucket, for good measure
    jsondata = {
        "date_updated": nowlong, 
        "pmcids": merged_pmcids,
        "pmids": merged_pmids,
        "items": str(num_updates)
    }

    # create an list that contains the items to update in the demographics table
    jsonitem = []
    for i in range(num_updates):
        jsonitem.append(
            {
            "pmcid": merged_pmcids[i],
            "pmid":  merged_pmids[i],
            "date_processed":  now
            }) 
    return jsondata, jsonitem
    
def puts3item(jsondata, filename):
    # function to put the JSON file of newRCTs.json to the S3 bucket for sentence miner
    # and also to put the full JSON file of metadata to the S3 bucket for safekeeping
    s3bucketfile = filename
    s3bucket = "pubminer-upload"
    s3_resource = boto3.resource('s3')
    obj = s3_resource.Object(s3bucket,s3bucketfile)
    obj.put(Body=json.dumps(jsondata), ACL='public-read')
    return "Success"

def putdbitem(jsonitem):
    # function to put the items from the list to the dynamodb table 
    dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
    
    table = dynamodb_resource.Table('demographics')
    for i in range(len(jsonitem)):
        table.put_item(
            Item=jsonitem[i]
        )
    return "Success"

def putdbmetaitem(jsondata):
    # function to update the attributes of the demographics item in the dynamodb meta table 
    dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
    table = dynamodb_resource.Table('demographics')
    total_count = table.item_count
    table = dynamodb_resource.Table('demographics_meta')
    
    table.update_item(
        Key={'source': 'demographics'},
        UpdateExpression="set date_updated=:u, items_updated=:i, items_downloaded=:d, items_total=:m, with_tables=:t, with_sentences=:s, pmcids=:c, pmids=:p", 
        ExpressionAttributeValues={
            ':u': str(jsondata['date_updated']),
            ':i': decimal.Decimal(jsondata['items']),
            ':d': decimal.Decimal(0),
            ':m': total_count,
            ':t': decimal.Decimal(0),
            ':s': decimal.Decimal(0),       
            ':c': jsondata['pmcids'],
            ':p': jsondata['pmids'],            
            },
        ReturnValues="UPDATED_NEW"
    )
    return "Success"

try:
    jsondata, jsonitem = grabfrompubmed(NUM_DAYS)
    num_updates = len(jsondata['pmcids'])
    if num_updates > 0:
        if not putdbitem(jsonitem):
            raise Exception('Writing updates to demographics DB failed')
        if not putdbmetaitem(jsondata):
            raise Exception('Writing updates to demographics_meta failed')
        if not puts3item(jsondata['pmcids'], "newRCTs.json"):
            raise Exception('Writing newRCTs.json to S3 failed')
        # put the full meta item in the S3 bucket
        if not puts3item(jsondata, "new_item.updates"):
            raise Exception('Writing new_item.updates to S3 failed')

except:
    print('\nGrab updates function failed!')
    raise
else:
    print('\nGrab updates function passed!')
    print(str(datetime.now()))
finally:
    print('Grab updates function complete at {}'.format(datetime.now()))

### Dump the JSON file to disk

In [None]:
class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)
    
data_to_dump = json.dumps(jsonitem, cls=DecimalEncoder)
f = open("output_dict.json","w")
f.write(data_to_dump)
f.close()

[back-to-top](#back-to-top)
<a id='batch_from_web'></a>


Batch populate the table with batch write
------

----


### Batch put from the JSON object `jsonitem`

#### *This is what has populated the current table*

In [None]:
dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table('demographics')

with table.batch_writer() as batch:
    for i in range(len(demographic_json)):
        batch.put_item(
            Item=jsonitem[i]
        )
        if (i % 1000 == 0):
            print ("The item_count is: %s " % (i))

[back-to-top](#back-to-top)
<a id='populate_from_json'></a>


Populate the table from JSON
------


----


### Normal  put from the JSON object

In [None]:
dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table('demographics')

for i in range(len(demographic_json)):
    table.put_item(
        Item=jsonitem[i]
    )
#    if (i % 10 == 0):
#        print ("The item_count is: %s " % (i))

### Normal  put from a JSON file

In [None]:
dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
input_file = "output_dict.json"

try:
    table = dynamodb_resource.Table('demographics')
    print("Instantiate a table: ",table.creation_date_time)
    print("Ready to load data\n")
    incr = 0
    with open(input_file) as json_file:
        itemset = json.load(json_file, parse_float = decimal.Decimal)
        for item in itemset:
            incr += 1
            table.put_item(
               Item={
                   'pmcid': item,
                   'pmcid': pmcid,
                   'date_processed': str(datetime.now().strftime("%Y-%m-%d"))
                }
            )
            
except dynamodb_client.exceptions.ResourceNotFoundException:
    print("Table does not exist")
    pass            
except OSError as err:
    print("OS error: {0}".format(err))
except ValueError:
    print("Could not convert data to an integer.")
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

[back-to-top](#back-to-top)
<a id='lambda'></a>

*Creation of Lambda function packages*  
=====

The Lambda functions can be setup using the an AWS Cloud Formation template. The template, as well as the Lambda function packages are created using Python 3.6 in the cells below. These cells are used to create (and adjust if necessary) the .py files for the  Lambda functions and can then be used to upload them to S3. Similarly, the Cloud Formation template is created and uploaded to S3 from the notebook, and then the Cloud Formation stack is launched using boto3.   

Each Lambda function is described in a dedicated section, along with the trigger events, and the example ARN for the running platform. A brief overview of the Lambda functions follows:   

- “GetPMCUpdatesFromCSV” does the search, intersection of PMIDs and PMCIDs for the Open Access subset, and the initial population of items in the demographics and demographics_meta tables.    
- “DownloadPMC-OAFileToS3” does the download of files from PMC-OA and the upload to S3.    
- “TruncateTable” does the table truncation. The AWS Lambda package includes the *BeautifulSoup4* and *lxml* packages, which are not included by default on Lambda.    
- The sentence extraction is performed on an EC2 instance, which is launched with a “user-data” script to perform the installation of the packages and necessary .jar file to run. The file “SentenceMinerOnEC2Instance” does these steps.    
- The function "UpdateStatsInDemographicsMeta" does a final update to the demographics_meta dynamodb table with the statistics of the latest update to the demographics table

[back-to-top](#back-to-top)
<a id='getupdatePMC'></a>

GetPMCUpdatesFromCSV
------

----


Does the search, intersection of PMIDs and PMCIDs for the Open Access subset, and the initial population of items in the demographics and demographics_meta tables.   

### Associated Role & Policy
`PM_lambda_dynamo_S3_role`


### trigger:     
`ScheduleS3AndDynamodbUpdateEvent`  
`Schedule expression: cron(00 6 * * ? *)Description: ScheduleS3AndDynamodbUpdateEvent`  

### test event:
`myTestEvent`  (Vanilla trigger)
{
  "key3": "value3",
  "key2": "value2",
  "key1": "value1"
}

In [11]:
%%writefile ./lambda_update_project/lambda_function.py
import os
import json
from datetime import datetime
from urllib.request import urlopen 
from io import StringIO
import csv
import boto3
import decimal

NUM_DAYS = os.environ['number_days'] 

def grabfrompubmed(num_days_delay):
    ##############################################
    ##
    ## Query pubmed with e-search  - returns JSON
    ##
    ##############################################
    
    # build the query string
    query = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils//esearch.fcgi?db=pubmed&retmax=4659616&retmode=json"
    query += "&datetype=edat&reldate=" + str(num_days_delay)
    query += "&term=((randomized+controlled+trial%5Bpt%5D)+OR+(controlled+clinical+trial%5Bpt%5D)+OR+(randomized%5Btiab%5D+OR+randomised%5Btiab%5D)+OR+(placebo%5Btiab%5D)+OR+(drug+therapy%5Bsh%5D)+OR+(randomly%5Btiab%5D)+OR+(trial%5Btiab%5D)+OR+(groups%5Btiab%5D))+NOT+(animals%5Bmh%5D+NOT+humans%5Bmh%5D)"

    # grab the content from pubmed with e-search
    print("Getting the OA CSV from NCMB FTP site ...")
    r = urlopen(query) 
    resp = json.loads(r.read().decode(r.info().get_param('charset') or 'utf-8'))
    
    # extract the list of target pubmed ids (pmids)
    target_pmids = resp['esearchresult']['idlist']
    print("\nSample of the first 10 pmids returned by search:")
    print(target_pmids[:10])
    print("Search of PubMed returned {} results".format(len(target_pmids)))
    
    ##############################################
    ##
    ## Grab the OA file list from the NCBI FTP site
    ##
    ##############################################
    print("\nGetting the OA CSV from NCMB FTP site ...")
    url = 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_file_list.csv'
    data = urlopen(url).read().decode('ascii', 'ignore')
    dataFile = StringIO(data)
    csvReader = csv.reader(dataFile)
    
    targs = set(target_pmids)
    merged_pmids = []
    merged_pmcids = []
    
    # scan through the pairs of pubmed ids (pmids) and pubmed pmc oa ids (pmcids)  
    # from the csv file and keep only those in the target list
    for row in csvReader:
        pmid = row[4]
        if pmid in targs:
            merged_pmids.append(pmid)
            merged_pmcids.append(row[2][3:])
    
    # get the number of articles to update
    num_updates = len(merged_pmcids)
    print("\nNumber of PMC OpenAccess updates: {}".format(num_updates))
    
    # some variables to hold todays date - both long and short format        
    now = str(datetime.now().strftime("%Y-%m-%d"))
    nowlong = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    
    ##############################################
    ##
    ## Create the objects for the DB tables and S3
    ##
    ##############################################
    
    # create a JSON object with the status elements for the demographics_meta table
    # it will also be saved to the S3 bucket, for good measure
    jsondata = {
        "date_updated": nowlong, 
        "pmcids": merged_pmcids,
        "pmids": merged_pmids,
        "items": str(num_updates)
    }

    # create an list that contains the items to update in the demographics table
    jsonitem = []
    for i in range(num_updates):
        jsonitem.append(
            {
            "pmcid": merged_pmcids[i],
            "pmid":  merged_pmids[i],
            "date_processed":  now
            }) 
    return jsondata, jsonitem
    
def puts3item(jsondata, filename):
    # function to put the JSON file of newRCTs.json to the S3 bucket for sentence miner
    # and also to put the full JSON file of metadata to the S3 bucket for safekeeping
    s3bucketfile = filename
    s3bucket = "pubminer-upload"
    s3_resource = boto3.resource('s3')
    obj = s3_resource.Object(s3bucket,s3bucketfile)
    obj.put(Body=json.dumps(jsondata), ACL='public-read')
    return "Success"

def putdbitem(jsonitem):
    # function to put the items from the list to the dynamodb table 
    dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
    
    table = dynamodb_resource.Table('demographics')
    for i in range(len(jsonitem)):
        table.put_item(
            Item=jsonitem[i]
        )
    return "Success"

def putdbmetaitem(jsondata):
    # function to update the attributes of the demographics item in the dynamodb meta table 
    dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
    table = dynamodb_resource.Table('demographics')
    total_count = table.item_count
    table = dynamodb_resource.Table('demographics_meta')
    
    table.update_item(
        Key={'source': 'demographics'},
        UpdateExpression="set date_updated=:u, items_updated=:i, items_downloaded=:d, items_total=:m, with_tables=:t, with_sentences=:s, pmcids=:c, pmids=:p", 
        ExpressionAttributeValues={
            ':u': str(jsondata['date_updated']),
            ':i': decimal.Decimal(jsondata['items']),
            ':d': decimal.Decimal(0),
            ':m': total_count,
            ':t': decimal.Decimal(0),
            ':s': decimal.Decimal(0),       
            ':c': jsondata['pmcids'],
            ':p': jsondata['pmids'],            
            },
        ReturnValues="UPDATED_NEW"
    )
    return "Success"

def lambda_handler(event, context):
    print('\nChecking for new articles at PubMed...')

    try:
        jsondata, jsonitem = grabfrompubmed(NUM_DAYS)
        num_updates = len(jsondata['pmcids'])
        if num_updates > 0:
            if not putdbitem(jsonitem):
                raise Exception('Writing updates to demographics DB failed')
            if not putdbmetaitem(jsondata):
                raise Exception('Writing updates to demographics_meta failed')
            if not puts3item(jsondata['pmcids'], "newRCTs.json"):
                raise Exception('Writing newRCTs.json to S3 failed')
            # put the full meta item in the S3 bucket
            if not puts3item(jsondata, "new_item.updates"):
                raise Exception('Writing new_item.updates to S3 failed')
                
    except:
        print('\nGrab updates function failed!')
        raise
    else:
        print('\nGrab updates function passed!')
        return str(datetime.now())
    finally:
        print('Grab updates function complete at {}'.format(datetime.now()))

Overwriting ./lambda_update_project/lambda_function.py


[back-to-top](#back-to-top)
<a id='download'></a>

DownloadPMC-OAFileToS3
------

----


Does the download of files from PMC-OA and the upload to S3.     

### Associated Role & Policy
`PM_lambda_dynamodb_execution_role`


### trigger:     
`DynamodbUpdateEvent`  
`demographics`  table stream for updates


### test event:
`UpdateToDynamoDB`  (see testing)


In [12]:
%%writefile ./lambda_download_project/lambda_function.py
import os
from datetime import datetime
import xml.etree.ElementTree as ET
from urllib.request import urlopen
import boto3
import glob
import tarfile
import json
import decimal

s3_client = boto3.client('s3')
save_bucket_name = 'pubmedcentral_oa'
dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table('demographics')
status_table = dynamodb_resource.Table('demographics_meta')

#s3bucketfile = "new_updates.json"

# test tar.gz to see if the unzipping from the website works
#thetarfile = "ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_package/ca/f2/PMC5855623.tar.gz"

# the url for getting the file references
url = 'http://www.ncbi.nlm.nih.gov/pmc/utils/oa/oa.fcgi?id=PMC'

# test list of pmcids to test the download function outside the dynamodb stream
#pmcidlist = ['3778263', '4473156', '5771285', '4264693', '4210730', '4578256', '5424460', '5541470', '4634690', '4548150']
#pmcidlist = ['5838818', '5869227', '5869228', '2842548']

def do_the_download(pmcidlist):
    print("Downloading the articles from the website...")
    for pmcid in pmcidlist:
        try:
            if not download_file_from_ftp(url, pmcid):
                raise Exception('Download from FTP failed - decrementing counter and deleting item')
            else:
                if not upload_file_to_s3(pmcid):
                    raise Exception('Upload to S3 failed')
                else:
                    # increment the items_downloaded counter on the metadata table
                    if not increment_counter(status_table):
                        raise Exception('Incrementing metadata table failed')
        except Exception as inst:
            print(inst)
            # remove item and decrement the items_updated counter on the metadata table
            if not decrement_counter(status_table, pmcid):
                print('decrementing metadata table failed')
            if not remove_from_database(table, pmcid):
                print('Removal from database table failed')
    return "Success"
                
def download_file_from_ftp(url, pmcid):
    # assemble the query url
    query = url + pmcid
    # grab the page with urlopen
    page = urlopen(query).read()
    #print(page)
    # get the xml tree from the returned page
    tree = ET.fromstring(page)
    #print(tree)
    # find the link element for the tar.gz file
    link = tree.find(".//link[@format='tgz']")
    if link is not None:
        thetarfile = link.get('href')
        print(thetarfile)
        # use urlopen again to get the file from the FTP site as a ftpstream
        ftpstream = urlopen(thetarfile)
        thetarfile = tarfile.open(fileobj=ftpstream, mode="r|gz")
        # get the .nxml file out of the zipped file, and save to the /tmp folder
        for tarinfo in thetarfile:
            if (tarinfo.name[-4:] == 'nxml'):
                thetarfile.extract(tarinfo, path='/tmp/')
        thetarfile.close()
        print("FTP download success")
        return "Success"
    else:
        print("FTP download failed")
                
def upload_file_to_s3(pmcid):
    # get the full path and name of the file from the /tmp folder
    filename_in = str(glob.glob('/tmp/PMC'+str(pmcid)+'/*.nxml')[0])
    print(filename_in)
    # set the format of the filename for the S3 bucket
    filename_out = 'PMC'+str(pmcid)+ '.nxml'
    # upload the file to S3
    try:
        s3_client.upload_file(filename_in, save_bucket_name, filename_out, ExtraArgs={'ACL': 'public-read'})
        print('S3 upload passed!')
        return "Success"
    except:
        print('S3 upload failed!')
        
def puts3item(jsondata, filename):
    # function to put the JSON file of newRCTs.json to the S3 bucket for sentence miner
    # and also to put the full JSON file of metadata to the S3 bucket for safekeeping
    s3bucketfile = filename
    s3bucket = "pubminer-upload"
    s3_resource = boto3.resource('s3')
    obj = s3_resource.Object(s3bucket,s3bucketfile)
    obj.put(Body=json.dumps(jsondata), ACL='public-read')
    return "Success"

def increment_counter(table):
    # increment the counter on items_downloaded in the demographics_meta table
    response = table.update_item(
        Key={
            'source': 'demographics'
        },
        UpdateExpression="set items_downloaded = items_downloaded + :val",
        ExpressionAttributeValues={
            ':val': decimal.Decimal(1)
        },
        ReturnValues="UPDATED_NEW"
    )
    print("Incremented download counter")
    return "Success"
    
def decrement_counter(table, pmcid):
    # function to make modification to the demographics_meta table
    # first get the current demographics item and see if there are updates
    print("Decrementing counter...")
    response = table.get_item(
                    Key={
                        'source': 'demographics'
                    }
                )
    current_status = response['Item']
    #print("response", current_status['pmcids'])
    if (current_status['items_updated'] > 0 ):

        # first get the list of PMCIDs and PMIDs
        pmcids = current_status['pmcids']
        pmids = current_status['pmids']
        try:
            # make modification to the demographics_meta table
            popind =  pmcids.index(pmcid)
            # take out the PMCID
            pmcids.remove(pmcid)
            # take out the PMID
            pmids.pop(popind)
            # decrement the counter on items_uploaded 
            response = table.update_item(
                Key={
                    'source': 'demographics'
                },
                UpdateExpression="set items_updated=items_updated-:val, pmcids=:l, pmids=:m",
                ExpressionAttributeValues={
                    ':val': decimal.Decimal(1),
                    ':l': pmcids,
                    ':m': pmids
                },
                ReturnValues="UPDATED_NEW"
            )
            jsondata = {
                "time_processed": current_status['time_processed'], 
                "pmcids": pmcids,
                "pmids": pmids,
                "items": str(len(pmcids))
            }
            if not puts3item(jsondata, "new_item.updates"):
                raise Exception('Writing new_item.updates to S3 failed')
            if not puts3item(jsondata['pmcids'], "newRCTs.json"):
                raise Exception('Writing newRCTs.json to S3 failed')
            print("Decremented upload counter")
        except ValueError:
            print("Decrement upload counter not necessary")
        except Exception as inst:
            print(inst)
    else: 
        print("No changes to demographics_meta table necessary")
    return "Success"

def remove_from_database(table, pmcid):
    print("Removing from database...")
    try:
        response = table.get_item(Key={'pmcid': pmcid})["Item"]
        print(response)
    except KeyError:
        print("PMCID not in database - OK")
    except:
        print("Error deleting PMCID from database")
    else: 
        print("Deleting unavailable PMCID from database")
        response = table.delete_item(Key={'pmcid': pmcid})
        #print(response)
    finally:
        print("Delete PMCID from DB finished")
        return "Success"
    
def lambda_handler(event, context):
    print('Receiving {} records ...'.format(len(event['Records'])))
    pmcidlist = []
    for record in event['Records']:
        print(record)
        # add to the list if the record is an INSERT
        if (record['eventName'] == 'INSERT') :
            print("Update from Insert")
            print("PMC"+record['dynamodb']['Keys']['pmcid']['S'])
            pmcidlist.append(record['dynamodb']['Keys']['pmcid']['S'])
        # add to the list if the record is an MODIFY which lost information   
        elif (record['eventName'] == 'MODIFY') :
            if (len(record['dynamodb']['NewImage']) < len(record['dynamodb']['OldImage'])):
                print("Update from Modify")
                print("PMC"+record['dynamodb']['Keys']['pmcid']['S'])
                pmcidlist.append(record['dynamodb']['Keys']['pmcid']['S'])
        else:
            print('No update of table')
    # download the articles from the list of pmcids
    try:
        if (len(pmcidlist) > 0):
            if not do_the_download(pmcidlist):
                raise Exception('Download failed')
        else: 
            return
    except:
        print('\nDownloading of article to S3 failed!')
        raise
    else:
        print('\nDownloadPMC-OAFileToS3 passed!')
    finally:
        print('\nDownloadPMC-OAFileToS3 complete at {}'.format(str(datetime.now())))
        

Overwriting ./lambda_download_project/lambda_function.py


[back-to-top](#back-to-top)
<a id='sentence'></a>

SentenceMinerOnEC2Instance
------

----


The sentence extraction is performed on an EC2 instance, which is launched with a “user-data” script to perform the installation of the packages and necessary .jar file to run. The file “SentenceMinerOnEC2Instance” does these steps. 

### Associated Role & Policy
`PM_lambda_start_stop_ec2`


### trigger:     
`DynamodbUpdateEvent`  
`demographics_meta`  table stream for updates


### test event:
`DynamoTriggerExtraction`  (see testing)


**NOTE: Replace Account ID in XXXXXXXXXXXX below** 

In [13]:
%%writefile ./lambda_sentences_project/lambda_function.py
import boto3
from datetime import datetime

REGION = 'us-east-1' 
AMI = 'ami-467ca739'
INSTANCE_TYPE = 't2.xlarge'
instance_profile_arn = 'arn:aws:iam::XXXXXXXXXXXX:instance-profile/PubMinerInstanceWithSSM'

ec2 = boto3.resource('ec2', region_name=REGION)

def lambda_handler(event, context):

    print('Receiving {} records ...'.format(len(event['Records'])))

    for record in event['Records']:
        print(record)
        if (record['eventName'] == 'MODIFY') :
            if (record['dynamodb']['NewImage']['items_downloaded']['N'] > record['dynamodb']['OldImage']['items_downloaded']['N']):
                if (record['dynamodb']['NewImage']['items_downloaded']['N'] == record['dynamodb']['NewImage']['items_updated']['N']):                
                    print("update from Modify: items_downloaded =  items_updated")
                    print((record['dynamodb']['NewImage']['items_downloaded']['N']))
                    print('Extracting the sentences...')
                    try:
                        user_data_string = """#!/bin/bash
yum update -y
yum install java-1.8.0 -y
yum remove java-1.7.0-openjdk -y
cd /home/ec2-user/
pip install boto3
wget http://pubminer-upload.s3.amazonaws.com/sentences_status.py
wget http://pubminer-upload.s3.amazonaws.com/newRCTs.json
wget http://pubminer-upload.s3.amazonaws.com/PubMiner_s3.jar
wait
java -cp PubMiner_s3.jar ReadXMLFile
wait
python sentences_status.py
shutdown -H +1
"""

                        print('Running script:')
                        print(user_data_string)
                        
                        subnet = ec2.Subnet('subnet-a5899b8a')
                        instance = subnet.create_instances(
                            ImageId=AMI,
                            InstanceType=INSTANCE_TYPE,
                            MinCount=1,
                            MaxCount=1,
                            KeyName = 'pubminerinstance',
                            IamInstanceProfile = {
                                'Arn': instance_profile_arn
                            },
                            SecurityGroupIds = [
                            'sg-d023d698',
                            ],
                            InstanceInitiatedShutdownBehavior='terminate', 
                            UserData=user_data_string 
                        )
                        
                        print("New instance created.")
                        print(instance)
                    except:
                        print('Extract sentences failed!')
                        raise
                    else:
                        print('Extract sentences passed!')
                        return str(datetime.now())
                    finally:
                        print('Extract sentences is launching at {}'.format(str(datetime.now())))

Overwriting ./lambda_sentences_project/lambda_function.py


[back-to-top](#back-to-top)
<a id='truncate'></a>

TruncateTable
------

----


Does the table truncation. The AWS Lambda package includes the BeautifulSoup4 and lxml packages, which are not included by default on Lambda.   

### Associated Role & Policy
`PM_lambda_dynamo_S3_role`


### trigger:     
pubmedcentral_oa
arn:aws:s3:::pubmedcentral_oa
Suffix: .nxmlEvent type: ObjectCreatedNotification name: TriggerTruncateTableEventOnCreate

### test event:
`TestPutToBucket`  (see testing)


In [14]:
%%writefile ./lambda_truncate_project/lambda_function.py
import re
import boto3
import glob
import bs4 as bs
from datetime import datetime
import decimal

regexTable1inTable = '(tab(:?.|le|le.|le-|leau|ela)? *?(1[.]?|I[.]?)$)'
regexInTable = re.compile(regexTable1inTable, re.I)

regexAge = '\\b(age[d|s]?)\\b'
regexTime1 = '(\\S+\\s*\\d+(.\\d+)?)$'
regexTime2 = '\\b(year[s]?|week[s]?|month[s]?|mean|(max|min)(imum)?)\\b'
regexTime = re.compile('|'.join([regexTime1, regexTime2]), re.I)

regexSex = '\\b(sex(es)?|gender[s]?|male[s]?|female[s]?|(wo)?men[s]?|boy[s]?|girl[s]?)\\b'
regexRace1 = '\\b(race[s]?|ethnic(?:ity|ities)?|asian[s]?|white[s]?|black[s]?|aboriginal[s]?|native[s]?|african\samerican[s]?|american\sindian[s]]?)\\b'
regexRace2 = '\\b(hispanic[s]?|caucasian[s]?|latin[a|o][s]?|chican[a|o][s]?|spanish|puerto\srica[n]?|cuban[s]?)\\b'
regexRace3 = '\\b(indian[s]?|chinese|korean|vietnamese|dutch|guamanian[s]?|chamorro|samoan|pacific\sislander[s]?|han|japanese|filipino[s]?|hawaiian[s]?)\\b'

regexFull = re.compile('|'.join([regexAge, regexSex, regexRace1, regexRace2, regexRace3]), re.I)

#trunk_tables = []
def truncate_table(fh, pmcid):
    dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
    demographic_table = dynamodb_resource.Table('demographics')
    status_table = dynamodb_resource.Table('demographics_meta')
    
    soup = bs.BeautifulSoup(fh,'lxml')

    if soup.find("article-title") is not None:
        title = soup.find("article-title").get_text()
        demographic_table.update_item(
            Key={'pmcid': pmcid},
            UpdateExpression="set title = :t",
            ExpressionAttributeValues={
                ':t': str(title)
            },
            ReturnValues="UPDATED_NEW"
        ) 
        
    table = soup.find("table-wrap")
    if table is not None:
        tlabel = table.find("label")

        if tlabel is not None:
            if (len(tlabel.find_all(text=regexInTable)) != 0):
                tlabel.string = "Table 1 Extract"

                if table.attrs['id'] is not None:
                    table.attrs.pop('id')

                table_id = table.find("object-id")
                if table_id is not None:
                    table_id.decompose()

                tfoot = table.find_all("table-wrap-foot")
                if (len(tfoot) != 0):
                    [i.decompose() for i in tfoot]

                tpermissions = table.find("permissions")
                if tpermissions is not None:
                    tpermissions.decompose()

                tabletables = table.find_all("table")
                if (len(tabletables) != 0):
                    for tabletable in tabletables:
                        tablehasdemo = False
                        tbody = tabletable.find("tbody")
                        if tbody is not None:
                            rows = tbody.find_all("tr")
                            bodyhasdemo = False
                            hasage = False
                            agerow = 0
                            for i, row in enumerate(rows):
                                firsttd = row.find("td")
                                if firsttd is not None:
                                    if (len(firsttd.find_all(text=regexFull)) > 0):
                                        bodyhasdemo = True
                                        if (len(firsttd.find_all(text=re.compile(regexAge, re.I))) > 0):
                                            hasage = True
                                            agerow = i
                                    else:
                                        if i > 0:
                                            if (hasage and (len(firsttd.find_all(text=regexTime)) > 0) and (agerow == (i - 1)) ):
                                                agerow = i
                                                bodyhasdemo = True
                                            else:
                                                row.decompose()
                            if bodyhasdemo:
                                tablehasdemo = bodyhasdemo
                    if tablehasdemo :
                        #trunk_tables.append({'Item': {'pmcid': pmcid, 'table': str(table)} })
                        demographic_table.update_item(
                            Key={'pmcid': pmcid},
                            UpdateExpression="set table1 = :tab",
                            ExpressionAttributeValues={
                                ':tab': str(table)
                            },
                            ReturnValues="UPDATED_NEW"
                        ) 
                        status_table.update_item(
                            Key={
                                'source': 'demographics'
                            },
                            UpdateExpression="set with_tables = with_tables + :val",
                            ExpressionAttributeValues={
                                ':val': decimal.Decimal(1)
                            },
                            ReturnValues="UPDATED_NEW"
                        )
    return "Success"

def lambda_handler(event, context):
    print('Receiving {} records ...'.format(len(event['Records'])))
    for record in event['Records']:
        print(record)
        if record['eventName'] == 'ObjectCreated:Put':
            BUCKET = record['s3']['bucket']['name']
            KEY = record['s3']['object']['key']
            pmcid = KEY[3:-5]
            print("Grabbing the file for PMC{} from the S3 bucket".format(pmcid))
            res = boto3.client("s3").get_object(Bucket=BUCKET, Key=KEY)
            fh = res["Body"].read().decode('utf-8')
            try:
                print('Trying table update...')
                if not truncate_table(fh, pmcid):
                    raise Exception('Table update failed')
            except:
                print('Table update failed!')
                raise
            else:
                print('Table update passed!')
            finally:
                print('Table update complete at {}'.format(str(datetime.now())))

Overwriting ./lambda_truncate_project/lambda_function.py


[back-to-top](#back-to-top)
<a id='stats'></a>

UpdateStatsInDemographicsMeta
------

----


Does a final update to the demographics_meta dynamodb table with the statistics of the latest update to the demographics table.   

### Associated Role & Policy
`PM_lambda_dynamo_S3_role`


### trigger:     
`TriggerForUpdateStatsWhenSentencesComplete`
In S3 bucket pubminer-upload-test
arn:aws:s3:::pubminer-upload-test
Suffix: .statusEvent type: ObjectCreatedByPut

### test event:
`TestUpdateStats`  (see testing)

In [15]:
%%writefile ./lambda_statistics_project/lambda_function.py
import boto3
import json
from datetime import datetime, timedelta
from boto3.dynamodb.conditions import Key, Attr
import decimal

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

dynamodb_resource = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table('demographics')
status_table = dynamodb_resource.Table('demographics_meta')

#now = datetime.now().strftime("%Y-%m-%d")
#date_1 = datetime.strptime(now, "%Y-%m-%d")
#now = str(now)
#since_date = date_1 + timedelta(days=-1)
#since_date = str(since_date.strftime("%Y-%m-%d"))

def get_latest_stats(pmcids):
    numwithsentences = 0 
    for dd in pmcids:
        response = table.get_item(
            Key={
                'pmcid': dd
            }
        )
        if 'sentences' in response['Item'].keys():
            numwithsentences += 1
    return numwithsentences
    
def puts3item(jsondata):
    s3bucketfile = "update_stats.json"
    s3bucket = "pubminer-upload"
    s3_resource = boto3.resource('s3')
    obj = s3_resource.Object(s3bucket,s3bucketfile)
    obj.put(Body=json.dumps(jsondata, cls=DecimalEncoder), ACL='public-read')
    return "Success"

def lambda_handler(event, context):
    print('Receiving {} records ...'.format(len(event['Records'])))
    for record in event['Records']:
        print(record)
        if record['eventName'] == 'ObjectCreated:Put':
            BUCKET = record['s3']['bucket']['name']
            KEY = record['s3']['object']['key']
            if (KEY == 'sentences.status' and BUCKET == 'pubminer-upload'):
                print('Updating the update_stats.json...')
                try:
                    response = status_table.get_item(
                        Key={
                            'source': 'demographics'
                        }
                    )
                    current_status = response['Item']
                    print("response", current_status['pmcids'])
                    numwithsentences = get_latest_stats(current_status['pmcids'])
                    status_table.update_item(
                        Key={
                            'source': 'demographics'
                        },
                        UpdateExpression="set with_sentences=:val",
                        ExpressionAttributeValues={
                            ':val': decimal.Decimal(numwithsentences)
                        },
                        ReturnValues="UPDATED_NEW"
                    )
                    jsondata = {
                        "update": current_status['date_updated'], 
                        "total_items": current_status['items_total'],
                        "total_updates": current_status['items_updated'],
                        "with_sentences": decimal.Decimal(numwithsentences),
                        "with_tables": current_status['with_tables']
                    }
                    if not puts3item(jsondata):
                        raise Exception('Writing to S3 failed')
                except:
                    print('UpdateStatsInDemographicsMeta function failed!')
                    raise
                else:
                    print('UpdateStatsInDemographicsMeta function passed!')
                    return str(datetime.now())
                finally:
                    print('UpdateStatsInDemographicsMeta function complete at {}'.format(str(datetime.now())))


Overwriting ./lambda_statistics_project/lambda_function.py


In [16]:
%%writefile ./sentences_status.py
import json
from datetime import datetime
import boto3


s3bucketfile = "sentences.status"
s3bucket = "pubminer-upload-test"
obj = boto3.resource('s3').Object(s3bucket,s3bucketfile)
obj.put(Body=json.dumps({"update": str(datetime.now().strftime("%Y-%m-%d"))}), ACL='public-read')

Overwriting ./sentences_status.py


[back-to-top](#back-to-top)
<a id='formation'></a>

*Cloud Formation template*  
=====

The Cloud Formation template is created in the cell below. It is then uploaded along with the lambda function packages to S3 from the subsequent cell, and then the Cloud Formation stack is launched using boto3. 

### First, get the ARN references for the two DynamoDB streams, which will be used in the template configuration

In [17]:
demographics_stream_arn = boto3.client('dynamodbstreams').list_streams(TableName='demographics')['Streams'][0]['StreamArn']
demographics_meta_stream_arn = boto3.client('dynamodbstreams').list_streams(TableName='demographics_meta')['Streams'][0]['StreamArn']

#print(demographics_stream_arn)
#print(demographics_meta_stream_arn)

[back-to-top](#back-to-top)
<a id='CFtemplate'></a>

Create the template
------

----



In [18]:
pubminer_template_object = {
    "Resources": {
        "DynamoDBExecutionRole":{
          "Type": "AWS::IAM::Role",
          "Properties": {
            "AssumeRolePolicyDocument": {
               "Version" : "2012-10-17",
               "Statement": [ {
                  "Effect": "Allow",
                  "Principal": {
                     "Service": [ "lambda.amazonaws.com" ]
                  },
                  "Action": [ "sts:AssumeRole" ]
               } ]
            },
            "Path": "/",
            "Policies": [ { 
                "PolicyName": "PM_lambda_dynamodb_execution_policy",
                "PolicyDocument": {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Action": "lambda:InvokeFunction",
                             "Resource":  {"Fn::Join": [":", ["arn:aws:lambda:us-east-1", {"Ref":"AWS::AccountId"}, "function:DownloadPMC-OAFileToS3"]]}
                        },
                        {
                            "Sid": "AllowAccessToDynamoDBs",
                            "Effect": "Allow",
                            "Action": "dynamodb:*",
                            "Resource": [
                                {"Fn::Join": [":", ["arn:aws:dynamodb:us-east-1", {"Ref":"AWS::AccountId"}, "table/demographics"]]},
                                {"Fn::Join": [":", ["arn:aws:dynamodb:us-east-1", {"Ref":"AWS::AccountId"}, "table/demographics_meta"]]}
               ]
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "logs:CreateLogGroup",
                                "logs:CreateLogStream",
                                "logs:PutLogEvents"
                            ],
                            "Resource": {"Fn::Join": [":", ["arn:aws:logs:us-east-1", {"Ref":"AWS::AccountId"}, "*"]]}
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "dynamodb:DescribeStream",
                                "dynamodb:GetRecords",
                                "dynamodb:GetShardIterator",
                                "dynamodb:ListStreams"
                            ],
                            "Resource": demographics_stream_arn
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "s3:GetObject",
                                "s3:PutObject",
                                "s3:PutObjectAcl"
                            ],
                            "Resource": [
                                "arn:aws:s3:::pubminer-upload/*",
                                "arn:aws:s3:::pubmedcentral_oa/*"
                            ]
                        }
                    ]
                }
            }],
            "RoleName": "PM_lambda_dynamodb_execution_role"
          }
        },
        "DynamoDBS3Role":{
          "Type": "AWS::IAM::Role",
          "Properties": {
            "AssumeRolePolicyDocument": {
               "Version" : "2012-10-17",
               "Statement": [ {
                  "Effect": "Allow",
                  "Principal": {
                     "Service": [ "lambda.amazonaws.com" ]
                  },
                  "Action": [ "sts:AssumeRole" ]
               } ]
            },
            "Path": "/",
            "Policies": [ { 
                    "PolicyName": "PM_lambda_dynamodb_s3_policy",
                    "PolicyDocument": {
                        "Version": "2012-10-17",
                        "Statement": [
                            {
                                "Effect": "Allow",
                                "Action": "iam:PassRole",
                                "Resource": "*"
                            },
                            {
                                "Effect": "Allow",
                                "Action": [
                                    "logs:CreateLogGroup",
                                    "logs:CreateLogStream",
                                    "logs:PutLogEvents"
                                ],
                                "Resource": {"Fn::Join": [":", ["arn:aws:logs:us-east-1", {"Ref":"AWS::AccountId"}, "*"]]}
                            },
                            {
                                "Sid": "AllowAccessToDynamoDBs",
                                "Effect": "Allow",
                                "Action": "dynamodb:*",
                                "Resource": [
                                    {"Fn::Join": [":", ["arn:aws:dynamodb:us-east-1", {"Ref":"AWS::AccountId"}, "table/demographics"]]},
                                    {"Fn::Join": [":", ["arn:aws:dynamodb:us-east-1", {"Ref":"AWS::AccountId"}, "table/demographics_meta"]]}
                                ]
                            },
                            {
                                "Effect": "Allow",
                                "Action": [
                                    "s3:GetObject",
                                    "s3:PutObject",
                                    "s3:PutObjectAcl"
                                ],
                                "Resource": [
                                    "arn:aws:s3:::pubminer-upload/*",
                                    "arn:aws:s3:::pubmedcentral_oa/*"
                                ]
                            }
                        ]
                 }
            }],
            "RoleName": "PM_lambda_dynamodb_s3_role"
          }
        },
        "StartStopEC2Role":{
          "Type": "AWS::IAM::Role",
          "Properties": {
            "AssumeRolePolicyDocument": {
               "Version" : "2012-10-17",
               "Statement": [ {
                  "Effect": "Allow",
                  "Principal": {
                     "Service": [ "lambda.amazonaws.com" ]
                  },
                  "Action": [ "sts:AssumeRole" ]
               } ]
            },
            "Path": "/",
            "Policies": [ { 
                "PolicyName": "PM_lambda_start_stop_ec2_policy",
                "PolicyDocument": {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Action": "iam:PassRole",
                            "Resource": "*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": "iam:ListInstanceProfiles",
                            "Resource": "*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": "lambda:InvokeFunction",
                            "Resource": {"Fn::Join": [":", ["arn:aws:lambda:us-east-1", {"Ref":"AWS::AccountId"}, "function:SentenceMinerOnEC2Instance"]]}
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "logs:CreateLogGroup",
                                "logs:CreateLogStream",
                                "logs:PutLogEvents"
                            ],
                            "Resource": {"Fn::Join": [":", ["arn:aws:logs:us-east-1", {"Ref":"AWS::AccountId"}, "*"]]}
                        },
                        {
                            "Effect": "Allow",
                            "Action": "ec2:*",
                            "Resource": "*"
                        },
                        {
                            "Sid": "AllowAccessToPubminerDynamoDBToEC2",
                            "Effect": "Allow",
                            "Action": "dynamodb:*",
                            "Resource": {"Fn::Join": [":", ["arn:aws:dynamodb:us-east-1", {"Ref":"AWS::AccountId"}, "table/demographics_meta"]]}
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "dynamodb:DescribeStream",
                                "dynamodb:GetRecords",
                                "dynamodb:GetShardIterator",
                                "dynamodb:ListStreams"
                            ],
                            "Resource": demographics_meta_stream_arn
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "s3:GetObject",
                                "s3:PutObject",
                                "s3:PutObjectAcl"
                            ],
                            "Resource": [
                                "arn:aws:s3:::pubminer-upload/*",
                                "arn:aws:s3:::pubmedcentral_oa/*"
                            ]
                        }
                    ]
                }
            }],
            "RoleName": "PM_lambda_start_stop_ec2_role"
          }
        },
        "SentenceMinerInstanceRole": {
          "Type": "AWS::IAM::Role",
          "Properties": {
             "AssumeRolePolicyDocument": {
                "Version" : "2012-10-17",
                "Statement": [ {
                   "Effect": "Allow",
                   "Principal": {
                      "Service": [ "ec2.amazonaws.com" ]
                   },
                   "Action": [ "sts:AssumeRole" ]
                } ]
             },
          }
        },
        "SentenceMinerInstanceProfile":  {
           "Type": "AWS::IAM::InstanceProfile",
           "Properties": {
              "Path": "/",
              "Roles": [  { "Ref": "SentenceMinerInstanceRole" } ],
              "InstanceProfileName": "PM_SentenceMinerInstanceProfile"
           }
        },
        "GettingUpdates": {
          "Type" : "AWS::Lambda::Function",
          "Properties" : {
            "Code" : {
              "S3Bucket" : "pubminer-upload",
              "S3Key" : "lambda_update_project.zip"
            },
            "Description" : "Lambda function to update DynamoDB table demographics from PMC-OA",
            "Environment" : { "Variables" : { "number_days":"2" } },
            "FunctionName" : "GetPMCUpdatesFromCSV",
            "Handler" : "lambda_function.lambda_handler",
            "MemorySize" : 2048,
            "Role" :  { "Fn::GetAtt": ["DynamoDBS3Role", "Arn"] },
            "Runtime" : "python3.6",
            "Timeout" : 180
          }
        },
        "DownloadingFiles": {
          "Type" : "AWS::Lambda::Function",
          "Properties" : {
            "Code" : {
              "S3Bucket" : "pubminer-upload",
              "S3Key" : "lambda_download_project.zip"
            },
            "Description" : "Lambda function download files from PMC-OA and upload to S3",
            "FunctionName" : "DownloadPMC-OAFileToS3",
            "Handler" : "lambda_function.lambda_handler",
            "MemorySize" : 128,
            "Role" : { "Fn::GetAtt": ["DynamoDBExecutionRole", "Arn"] },
            "Runtime" : "python3.6",
            "Timeout" : 180
          }
        },        
        "ExtractSentences": {
          "Type" : "AWS::Lambda::Function",
          "Properties" : {
            "Code" : {
              "S3Bucket" : "pubminer-upload",
              "S3Key" : "lambda_sentences_project.zip"
            },
            "Description" : "Lambda function to launch Sentence Miner EC2 instance",
            "FunctionName" : "SentenceMinerOnEC2Instance",
            "Handler" : "lambda_function.lambda_handler",
            "MemorySize" : 128,
            "Role" :  { "Fn::GetAtt": ["StartStopEC2Role", "Arn"] },
            "Runtime" : "python3.6",
            "Timeout" : 180
          }
        },
        "TruncateTables": {
          "Type" : "AWS::Lambda::Function",
          "Properties" : {
            "Code" : {
              "S3Bucket" : "pubminer-upload",
              "S3Key" : "lambda_truncate_project.zip"
            },
            "Description" : "Lambda function to truncate the tables",
            "FunctionName" : "TruncateTable",
            "Handler" : "lambda_function.lambda_handler",
            "MemorySize" : 128,
            "Role" :  { "Fn::GetAtt": ["DynamoDBS3Role", "Arn"] },
            "Runtime" : "python3.6",
            "Timeout" : 180
          }
        },
        "UpdateStatistics": {
          "Type" : "AWS::Lambda::Function",
          "Properties" : {
            "Code" : {
              "S3Bucket" : "pubminer-upload",
              "S3Key" : "lambda_statistics_project.zip"
            },
            "Description" : "Lambda function to update the statistics on the demographics table",
            "FunctionName" : "UpdateStatsInDemographicsMeta",
            "Handler" : "lambda_function.lambda_handler",
            "MemorySize" : 128,
            "Role" :  { "Fn::GetAtt": ["DynamoDBS3Role", "Arn"] },
            "Runtime" : "python3.6",
            "Timeout" : 180
          }
        },
        "ScheduledRule": {
          "Type": "AWS::Events::Rule",
          "Properties": {
            "Description": "ScheduledRule",
            "ScheduleExpression": "cron(00 6 * * ? *)",
            "State": "ENABLED",
            "Targets": [{
              "Arn": { "Fn::GetAtt": ["GettingUpdates", "Arn"] },
              "Id": "TargetFunctionV1"
            }]
          }
        },
        "PermissionForEventsToInvokeLambda": {
          "Type": "AWS::Lambda::Permission",
          "Properties": {
            "FunctionName": { "Ref": "GettingUpdates" },
            "Action": "lambda:InvokeFunction",
            "Principal": "events.amazonaws.com",
            "SourceArn": { "Fn::GetAtt": ["ScheduledRule", "Arn"] }
          }
        },
        "DownloadingEvent": {
          "Type" : "AWS::Lambda::EventSourceMapping",
          "Properties" : {
            "BatchSize" : 1,
            "Enabled" : "true",
            "EventSourceArn" : demographics_stream_arn, 
            "FunctionName" : { "Fn::GetAtt": ["DownloadingFiles", "Arn"] },
            "StartingPosition" : "LATEST"
          }
        },
        "ExtractingEvent": {
          "Type" : "AWS::Lambda::EventSourceMapping",
          "Properties" : {
            "BatchSize" : 1,
            "Enabled" : "true",
            "EventSourceArn" : demographics_meta_stream_arn, 
            "FunctionName" : { "Fn::GetAtt": ["ExtractSentences", "Arn"] },
            "StartingPosition" : "LATEST"
          }
        },
        "PermissionForS3UploadToInvokeTruncateLambda": {
          "Type": "AWS::Lambda::Permission",
          "Properties": {
            "FunctionName":  { "Fn::GetAtt": ["TruncateTables", "Arn"] },
            "Action": "lambda:InvokeFunction",
            "Principal": "s3.amazonaws.com",
            "SourceAccount": { "Ref": "AWS::AccountId" },
            "SourceArn": "arn:aws:s3:::pubmedcentral_oa"
          }
        },
        "PermissionForS3UploadToInvokeStatisticsLambda": {
          "Type": "AWS::Lambda::Permission",
          "Properties": {
            "FunctionName": { "Fn::GetAtt": ["UpdateStatistics", "Arn"] },
            "Action": "lambda:InvokeFunction",
            "Principal": "s3.amazonaws.com",
            "SourceAccount": { "Ref": "AWS::AccountId" },
            "SourceArn": "arn:aws:s3:::pubminer-upload"
          }
        },
    }
}

### Dump the JSON of the template configuration to a file

In [19]:
data_to_dump = json.dumps(pubminer_template_object)
f = open("pubminer_template.json","w")
f.write(data_to_dump)
f.close()

[back-to-top](#back-to-top)
<a id='uploadLambdas'></a>

Upload AWS Lambda packages and template to `pubminer-upload`
------

----



In [20]:
import shutil

list_of_zip_files = ["lambda_update_project", "lambda_download_project", "lambda_sentences_project", "lambda_truncate_project", "lambda_statistics_project"]

for folder in list_of_zip_files:
    shutil.make_archive(folder, 'zip', folder)

list_of_zip_files = [ i + ".zip" for i in list_of_zip_files]
files_to_upload = list_of_zip_files + ["sentences_status.py", "PubMiner_s3.jar"]

s3_client = boto3.client('s3')
save_bucket_name = 'pubminer-upload'

for filename in files_to_upload:
    # upload the file to S3
    try:
        s3_client.upload_file(filename, save_bucket_name, filename, ExtraArgs={'ACL': 'public-read'})
        print('S3 upload passed!')
    except:
        print('S3 upload failed!')

S3 upload passed!
S3 upload passed!
S3 upload passed!
S3 upload passed!
S3 upload passed!
S3 upload passed!
S3 upload passed!


[back-to-top](#back-to-top)
<a id='launch_stack'></a>

Launch the Cloud Formation stack
------

----



In [21]:
cf_client = boto3.client('cloudformation')
response = cf_client.create_stack(
    StackName="PubMinerStack",
    TemplateBody=json.dumps(pubminer_template_object),
    DisableRollback=False,
    TimeoutInMinutes=10,
    Capabilities = ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM'],
    Tags=[{'Key': 'Name', 'Value': 'PubMiner CF Stack'}]
)

### Add the event notification configuration to the two S3 buckets

In [22]:
truncateTableARN = boto3.client('lambda').get_function_configuration( FunctionName='TruncateTable' )['FunctionArn']
updateStatsARN = boto3.client('lambda').get_function_configuration( FunctionName='UpdateStatsInDemographicsMeta' )['FunctionArn']

#print(truncateTableARN)
#print(updateStatsARN)

### First, for the TruncateTable Lambda function triggered by uploads to `pubmedcentral_oa`

In [23]:
s3_resource = boto3.resource('s3')

config_data = {         
    'LambdaFunctionConfigurations': [
        {
            'Id': 'TriggerTruncateTableEventOnUpload',
            'LambdaFunctionArn': truncateTableARN,
            'Events': [
                's3:ObjectCreated:*',
            ],
            'Filter': {
                'Key': {
                    'FilterRules': [
                        {
                            'Name': 'suffix',
                            'Value': '.nxml'
                        },
                    ]
                }
            }
        }
    ]}
bucket_notification = s3_resource.BucketNotification('pubmedcentral_oa')
response = bucket_notification.put(NotificationConfiguration=config_data)
print('Bucket notification updated successfully')

Bucket notification updated successfully


### Then, for the Lambda function for updating of statistics, triggered by completion of sentence extraction

In [24]:
config_data = {         
    'LambdaFunctionConfigurations': [
        {
            'Id': 'TriggerUpdateStatsWhenSentencesComplete',
            'LambdaFunctionArn': updateStatsARN,
            'Events': [
                's3:ObjectCreated:Put',
            ],
            'Filter': {
                'Key': {
                    'FilterRules': [
                        {
                            'Name': 'suffix',
                            'Value': '.status'
                        },
                    ]
                }
            }
        }
    ]}
bucket_notification = s3_resource.BucketNotification('pubminer-upload')
response = bucket_notification.put(NotificationConfiguration=config_data)
print('Bucket notification updated successfully')

Bucket notification updated successfully


[back-to-top](#back-to-top)
<a id='tear_down_stack'></a>

Tear down the Cloud Formation stack
------

----

** This will remove the PubMiner backend system of AWS Lambda functions **    

Do this only when wanting to redo the setup.

In [42]:
cf_client = boto3.client('cloudformation')
response = cf_client.delete_stack( StackName="PubMinerStack" )

[back-to-top](#back-to-top)
<a id='testing'></a>

*Testing the installation*  
=====

The following cells contain the test examples for the Lambda functions and the test suite for the BeautifulSoup parsing. 

[back-to-top](#back-to-top)
<a id='Testlambda'></a>

Test events for AWS Lambda functions
------

----

The next cells contains the JSON code for the test events of the individual Lambda functions. They can each be copy-pasted into the Test Event boxes in AWS to test the Lambda functions.

### Vanilla  test event for  `GetPMCUpdatesFromCSV`

Triggers the entire update workflow from a vanilla test event.

In [25]:
# Create CloudWatchEvents client
cloudwatch_events = boto3.client('events')

# Get the ARN of the function
getUpdatesARN = boto3.client('lambda').get_function_configuration( FunctionName='GetPMCUpdatesFromCSV' )['FunctionArn']

# Put an event
response = cloudwatch_events.put_events(
    Entries=[
        {
            'Detail': json.dumps({'key1': 'value1', 'key2': 'value2'}),
            'DetailType': 'appRequestSubmitted',
            'Resources': [
                getUpdatesARN,
            ],
            'Source': 'Trigger'
        }
    ]
)
print(response)

{'FailedEntryCount': 0, 'Entries': [{'EventId': '97f7e81f-d74b-c1cd-9def-0890c97e7125'}], 'ResponseMetadata': {'RequestId': '274b9b36-5485-11e8-8327-413b3b94c1b0', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '274b9b36-5485-11e8-8327-413b3b94c1b0', 'content-type': 'application/x-amz-json-1.1', 'content-length': '85', 'date': 'Thu, 10 May 2018 19:05:49 GMT'}, 'RetryAttempts': 0}}


#### Invoke the Lambda function with the test event and check the tail of the log for errors

In [26]:
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
    FunctionName=getUpdatesARN,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps({'key1': 'value1', 'key2': 'value2'})
)

print(json.loads(response['Payload'].read().decode("utf-8")))

2018-05-10 19:06:07.577959


### UpdateToDynamoDB  test event for  `DownloadPMC-OAFileToS3`

Includes 3 files that are in the OA subset and 1 file (the last) that is not. Test the downloading and the increment and decrement/deletion operations.

In [27]:
UpdateToDynamoDBtestevent = {
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "pmcid": {
            "S": "5920940"
          }
        },
        "StreamViewType": "KEYS",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-east-1",
      "eventName": "INSERT",
      "eventSourceARN": demographics_stream_arn,
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "pmcid": {
            "S": "5935860"
          }
        },
        "StreamViewType": "KEYS",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-east-1",
      "eventName": "INSERT",
      "eventSourceARN": demographics_stream_arn,
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "3",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "pmcid": {
            "S": "5916111"
          }
        },
        "StreamViewType": "KEYS",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-east-1",
      "eventName": "INSERT",
      "eventSourceARN": demographics_stream_arn,
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "4",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "pmcid": {
            "S": "2842548"
          }
        },
        "StreamViewType": "KEYS",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-east-1",
      "eventName": "INSERT",
      "eventSourceARN": demographics_stream_arn,
      "eventSource": "aws:dynamodb"
    }
  ]
}

In [28]:
# Get the ARN of the function
downloadPMCtoS3ARN = boto3.client('lambda').get_function_configuration( FunctionName='DownloadPMC-OAFileToS3' )['FunctionArn']

# Put an event
response = cloudwatch_events.put_events(
    Entries=[
        {
            'Detail': json.dumps(UpdateToDynamoDBtestevent),
            'DetailType': 'UpdateToDynamoDB',
            'Resources': [
                downloadPMCtoS3ARN,
            ],
            'Source': 'Trigger'
        }
    ]
)
print(response)

{'FailedEntryCount': 0, 'Entries': [{'EventId': 'f6fd6a01-223e-6164-d0e7-3a1f987b7343'}], 'ResponseMetadata': {'RequestId': '314bec3a-5486-11e8-a85e-b3d04ecf237e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '314bec3a-5486-11e8-a85e-b3d04ecf237e', 'content-type': 'application/x-amz-json-1.1', 'content-length': '85', 'date': 'Thu, 10 May 2018 19:13:15 GMT'}, 'RetryAttempts': 0}}


#### Invoke the Lambda function with the test event and check the tail of the log for errors

In [29]:
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
    FunctionName=downloadPMCtoS3ARN,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(UpdateToDynamoDBtestevent)
)

print(json.loads(response['Payload'].read().decode("utf-8")))

None


### DynamoTriggerExtraction  test event for  `SentenceMinerOnEC2Instance`

Provides the event that `items_downloaded` increases and reaches the total number of `items_updated`.

In [30]:
DynamoTriggerExtractiontestevent = {
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "items_downloaded": {
            "N": 99
          },
          "items_updated": {
            "N": 100
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "source": {
            "S": "demographics"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "items_downloaded": {
            "N": 100
          },
          "items_updated": {
            "N": 100
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-east-1",
      "eventName": "MODIFY",
      "eventSourceARN": demographics_meta_stream_arn,
      "eventSource": "aws:dynamodb"
    }
  ]
}

In [31]:
sentenceMinerARN = boto3.client('lambda').get_function_configuration( FunctionName='SentenceMinerOnEC2Instance' )['FunctionArn']
response = cloudwatch_events.put_events(
    Entries=[
        {
            'Detail': json.dumps(DynamoTriggerExtractiontestevent),
            'DetailType': 'DynamoTriggerExtraction',
            'Resources': [
                sentenceMinerARN,
            ],
            'Source': 'Trigger'
        }
    ]
)
print(response)

{'FailedEntryCount': 0, 'Entries': [{'EventId': '55d8df5d-a66c-72bb-06c1-de52902ccbe3'}], 'ResponseMetadata': {'RequestId': '395da836-5486-11e8-a395-e1c090485db8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '395da836-5486-11e8-a395-e1c090485db8', 'content-type': 'application/x-amz-json-1.1', 'content-length': '85', 'date': 'Thu, 10 May 2018 19:13:29 GMT'}, 'RetryAttempts': 0}}


#### Invoke the Lambda function with the test event and check the tail of the log for errors

In [32]:
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
    FunctionName=sentenceMinerARN,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(DynamoTriggerExtractiontestevent)
)

print(json.loads(response['Payload'].read().decode("utf-8")))

2018-05-10 19:13:33.819428


### S3 upload  test event for  `TruncateTable`

Provides the event that an XML file of an article is uploaded to S3.

In [33]:
S3Uploadtestevent = {
  "Records": [
    {
      "s3": {
        "object": {
          "key": "PMC5935858.nxml"
        },
        "bucket": {
          "arn": "arn:aws:s3:::pubmedcentral_oa",
          "name": "pubmedcentral_oa"
        },
        "s3SchemaVersion": "1.0"
      },
      "awsRegion": "us-east-1",
      "eventName": "ObjectCreated:Put",
      "eventSource": "aws:s3"
    },
    {
      "s3": {
        "object": {
          "key": "PMC5934515.nxml"
        },
        "bucket": {
          "arn": "arn:aws:s3:::pubmedcentral_oa",
          "name": "pubmedcentral_oa"
        },
        "s3SchemaVersion": "1.0"
      },
      "awsRegion": "us-east-1",
      "eventName": "ObjectCreated:Put",
      "eventSource": "aws:s3"
    }
  ]
}

In [34]:
truncateTableARN = boto3.client('lambda').get_function_configuration( FunctionName='TruncateTable' )['FunctionArn']
response = cloudwatch_events.put_events(
    Entries=[
        {
            'Detail': json.dumps(S3Uploadtestevent),
            'DetailType': 'TruncateTable',
            'Resources': [
                truncateTableARN,
            ],
            'Source': 'Trigger'
        }
    ]
)
print(response)


{'FailedEntryCount': 0, 'Entries': [{'EventId': '9dcba488-2ab8-cf49-05db-11490ad83ea7'}], 'ResponseMetadata': {'RequestId': '509a2484-5486-11e8-a395-e1c090485db8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '509a2484-5486-11e8-a395-e1c090485db8', 'content-type': 'application/x-amz-json-1.1', 'content-length': '85', 'date': 'Thu, 10 May 2018 19:14:08 GMT'}, 'RetryAttempts': 0}}


#### Invoke the Lambda function with the test event and check the tail of the log for errors

In [35]:
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
    FunctionName=truncateTableARN,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(S3Uploadtestevent)
)

print(json.loads(response['Payload'].read().decode("utf-8")))

None


### S3 upload test event for  `UpdateStatsInDemographicsMeta`

Provides the event that the sentence.status file from the SentenceMiner instance is uploaded to S3.

In [36]:
UpdateStatstestevent = {
  "Records": [
    {
      "s3": {
        "object": {
          "key": "sentences.status"
        },
        "bucket": {
          "arn": "arn:aws:s3:::pubminer",
          "name": "pubminer-upload"
        },
        "s3SchemaVersion": "1.0"
      },
      "awsRegion": "us-east-1",
      "eventName": "ObjectCreated:Put",
      "eventSource": "aws:s3"
    }
  ]
}

In [37]:
updateStatsARN = boto3.client('lambda').get_function_configuration( FunctionName='UpdateStatsInDemographicsMeta' )['FunctionArn']
response = cloudwatch_events.put_events(
    Entries=[
        {
            'Detail': json.dumps(UpdateStatstestevent),
            'DetailType': 'UpdateStats',
            'Resources': [
                updateStatsARN,
            ],
            'Source': 'Trigger'
        }
    ]
)
print(response)

{'FailedEntryCount': 0, 'Entries': [{'EventId': 'dd0b5229-9a62-19d6-6173-983c0dbf800e'}], 'ResponseMetadata': {'RequestId': '60453812-5486-11e8-8dca-6156cc8dd389', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '60453812-5486-11e8-8dca-6156cc8dd389', 'content-type': 'application/x-amz-json-1.1', 'content-length': '85', 'date': 'Thu, 10 May 2018 19:14:35 GMT'}, 'RetryAttempts': 0}}


#### Invoke the Lambda function with the test event and check the tail of the log for errors

In [38]:
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
    FunctionName=getUpdatesARN,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps({'key1': 'value1', 'key2': 'value2'})
)

print(json.loads(response['Payload'].read().decode("utf-8")))

2018-05-10 19:14:53.722902


[back-to-top](#back-to-top)
<a id='crummytest'></a>

BeautifulSoup tests
------

----

The set of BeautifulSoup tests can be run from the notebook to confirm that the table mining functions are working as expected. The tests are based on the installation unittests of BeautifulSoup3. The source tests can be found [here.](https://www.crummy.com/software/BeautifulSoup/bs3/download/3.x/BeautifulSoup-3.0.8/BeautifulSoupTests.py)

In [39]:
#%%writefile PubMinerSoupTests.py
import unittest
import re
from bs4 import *
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore",category=DeprecationWarning)


'''
Tests for BeautifulSoup based on the package tests using Python unittest 
found here : https://www.crummy.com/software/BeautifulSoup/bs3/download/3.x/BeautifulSoup-3.0.8/BeautifulSoupTests.py
'''

class SoupTest(unittest.TestCase):

    def assertSoupEquals(self, toParse, rep=None, c=BeautifulSoup):
        """Parse the given text and make sure its string rep is the other
        given text."""
        if rep == None:
            rep = toParse
        self.assertEqual(str(c(toParse)), rep)


class FollowThatTag(SoupTest):

    "Tests the various ways of fetching tags from a soup."

    def setUp(self):
        ml = """
        <front>
            <article-meta>
                <title-group>
                    <article-title>A good title</article-title>
                </title-group>
            </article-meta>
        </front>
        <table-wrap id="T1" orientation="portrait" position="float">
            <label>Table 1.</label>
            <caption>
                <p>Demographics</p>
            </caption>
            <table frame="vsides" rules="groups">
                <thead>
                    <tr>
                        <th align="left" valign="bottom" rowspan="1" colspan="1"/>
                        <th align="center" valign="bottom" rowspan="1" colspan="1">Sham (n=11)</th>
                        <th align="center" valign="bottom" rowspan="1" colspan="1">ECT (n=14)</th>
                        <th align="center" valign="bottom" rowspan="1" colspan="1"><italic>P</italic> value</th>
                    </tr>
                </thead>
                <tbody>
                    <tr>
                        <td rowspan="1" colspan="1">Gender, n/% males</td>
                        <td align="center" rowspan="1" colspan="1">7/63</td>
                        <td align="center" rowspan="1" colspan="1">8/53</td>
                        <td align="center" rowspan="1" colspan="1">.7</td>
                    </tr>
                    <tr>
                        <td rowspan="1" colspan="1">Age, median (IQR)</td>
                        <td align="center" rowspan="1" colspan="1">37 (32)</td>
                        <td align="center" rowspan="1" colspan="1">40 (29)</td>
                        <td align="center" rowspan="1" colspan="1">.9</td>
                    </tr>
                    <tr>
                        <td rowspan="1" colspan="1">Age at onset, median (IQR)</td>
                        <td align="center" rowspan="1" colspan="1">22 (24)</td>
                        <td align="center" rowspan="1" colspan="1">18 (30)</td>
                        <td align="center" rowspan="1" colspan="1">.5</td>
                    </tr>
                    <tr>
                        <td rowspan="1" colspan="1">First hospitalization, median (IQR)</td>
                        <td align="center" rowspan="1" colspan="1">24 (30)</td>
                        <td align="center" rowspan="1" colspan="1">39 (30)</td>
                        <td align="center" rowspan="1" colspan="1">.9</td>
                    </tr>
                    <tr>
                        <td rowspan="1" colspan="1">Number of episodes, median (IQR)</td>
                        <td align="center" rowspan="1" colspan="1">5 (8)</td>
                        <td align="center" rowspan="1" colspan="1">9 (20)</td>
                        <td align="center" rowspan="1" colspan="1">.6</td>
                    </tr>
                    <tr>
                        <td rowspan="1" colspan="1">Education (yrs.), mean (SD)</td>
                        <td align="center" rowspan="1" colspan="1">16 (2)</td>
                        <td align="center" rowspan="1" colspan="1">14 (2)</td>
                        <td align="center" rowspan="1" colspan="1">.04</td>
                    </tr>
                </tbody>
            </table>
            <table>This is another table.</table>
            <table-wrap-foot>
                <fn id="fn-01">
                    <p>Abbreviations: SD, standard deviation; STAI, Yrs, years.</p>
                </fn>
            </table-wrap-foot>
            <table-wrap-foot>
                <fn id="fn-02">
                    <p>Another great table footer.</p>
                </fn>
            </table-wrap-foot>
            <permissions>
                <license license-type="cc-by-nc" href="http://creativecommons.org/licenses/by-nc/4.0/">
                    <license-p>This is an Open Access article</license-p>
                    <license-q></license-q>
                </license>
            </permissions>"""
        self.soup = BeautifulSoup(ml,'lxml')

    def testFindAllByName(self):
        # Run tests on finding the elements used in the table truncation
        matching = self.soup('article-title')
        self.assertEqual(len(matching), 1)
        self.assertEqual(matching[0].name, 'article-title')
        self.assertEqual(matching, self.soup.find_all('article-title'))

        matching2 = self.soup('table-wrap')
        self.assertEqual(len(matching2), 1)
        self.assertEqual(matching2[0].name, 'table-wrap')
        self.assertEqual(matching2, self.soup.find_all('table-wrap'))

        matching3 = self.soup('label')
        self.assertEqual(len(matching3), 1)
        self.assertEqual(matching3[0].name, 'label')
        self.assertEqual(matching3, self.soup.find_all('label'))

        matching4 = self.soup('table')
        self.assertEqual(len(matching4), 2)
        self.assertEqual(matching4[0].name, 'table')
        self.assertEqual(matching4[1].name, 'table')
        self.assertEqual(matching4, self.soup.find_all('table'))

        matching5 = self.soup('table-wrap-foot')
        self.assertEqual(len(matching5), 2)
        self.assertEqual(matching5[0].name, 'table-wrap-foot')
        self.assertEqual(matching5[1].name, 'table-wrap-foot')
        self.assertEqual(matching5, self.soup.find_all('table-wrap-foot'))        

        matching6 = self.soup('permissions')
        self.assertEqual(len(matching6), 1)
        self.assertEqual(matching6[0].name, 'permissions')
        self.assertEqual(matching6, self.soup.find_all('permissions'))    
        
        matching7 = self.soup('tbody')
        self.assertEqual(len(matching7), 1)
        self.assertEqual(matching7[0].name, 'tbody')
        self.assertEqual(matching7, self.soup.find_all('tbody'))
        
        matching8 = self.soup('tr')
        self.assertEqual(len(matching8), 7)
        self.assertEqual(matching8[0].name, 'tr')
        self.assertEqual(matching8, self.soup.find_all('tr'))
        
    def testFindAllByAttribute(self):
        # Run tests on finding some elements by attribute
        matching = self.soup.findAll(id='T1')
        self.assertEqual(len(matching), 1)
        self.assertEqual(matching[0].name, 'table-wrap')

        matching2 = self.soup.findAll(attrs={'id' : 'T1'})
        self.assertEqual(matching, matching2)
        
        matching3 = self.soup.findAll(id='fn-01')
        self.assertEqual(len(matching3), 1)
        self.assertEqual(matching3[0].name, 'fn')
        
        matching4 = self.soup.findAll(attrs={'license-type' : 'cc-by-nc'})
        self.assertEqual(len(matching4), 1)
        self.assertEqual(matching4[0].name, 'license')
        
        self.assertEqual(len(self.soup.find_all(id=None)), 57)

        self.assertEqual(len(self.soup.find_all(rowspan="1")), 28)
        self.assertEqual(len(self.soup.find_all(colspan="1")), 28)
        self.assertEqual(len(self.soup.find_all(align="center")), 21)
        self.assertEqual(len(self.soup.find_all(junk=None)), 60)
        self.assertEqual(len(self.soup.find_all(junk=[1, None])), 0)

        self.assertEqual(len(self.soup.find_all(junk=re.compile('.*'))), 0)
        self.assertEqual(len(self.soup.find_all(junk=True)), 0)

        self.assertEqual(len(self.soup.find_all(junk=True)), 0)
        self.assertEqual(len(self.soup.find_all(href=True)), 1)     

    def testFindAllByList(self):
        matching = self.soup(['table-wrap', 'label', 'table', 'tbody', 'table-wrap-foot'])
        self.assertEqual(len(matching), 7)
        
        
    def testFindAllText(self):
        soup = BeautifulSoup("<html>\xbb</html>", 'lxml')
        self.assertEqual(soup.findAll(text=re.compile('.*')),
                         [u'\xbb'])
        
        matching = self.soup('article-title')
        self.assertEqual(len(matching), 1)
        self.assertEqual(matching[0].get_text(), 'A good title')
        
    def testFindAllByRE(self):
        regexTable1inTable = '(tab(:?.|le|le.|le-|leau|ela)? *?(1[.]?|I[.]?)$)'
        regexInTable = re.compile(regexTable1inTable, re.I)
        self.assertEqual(len(self.soup(text=regexInTable)), 1)

        regexAge = '\\b(age[d|s]?)\\b'
        regexTime1 = '(\\S+\\s*\\d+(.\\d+)?)$'
        regexTime2 = '\\b(year[s]?|week[s]?|month[s]?|mean|(max|min)(imum)?)\\b'
        regexTime = re.compile('|'.join([regexTime1, regexTime2]), re.I)
        self.assertEqual(len(self.soup(text=regexTime)), 10)

        regexSex = '\\b(sex(es)?|gender[s]?|male[s]?|female[s]?|(wo)?men[s]?|boy[s]?|girl[s]?)\\b'
        regexSex = re.compile(regexSex, re.I)
        self.assertEqual(len(self.soup(text=regexSex)), 1)
        

    def testLen(self):
        matching = self.soup('table')
        self.assertEqual(len(matching[0].tbody), 13)
        
        matching2 = self.soup('permissions')
        self.assertEqual(len(matching2[0].license), 5)

    def testString(self):
        matching = self.soup('label')
        self.assertEqual(matching[0].string, 'Table 1.')

    def testLackOfString(self):
        matching = self.soup('license-q')
        self.assertTrue(not matching[0].string)

    def testStringAssign(self):
        matching = self.soup('label')
        matching[0].string = 'Table 1.'
        string = matching[0].string
        self.assertEqual(string, 'Table 1.')

    def testText(self):
        matching = self.soup('table-wrap')
        self.assertEqual(matching[0].label.getText(), 'Table 1.')

    
    def testTagReplacement(self):
        # Make sure you can replace an element with itself.
        matching = self.soup.findAll(id='T1')
        label = matching[0].label
        matching[0].label.replaceWith(label)
        self.assertEqual(matching[0].label,  self.soup('label')[0])
        
        
    def testDecompose(self):
        matching = self.soup
        self.assertEqual(len(matching('table')[0].tbody), 13)
        regexSex = '\\b(sex(es)?|gender[s]?|male[s]?|female[s]?|(wo)?men[s]?|boy[s]?|girl[s]?)\\b'
        regexSex = re.compile(regexSex, re.I)
        tbody = matching.find("tbody")
        tbody.find_all(text=regexSex)[0].parent.parent.decompose()
        self.assertEqual(len(matching('table')[0].tbody), 12)
        
        matching2 = self.soup('table-wrap-foot')
        self.assertEqual(len(matching2), 2)
        matching2 = self.soup.find_all(attrs={'id' : 'fn-02'})
        matching2[0].parent.decompose()
        matching2 = self.soup('table-wrap-foot')
        self.assertEqual(len(matching2), 1)
        
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

  self.parser.feed(markup)
............
----------------------------------------------------------------------
Ran 12 tests in 0.036s

OK


Delete the database tables
--------


**Warning: Do this only to tear down the system**

In [40]:
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')

try:
    dynamodb_client.delete_table(TableName='demographics')
    print('Deleted table')
        
except dynamodb_client.exceptions.ResourceNotFoundException:
    print("Table does not exist - cannot delete it")
    pass
except OSError as err:
    print("OS error: {0}".format(err))
except ValueError:
    print("Could not convert data to an integer.")
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

Deleted table


In [41]:
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')

try:
    dynamodb_client.delete_table(TableName='demographics_meta')
    print('Deleted table')
        
except dynamodb_client.exceptions.ResourceNotFoundException:
    print("Table does not exist - cannot delete it")
    pass
except OSError as err:
    print("OS error: {0}".format(err))
except ValueError:
    print("Could not convert data to an integer.")
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

Deleted table
