# Amazon SageMaker と AWS Step Functions を使った機械学習ワークフローの構築

本ハンズオンでは [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/) を使ったワークフローの作成と管理を行います。Step Functions SDK は Amazon SageMaker と AWS Step Functions を用いて機械学習ワークフローの作成と実行を簡単に行うためのオープンソースのライブラリーです。

- [AWS Step Functions](https://aws.amazon.com/jp/step-functions/?nc1=h_ls)
- [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html)
- [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/)

---
## コンテンツ

1. [環境のセットアップ](#1.環境のセットアップ)
1. [機械学習ワークフローの構築と実行](#2.機械学習ワークフローの構築と実行)
---

# 1.環境のセットアップ
ワークフローの構築を行う準備として、下記を行います。
1. SageMaker のロールへポリシーの追加
2. Step Functions の実行ロールの作成
3. ジュールの準備
4. データセットの準備
5. ワークフローの中で行うモデルの学習についての設定

## 1.1 SageMaker の実行ロールへポリシーの追加
Amazon SageMaker のノートブックインスタンスをお使いの場合には、そのノートブックでお使いになられる IAM ロールは Step Functions のワークフローを作成して実行するための権限が必要になります。この権限をロールに不要するため、下記を実施します。
1. [SageMaker のコンソール](https://console.aws.amazon.com/sagemaker/)を開きます。
2. 左側のペインで「ノートブックインスタンス」を選択し、使っているノートブックインスタンスを選びます。
3. 「Permissions and encryption」で「IAM ロール ARN」をクリックし、 IAM コンソールへ移動します。
4. 「ポリシーをアタッチします」を選択し、「AWSStepFunctionsFullAccess」を検索、アタッチします。

もしノートブックをローカル環境でお使いの場合には、SDK は AWS CLI の設定を使います。詳細は、[AWS CLI の設定](https://docs.aws.amazon.com/ja_jp/cli/latest/userguide/cli-chap-configure.html)をご確認下さい。

## 1.2 Step Functions の実行ロールの作成
Step Functions がワークフローを作成して、実行するための実行ロールを作成します。
1. [IAM のコンソール](https://console.aws.amazon.com/iam/)を開きます。
2. 左側のペインで「ロール」を選択し、「ロールを作成」をクリックします。
3. このロールを使用するサービスを選択にて、Step Functions を選びます。
4. 「次のステップ」で「確認」まで移動し。「StepFunctionsWorkflowExecutionRole」のような名前でロールを作成します。

Step Functions に対して、必要なポリシーをアタッチします。

1. ロールの中で「StepFunctionsWorkflowExecutionRole」を検索します。
2. 「アクセス権限」タブの「インラインポリシーの追加」の中の「JSON」へ下記の内容を記載します。
3. ポリシーへ名前をつけて作成し、概要画面でロール ARN をコピーします。

```JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateTransformJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:StopTransformJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DeleteEndpoint",
                "sagemaker:UpdateEndpoint",
                "sagemaker:ListTags",
                "lambda:InvokeFunction",
                "sqs:SendMessage",
                "sns:Publish",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:DescribeTasks",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "batch:SubmitJob",
                "batch:DescribeJobs",
                "batch:TerminateJob",
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:BatchStopJobRun"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        }
    ]
}
```

## 1.3 実行ロールの設定
SageMaker と Step Functions の実行ロールを設定します。 Step Functions へはコピーした StepFunctionsWorkflowExecutionRole のロール ARN をペーストします。

In [1]:
import sagemaker

# SageMaker の実行ロール
sagemaker_execution_role = sagemaker.get_execution_role()

# Step　Functions の実行ロール。StepFunctionsWorkflowExecutionRole ARN をペーストします。
workflow_execution_role = "<execution-role-arn>"

## 1.4 モジュールの準備

In [None]:
import boto3
import sagemaker
import time
import random
import uuid
import logging
import stepfunctions
import io
import random

from sagemaker.amazon.amazon_estimator import get_image_uri
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath

session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
bucket = session.default_bucket()
prefix = 'sagemaker/DEMO-xgboost-regression'
bucket_path = 'https://s3-{}.amazonaws.com/{}'.format(region, bucket)

## 1.5 データセットの準備
データセットを学習、評価、テストへ分割し、Amazon S3 へアップロードします。

In [None]:
def data_split(FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST):
    data = [l for l in open(FILE_DATA, 'r')]
    train_file = open(FILE_TRAIN, 'w')
    valid_file = open(FILE_VALIDATION, 'w')
    tests_file = open(FILE_TEST, 'w')

    num_of_data = len(data)
    num_train = int((PERCENT_TRAIN/100.0)*num_of_data)
    num_valid = int((PERCENT_VALIDATION/100.0)*num_of_data)
    num_tests = int((PERCENT_TEST/100.0)*num_of_data)

    data_fractions = [num_train, num_valid, num_tests]
    split_data = [[],[],[]]

    rand_data_ind = 0

    for split_ind, fraction in enumerate(data_fractions):
        for i in range(fraction):
            rand_data_ind = random.randint(0, len(data)-1)
            split_data[split_ind].append(data[rand_data_ind])
            data.pop(rand_data_ind)

    for l in split_data[0]:
        train_file.write(l)

    for l in split_data[1]:
        valid_file.write(l)

    for l in split_data[2]:
        tests_file.write(l)

    train_file.close()
    valid_file.close()
    tests_file.close()

def write_to_s3(fobj, bucket, key):
    return boto3.Session(region_name=region).resource('s3').Bucket(bucket).Object(key).upload_fileobj(fobj)

def upload_to_s3(bucket, channel, filename):
    fobj=open(filename, 'rb')
    key = prefix+'/'+channel
    url = 's3://{}/{}/{}'.format(bucket, key, filename)
    print('Writing to {}'.format(url))
    write_to_s3(fobj, bucket, key)

本ハンズオンでは アワビの年齢を XGBoost を用いて学習、推論します。データは [UCI data repository](https://archive.ics.uci.edu/ml/datasets/abalone) にある [Abalone data](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html) を使います。

In [None]:
try: #python3
    from urllib.request import urlretrieve
except: #python2
    from urllib import urlretrieve
    
# データセットのロード
FILE_DATA = 'abalone'
urlretrieve("https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/abalone", FILE_DATA)

# データセットを train/test/validation へ分割
FILE_TRAIN = 'abalone.train'
FILE_VALIDATION = 'abalone.validation'
FILE_TEST = 'abalone.test'
PERCENT_TRAIN = 70
PERCENT_VALIDATION = 15
PERCENT_TEST = 15
data_split(FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST)

# S3 へアップロード
upload_to_s3(bucket, 'train', FILE_TRAIN)
upload_to_s3(bucket, 'validation', FILE_VALIDATION)
upload_to_s3(bucket, 'test', FILE_TEST)

train_s3_file = bucket_path + "/" + prefix + '/train'
validation_s3_file = bucket_path + "/" + prefix + '/validation'
test_s3_file = bucket_path + "/" + prefix + '/test'

## 1.6 ワークフローの中で行うモデルの学習についての設定

SageMaker の学習ジョブについての設定を行い estimator の作成を行います。

In [None]:
xgb = sagemaker.estimator.Estimator(
    get_image_uri(region, 'xgboost'),
    sagemaker_execution_role, 
    train_instance_count = 1, 
    train_instance_type = 'ml.m4.4xlarge',
    train_volume_size = 5,
    output_path = bucket_path + "/" + prefix + "/single-xgboost",
    sagemaker_session = session
)

xgb.set_hyperparameters(
    objective = 'reg:linear',
    num_round = 50,
    max_depth = 5,
    eta = 0.2,
    gamme = 4,
    min_child_weight = 6,
    subsample = 0.7,
    silent = 0
)

# 2.機械学習ワークフローの構築と実行
下記の手順で、下図のような機械学習ワークフローを構築します。

1. ワークフロー実行のための入力スキーマの定義
1. 学習ステップの作成
1. モデルステップの作成
1. バッチ変換ステップの作成
1. 推論エンドポイントの設定ステップの作成
1. エンドポイントへデプロイするステップの作成
1. 作成したステップをワークフローへ統合
1. ワークフローの実行


![e2e_pipeline.png](images/e2e_pipeline.png)

機械学習ワークフローを構築するために、AWS Data Science Workflows SDK　はいくつかの SageMaker の機能を Step として提供します。本ハンズオンでは Train と Transform の Step を使います。

- [**TrainingStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) - SageMaker の学習ジョブを開始し、モデルアーティファクトを　S3 へ保存します。
- [**ModelStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) - S3 に学習済モデルをワークフローで活用するためにロードして作成します。
- [**TransformStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TransformStep) - SageMaker のバッチ変換ジョブを開始します。
- [**EndpointConfigStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) - SageMaker の推論エンドポイントの設定を行います。
- [**EndpointStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) - 学習済のモデルを設定されたエンドポイントへデプロイします。

## 2.1 ワークフロー実行のための入力スキーマの定義

[**ExecutionInput**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/placeholders.html#stepfunctions.inputs.ExecutionInput) API はワークフローへ実行のための入力を定義します。ExecutionInput で定義された値はワークフローのそれぞれのステップで参照可能です。また、SDK の活用によって、ワークフローで受け渡される値へ動的な代入を行うことも可能です。

In [None]:
# SageMaker へは学習ジョブ、モデル、エンドポイントへそれぞれユニークな名前を用います。 
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str,
    'EndpointName': str
})

## 2.2 学習ステップの作成
学習ステップを作成し、以前のセルで作成した estimator を渡します。詳細は [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) をご確認下さい。

In [None]:
training_step = steps.TrainingStep(
    'Train Step', 
    estimator=xgb,
    data={
        'train': sagemaker.s3_input(train_s3_file, content_type='libsvm'),
        'validation': sagemaker.s3_input(validation_s3_file, content_type='libsvm')
    },
    job_name=execution_input['JobName']  
)

## 2.3 モデルステップの作成 

次のセルで S3 へ保存された学習済モデルのアーティファクトを使ってモデルを準備する、モデルステップを作成します。 詳細は [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) をご確認下さい。

通常、モデルの作成は学習ステップの直後に行われます。 Step Functions SDK は学習済モデルを参照するために、TrainingStep クラスに対して [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) メソッドを提供しています。 このメソッドは ModelStep が TrainingStep の直後にある場合にのみ使えることにご注意下さい。

In [None]:
model_step = steps.ModelStep(
    'Save model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

## 2.4 バッチ変換ステップの作成

バッチ変換ステップの作成を行います。 詳細は [TransformStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TransformStep) をご確認下さい。

In [None]:
transform_step = steps.TransformStep(
    'Transform Input Dataset',
    transformer=xgb.transformer(
        instance_count=1,
        instance_type='ml.m5.large'
    ),
    job_name=execution_input['JobName'],     
    model_name=execution_input['ModelName'], 
    data=test_s3_file,
    content_type='text/libsvm'
)

## 2.5 推論エンドポイントの設定ステップの作成

次のセルでエンドポイントの設定を行うステップを作成します。詳細は [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) をご確認下さい。

In [None]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

## 2.6 エンドポイントへデプロイするステップの作成

学習済モデルを SageMaker のエンドポイントへデプロイします。詳細は [EndpointStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) をご確認下さい。

In [None]:
endpoint_step = steps.EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName']
)

## 2.7 作成したステップをワークフローへ統合

これまで作成したステップをもとに、ワークフローを定義します。詳細は [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) をご確認下さい。

In [None]:
workflow_definition = steps.Chain([
    training_step,
    model_step,
    transform_step,
    endpoint_config_step,
    endpoint_step
])

## 2.8 ワークフローの実行

上記で定義したワークフローを実際に作成します。[render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph) メソッドで実際にワークフローをレンダリングすることができます。[create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create) メソッドで Step Functions へワークフローを構築します。

In [None]:
workflow = Workflow(
    name='MyTrainTransformDeploy_v1',
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [None]:
workflow.render_graph()

In [None]:
workflow.create()

[execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute) でワークフローを実行します。

In [None]:
execution = workflow.execute(
    inputs={
        'JobName': 'regression-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'regression-{}'.format(uuid.uuid1().hex), # Each Model requires a unique name,
        'EndpointName': 'regression-{}'.format(uuid.uuid1().hex) # Each Endpoint requires a unique name,
    }
)

ワークフローの進捗を [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress) で現在のスナップショットとして確認することができます。

In [None]:
execution.render_progress()

[list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) メソッドを使うことでワークフローの実行中に起こったイベントをリストで表示することができます。

In [None]:
execution.list_events(html=True)

また、[list_executions](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_executions) メソッドを使うと、任意のワークフローのこれまでの実行をリストで表示することができます。

In [None]:
workflow.list_executions(html=True)

[list_workflows](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_workflows) メソッドを使うと、この AWS account で作成したワークフローを一覧で確認できます。

In [None]:
Workflow.list_workflows(html=True)