### Working with Dynamo DB

Create `words.csv` file and list at least two words separated by comma in the file. For example, mathematics,statistics,computer science.

Since we do not have an user interface like Google Ngrams viewer, we will pretend that we are passed in a list of words from the user in the `words.csv`.


In [150]:
# Read in the file containing the words to be processed.
words = set([])

with open('words.csv', 'r') as f:
    for line in f:
        line_words = line.split(',')
        for w in line_words:
            words.add(w.rstrip())

print(words)

set(['Google', 'Microsoft', 'Apple'])


AWS CLI and Python boto3 should have been installed and configured. For example, the below code, listing the S3 buckets should work without error.

In [152]:
import boto3

s3 = boto3.resource('s3')
for bucket in s3.buckets.all():
    print(bucket.name)


seonghwan-jun-ngrams
ubcbucket123


We will start by creating a Dynamo DB table. You can do this programmatically using boto3. Ensure that your user has access to DynamoDB services (you learned how to do this through AWS IAM console as part of lab 3).

Refer to the boto3 documentation on DynamoDB [here](http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html) as you work through this tutorial. You may also find Amazon's [guide](http://docs.aws.amazon.com/amazondynamodb/latest/gettingstartedguide/GettingStarted.Python.03.html#GettingStarted.Python.03.03) on accessing Dynamo DB services using Python.

Set the table name as **unigrams**, with **word** as a primary key taking on the String values and **year** as a sort key taking on Number values.

In [153]:
def create_table(dynamodb, table_name):
    table = dynamodb.create_table(
        TableName=table_name, 
        AttributeDefinitions=[
            {
                'AttributeName': 'word',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'year',
                'AttributeType': 'N'
                
            },
        ],
        KeySchema=[
            {
                'AttributeName': 'word',
                'KeyType': 'HASH'
            },
            {
                'AttributeName': 'year',
                'KeyType': 'RANGE'
            },
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        })
    return(table)
        

In [154]:
# create the table if not created already
dynamodb = boto3.client('dynamodb')
response = dynamodb.list_tables()
table_name = 'unigrams'
if table_name not in response["TableNames"]:
    print("create table: " + table_name)
    create_table(dynamodb, 'unigrams')
else:
    print("Table " + table_name + " already exists.")

Table unigrams already exists.


We will add some data to our table first. Insert the follow entry: {'word': Google, 'year': 2005, 'count': 19590}.


In [155]:
dynamodb = boto3.resource('dynamodb', region_name = 'us-west-1')
table = dynamodb.Table(table_name)
response = table.put_item(Item={'word': 'Google', 'year': 2005, 'count': 19590})

Let's query this table and compare it to the `words` variable. The goal here is to find the words that are not present in the table.

In [156]:
from boto3.dynamodb.conditions import Key, Attr

#print(table.scan())

words_to_process = list()
for word in words:
    response = table.query(KeyConditionExpression=Key('word').eq(word))
    if len(response['Items']) == 0:
        words_to_process.append(word)
    else:
        print("The word '" + word + "' already exists in the table.")

# these words need to be processed 
print(words_to_process)

The word 'Google' already exists in the table.
['Microsoft', 'Apple']


Your turn: delete the entry with word: Google from the unigrams table.

In [None]:
# Your code to delete the entry containing primary key = Google goes here




### Launching EMR cluster

Now we will learn how to launch EMR programmatically. We will refer to the boto3's [documentation](http://boto3.readthedocs.io/en/latest/reference/services/emr.html#client) on EMR. 

In [None]:
# List the existing clusters. There should be at least one (if you have completed Exercise 2).
client = boto3.client('emr')
clusters = client.list_clusters()['Clusters']

# view the details of the existing cluster, this will be helpful to launch EMR cluster programmatically
client.describe_cluster(ClusterId=clusters[0]["Id"])

In [193]:
# view the existing steps to help construct the steps object
step = client.list_steps(ClusterId=clusters[0]["Id"])
#step['Steps'][0]
config = step['Steps'][0]['Config']

# view the contents of config
config

{u'Args': [u'hadoop-streaming',
  u'-files',
  u's3://seonghwan-jun-ngrams/scripts/mapper.py,s3://seonghwan-jun-ngrams/scripts/reduce.py',
  u'-mapper',
  u'mapper.py',
  u'-reducer',
  u'reduce.py',
  u'-input',
  u's3://seonghwan-jun-ngrams/unigrams/',
  u'-output',
  u's3://seonghwan-jun-ngrams/output-ngrams2/',
  u'-numReduceTasks',
  u'1'],
 u'Jar': u'command-runner.jar',
 u'Properties': {}}

In [195]:
# we need the Jar and the Args
jar = config['Jar']
args = config['Args'][:]


u'command-runner.jar'

We will launch the cluster to perform job from exercise 2 (counting the word **google**). You do not have to modify the mapper or reducer for now but keep in mind that before you launch the cluster, make sure that the output directory specified in the `Args` variables is deleted from the previous run. Recall that output directory is newly created by Hadoop. You can easily write a code to do this:

In [None]:
# your code to ensure that the output directory is deleted goes here (if needed)


In [None]:
cluster_name = 'Ngrams-job'
# specify the key pair to be used for accessing the EC2 instances
keypair = 'your-key-pair'
# specify the path to your log -- ensure that you replace firstname-lastname with your own
log_uri = 's3://firstname-lastname-ngrams/logs' 
response = client.run_job_flow(
                    Name=cluster_name,
                    LogUri=log_uri,
                    ReleaseLabel='emr-5.0.0',
                    Instances={'MasterInstanceType':'m3.xlarge','SlaveInstanceType':'m3.xlarge','InstanceCount':3,'Ec2KeyName':keypair,'KeepJobFlowAliveWhenNoSteps':True},
                    Steps=[
                        {
                            'Name': 'Ngrams-step', 
                            'ActionOnFailure': 'CONTINUE', 
                            #'ActionOnFailure': 'TERMINATE_CLUSTER', 
                            'HadoopJarStep': {'Args': args, 'Jar': jar},
                        }
                    ],
                    JobFlowRole='EMR_EC2_DefaultRole',
                    ServiceRole='EMR_DefaultRole')

print(response)

# open the EMR console, check that your cluster is running

Now you know how to work with Dynamo DB to query for items, put items, and delete items programmatically. You also know how to launch and terminate an EMR cluster programmatically. The materials covered in this tutorial form important steps in a data processing pipeline, allowing you to access and control Amazon's cloud computing services from your local machine.


### Completing the data processing

The data processing is not quite complete yet, we need to terminate the cluster once it's done, download the output, merge the output files to get the counts for each of the words by year, delete the S3 output directory, and finally to insert the data to database. **Your turn**:

1. Wait for the job to finish, then terminate the cluster programmatically. Alternatively, modify the EMR lauch code to terminate the cluster upon completion of the steps.
2. Download the output files from the S3 bucket that you specified to run the job. Refer to boto3 [documentation on S3](http://boto3.readthedocs.io/en/latest/reference/services/s3.html).
3. Delete the S3 output bucket.
4. Merge the output files (if needed) to obtain the (word, year, counts) for each word. Insert the items to Dynamo DB table.


In [None]:
# your code for terminating the cluster goes here

client = boto3.client('emr')

# wait for the running cluster
waiter = client.get_waiter('cluster_running')
cluster=clusters[0]
waiter.wait(ClusterId=cluster['Id'])

The previous step  allows the client to launch your code when the cluster is up and running. Sending job to a cluster which is not ready, will cause errors. Now that we have a working cluster, we can do our analysis.

In [None]:
#Execute your code here, now that we know the cluster is running

#To check whether a certain execution step has been completed you can use the following code:
client.list_steps(ClusterId=cluster['Id'], StepStates=['COMPLETED'])

When the analysis on the cluster is finished, you can terminate it using the code in the following cell.

In [None]:
# terminate the cluster
clusters = client.list_clusters()['Clusters']
for cluster in clusters:
        if cluster['Name'].find(cluster_name) >= 0:
            response = client.terminate_job_flows(JobFlowIds=[cluster['Id']])
            break

In [None]:
# code to download the output goes here


In [158]:
# code to delete the output directory in you S3 goes here


In [159]:
# code to merge the output files to compute the counts for each word by year goes here


In [None]:
# code to put the items to the DB goes here
