In [1]:
!pip install boto3

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import os
import boto3
import json
import requests

# Utilitary build functions

In [3]:
def build_ecr_password_stdin(account_id_, region_):
    return f"{account_id_}.dkr.ecr.{region_}.amazonaws.com"

def build_tagged_image(image_name, tag):
    return f"{ecr_image_name}:{tag}"

def build_ecr_url(account_id_, region_, image_name, tag):    
    tagged_image_name=build_tagged_image(image_name, tag)
    password_stdin=build_ecr_password_stdin(account_id_, region_)
    
    return f"{password_stdin}/{tagged_image_name}"

def build_lambda_uri(region_, lambda_arn_):
    uri_host=f"arn:aws:apigateway:{region_}:lambda:path"
    uri_route=f"2015-03-31/functions/{lambda_arn_}/invocations"
    return f"{uri_host}/{uri_route}"

def build_source_arn(region_, account_id_, rest_api_id_):
    return f'arn:aws:execute-api:{region_}:{account_id_}:{rest_api_id_}/*'

def build_api_url(rest_api_id, region_, endpoint_, stage_):
    host=f"https://{rest_api_id}.execute-api.{region}.amazonaws.com"
    route=f"{stage}/{endpoint}/"
    return f"{host}/{route}"


# Information for communication protocols

In [4]:
# Account
account_id = '060004687794'

# Server
region = "sa-east-1"

# Platform
ecr_image_name = "serverless-example"
tag='latest'
target_platform="linux/amd64"

# API
endpoint = "predict"
method_verb='POST'
stage = "test"

# Ellaborate information
tagged_image_uri=f"{ecr_image_name}:latest"
password_stdin=f"{account_id}.dkr.ecr.{region}.amazonaws.com"

# Specify the role name and trust policy for the Lambda service
role_name = 'lambda-exec-role'

trust_policy = {
    'Version': '2012-10-17',
    'Statement': [
        {
            'Effect': 'Allow',
            'Principal': {'Service': 'lambda.amazonaws.com'},
            'Action': 'sts:AssumeRole'
        }
    ]
}

policy_arn = 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'

# Rate limits: Harsh since this will be public facing
# Quota: Low daily limits for the same reason
usage_constraints = {
    'rate_limits': {
        'burstLimit': 10,
        'rateLimit': 10.0
    },
    'quota': {
        'limit': 100,
        'period': 'DAY'
    }
}

# Function name (not public facing)
function_name = f'lambda-fn-{ecr_image_name}'

# Clients

In [5]:
# Set up the IAM client
iam_client = boto3.client('iam', region_name=region)

# Set up the Lambda client
lambda_client = boto3.client('lambda', region_name=region)

# Set up the API Gateway client
gateway_client = boto3.client('apigateway', region_name=region)

# Development steps

- IAM role image handling;
- Docker image
- Lambda function creation;
- API Gateway

## IAM Role

In [6]:
def try_get_role(i_client, role_name_, trust_policy):
    try:
        return i_client.get_role(
            RoleName=role_name_
        )
    except i_client.exceptions.NoSuchEntityException:
        response = client.create_role(
            RoleName=role_name_,
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description='Execution role for Lambda function',
        )
        
def try_attach_role_policy(i_client, role_name_, policy_arn, trust_policy):
    # Just need to run it once, otherwise retrieve already existing role
    response=try_get_role(i_client, role_name_, trust_policy)

    # Get the role ARN
    role_arn = response['Role']['Arn']

    # Attach the AWSLambdaBasicExecutionRole policy to the role
    i_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
    
    return role_arn

# The id "role_arn" will be used on lambda deployment
role_arn = try_attach_role_policy(iam_client, role_name, policy_arn, trust_policy)

## Docker image

In [7]:
def run(command):
    os.system(command)

def login_ecr_docker(account_id, region):
    password_stdin=build_ecr_password_stdin(account_id, region)
    
    opts=f"get-login-password --region {region}"
    get_pwd_command=f"aws ecr {opts}"
    
    opts=f"--username AWS --password-stdin {password_stdin}"
    login_command=f"docker login {opts}"
    
    entry_command=f"{get_pwd_command} | {login_command}"
    
    run(entry_command)
    
def create_ecr_image(ecr_image_name_):
    args_1=f"--repository-name {ecr_image_name_}"
    args_2=f"--image-scanning-configuration scanOnPush=true"
    args_3=f"--image-tag-mutability MUTABLE"
    create_args=f"{args_1} {args_2} {args_3}"
    
    opts=f"create-repository {create_args}"
    create_comand=f"aws ecr {opts}"

    run(create_comand)

def build_docker_image(ecr_image_name):
    build_args=f"-t {ecr_image_name} ."
    build_command=f"docker build {build_args}"
    
    run(build_command)
    
def tag_docker_image(tagged_image_uri_, routed_url):
    tag_args=f"{tagged_image_uri_} {routed_url}"
    tag_command=f"docker tag {tag_args}"
    
    run(tag_command)
    
def push_docker_image(tagged_image_uri):
    push_command=f"docker push {tagged_image_uri}"

    run(push_command)
    
def pipe_push_image(account_id_, region_, ecr_image_name_, tag_):
    password_stdin=build_ecr_password_stdin(account_id_, region_)
    
    # 1. Log in to AWS ECR
    login_ecr_docker(account_id_, region_)
    
    # 2. Create ECR repo: only needs to be done once
    create_ecr_image(ecr_image_name_)

    # 3. Build Docker image using your local Dockerfile
    build_docker_image(ecr_image_name_)

    # 4. Tag you image
    tagged_image_uri=build_tagged_image(ecr_image_name_, tag_)
    routed_url=build_ecr_url(account_id_, region_, ecr_image_name_, tag_)
    tag_docker_image(tagged_image_uri, routed_url)

    # 5. Push your image to ECR
    push_docker_image(routed_url)

In [8]:
pipe_push_image(account_id, region, ecr_image_name, tag)

https://docs.docker.com/engine/reference/commandline/login/#credentials-store



Login Succeeded
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:sa-east-1:060004687794:repository/serverless-example",
        "registryId": "060004687794",
        "repositoryName": "serverless-example",
        "repositoryUri": "060004687794.dkr.ecr.sa-east-1.amazonaws.com/serverless-example",
        "createdAt": "2023-06-15T18:00:06-03:00",
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": true
        },
        "encryptionConfiguration": {
            "encryptionType": "AES256"
        }
    }
}


#1 [internal] load build definition from Dockerfile
#1 transferring dockerfile: 32B done
#1 DONE 0.0s

#2 [internal] load .dockerignore
#2 transferring context: 2B done
#2 DONE 0.0s

#3 [internal] load metadata for public.ecr.aws/lambda/python:3.9.2023.03.15.15-x86_64
#3 DONE 0.7s

#4 [1/4] FROM public.ecr.aws/lambda/python:3.9.2023.03.15.15-x86_64@sha256:d7dd8369b33bb0825b32269af61cab13e4c5db6200ad6350b6aee0c4682b5562
#4 DONE 0.0s

#5 [internal] load build context
#5 transferring context: 71B done
#5 DONE 0.0s

#6 [2/4] COPY requirements.txt  .
#6 CACHED

#7 [3/4] RUN  pip3 install -r requirements.txt --target "/var/task"
#7 CACHED

#8 [4/4] COPY predict_fun.py /var/task
#8 CACHED

#9 exporting to image
#9 exporting layers done
#9 writing image sha256:039dd11ec199acb5e1012a4769cfa713b33d797994fa2d018126e9da52926734 done
#9 naming to docker.io/library/serverless-example done
#9 DONE 0.0s


The push refers to repository [060004687794.dkr.ecr.sa-east-1.amazonaws.com/serverless-example]
94db58c2b42e: Preparing
328bd2e62eea: Preparing
8cd0f519a56d: Preparing
2d32c89b4065: Preparing
e5a063df86fb: Preparing
2d244e0816c6: Preparing
ef5cc9241c56: Preparing
65c2fe7d7702: Preparing
0d91c86bcea1: Preparing
2d244e0816c6: Waiting
ef5cc9241c56: Waiting
65c2fe7d7702: Waiting
0d91c86bcea1: Waiting
8cd0f519a56d: Pushed
94db58c2b42e: Pushed
2d32c89b4065: Pushed
ef5cc9241c56: Pushed
2d244e0816c6: Pushed
65c2fe7d7702: Pushed
e5a063df86fb: Pushed
328bd2e62eea: Pushed
0d91c86bcea1: Pushed
latest: digest: sha256:d7aadbefa1d1615b0ac711a2f0d0d201db4f2e2631446de520f29d1afe003959 size: 2207


# Lambda function 

In [9]:
def try_get_function(client, function_name, tagged_image_uri, role_arn):
    success_msg=f"Lambda function {function_name} already exists"
    failure_message=f"Lambda function {function_name} created!"
    
    code_payload={'ImageUri': tagged_image_uri}
    func_description='SKLearn predict Lambda function'
    
    try:
        return client.get_function(FunctionName=function_name)
        
    except lambda_client.exceptions.ResourceNotFoundException:
        return client.create_function(
            FunctionName=function_name,
            Role=role_arn,
            PackageType='Image',
            Code=code_payload,
            Description=func_description,
            Timeout=10,
            MemorySize=256,
            Publish=True,
        )
        print(failure_message)

In [10]:
# Retrieve (if already exists) or create a new Lambda function
routed_url=build_ecr_url(account_id, region, ecr_image_name, tag)
response = try_get_function(lambda_client, function_name, routed_url, role_arn)

response

{'ResponseMetadata': {'RequestId': 'e424027c-c6fd-41ef-bda0-89e4808a490e',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Thu, 15 Jun 2023 21:00:58 GMT',
   'content-type': 'application/json',
   'content-length': '1110',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'e424027c-c6fd-41ef-bda0-89e4808a490e'},
  'RetryAttempts': 0},
 'FunctionName': 'lambda-fn-serverless-example',
 'FunctionArn': 'arn:aws:lambda:sa-east-1:060004687794:function:lambda-fn-serverless-example',
 'Role': 'arn:aws:iam::060004687794:role/lambda-exec-role',
 'CodeSize': 0,
 'Description': 'SKLearn predict Lambda function',
 'Timeout': 10,
 'MemorySize': 256,
 'LastModified': '2023-06-15T21:00:57.601+0000',
 'CodeSha256': 'd7aadbefa1d1615b0ac711a2f0d0d201db4f2e2631446de520f29d1afe003959',
 'Version': '2',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'a066e91a-c1ce-42c0-82c9-19f652e32ba3',
 'State': 'Pending',
 'StateReason': 'The function is being created.',
 'StateReasonCode': 'Creating'

In [12]:
# Prepare the event to pass to the Lambda function
example=[1, 2, 3, 4, 5]

# Transform into json format
payload=json.dumps({"body": example})

# Invoke the Lambda function
response = lambda_client.invoke(
    FunctionName=function_name,
    InvocationType='RequestResponse',
    Payload=payload
)

# Get the response from the Lambda function
result = json.loads(response['Payload'].read())

print(result)

{'isBase64Encoded': False, 'statusCode': 200, 'headers': {'Content-Type': 'application/json'}, 'body': '"42"'}


## API Gateway setup

In [13]:
def has_api(g_client, rest_api_name_):
    response = g_client.get_rest_apis()
    create_api_on_gateway = True
    
    for item in response['items']:
        if item['name'] == rest_api_name_:
            create_api_on_gateway = False
            
    return create_api_on_gateway

def create_resource(g_client, rest_api_id_):
    response = g_client.get_resources(restApiId=rest_api_id_)
    root_id = response['items'][0]['id']
    
    response = gateway_client.create_resource(
        restApiId=rest_api_id_,
        parentId=root_id,
        pathPart=endpoint,
    )
    
    resource_id = response['id']
    
    return resource_id

def create_rest_method(g_client, rest_api_id_, resource_id_, method_verb_):
    g_client.put_method(
        restApiId=rest_api_id_,
        resourceId=resource_id_,
        httpMethod=method_verb_,
        authorizationType='NONE', # WARNING: this will allow public access!
        apiKeyRequired=True,
    )

def create_rest_api(g_client, rest_api_name_):
    description='API Gateway that triggers a lambda function'
    response=g_client.create_rest_api(name=rest_api_name_, description=description) 
    
    rest_api_id = response['id']
    
    return rest_api_id

def get_lambda_arn(g_client, function_name_):
    response = g_client.get_function(FunctionName=function_name_)
    return response['Configuration']['FunctionArn']

def setup_integration(g_client, lambda_uri_, rest_api_id_, resource_id_, method_verb_):
    g_client.put_integration(
        restApiId=rest_api_id_,
        resourceId=resource_id_,
        httpMethod=method_verb_,
        type='AWS_PROXY',
        integrationHttpMethod=method_verb_,
        uri=lambda_uri_,
    )
    
def create_deployment(g_client, rest_api_id_, stage_):
    g_client.create_deployment(restApiId=rest_api_id_, stageName=stage_)
    
def create_api_key(g_client, rest_api_name_):
    response = g_client.create_api_key(
        name=rest_api_name_ + '-key',
        description='API key',
        enabled=True,
        generateDistinctId=True
    )
    
    api_key_id = response['id']
    api_key_value = response['value']
    
    return api_key_id, api_key_value

def create_usage_plan(g_client, rest_api_id_, stage_, usage_constraints_):
    name='API usage plan'
    description='Harsh rate limits and daily quota for public facing API'
    stages=[
        {
            'apiId': rest_api_id_,
            'stage': stage_,
        },
    ]
    
    response = g_client.create_usage_plan(
        name=name,
        description=description,
        apiStages=stages,
        throttle=usage_constraints_['rate_limits'],
        quota=usage_constraints_['quota']
    )
    
    usage_plan_id = response['id']
    
    return usage_plan_id

def create_usage_plan_key(g_client, usage_plan_id_, api_key_id_):
    g_client.create_usage_plan_key(
        usagePlanId=usage_plan_id_,
        keyId=api_key_id_,
        keyType='API_KEY'
    )
    
def add_apigateway_permission(l_client, function_name_, source_arn_):
    return l_client.add_permission(
        FunctionName=function_name_,
        StatementId='apigateway-lambda-invoke-permission',
        Action='lambda:InvokeFunction',
        Principal='apigateway.amazonaws.com',
        SourceArn=source_arn_
    )

def deploy_rest_api(\
        g_client, l_client, \
        account_id, region, \
        function_name_, rest_api_name_, endpoint_, method_verb_, \
        usage_constraints_, stage_ \
    ):
    # First, lets verify whether we already have an endpoint with this name.
    if not has_api(g_client, rest_api_name_):

        # 1. Create REST API
        rest_api_id = create_rest_api(g_client, rest_api_name_)

        # 2. Create resource
        resource_id=create_resource(g_client, rest_api_id)
        
        # 3. Create method
        create_rest_method(g_client, rest_api_id, resource_id, method_verb_)
        
        # 4. Get the Lambda function ARN
        lambda_arn = get_lambda_arn(l_client, function_name_)

        # 5. Set up integration with the Lambda function
        lambda_uri = build_lambda_uri(region, lambda_arn)

        setup_integration(g_client, lambda_uri, rest_api_id, resource_id, method_verb_)

        # 6. Deploy API
        create_deployment(g_client, rest_api_id, stage_)

        # 7. Create API key
        api_key_id, api_key_value = create_api_key(g_client, rest_api_name_)

        # 8. Create usage plan
        usage_plan_id = create_usage_plan(g_client, rest_api_id, stage_, usage_constraints_)
        
        # 9. Associate the usage plan with the API key
        create_usage_plan_key(g_client, usage_plan_id, api_key_id)
        
        # 10. Grant API Gateway permission to invoke the Lambda function
        source_arn = build_source_arn(region, account_id, rest_api_id)
        
        try: 
            add_apigateway_permission(l_client, function_name_, source_arn)
        except l_client.exceptions.ResourceConflictException:
            pass
        
        return {
            'url': build_api_url(rest_api_id, region, endpoint_, stage_),
            'api_key': api_key_value,
            'usage_plan_id': usage_plan_id,
            'rest_api_id': rest_api_id
        }
    
    else: 
        failure_msg=f"REST API name {rest_api_name} is already under usage!"
        print(failure_msg)
        
        return {}

In [14]:
# Define the name of the API (not public facing)
rest_api_name = function_name + '-api'

deployment_reponse = deploy_rest_api(\
    gateway_client, lambda_client, \
    account_id, region, \
    function_name, rest_api_name, endpoint, method_verb, \
    usage_constraints, stage \
)


In [None]:
rest_api_id=deployment_reponse['rest_api_id']

# The URL by default will follow this pattern:
api_url = build_api_url(rest_api_id, region, endpoint, stage)
print(api_url)


In [16]:
api_key=deployment_reponse['api_key']
rest_api_id=deployment_reponse['rest_api_id']

# The URL by default will follow this pattern:
api_url = build_api_url(rest_api_id, region, endpoint, stage)
print(api_key)
print(api_url)

RvnS3wdlCV5QbYAwYePRl4PkhmMN20Bn1pk8Co2f
https://spoixzdvhf.execute-api.sa-east-1.amazonaws.com/test/predict/


In [15]:
headers = {
    'Content-type': 'application/json', 
    'x-api-key': api_key,
}

resp = requests.post(api_url, headers=headers, json=example)
resp.json()


https://spoixzdvhf.execute-api.sa-east-1.amazonaws.com/test/predict/


'42'