# Lab Week 6

## Today's Lab Agenda: 

1. cd to labs/week_6
2. Connect to AWS using terminal 
3. Set up DynamoDB/sqs/lambda
4. Build a Basic AWS Pipeline
5. Assignment 6 Overview and Hints

### Context: 
In this week’s lab, we’re continuing to work with our Twitter-related data — but instead of having each message directly trigger a Lambda function like last week, we're introducing a new component: Amazon SQS (Simple Queue Service).


### what's new: 
Last week, we sent individual tweets directly to a Lambda function, which wrote them into a DynamoDB table.

This week, we’re improving that design by using an SQS queue as a buffer. We'll send a batch of tweets to the queue first. Then, Lambda will be triggered just once to process all of them together and store the relevant ones in DynamoDB.


### Why SQS?
- 

In [1]:
import boto3
import time
import json

dynamodb = boto3.resource('dynamodb')
aws_lambda = boto3.client('lambda')
sqs = boto3.client('sqs')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

1. [Describe in words what is happening in the code block below]

In [2]:
table = dynamodb.create_table(
    TableName='twitter',
    KeySchema=[
        {
            'AttributeName': 'uid',
            'KeyType': 'HASH'
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'uid',
            'AttributeType': 'S'
        }
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 1,
        'WriteCapacityUnits': 1
    }
)

table.meta.client.get_waiter('table_exists').wait(TableName='twitter')

print(table.item_count)
print(table.creation_date_time)

0
2024-04-25 17:27:29.201000-05:00


2. [Describe in words what is happening in the code block below (and what the lambda function that you're creating does)]

In [3]:
with open('lab_wk6.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    response = aws_lambda.create_function(
        FunctionName='lab_wk6',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=30
    )
except aws_lambda.exceptions.ResourceConflictException:
    response = aws_lambda.update_function_code(
        FunctionName='lab_wk6',
        ZipFile=lambda_zip
        )

3. [Describe in words what is happening in the code block below]

In [4]:
try:
    queue_url = sqs.create_queue(QueueName='lab_wk6')['QueueUrl']
except sqs.exceptions.QueueNameExists:
    queue_url = [url for url in sqs.list_queues()['QueueUrls'] if 'lab_wk6' in url][0]
    
sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])
sqs_arn = sqs_info['Attributes']['QueueArn']

4. [Describe in words what is happening in the code block below]

In [5]:
try:
    response = aws_lambda.create_event_source_mapping(
        EventSourceArn=sqs_arn,
        FunctionName='lab_wk6',
        Enabled=True,
        BatchSize=10
    )
except aws_lambda.exceptions.ResourceConflictException:
    es_id = aws_lambda.list_event_source_mappings(
        EventSourceArn=sqs_arn,
        FunctionName='lab_wk6'
    )['EventSourceMappings'][0]['UUID']
    response = aws_lambda.update_event_source_mapping(
        UUID=es_id,
        FunctionName='lab_wk6',
        Enabled=True,
        BatchSize=10
    )

5. [Describe in words what is happening in the code block below]

In [6]:
test_data = [
    {
        "username": "john_doe",
        "datetime": "04252024120000",
        "tweet": "this is a fun! #uchicago"
    },
    {
        "username": "jane_doe",
        "datetime": "05152023140100",
        "tweet": "another day, another dollar"
    },
    {
        "username": "jane_doe",
        "datetime": "05152023140200",
        "tweet": "went to the museum today #uchicago"
    }
]

for t in test_data:
    response = sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(t))
    print(response)
    time.sleep(1)

{'MD5OfMessageBody': 'd3c3a03fccaabb647b29e9e6a42c434d', 'MessageId': 'f19b11d7-0bb0-41b5-8138-daff94495fb8', 'ResponseMetadata': {'RequestId': 'df22292d-26aa-5b1a-a401-7789fd851222', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'df22292d-26aa-5b1a-a401-7789fd851222', 'date': 'Thu, 25 Apr 2024 22:27:59 GMT', 'content-type': 'text/xml', 'content-length': '378', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
{'MD5OfMessageBody': 'c881ef5c6dbf10f77d0c25ef970b0c4d', 'MessageId': 'aa3b0dab-56ec-4702-9ef7-81f88636a815', 'ResponseMetadata': {'RequestId': 'fb49ef21-2e3a-501a-bca0-b185f50ae922', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fb49ef21-2e3a-501a-bca0-b185f50ae922', 'date': 'Thu, 25 Apr 2024 22:28:00 GMT', 'content-type': 'text/xml', 'content-length': '378', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
{'MD5OfMessageBody': '7615a1360a21ac6618a9898736a4efc1', 'MessageId': 'dc8300aa-5c12-424c-8db9-51663a7fe65d', 'ResponseMetadata': {'RequestId': '2

6. Describe in words (or draw a picture of!) what happens in your AWS Architecture when you run the code above.
7. [Bonus] Let's imagine that your Lambda workers are not able to process the data in your SQS queue fast enough (you are no longer using the test data above; you have an EC2 instance that is connected to a streaming Twitter API and streaming a large number of messages into your queue). How might you make the above architecture more scalable? Implement your solution using `boto3`.

After you're done, be sure to run a teardown script like the following:

In [7]:
# Delete each pipeline component if it still exists:
# Lambda
try:
    aws_lambda.delete_function(FunctionName='lab_wk6')
    print("Lambda Function Deleted")
except aws_lambda.exceptions.ResourceNotFoundException:
    print("AWS Lambda Function Already Deleted")

# SQS
try:
    sqs.delete_queue(QueueUrl=queue_url)
    print("SQS Queue Deleted")
except sqs.exceptions.QueueDoesNotExist:
    print("SQS Queue Already Deleted")

# DynamoDB
dynamodb = boto3.client('dynamodb')
try:
    response = dynamodb.delete_table(TableName='twitter')
    print("DynamoDB Table Deleted")
except dynamodb.exceptions.ResourceNotFoundException:
    print("DynamoDB Table Already Deleted")

Lambda Function Deleted
SQS Queue Deleted
DynamoDB Table Deleted
