In [1]:
import boto3
import dataset
import json

## Step1: Setting Up RDS instance

In [2]:
rds = boto3.client('rds')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')


try:
    response = rds.create_db_instance(
        DBInstanceIdentifier='relational-db',
        DBName='books',
        MasterUsername='username',
        MasterUserPassword='password',
        DBInstanceClass='db.t3.micro',
        Engine='MySQL',
        AllocatedStorage=5
    )
    print("RDS instance created successfully:", response)
    
    # Wait until DB is available to continue
    rds.get_waiter('db_instance_available').wait(DBInstanceIdentifier='relational-db')

    # Describe where DB is available and on what port
    db = rds.describe_db_instances()['DBInstances'][0]
    ENDPOINT = db['Endpoint']['Address']
    PORT = db['Endpoint']['Port']
    DBID = db['DBInstanceIdentifier']
    
    USERNAME = 'username'
    PASSWORD = 'password'
    
    print(DBID,
        "is available at", ENDPOINT,
        "on Port", PORT,
        )   
except Exception as e:
    print("Error creating RDS instance:", e)

RDS instance created successfully: {'DBInstance': {'DBInstanceIdentifier': 'relational-db', 'DBInstanceClass': 'db.t3.micro', 'Engine': 'mysql', 'DBInstanceStatus': 'creating', 'MasterUsername': 'username', 'DBName': 'books', 'AllocatedStorage': 5, 'PreferredBackupWindow': '06:09-06:39', 'BackupRetentionPeriod': 1, 'DBSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0ebcfd71f93ad1599', 'Status': 'active'}], 'DBParameterGroups': [{'DBParameterGroupName': 'default.mysql8.0', 'ParameterApplyStatus': 'in-sync'}], 'DBSubnetGroup': {'DBSubnetGroupName': 'default', 'DBSubnetGroupDescription': 'default', 'VpcId': 'vpc-03fe4a0e60b95c95e', 'SubnetGroupStatus': 'Complete', 'Subnets': [{'SubnetIdentifier': 'subnet-06c5f1ed345b03ce5', 'SubnetAvailabilityZone': {'Name': 'us-east-1d'}, 'SubnetOutpost': {}, 'SubnetStatus': 'Active'}, {'SubnetIdentifier': 'subnet-023920fe191952e92', 'SubnetAvailabilityZone': {'Name': 'us-east-1a'}, 'SubnetOutpost': {}, 'SubnetStatus': 'Active'}, {'

In [3]:
# Get Name of Security Group
SGNAME = db['VpcSecurityGroups'][0]['VpcSecurityGroupId']

# Adjust Permissions for that security group so that we can access it on Port 3306
# If already SG is already adjusted, print this out
try:
    ec2 = boto3.client('ec2')
    data = ec2.authorize_security_group_ingress(
            GroupId=SGNAME,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': PORT,
                 'ToPort': PORT,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ]
    )
    print("Permissions adjusted successfully.")
    
except ec2.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
        print("Permissions already adjusted.")
    else:
        print(e)

Permissions already adjusted.


In [1]:
'''
import mysql.connector

# Connect to the MySQL database
conn =  mysql.connector.connect(host=ENDPOINT, 
                                user="username", 
                                passwd="password", 
                                port=PORT, 
                                database="books")
cur = conn.cursor()

# Create a new table
create_table_query = """
    CREATE TABLE IF NOT EXISTS books_table (
        book_id VARCHAR(255) PRIMARY KEY,
        title VARCHAR(255),
        price VARCHAR(255),
        stock VARCHAR(255),
        rating VARCHAR(255),
        img TEXT,
        description TEXT,
        UPC VARCHAR(255),
        Product_Type VARCHAR(255),
        Price_excl_tax VARCHAR(255),
        Price_incl_tax VARCHAR(255),
        Tax VARCHAR(255),
        Availability VARCHAR(255),
        Number_of_reviews INT
    );
    """
cur.execute(create_table_query)
print("Table 'books_table' created successfully.")
'''

NameError: name 'ENDPOINT' is not defined

In [56]:
# Query the table

'''
USERNAME = 'username'
PASSWORD = 'password'

db_url = \
    'mysql+mysqlconnector://{}:{}@{}:{}/books'.format(
        USERNAME,
        PASSWORD,
        ENDPOINT,
        PORT)

db = dataset.connect(db_url)
'''





## Step2: Get list of books to scrape & Get Sublists Considering Number of Concurrency Workers

In [107]:
import requests
import dataset
import re
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse

db_url = \
    'mysql+mysqlconnector://{}:{}@{}:{}/books'.format(
        USERNAME,
        PASSWORD,
        ENDPOINT,
        PORT)

db = dataset.connect(db_url)

base_url = 'http://books.toscrape.com/'

book_ids_urls = []

def scrape_books(html_soup, url):
    for book in html_soup.select('article.product_pod'):
        # For now, we'll only store the books url
        book_url = book.find('h3').find('a').get('href')
        book_url = urljoin(url, book_url)
        path = urlparse(book_url).path
        book_id = path.split('/')[2]
        book_ids_urls.append({"url": book_url, "book_id": book_id})
        # Upsert tries to update first and then insert instead
        db['books'].upsert({'book_id' : book_id,
                            'last_seen' : datetime.now()
                            }, ['book_id'])
        
        
# Scrape the pages in the catalogue
url = base_url
inp = input('Do you wish to re-scrape the catalogue (y/n)? ')
while True and inp == 'y':
    print('Now scraping page:', url)
    r = requests.get(url)
    html_soup = BeautifulSoup(r.text, 'html.parser')
    scrape_books(html_soup, url)
    # Is there a next page?
    next_a = html_soup.select('li.next > a')
    if not next_a or not next_a[0].get('href'):
        break
    url = urljoin(url, next_a[0].get('href'))

Now scraping page: http://books.toscrape.com/
Now scraping page: http://books.toscrape.com/catalogue/page-2.html
Now scraping page: http://books.toscrape.com/catalogue/page-3.html
Now scraping page: http://books.toscrape.com/catalogue/page-4.html
Now scraping page: http://books.toscrape.com/catalogue/page-5.html
Now scraping page: http://books.toscrape.com/catalogue/page-6.html
Now scraping page: http://books.toscrape.com/catalogue/page-7.html
Now scraping page: http://books.toscrape.com/catalogue/page-8.html
Now scraping page: http://books.toscrape.com/catalogue/page-9.html
Now scraping page: http://books.toscrape.com/catalogue/page-10.html
Now scraping page: http://books.toscrape.com/catalogue/page-11.html
Now scraping page: http://books.toscrape.com/catalogue/page-12.html
Now scraping page: http://books.toscrape.com/catalogue/page-13.html
Now scraping page: http://books.toscrape.com/catalogue/page-14.html
Now scraping page: http://books.toscrape.com/catalogue/page-15.html
Now scrapi

## Step3: Deploying Lambda Functions

In [10]:
import boto3

aws_lambda = boto3.client('lambda')


with open('q2-deployment-package.zip', 'rb') as f:
    lambda_zip = f.read()

# Define your environment variables here
env_variables = {
    'Variables': {
        'DB_ENDPOINT': ENDPOINT,
        'DB_USER': USERNAME,
        'DB_PASSWORD': PASSWORD,
        'DB_PORT': str(PORT), 
    }
}

try:
    response = aws_lambda.create_function(
        FunctionName='a2q2_lambda',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=300,
        Environment=env_variables  # Add environment variables here
    )
    print("Lambda function created successfully:", response)
except aws_lambda.exceptions.ResourceConflictException:
    # Update the function code
    response = aws_lambda.update_function_code(
        FunctionName='a2q2_lambda',
        ZipFile=lambda_zip
    )
    print("Lambda function updated successfully:", response)

    # Optionally, update the environment variables if needed
    response = aws_lambda.update_function_configuration(
        FunctionName='a2q2_lambda',
        Environment=env_variables
    )
    print("Environment variables updated successfully:", response)

response = aws_lambda.put_function_concurrency(
    FunctionName='a2q2_lambda',
    ReservedConcurrentExecutions=10
)

print(response)


Lambda function created successfully: {'ResponseMetadata': {'RequestId': '2bc0a675-dcc6-45ec-8bbf-7ce502ba8188', 'HTTPStatusCode': 201, 'HTTPHeaders': {'date': 'Thu, 02 May 2024 02:05:21 GMT', 'content-type': 'application/json', 'content-length': '1475', 'connection': 'keep-alive', 'x-amzn-requestid': '2bc0a675-dcc6-45ec-8bbf-7ce502ba8188'}, 'RetryAttempts': 0}, 'FunctionName': 'a2q2_lambda', 'FunctionArn': 'arn:aws:lambda:us-east-1:102168828713:function:a2q2_lambda', 'Runtime': 'python3.9', 'Role': 'arn:aws:iam::102168828713:role/LabRole', 'Handler': 'lambda_function.lambda_handler', 'CodeSize': 45451548, 'Description': '', 'Timeout': 300, 'MemorySize': 128, 'LastModified': '2024-05-02T02:05:21.220+0000', 'CodeSha256': 'gAScAX+DgviNBhP/5H1Sa1oIVIWcwinkCzp8q2tqBL4=', 'Version': '$LATEST', 'Environment': {'Variables': {'DB_PORT': '3306', 'DB_ENDPOINT': 'relational-db.cfxyluhsb7bh.us-east-1.rds.amazonaws.com', 'DB_USER': 'username', 'DB_PASSWORD': 'password'}}, 'TracingConfig': {'Mode': 

# Step4: Set Up Step Functions

In [85]:
import boto3
import json

def make_def(lambda_arn):
    definition = {
      "Comment": "Q2 State Machine",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "End": True,
          "MaxConcurrency": 10,
          "Iterator": {
            "StartAt": "Lambda Invoke",
            "States": {
              "Lambda Invoke": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "OutputPath": "$.Payload",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": lambda_arn
                },
                "Retry": [
                  {
                    "ErrorEquals": [
                      "Lambda.ServiceException",
                      "Lambda.AWSLambdaException",
                      "Lambda.SdkClientException",
                      "Lambda.TooManyRequestsException",
                      "States.TaskFailed",
                      "Lambda.Unknown"                      
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                  }
                ],
                "End": True
              }
            }
          }
        }
      }
    }
    return definition

if __name__ == '__main__':
    iam = boto3.client('iam')
    sfn = boto3.client('stepfunctions')
    aws_lambda = boto3.client('lambda')
    role = iam.get_role(RoleName='LabRole')

    lambda_function_name = "a2q2_lambda"

    # Get Lambda Function ARN and Role ARN
    # Assumes Lambda function already exists
    lambda_arn = [f['FunctionArn']
                  for f in aws_lambda.list_functions()['Functions']
                  if f['FunctionName'] == lambda_function_name][0]
    
    # Throttle concurrent executions to 10
    response = aws_lambda.put_function_concurrency(
            FunctionName=lambda_function_name,
            ReservedConcurrentExecutions=10
        )

    sfn_function_name = "a2q2_stepfunctions"

    # Use Lambda ARN to create State Machine Definition
    sf_def = make_def(lambda_arn)

    # Create Step Function State Machine if doesn't already exist
    try:
        response = sfn.create_state_machine(
            name=sfn_function_name,
            definition=json.dumps(sf_def),
            roleArn=role['Role']['Arn'],
            type='EXPRESS' 
        )
    except sfn.exceptions.StateMachineAlreadyExists:
        response = sfn.list_state_machines()
        state_machine_arn = [sm['stateMachineArn'] 
                            for sm in response['stateMachines'] 
                            if sm['name'] == sfn_function_name][0]
        response = sfn.update_state_machine(
            stateMachineArn=state_machine_arn,
            definition=json.dumps(sf_def),
            roleArn=role['Role']['Arn']
        )



In [103]:
db_url = \
    'mysql+mysqlconnector://{}:{}@{}:{}/books'.format(
        USERNAME,
        PASSWORD,
        ENDPOINT,
        PORT)

db = dataset.connect(db_url)

# Get arn for Step Function state machine
response = sfn.list_state_machines()
state_machine_arn = [sm['stateMachineArn'] 
                     for sm in response['stateMachines'] 
                     if sm['name'] == sfn_function_name][0]

# generate test data to pass as input
# "Map" will automatically invoke a separate Lambda function
# to process each dictionary in the list (10 concurrently)
records = list(db['books'].find())
data = [{'book_id': record['book_id']} for record in records]
json_input = json.dumps(data)

[{'book_id': 'a-light-in-the-attic_1000'}, {'book_id': 'tipping-the-velvet_999'}, {'book_id': 'soumission_998'}, {'book_id': 'sharp-objects_997'}, {'book_id': 'sapiens-a-brief-history-of-humankind_996'}, {'book_id': 'the-requiem-red_995'}, {'book_id': 'the-dirty-little-secrets-of-getting-your-dream-job_994'}, {'book_id': 'the-coming-woman-a-novel-based-on-the-life-of-the-infamous-feminist-victoria-woodhull_993'}, {'book_id': 'the-boys-in-the-boat-nine-americans-and-their-epic-quest-for-gold-at-the-1936-berlin-olympics_992'}, {'book_id': 'the-black-maria_991'}, {'book_id': 'starving-hearts-triangular-trade-trilogy-1_990'}, {'book_id': 'shakespeares-sonnets_989'}, {'book_id': 'set-me-free_988'}, {'book_id': 'scott-pilgrims-precious-little-life-scott-pilgrim-1_987'}, {'book_id': 'rip-it-up-and-start-again_986'}, {'book_id': 'our-band-could-be-your-life-scenes-from-the-american-indie-underground-1981-1991_985'}, {'book_id': 'olio_984'}, {'book_id': 'mesaerion-the-best-science-fiction-stori

In [104]:
response = sfn.start_sync_execution(
    stateMachineArn=state_machine_arn,
    input=json_input
)
print(response)

{'executionArn': 'arn:aws:states:us-east-1:102168828713:express:a2q2_stepfunctions:813b4a42-a889-4911-baae-115849807818:e9ed57d3-22f4-432f-9bc0-9ea615f8115c', 'stateMachineArn': 'arn:aws:states:us-east-1:102168828713:stateMachine:a2q2_stepfunctions', 'name': '813b4a42-a889-4911-baae-115849807818', 'startDate': datetime.datetime(2024, 5, 1, 23, 29, 46, 198000, tzinfo=tzlocal()), 'stopDate': datetime.datetime(2024, 5, 1, 23, 30, 32, 910000, tzinfo=tzlocal()), 'status': 'SUCCEEDED', 'input': '[{"book_id": "a-light-in-the-attic_1000"}, {"book_id": "tipping-the-velvet_999"}, {"book_id": "soumission_998"}, {"book_id": "sharp-objects_997"}, {"book_id": "sapiens-a-brief-history-of-humankind_996"}, {"book_id": "the-requiem-red_995"}, {"book_id": "the-dirty-little-secrets-of-getting-your-dream-job_994"}, {"book_id": "the-coming-woman-a-novel-based-on-the-life-of-the-infamous-feminist-victoria-woodhull_993"}, {"book_id": "the-boys-in-the-boat-nine-americans-and-their-epic-quest-for-gold-at-the-19

In [105]:
db_url = \
    'mysql+mysqlconnector://{}:{}@{}:{}/books'.format(
        USERNAME,
        PASSWORD,
        ENDPOINT,
        PORT)

db = dataset.connect(db_url)

tables = db.tables
print("Tables in the database:", tables)


Tables in the database: ['book_info', 'books']


In [106]:
table = db['books']

# Fetch the first 10 records
records = table.find()
for record in records:
    print(record)


OrderedDict([('id', 1), ('book_id', 'a-light-in-the-attic_1000'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 2), ('book_id', 'tipping-the-velvet_999'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 3), ('book_id', 'soumission_998'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 4), ('book_id', 'sharp-objects_997'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 5), ('book_id', 'sapiens-a-brief-history-of-humankind_996'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 6), ('book_id', 'the-requiem-red_995'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 7), ('book_id', 'the-dirty-little-secrets-of-getting-your-dream-job_994'), ('last_seen', datetime.datetime(2024, 5, 2, 4, 29, 49))])
OrderedDict([('id', 8), ('book_id', 'the-coming-woman-a-novel-based-on-the-life-of-the-infamous-feminist-victoria-wo

In [33]:
'''
# Synchronous Execution
response = sfn.start_sync_execution(
    stateMachineArn=state_machine_arn,
    name='sync_test',
    input=json.dumps(data)
)

print(response['output'])
'''

[null,null,null,null,null]


In [None]:
'''
# Async; perhaps writing results to db and don't need to wait for execution to finish before moving on with code
response = sfn.start_execution(
    stateMachineArn=state_machine_arn,
    name='async_test',
    input=json.dumps(data)
)
'''


print(response) # no results returned for async option
# Can go into logs in Cloud Watch and see execution results (Express SF workflow)
# Note that Standard Step Function workflow allows us to audit results via Boto3)