## Lambda demo

Invoking Lambda in Python

Lambda functions can be developed in Python through a Python package called Boto3. Boto3 is the AWS SDK to interact and manage AWS services, including Lambda. You do so through a client:

In [None]:
import boto3
client = boto3.client('lambda')

# Code is uploaded in boto3 to lambda through the function create_function.

response = client.create_function(
    FunctionName='botoLambdaFunction1',
    Role='arn:aws:iam::565094796913:role/lambda_full_access',
    Code={ 'ZipFile': code})

lambda Console 


From the Lambda console, we'll select Create new function. We'll then create an author-from-scratch function that has a runtime environment of Python 3.

We'll then create a test event to see if this function actually works. We’ll click test and then click configure test event. We'll leave the default 'hello-world' event. If we click test, we can see the specified status code and we can also see the specified body in the execution.

What if an error occurred outside a test environment, and we wanted to see what the issue was? Click monitor, then view logs in CloudWatch. We can see logs right here, and clicking through we may be able to see the source of the error.

Lambda SDK

In the SDK, we can create a function through boto3. We first need to write this lambda function to our local machine. Then, we’ll create a boto3 session for Lambda, and pass in the binary code of our zipped up code into the create-function method. A few reminders:

By default, SageMaker execution roles won’t have access to Lambda, you’ll need to add this permission through the IAM console.
You’ll need to create an execution role for the Lambda function. Again, you’ll need to do this through the IAM console.
Names for Lambda functions need to be unique. Finally, you’ll need to pass in the right lambda handler name. (lambda_function.lambda_handler)

In [None]:
import boto3

from sagemaker import get_execution_role
from zipfile import ZipFile
role = get_execution_role()
client = boto3.client('lambda')

with ZipFile('code.zip', 'w') as f:
    f.write('lambda_function.py')

# If submitting as a ZipFile, you need to insert raw data. 
with open('code.zip', 'rb') as f:
    b_code = f.read()

response = client.create_function(
    FunctionName='botoLambdaFunction1',
    Runtime='python3.9',
    Handler='lambda_function.lambda_handler',
    Code={
        'ZipFile': b_code
    },
    Description='string',
    Timeout=30,
    MemorySize=1024,
    Publish=True,
    PackageType='Zip',
    Role='arn:aws:iam::565094796913:role/lambda_full_access'
)

## Triggering a Lambda Function Demo

In this demo, we demonstrate how to trigger a Lamda in the following two ways:

CloudWatch Scheduled Event

SDK


### through CloudWatch Scheduled Event

CloudWatch Scheduled Events

We'll start with CloudWatch Scheduled Events. We have two options under "rules".

"Event Pattern" would be what we use if we wanted to send an event based on an API call appearing somewhere.

"Schedule" is how we can specify a Scheduled Event. There, we can specify an interval and a target. Once done, we need to wait for an interval of the length we specified to elapse before we see any invocations.

If we click through to the lambda function, there are two visual cues that indicate what we've done.

We can see invocations under "monitor".

We can see in the "overview" that CloudWatch events were added as a trigger.

SDK

We'll then trigger a Lambda function in Python through Boto3. Here, similar to how AWS Services can send an event to Lambda, we too can draft an event that we can send to Lambda. The steps to do this are...

Spin up a boto3 Lambda client.

Ensure your execution role needs to have access to Lambda.

From there, you can specify the payload as a JSON-serializable object.

Convert it into a byte array.

Call the invoke method.

You can verify that the invocation was successful through the response object.

In [None]:
# SDK

import boto3
from sagemaker import get_execution_role 
## The SageMaker role executing your notebook needs to have Lambda permissions. 
import json

client = boto3.client('lambda')

payload = {'key': 'value'}

payload_bytes = json.dumps(payload).encode('utf-8')

response = client.invoke(
    FunctionName='example123',
    InvocationType='Event',
    Payload=payload_bytes
)

UDACITY : Designing Your First Workflow - Invoking Lambda Functions

In the last exercise, you created your own Lambda function. Without realizing it, you've already practiced invoking as well! Launching a test event is an example of synchronous invocation. In this exercise, you will continue working on the lambda function 'PreprocessLambda' from the previous exercise. However, you'll practice a different way to launch asynchronous invocation, and also practice the setup of an asynchronous invocation. These are only two examples. Lambda is one of the most flexible offerings in AWS and can be utilized in a variety of applications. The same Lambda function can be (and often is) both invoked synchronously and asynchronously.



## Exercise: Synchronous invocation

Synchronous invocations occur when a call is made to a Lambda function and a response is waited for. The example we're asking you to implement is a CLI invocation, but Lambda functions can also be placed behind Amazon's API Gateway for potential users to directly invoke a Lambda function. This, in turn, could be the interface that you expose to users to allow them to interact with other AWS resources. These types of invocations are great for "get" and "set" methods.

Your task is to synchronously invoke the Lambda function you implemented in the last exercise using the CLI. The following documentation may be useful to you: https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html

You will need to attach the LambdaFullAccess policy to the SageMaker execution role used for your notebook. Once done, it will take a few minutes for the policy to register.

In [None]:
%%bash 
# echo "Example CLI Command."
aws lambda invoke --function-name preprocess-helloblze --payload '{"s3-dataset-uri": "s3://mldeployex/trigger-demo/reviews_Patio_Lawn_and_Garden_5.json.zip"}' response.json

## Exercise: Asynchronous invocation

Asynchronous invocations occur when a service invokes lambda but does not wait for a response. The two most popular services that utilize asynchronous invocations are S3 (the storage we've been using) and SNS (Simple Notification Service.) We'll be setting up asynchronous invocations on an S3 bucket for our preprocessing function.

Your task is to setup a trigger for the Lambda function we've been working whenever a file is uploaded to a specific folder in S3. You will need to do the following:

Create a new s3 folder within an existing bucket.
Create a new lambda trigger for S3 by clicking '+Add trigger'. Specifying the bucket. Specify a prefix of the desired folder. Specify a suffix of ".zip" to ensure that recursive calls don't occur.
Modify the lambda handler in the previous exercise using the starter code so that it properly parses the event that's sent to it.
To test, upload reviews_Patio_Lawn_and_Garden_5.json.zip in this directory to your S3 bucket. To see if the lambda function is triggered, you can go to the Monitor tab.



## Lambda Handler Starter Code: Parsing S3 Upload Event.

In [None]:
# Todo: write a lambda_handler function here.
# The code to parse S3 event has provided to you, you only need to call the `preprocess` from the HelloBlazePreprocessLambda.py and return the status.
import json
import urllib

for r in event['Records']:
        bucket = r['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(r['s3']['object']['key'], encoding='utf-8')
        uri = "/".join([bucket, key])

## Triggering from CLI

Below is a valid CLI function that will invoke a lambda function.

aws lambda invoke --function-name preprocess-helloblze --payload '{"s3-dataset-uri": "udacity-sagemaker-solutiondata2021/l3e1/reviews_Musical_Instruments_5.json.zip"}' response.json

Let's break this command down:

aws lambda is the service we’re using.

--function-name is the name of the function we're invoking.

--payload is the payload we want to send to the function.

response.json is where we want the output of this function to be written to.


## Triggering from an S3 Upload.

We will need to perform the following steps before modifying our lambda code:

Create a new s3 folder within an existing bucket.
Create a new lambda trigger for S3, specifying the bucket, specify the folder using the prefix, and specify a suffix of ".zip" to ensure that recursive calls don't occur.
We then need to modify the lambda handler starter code so that it properly parses the event that's sent to it.

To test, we'll upload reviews_Patio_Lawn_and_Garden_5.json.zip in this directory to your S3 bucket. To see if the lambda function is triggered, you can go to the Monitor tab.

In [None]:
# code

import json
import urllib
from HelloBlazePreprocessLambda import preprocess

def lambda_handler(event, context):
    for r in event['Records']:
        bucket = r['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(r['s3']['object']['key'], encoding='utf-8')
        uri = "/".join([bucket, key])
        preprocess(uri)
    return {
        'statusCode': 200,
        'body': "Good to go!"
    }

## Creating Workflows with Step Functions Demo

## SDK

We then created a state machine in Boto3, using a state machine definition that we took from the UI. We made sure our SageMaker execution role had full access both to StepFunctions and Lambda.

We used an IAM role that had access to Lambda, and with that invoked the create_state_machine function. Once that’s done, you can print the response to see the 200 ‘success’ code. Remember both that state-machine names and job names need to be unique.

In [None]:
response = client.create_state_machine(
    name='boto3StateMachine3', # Names need to be unique. 
    definition=definition,
    roleArn='arn:aws:iam::565094796913:role/service-role/StepFunctions-firstStateMachine-role-0826984a',
    type='STANDARD',
    loggingConfiguration={
        'level': 'OFF'
    }
)

Once that was done, we executed this state machine by specifying an execution name, the URI of the state machine, and an input.

In [None]:

response = client.start_execution(
    stateMachineArn='arn:aws:states:us-west-2:565094796913:stateMachine:boto3StateMachine3', # You can find this through the Console or through the 'response' object. 
    name='example1', # Execution names need to be unique within state machines. 
    input='{}' # Input needs to be at least empty brackets. 
)

## UDACITY Designing Your First Workflow - Step Functions

#### Step Functions & SageMaker

In the prior exercises, we've been working with many small services. This can be overwhelming for a data scientist that wants to establish a consistent methodology for handling data. Step Functions is an orchestration service that can allow us to utilize SageMaker in a methodical and consistent way. Step Functions also integrates with Lambda, which can allow us to potentially automate our entire machine learning pipeline end-to-end. Let's get a handle on what a 'step' in a step function looks like.

In this exercise, you will create a preprocessing step and a training step. Then you will create a step function to chain the two steps.

#### Exercise: Grant Permissions and install packages.


Attach the IAMFullAccess and the StepFunctionsFullAccess polices to your SageMaker execution role.

In [None]:
%%bash
pip install stepfunctions

### Exercise: Fill out preprocessing step.

The 'step' interface is designed to be quite similar to the Preprocessing Job in lesson 2. The main difference between these is the ability of a 'step' to interface with other steps. Given the successful outcome of a single step, the next step specified in a workflow will automatically continue. In our case, a training step will launch given the successful outcome of a preprocessing step. The preprocessing step has been encoded for you. Upload the preprocessing code 'HelloBlazePreprocess.py' and the zipped dataset 'reviews_Musical_Instruments_5.json.zip' to s3, and fill out the constants in the code below.

Code below is the preprocessing step. Fill in the constants in the code.

In [None]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from stepfunctions.steps.sagemaker import ProcessingStep
import sagemaker

role = get_execution_role()

PREPROCESSING_JOB_NAME = "preprocess-job"
input_data = 's3://mldeployex/reviews_Musical_Instruments_5.json.zip'
input_preprocessing_code = 's3://mldeployex/HelloBlazePreprocess.py'
sess = sagemaker.Session()

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)


processed_data_train = "{}{}/{}".format("s3://", sess.default_bucket(), 'hello_blaze_train_scikit')
processed_data_test = "{}{}/{}".format("s3://", sess.default_bucket(), 'hello_blaze_test_scikit')

inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input', input_name = 'input-1'),  ProcessingInput(source=input_preprocessing_code , destination='/opt/ml/processing/input/code', input_name = 'code')]


outputs=[ProcessingOutput(source='/opt/ml/processing/output/train', destination=processed_data_train, output_name = 'train_data'), ProcessingOutput(source='/opt/ml/processing/output/test', destination=processed_data_test, output_name = 'test_data')]


processing_step = ProcessingStep(
    "SageMaker pre-processing step 4",
    processor=sklearn_processor,
    job_name=PREPROCESSING_JOB_NAME,
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/HelloBlazePreprocess.py"],
)


### Exercise: Fill out Training Step

Upon the success of the preprocessing step, we wish to execute a training step. A training step is defined below. Fill the constants in the code.

In [None]:
from stepfunctions.steps.sagemaker import TrainingStep
import boto3

WORKFLOW_OUTPUT = "s3://udacity-sagemaker-solutiondata2021/l3e3/workflow_output"
TRAINING_JOB_NAME = "test-job2-train"

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    region=region_name, framework="blazingtext", version="latest"
)

helloBlazeEstimator = sagemaker.estimator.Estimator(
    container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=WORKFLOW_OUTPUT,
    sagemaker_session=sess,
)

helloBlazeEstimator.set_hyperparameters(mode='supervised')

training_step = TrainingStep(
    "SageMaker Training Step",
    estimator=helloBlazeEstimator,
    data={"train": sagemaker.TrainingInput(processed_data_train, content_type="text/plain"), "validation": sagemaker.TrainingInput(processed_data_test, content_type="text/plain")},
    job_name=TRAINING_JOB_NAME,
    wait_for_completion=True,
)

### Exercise: Create & Execute Workflow

In [None]:
from stepfunctions.steps import Chain
from stepfunctions.workflow import Workflow

workflow_role = 'arn:aws:iam::354406515641:role/StepFunctionsLambdaRole'

workflow_graph = Chain([processing_step, training_step])
workflow = Workflow(
    name="SageMakerProcessingWorkflow7",
    definition=workflow_graph,
    role=workflow_role,
)

workflow.create()

# Execute workflow
execution = workflow.execute(
    inputs={
        "PreprocessingJobName": PREPROCESSING_JOB_NAME,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "TrainingJobName": TRAINING_JOB_NAME  # Each Sagemaker Training job requires a unique name,       
    }
)

execution_output = execution.get_output(wait=True)


In [None]:
execution.render_progress()