# Automate Model Retraining & Deployment Using the AWS Step Functions Data Science SDK

## An experiment on medical images

1. [Introduction](#Introduction)
1. [Setup](#Setup)
1. [Create Resources](#Create-Resources)
1. [Build a Machine Learning Workflow](#Build-a-Machine-Learning-Workflow)
1. [Run the Workflow](#Run-the-Workflow)
1. [Clean Up](#Clean-Up)

## Introduction

This experiment try to use the AWS Step Functions Data Science SDK to create a machine learning model retraining workflow. The Step Functions SDK is an open source library that allows data scientists to easily create and execute machine learning workflows using AWS Step Functions and Amazon SageMaker. For more information, please see the following resources:
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

In this notebook, we will use the SDK to create steps that capture and transform data using AWS Glue, encorporate this data into the training of a machine learning model, deploy the model to a SageMaker endpoint, link these steps together to create a workflow, and then execute the workflow in AWS Step Functions.

This experiment is based on [a medical image dataset on kaggle](https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia), aiming to distinguish chest-ray images between pneumonia and normal patients .

## Setup

First, we'll need to install and load all the required modules. Then we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources that we will create. The IAM roles grant the services permissions within your AWS environment.

In [2]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions



### Import the Required Modules

In [39]:
import uuid
import logging 
import stepfunctions 
import boto3
import sagemaker 
from sagemaker.tensorflow import TensorFlow
from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.s3 import S3Uploader
from sagemaker.s3 import S3Downloader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
import random
import shutil

session = sagemaker.Session() 
stepfunctions.set_stream_logger(level=logging.INFO) 

region = boto3.Session().region_name
bucket = session.default_bucket() 
id = uuid.uuid4().hex

# Create a unique name for the AWS Glue job to be created. If you change the
# default name, you may need to change the Step Functions execution role.
job_name = "glue-customer-churn-etl-{}".format(id)

# Create a unique name for the AWS Lambda function to be created. If you change
# the default name, you may need to change the Step Functions execution role.
function_name = "query-training-status-{}".format(id)

pwd = "/home/ec2-user/SageMaker/img_cls_exp/Medical Image/"

Next, we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources. The IAM roles grant the services permissions within your AWS environment.

### Add permissions to your notebook role in IAM

The IAM role assumed by your notebook requires permission to create and run workflows in AWS Step Functions. If this notebook is running on a SageMaker notebook instance, do the following to provide IAM permissions to the notebook:

1. Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). 
2. Select **Notebook instances** and choose the name of your notebook instance.
3. Under **Permissions and encryption** select the role ARN to view the role on the IAM console.
4. Copy and save the IAM role ARN for later use. 
5. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.
6. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**.

We also need to provide permissions that allow the notebook instance the ability to create an AWS Lambda function and AWS Glue job. We will edit the managed policy attached to our role directly to encorporate these specific permissions:

1. Under **Permisions policies** expand the AmazonSageMaker-ExecutionPolicy-******** policy and choose **Edit policy**.
2. Select **Add additional permissions**. Choose **IAM**  for Service and **PassRole** for Actions.
3. Under Resources, choose **Specific**. Select **Add ARN** and enter `query_training_status-role` for **Role name with path*** and choose **Add**. You will create this role later on in this notebook.
4. Select **Add additional permissions** a second time. Choose **Lambda** for Service, **Write** for Access level, and **All resources** for Resources.
5. Select **Add additional permissions** a final time. Choose **Glue** for Service, **Write** for Access level, and **All resources** for Resources.
6. Choose **Review policy** and then **Save changes**.


Next, let's create an execution role in IAM for Step Functions. 

### Create an Execution Role for Step Functions

Your Step Functions workflow requires an IAM role to interact with other services in your AWS environment. 

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Step Functions**.
4. Choose **Next** until you can enter a **Role name**.
5. Enter a name such as `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` and then select **Create role**.

Next, create and attach a policy to the role you created. As a best practice, the following steps will attach a policy that only provides access to the specific resources and actions needed for this solution.

1. Under the **Permissions** tab, click **Attach policies** and then **Create policy**.
2. Enter the following in the **JSON** tab:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "NOTEBOOK_ROLE_ARN",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateModel",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:CreateEndpoint",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:UpdateEndpoint",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:DeleteEndpoint"
            ],
            "Resource": [
                "arn:aws:sagemaker:*:*:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:DescribeRule",
                "events:PutRule",
                "events:PutTargets"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:query-training-status*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:BatchStopJobRun",
                "glue:GetJobRuns"
            ],
            "Resource": "arn:aws:glue:*:*:job/glue-customer-churn-etl*"
        }
    ]
}
```

3. Replace **NOTEBOOK_ROLE_ARN** with the ARN for your notebook that you created in the previous step.
4. Choose **Review policy** and give the policy a name such as `AmazonSageMaker-StepFunctionsWorkflowExecutionPolicy`.
5. Choose **Create policy**.
6. Select **Roles** and search for your `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` role.
7. Under the **Permissions** tab, click **Attach policies**.
8. Search for your newly created `AmazonSageMaker-StepFunctionsWorkflowExecutionPolicy` policy and select the check box next to it.
9. Choose **Attach policy**. You will then be redirected to the details page for the role.
10. Copy the AmazonSageMaker-StepFunctionsWorkflowExecutionRole **Role ARN** at the top of the Summary.

### Configure Execution Roles

In [4]:
# paste the AmazonSageMaker-StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = "arn:aws:iam::179199196742:role/AmazonSageMaker-StepFunctionsWorkflowExecutionRole" 

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
sagemaker_execution_role = (
    sagemaker.get_execution_role()
)  # Replace with ARN if not in an AWS SageMaker notebook

#### Create a Glue IAM Role
You need to create an IAM role so that you can create and execute an AWS Glue Job on your data in Amazon S3.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Glue**.
4. Choose **Next** until you can enter a **Role name**.
5. Enter a name such as `AWS-Glue-S3-Bucket-Access` and then select **Create role**.

Next, create and attach a policy to the role you created. The following steps attach a managed policy that provides Glue access to the specific S3 bucket holding your data.

1. Under the **Permissions** tab, click **Attach policies** and then **Create policy**.
2. Enter the following in the **JSON** tab:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ListObjectsInBucket",
            "Effect": "Allow",
            "Action": ["s3:ListBucket"],
            "Resource": ["arn:aws:s3:::BUCKET-NAME"]
        },
        {
            "Sid": "AllObjectActions",
            "Effect": "Allow",
            "Action": "s3:*Object",
            "Resource": ["arn:aws:s3:::BUCKET-NAME/*"]
        }
    ]
}
```

3. Run the next cell (below) to retrieve the specific **S3 bucket name** that we will grant permissions to.

4. Copy the output of the above cell and replace the **two occurances** of **BUCKET-NAME** in the JSON text that you entered.
5. Choose **Review policy** and give the policy a name such as `S3BucketAccessPolicy`.
6. Choose **Create policy**.
7. Select **Roles**, then search for and select your `AWS-Glue-S3-Bucket-Access` role.
8. Under the **Permissions** tab, click **Attach policies**.
9. Search for your newly created `S3BucketAccessPolicy` policy and select the check box next to it.
10. Choose **Attach policy**. You will then be redirected to the details page for the role.
11. Copy the **Role ARN** at the top of the Summary tab.

In [5]:
# paste the AWS-Glue-S3-Bucket-Access role ARN from above
glue_role = "arn:aws:iam::179199196742:role/AWS-Glue-S3-Bucket-Access"

#### Create a Lambda IAM Role
You also need to create an IAM role so that you can create and execute an AWS Lambda function stored in Amazon S3.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Lambda**.
4. Choose **Next** until you can enter a **Role name**.
5. Enter a name such as `query_training_status-role` and then select **Create role**.

Next, attach policies to the role you created. The following steps attach policies that provides Lambda access to S3 and read-only access to SageMaker.

1. Under the **Permissions** tab, click **Attach Policies**.
2. In the search box, type **SageMaker** and select **AmazonSageMakerReadOnly** from the populated list.
3. In the search box type **AWSLambda** and select **AWSLambdaBasicExecutionRole** from the populated list.
4. Choose **Attach policy**. You will then be redirected to the details page for the role.
5. Copy the **Role ARN** at the top of the **Summary**.


In [6]:
# paste the query_training_status-role role ARN from above
lambda_role = "arn:aws:iam::179199196742:role/query_training_status-role"

## Prepare the Dataset
This notebook uses a pretrained mobilenet model to save training time.

We use AWS S3 to store and manage our image data. In this experiment, we only use 1000 normal images and 1000 pneumonia images from the kaggle dataset.

In [7]:
project_name = "pn_deploy"

train_prefix = "train"
val_prefix = "validation"

train_data = "s3://{}/{}/{}/".format(bucket, project_name, train_prefix)
validation_data = "s3://{}/{}/{}/".format(bucket, project_name, val_prefix)

### Download or Update Data from S3

In [8]:
def get_file_list(bucket_name, prefix):
    s3 = boto3.resource('s3')
    bucket=bucket_name
    my_bucket = s3.Bucket(bucket)
    location_list = []
    for (bucket_name, key) in map(lambda x: (x.bucket_name, x.key), my_bucket.objects.filter(Prefix=prefix)):
        data_location = "s3://{}/{}".format(bucket_name, key)
        location_list.append(data_location)
    # Remove the root folder path
    if "s3://{}/{}/".format(bucket_name, prefix) in location_list:
        location_list.remove("s3://{}/{}/".format(bucket_name, prefix))
    return location_list

In [9]:
list_normal = get_file_list(bucket,"pn_deploy/normal_1000")
list_pneumonia = get_file_list(bucket,"pn_deploy/pneumonia_1000")

In [15]:
#download data
for l in list_normal:
    data_source1 = S3Downloader.download(
    local_path="/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/data/normal/",
    s3_uri=l,
    sagemaker_session=session,
    )

for l in list_pneumonia:
    data_source1 = S3Downloader.download(
    local_path="/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/data/pneumonia/",
    s3_uri=l,
    sagemaker_session=session,
    )


### Generate annotations

In [20]:
# generate annotations
import os
os.mkdir('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/annotations')

filePath = '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/data/normal'
l = os.listdir(filePath)
ant={}
for n in l:
    name,_ = n.split('.')
    ant[name] = 'normal'
with open('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/annotations/normal.txt', 'w') as f:
    for n, c in ant.items():
        f.write(str(n)+" "+str(c)+"\n")

filePath = '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/data/pneumonia'
l = os.listdir(filePath)
ant={}
for n in l:
    name,_ = n.split('.')
    ant[name] = 'pneumonia'
with open('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/annotations/pneumonia.txt', 'w') as f:
    for n, c in ant.items():
        f.write(str(n)+" "+str(c)+"\n")

In [21]:
# read annotations
def get_annotations(file_path, annotations={}):
    
    with open(file_path, 'r') as f:
        rows = f.read().splitlines()

    for i, row in enumerate(rows):
        image_name, class_name = row.split(' ')
        image_name = image_name + '.jpeg'
        
        annotations[image_name] = class_name
    
    return annotations

In [23]:
# read annotations
annotations_normal={}
annotations_pneumonia={}
annotations_normal = get_annotations('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/annotations/normal.txt',annotations_normal)
annotations_pneumonia = get_annotations('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/annotations/pneumonia.txt',annotations_pneumonia)

total_count = len(annotations_normal.keys())
print('Total normal examples', total_count)
total_count = len(annotations_pneumonia.keys())
print('Total pneumonia examples', total_count)

Total normal examples 1000
Total pneumonia examples 1000


In [24]:
print(next(iter(annotations_normal.items())))
print(next(iter(annotations_pneumonia.items())))

('NORMAL2-IM-0774-0001.jpeg', 'normal')
('person109_virus_203.jpeg', 'pneumonia')


### Split Data and Upload

In [25]:
# split and copy file
import os
classes = ['normal', 'pneumonia']
sets = ['train', 'validation']
root_dir = '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/custom_data'

if not os.path.isdir(root_dir):
    os.mkdir(root_dir)
    
for set_name in sets:
    if not os.path.isdir(os.path.join(root_dir, set_name)):
        os.mkdir(os.path.join(root_dir, set_name))
    for class_name in classes:
        folder = os.path.join(root_dir, set_name, class_name)
        if not os.path.isdir(folder):
            os.mkdir(folder)

In [26]:
for image, class_name in annotations_normal.items():
    target_set = 'validation' if random.randint(0, 99) < 20 else 'train'
    target_path = os.path.join(root_dir, target_set, class_name, image)
    shutil.copy(os.path.join('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/data/normal', image), target_path)

for image, class_name in annotations_pneumonia.items():
    target_set = 'validation' if random.randint(0, 99) < 20 else 'train'
    target_path = os.path.join(root_dir, target_set, class_name, image)
    shutil.copy(os.path.join('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/data/pneumonia', image), target_path)

In [27]:
sets_counts = {
    'train': 0,
    'validation': 0
}

for set_name in sets:
    for class_name in classes:
        path = os.path.join(root_dir, set_name, class_name)
        count = len(os.listdir(path))
        print(path, 'has', count, 'images')
        sets_counts[set_name] += count

print(sets_counts)

/home/ec2-user/SageMaker/img_cls_exp/Medical Image/Pneumonia/custom_data/train/normal has 814 images
/home/ec2-user/SageMaker/img_cls_exp/Medical Image/Pneumonia/custom_data/train/pneumonia has 790 images
/home/ec2-user/SageMaker/img_cls_exp/Medical Image/Pneumonia/custom_data/validation/normal has 186 images
/home/ec2-user/SageMaker/img_cls_exp/Medical Image/Pneumonia/custom_data/validation/pneumonia has 210 images
{'train': 1604, 'validation': 396}


In [29]:
print('Uploading to S3..')
s3_data_path = session.upload_data(path=root_dir, bucket=bucket, key_prefix='pn_deploy')

print('Uploaded to', s3_data_path)

Uploading to S3..
Uploaded to s3://sagemaker-us-east-2-179199196742/pn_deploy


## Prepare Training Script

### Create Model

In [30]:
%%writefile train.py

import tensorflow as tf
import argparse
import os
import json

def create_model():
    model = tf.keras.models.Sequential([
        tf.keras.applications.mobilenet_v2.MobileNetV2(include_top=False, weights='imagenet',
                                                       pooling='avg', input_shape=(224, 224, 3)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    model.layers[0].trainable = True
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

Writing train.py


### Data Generators

In [31]:
%%writefile -a train.py

def create_data_generators(root_dir, batch_size):
    train_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(
        preprocessing_function=tf.keras.applications.mobilenet_v2.preprocess_input,
        horizontal_flip=True,
        zoom_range=[0.8, 1.2],
        rotation_range=20
    ).flow_from_directory(
        os.path.join(root_dir, 'train'),
        target_size=(224, 224),
        batch_size=batch_size,
        class_mode='binary'
    )
    
    val_data_generator = tf.keras.preprocessing.image.ImageDataGenerator(
        preprocessing_function=tf.keras.applications.mobilenet_v2.preprocess_input
    ).flow_from_directory(
        os.path.join(root_dir, 'validation'),
        target_size=(224, 224),
        batch_size=batch_size,
        class_mode='binary'
    )
    
    return train_data_generator, val_data_generator

Appending to train.py


### Putting it Together

In [32]:
%%writefile -a train.py

if __name__ =='__main__':

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument('--epochs', type=int, default=3)
    parser.add_argument('--batch_size', type=int, default=16)
    parser.add_argument('--steps', type=int, default=int(1614/16))
    parser.add_argument('--val_steps', type=int, default=int(386/16))

    # input data and model directories
    parser.add_argument('--model-dir', type=str)
    parser.add_argument('--sm-model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAINING'))

    args, _ = parser.parse_known_args()

    local_output_dir = args.sm_model_dir
    local_root_dir = args.train
    batch_size = args.batch_size
    
    model = create_model()
    train_gen, val_gen = create_data_generators(local_root_dir, batch_size)
    
    _ = model.fit(
        train_gen,
        epochs=args.epochs,
        steps_per_epoch=args.steps,
        validation_data=val_gen,
        validation_steps=args.val_steps
    )
    
    model.save(os.path.join(local_output_dir, 'model', '1'))
    

Appending to train.py


## Train with TensorFlow Estimator
You can check [SageMaker endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html#limits_sagemaker) for sagemaker training and deployment.

In [33]:
role = sagemaker.get_execution_role()
pets_estimator = TensorFlow(
    entry_point='train.py',
    role=role,
    instance_count=2,
    instance_type='ml.m5.4xlarge', # you can use any instance_type within your quotas
    framework_version='2.1.0',
    py_version='py3',
    output_path="s3://{}/{}".format(bucket, project_name),
)

In [34]:
# s3_data_path = 's3://sagemaker-us-east-2-179199196742/pn_deploy'
pets_estimator.fit(s3_data_path)

2021-06-04 17:55:33 Starting - Starting the training job...
2021-06-04 17:55:37 Starting - Launching requested ML instancesProfilerReport-1622829333: InProgress
......
2021-06-04 17:56:50 Starting - Preparing the instances for training......
2021-06-04 17:57:50 Downloading - Downloading input data...
2021-06-04 17:58:30 Training - Downloading the training image..[35m2021-06-04 17:58:46,273 sagemaker-containers INFO     Imported framework sagemaker_tensorflow_container.training[0m
[35m2021-06-04 17:58:46,279 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[35m2021-06-04 17:58:46,676 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[35m2021-06-04 17:58:46,690 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[35m2021-06-04 17:58:46,704 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[35m2021-06-04 17:58:46,713 sagemaker-containers INFO     Invoking user 

## Deploy TensorFlow Model

In [35]:
pets_predictor = pets_estimator.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')
print('\nModel Deployed!')

-------------!
Model Deployed!


## Test Predictions

In [59]:
import os
list_normal_test = get_file_list(bucket, "pn_deploy/test/normal")
list_pneumonia_test = get_file_list(bucket, "pn_deploy/test/pneumonia")
if not os.path.isdir('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/test/'):
    os.mkdir('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/test/')
test = list_normal_test[0]
data_source = S3Downloader.download(local_path='/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/test/', s3_uri=test)
image_path = '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/test/' + test[60:]
image_path

'/home/ec2-user/SageMaker/img_cls_exp/Medical_Image/Pneumonia/test/NORMAL2-IM-1436-0001.jpeg'

In [60]:
import tensorflow as tf
import numpy as np
def get_pred(image_path):
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=(224, 224))
    img = tf.keras.preprocessing.image.img_to_array(img)
    img = tf.keras.applications.mobilenet_v2.preprocess_input(img)
    img = np.expand_dims(img, axis=0)

    results = pets_predictor.predict(img)
    class_id = int(np.squeeze(results['predictions']) > 0.5)
    return classes[class_id]

In [61]:
get_pred(image_path)

'normal'

## Prediction Task
We create a folder in S3 bucket, running the following cells will make predictions and send the output back to the folder in S3 bucket.

In [80]:
# task link and list
list_task = get_file_list(bucket, "pn_deploy/task/data")

In [81]:
for l in list_task:
    data_source = S3Downloader.download(
    local_path='/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/',
    s3_uri=l,
    )

In [82]:
image_path = []
for l in list_task:
    image_path.append('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/' + l[53:])
image_path

['/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0115-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0117-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0119-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0122-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0125-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0127-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0128-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0129-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0131-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0133-0001.jpeg',
 '/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/data/IM-0135-

In [84]:
with open('/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/prediction_output.txt', 'w') as f:
    for i in image_path:
        f.write(i[70:] + " " + get_pred(i)+"\n")

In [85]:
print('Uploading to S3..')
s3_data_path = session.upload_data(path='/home/ec2-user/SageMaker/img_cls_exp/MedicalImage/Pneumonia/task/prediction_output.txt', bucket=bucket, key_prefix='pn_deploy/task/pred_output')
print('Uploaded to', s3_data_path)

Uploading to S3..
Uploaded to s3://sagemaker-us-east-2-179199196742/pn_deploy/task/pred_output/prediction_output.txt


## The following procedure is to create an automatic retraining work flow based on AWS Step Functions Data Science SDK. 
### Create Resources
In the following steps we'll create the Glue job and Lambda function that are called from the Step Functions workflow.

### Create the AWS Glue Job

In [105]:
glue_script_location = S3Uploader.upload(
    local_path="./code/glue_etl.py",
    desired_s3_uri="s3://{}/{}".format(bucket, project_name),
    sagemaker_session=session,
)
glue_client = boto3.client("glue")

response = glue_client.create_job(
    Name=job_name,
    Description="Data processing",
    Role=glue_role,  # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={"MaxConcurrentRuns": 2},
    Command={"Name": "glueetl", "ScriptLocation": glue_script_location, "PythonVersion": "3"},
    DefaultArguments={"--job-language": "python"},
    GlueVersion="1.0",
    WorkerType="Standard",
    NumberOfWorkers=2,
    Timeout=60,
)

### Create the AWS Lambda Function

In [106]:
import zipfile

zip_name = "query_training_status.zip"
lambda_source_code = "./code/query_training_status.py"

zf = zipfile.ZipFile(zip_name, mode="w")
zf.write(lambda_source_code, arcname=lambda_source_code.split("/")[-1])
zf.close()


S3Uploader.upload(
    local_path=zip_name,
    desired_s3_uri="s3://{}/{}".format(bucket, project_name),
    sagemaker_session=session,
)

's3://sagemaker-us-east-2-179199196742/ml_deploy/query_training_status.zip'

In [107]:
lambda_client = boto3.client("lambda")

response = lambda_client.create_function(
    FunctionName=function_name,
    Runtime="python3.7",
    Role=lambda_role,
    Handler="query_training_status.lambda_handler",
    Code={"S3Bucket": bucket, "S3Key": "{}/{}".format(project_name, zip_name)},
    Description="Queries a SageMaker training job and return the results.",
    Timeout=15,
    MemorySize=128,
)

### Configure the AWS SageMaker Estimator

In [108]:
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")

xgb = sagemaker.estimator.Estimator(
    container,
    sagemaker_execution_role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path="s3://{}/{}/output".format(bucket, project_name),
)

xgb.set_hyperparameters(
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    objective="binary:logistic",
    eval_metric="error",
    num_round=100,
)

In [None]:
#macy
pets_estimator = TensorFlow(
    entry_point='train_std.py',
    role=role,
    instance_count=1,
    instance_type='ml.m5.4xlarge',
    framework_version='2.1.0',
    py_version='py3',
    output_path='s3://pets-std/'
)


## Build a Machine Learning Workflow

You can use a state machine workflow to create a model retraining pipeline. The AWS Data Science Workflows SDK provides several AWS SageMaker workflow steps that you can use to construct an ML pipeline. In this tutorial you will create the following steps:

* [**ETLStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) - Starts an AWS Glue job to extract the latest data from our source database and prepare our data.
* [**TrainingStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) - Creates the training step and passes the defined estimator.
* [**ModelStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) - Creates a model in SageMaker using the artifacts created during the TrainingStep.
* [**LambdaStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) - Creates the task state step within our workflow that calls a Lambda function.
* [**ChoiceStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Choice) - Creates the choice state step within our workflow.
* [**EndpointConfigStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) - Creates the endpoint config step to define the new configuration for our endpoint.
* [**EndpointStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) - Creates the endpoint step to update our model endpoint.
* [**FailStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) - Creates fail state step within our workflow.

In [109]:
# SageMaker expects unique names for each job, model and endpoint.
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(
    schema={
        "TrainingJobName": str,
        "GlueJobName": str,
        "ModelName": str,
        "EndpointName": str,
        "LambdaFunctionName": str,
    }
)

### Create an ETL step with AWS Glue
In the following cell, we create a Glue step thats runs an AWS Glue job. The Glue job extracts the latest data from our source database, removes unnecessary columns, splits the data in to training and validation sets, and saves the data to CSV format in S3. Glue is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage. See the [GlueStartJobRunStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) Compute step in the AWS Step Functions Data Science SDK documentation.

In [110]:
train_prefix = "train"
val_prefix = "validation"

etl_step = steps.GlueStartJobRunStep(
    "Extract, Transform, Load",
    parameters={
        "JobName": execution_input["GlueJobName"],
        "Arguments": {
            "--S3_SOURCE": data_source,  #上传到s3中的原数据
            "--S3_DEST": "s3a://{}/{}/".format(bucket, project_name),
            "--TRAIN_KEY": train_prefix + "/",
            "--VAL_KEY": val_prefix + "/",
        },
    },
)

### Create a SageMaker Training Step 

In the following cell, we create the training step and pass the estimator we defined above. See  [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [111]:
training_step = steps.TrainingStep(
    "Model Training",  #按钮显示的名称
    estimator=xgb,
    data={
        "train": TrainingInput(train_data, content_type="text/csv"),
        "validation": TrainingInput(validation_data, content_type="text/csv"),
    },
    job_name=execution_input["TrainingJobName"],
    wait_for_completion=True,
)

### Create a Model Step 

In the following cell, we define a model step that will create a model in Amazon SageMaker using the artifacts created during the TrainingStep. See  [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the AWS Step Functions Data Science SDK documentation to learn more.

The model creation step typically follows the training step. The Step Functions SDK provides the [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [112]:
model_step = steps.ModelStep(
    "Save Model",
    model=training_step.get_expected_model(),
    model_name=execution_input["ModelName"],
    result_path="$.ModelStepResults",
)

### Create a Lambda Step
In the following cell, we define a lambda step that will invoke the previously created lambda function as part of our Step Function workflow. See [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [113]:
lambda_step = steps.compute.LambdaStep(
    "Query Training Results",
    parameters={
        "FunctionName": execution_input["LambdaFunctionName"],
        "Payload": {"TrainingJobName.$": "$.TrainingJobName"},
    },
)

### Create a Choice State Step 
In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our SageMaker training step: did the training job fail or should the model be saved and the endpoint be updated? We will add specfic rules to this choice step later on in section 8 of this notebook.

In [114]:
check_accuracy_step = steps.states.Choice("Accuracy > 90%")

### Create an Endpoint Configuration Step
In the following cell we create an endpoint configuration step. See [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [115]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input["ModelName"],
    model_name=execution_input["ModelName"],
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
)

### Update the Model Endpoint Step
In the following cell, we create the Endpoint step to deploy the new model as a managed API endpoint, updating an existing SageMaker endpoint if our choice state is sucessful.

In [116]:
endpoint_step = steps.EndpointStep(
    "Update Model Endpoint",
    endpoint_name=execution_input["EndpointName"],
    endpoint_config_name=execution_input["ModelName"],
    update=False,
)

### Create the Fail State Step
In addition, we create a Fail step which proceeds from our choice state if the validation accuracy of our model is lower than the threshold we define. See [FailStateStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation to learn more.

In [117]:
fail_step = steps.states.Fail(
    "Model Accuracy Too Low", comment="Validation accuracy lower than threshold"
)

### Add Rules to Choice State
In the cells below, we add a threshold rule to our choice state. Therefore, if the validation accuracy of our model is below 0.90, we move to the Fail State. If the validation accuracy of our model is above 0.90, we move to the save model step with proceeding endpoint update. See [here](https://github.com/dmlc/xgboost/blob/master/doc/parameter.rst) for more information on how XGBoost calculates classification error.

For binary classification problems the XGBoost algorithm defines the model error as: 

\begin{equation*}
\frac{incorret\:predictions}{total\:number\:of\:predictions}
\end{equation*}

To achieve an accuracy of 90%, we need error <.10.

In [118]:
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(
    variable=lambda_step.output()["Payload"]["trainingMetrics"][0]["Value"], value=0.1
)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=endpoint_config_step)
check_accuracy_step.default_choice(next_step=fail_step)

### Link all the Steps Together
Finally, create your workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more.

In [119]:
endpoint_config_step.next(endpoint_step)

Update Model Endpoint EndpointStep(resource='arn:aws:states:::sagemaker:createEndpoint', parameters={'EndpointConfigName': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f32ac88d5f8>, 'EndpointName': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f32ac88d828>}, type='Task')

In [120]:
workflow_definition = steps.Chain(
    [etl_step, training_step, model_step, lambda_step, check_accuracy_step]
)

## Run the Workflow
Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [121]:
workflow = Workflow(
    name="MyInferenceRoutine_{}".format(id),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input,
)

In [122]:
workflow.render_graph()

Create the workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create):

In [123]:
workflow.create()
# workflow.update()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-2:179199196742:stateMachine:MyInferenceRoutine_ad1448b45e4d41df91786f4acc47f8ce'

Run the workflow with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute):

In [124]:
execution = workflow.execute(
    inputs={
        "TrainingJobName": "regression-{}".format(id),  # Each Sagemaker Job requires a unique name,
        "GlueJobName": job_name,
        "ModelName": "CustomerChurn-{}".format(id),  # Each Model requires a unique name,
        "EndpointName": "CustomerChurn",  # Each Endpoint requires a unique name
        "LambdaFunctionName": function_name,
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress). This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:

In [125]:
execution.render_progress()

Use [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution:

In [107]:
execution.list_events(html=True)

Use [list_executions](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_executions) to list all executions for a specific workflow:

In [39]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
b484d36f-0278-4470-812b-514b7c07c539,SUCCEEDED,"Jun 02, 2021 05:59:00.617 AM","Jun 02, 2021 06:05:24.830 AM"


Use [list_workflows](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_workflows) to list all workflows in your AWS account:

In [40]:
Workflow.list_workflows(html=True)

Name,Creation Date
MyInferenceRoutine_23bb4bd880454ab8a2fb576c7bf3d081,"Jun 02, 2021 05:58:54.478 AM"


## Clean Up
When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing. Uncomment the code below and run the cell to delete the Glue job, Lambda function, and Step Function.

In [None]:
# lambda_client.delete_function(FunctionName=function_name)
# glue_client.delete_job(JobName=job_name)
# workflow.delete()

---