# AWS Step Functions を利用した学習・推論の自動化

AWS Step Functions は AWS の様々なサービスを利用してワークフローを実装するためのサービスです。Amazon SageMaker を AWS Step Functions から呼び出すために、以前は、Amazon SageMaker の API を操作する AWS Lambda を用意して、AWS Step Functins から実行する必要がありました。代表的な例は、AWS blog [AWS Step Functions を使用した Amazon SageMaker モデルの自動的で連続的なデプロイ](https://aws.amazon.com/jp/blogs/news/automated-and-continuous-deployment-of-amazon-sagemaker-models-with-aws-step-functions/) にも掲載しています。

AWS Lambda から Amazon SageMaker を実行する際、AWS Lambda の実行時間制限が課題になる場合があります。例えば、長時間必要な機械学習を AWS Lambda から実行して、その終了を待って次の処理を行う場合です。AWS Lambda には時間制限があるため、制限を超えた実行はエラーになってしまいます。これを回避するために、機械学習を実行しつつも、機械学習の終了を監視するような AWS Lambda を実行する必要がありました。

2018年の re:Invent において、Amazon SageMaker の以下の機能が、AWS Step Functions によってサポートされました [URL](https://aws.amazon.com/jp/about-aws/whats-new/2018/11/aws-step-functions-adds-eight-more-service-integrations/)。
- 学習の実行 (create-training-job)
- バッチ変換ジョブの実行 (create-tranform-job)

特に時間のかかる上記のジョブについて、AWS Step Functins で直接APIを操作し、その処理を待つことができるようになりました。このノートブックでは上記の機能を利用して、AWS Step Functions、AWS Lambda、 Amazon SageMaker を用いた簡単な ML のワークフローを作成してます。


## 目次
1. [作成するワークフロー](#作成するワークフロー)
2. [実行に必要なロール](#実行に必要なロール)
3. [ワークフロー作成に必要なファイル](#ワークフロー作成に必要なファイル)
4. [AWS Lambdaの準備](#lambda)
5. [AWS Step Functions の準備](#Step-Functions)
6. [Amazon SageMaker の準備](#sagemaker)
    1. [学習データ](#学習データ)
    2. [学習・推論スクリプト](#学習・推論スクリプト)
7. [AWS StepFunctions の実行](#exec-steps)

## 作成するワークフロー

今回は、[Chainer によって MNIST の分類を行うサンプル](https://github.com/aws-samples/amazon-sagemaker-examples-jp/tree/master/chainer_mnist)をベースにして、以下のようなワークフローを作成します。

<img src="./image/workflow.png" width= 600px>


1. StepFunctionsからSageMaker の学習ジョブを実行
2. 学習ジョブ完了後、S3にアップロードされるモデルファイルから、SageMakerのモデルを作成
3. SageMaker のモデルを利用して、S3にあるMNISTのデータに対してバッチ変換ジョブを実行

このうち1と3は直接AWS Step Functionsから実行することができますが、2は実行できないためAWS Lambda経由で実行します。

## 実行に必要なロール


### ポリシーの設定
このノートブックでは、Amazon SageMaker 以外にも、AWS StepFunctions、AWS Lambda を実行する権限が必要になります。それぞれのサービスを実行する際に、細かくロールを指定することもできますが、ここでは簡単のため**このノートブックに付与されたロール**のみを利用します。このノートブックには以下の管理ポリシーが付与されている必要があります。

- AWSLambdaFullAccess 
- AWSStepFunctionsFullAccess 
- AmazonSageMakerFullAccess 

また、実行にあたっては[こちらのインラインポリシー](https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/sagemaker-iam.html)を必要とします。管理ポリシーとインラインポリシーについては、[こちら](https://docs.aws.amazon.com/ja_jp/IAM/latest/UserGuide/access_policies_managed-vs-inline.html)を御覧ください。これは、AWS SteoFunctionsとAmazon SageMakerの同期、つまり学習ジョブを待つなどのために必要です。以下は、Resouce などをワイルドカードにして、変更なく簡単に動かせるようにしたものです。

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateTrainingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateTransformJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:StopTransformJob"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:ListTags"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "*"
            ],
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": [
                        "sagemaker.amazonaws.com",
                        "states.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

### 信頼関係の設定

IAMロールの画面からSageMaker、Step Functions、Lambda に関する信頼関係を設定しましょう。編集画面で以下を入力します。

```
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "sagemaker.amazonaws.com",
          "lambda.amazonaws.com",
          "states.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

## ワークフロー作成に必要なファイル

このノートブックでは以下のファイルを使用します。

### AWS Step Functions
- `./json/stepfunc_start.json`: AWS Step Functions の開始
- `./json/create_training.json`: AWS Step Functions から create-training-job を実行
- `./json/lambda_create_model.json`: AWS Step Functions から create-model を呼び出す AWS Lambda 関数を実行
- `./json/create_transform_job.json`: AWS Step Dunctions から create-transform-job を実行

### AWS Lambda
- `./lambda/lambda_function.py`: create-model を呼び出す AWS Lambda 関数

### Amazon SageMaker
- `./chainer_mnist.py`: Amazon SageMaker で学習を行ったり、バッチ変換ジョブを行うためのスクリプト

## <a name="lambda"></a>AWS Lambda の準備

Amazon SageMaker の create-model を呼び出す [`lambda_function.py`](./lambda/lambda_function.py) を AWS Lambda にデプロイします。`lambda_function.py`にあるように、create-modelに必要な情報は、eventという変数でStep functions から受け取ります。AWS Lambda への関数のデプロイは、AWS Lambda のコンソールからも実行可能ですが、ここでは boto3 を利用して、このノートブックからデプロイします。

まず、AWS Lambda に `lambda_function.py` をアップロードするために、この python ファイルを zip 形式に圧縮します。この zip ファイルには、ライブラリを含むことも可能で、デプロイパッケージと呼ばれます。zip 形式に圧縮したら、これをバイナリ形式で読み込んで、create_function 関数の code のところで指定しましょう。zipファイルへのパスを指定するわけではないことに注意してください。

以下ではlist_functions()を利用して同名の AWS Lambda 関数が無いかをチェックして、もし同名のものがなければ新しく作成するようにしています。

In [None]:
!zip lambda/lambda_function.zip lambda/lambda_function.py
with open('./lambda/lambda_function.zip', 'rb') as f:
    zip_file = f.read()

In [None]:
import boto3
from sagemaker import get_execution_role

role = get_execution_role()
lambda_create_model = boto3.client('lambda')
func_name = "create_model"

conflict = False
lambda_arn = None
existing_func = lambda_create_model.list_functions()
func_list = existing_func['Functions']
for func in func_list:
    if func_name == func['FunctionName']:
        print("This function '{}' already exists.".format(func_name))
        lambda_arn = func['FunctionArn']
        break

if lambda_arn is None:
    print("Create a new function.")
    response = lambda_create_model.create_function(
        FunctionName=func_name, 
        Runtime="python3.7", 
        Role=role,
        Handler="lambda_function.lambda_handler", 
        Code={"ZipFile": dt}
        )
    lambda_arn = response['FunctionArn']
else:
    print("Use the existing function.")
    
print("The ARN that you will use: {}".format(lambda_arn))

ここで、create_function の返り値を見ると、AWS Lambda 関数の ARN を確認することができます。この ARN は、AWS STep Functions から AWS Lambda 関数を呼び出すために必要になります。呼び出しは、`./json/lambda_create_model.json` に定義されています。そのファイルの内容は以下の通りです。

```json
{
    "CreateModel":{
            "Comment": "CreateModel",
            "Type": "Task",
            "Resource": "arn:aws:lambda:(region_name):(account_name):function:create_model",
            "InputPath": "$",
            "ResultPath": "$.CreateModelResponse",
            "OutputPath": "$",
            "Next": "CreateTransformJob"
    }
}
```

ここで `"Resource"`の部分を先程作成した AWS Lambda 関数の ARN で置き換えます。エディタで編集しても良いですが、以下では `"Resource"` を含む行に、ARN を挿入する簡単なスクリプトを用意しています。

In [None]:
lines = []
with open("./json/lambda_create_model.json") as f:
    for line in f:
        if "Resource" in line:
            line = "\"Resource\" : \"{}\",".format(lambda_arn) +'\n'
        lines.append(line)

with open("./json/lambda_create_model.json", 'w') as f:
    f.writelines(lines)
    
!cat ./json/lambda_create_model.json

## <a name="Step-Functions"></a>AWS Step Functions の準備

AWS Lambda の関数の ARN を json ファイル内に指定すると、AWS Step Functions で利用する json ファイルの準備は完了です (`./json/lambda_create_model.json` 以外の json ファイルの修正は不要です)。AWS Step Functions で利用するために、これらの json ファイルをひとまとめにして、state machine を作りましょう。その前に、AWS Step Functions が利用する state machine について簡単に説明します。

- 1つの state machine には、学習、モデル作成、バッチ変換ジョブの 3 つの処理が States として含まれます。
- 各 state には、その状態が終了した後に遷移する state を "Next" として与えます。
- "Start_at" で最初の状態を指定し、"End": true を含む状態が最後の状態です。

最初の状態を指定する `./json/stepfunc_start.json` を読み込んだら、学習、モデル作成、バッチ変換ジョブに対応する json ファイルを読み込み、`update`関数によって States に追加していきます。最後に "CreateTransformJob" の state に `"End": true` を追加して終了させるようにします。

以下のように `json.dumps()` を利用すると、AWS Step Functions で利用する State machine を確認することができます。

In [None]:
import json

with open("./json/stepfunc_start.json") as f:
    start = json.load(f)

with open("./json/create_training.json") as f:
    training = json.load(f)

with open("./json/lambda_create_model.json") as f:
    model = json.load(f)

with open("./json/create_transform_job.json") as f:
    batch = json.load(f)

start['States'].update(training)
start['States'].update(model)
start['States'].update(batch)
start['States']['CreateTransformJob']['End'] = True

In [None]:
json.dumps(start['States'])

## <a name="sagemaker"></a> Amazon SageMaker の準備

MNIST の手書きデータセットの画像を利用して学習し、その後、バッチ変換ジョブを利用してS3のデータに対する推論を行えるようにします。まず学習データの準備を行い、次に学習スクリプトの準備を行います。

### 学習データ

Chainer の datasets の機能を利用して、MNIST のデータをダウンロードします。ここでは、データを Training data, Validation data, Test data に分けます。Training data, Validation data は学習に用いるため、ラベルを含めて npz の形式で S3 に保存します。Test data は学習後に、バッチ変換ジョブで推論される対象ですので、画像データのみを npy 形式でS3に保存します (npy は単一の array, npz は複数の array の形式です)。後述する学習スクリプトは npy 形式に対応しているので、Test data を npz にしてしまうとエラーが出るので注意してください。

In [None]:
import chainer
import os
import shutil
import numpy as np

import sagemaker


sagemaker_session = sagemaker.Session()

# Download MNIST dataset
train, test = chainer.datasets.get_mnist()

# Extract data and labels from dataset 
train_images = np.array([data[0] for data in train])
train_labels = np.array([data[1] for data in train])
test_images = np.array([data[0] for data in test])
test_labels = np.array([data[1] for data in test])

# test data is splitted into test and valid (9:1)
valid_images = test_images[:9000, :]
valid_labels = test_labels[:9000]
test_images = test_images[9000:, :]
test_labels = test_labels[9000:]

# Save the data and labels as .npz into local directories and upload them to S3
try:
    os.makedirs('/tmp/data/train')
    os.makedirs('/tmp/data/test')
    os.makedirs('/tmp/data/valid')

    np.savez('/tmp/data/train/train.npz', images=train_images, labels=train_labels)
    np.savez('/tmp/data/valid/valid.npz', images=valid_images, labels=valid_labels)
    np.save('/tmp/data/test/test.npy', test_images)
    
    train_input = sagemaker_session.upload_data(
        path=os.path.join('/tmp/data', 'train'),
        key_prefix='notebook/chainer/mnist/train')
    valid_input = sagemaker_session.upload_data(
        path=os.path.join('/tmp/data', 'valid'),
        key_prefix='notebook/chainer/mnist/valid')
    test_input = sagemaker_session.upload_data(
        path=os.path.join('/tmp/data', 'test'),
        key_prefix='notebook/chainer/mnist/test')        
finally:
    shutil.rmtree('/tmp/data')

### 学習・推論スクリプト

ここで利用する学習・推論スクリプト `chainer_mnist.py` は、以下の example を参考に作成しています。

- 学習
    - CNN を利用した2値画像分類スクリプト
    - https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/chainer_mnist/chainer_mnist.py
- 推論 (バッチ変換ジョブ)
    - S3 に保存された npy に対応するために input_fn を実装した推論スクリプト (model_fn は上記の学習スクリプトと同じものを利用)
    - https://sagemakersfn.notebook.us-west-2.sagemaker.aws/edit/amazon-sagemaker-examples-jp/chainer_pretrained_ssd/chainercv_ssd.py
    
ここで作成した学習・推論スクリプトは、SageMaker Python SDK を利用する場合は、実行時に自動的にアップロードされますが、今回は AWS Step Functions からアップロードされるので、自分でアップロードする必要があります。重複が生じないように、timestampをつけてアップロードしましょう。以下を実行すると、アップロード先のS3のパスが表示されます。

In [None]:
import datetime
entry_point = "chainer_mnist.py"
!tar cvzf sourcedir.tar.gz $entry_point
job_name = 'sagemaker-chainer-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
sagemaker_session.upload_data(path='sourcedir.tar.gz', key_prefix=job_name +'/source')

## <a name="exec-steps"></a>AWS Step Functions の実行

さて、AWS Lambda, AWS Step Functions, Amazon SageMaker の準備が整ったので、AWS Step Functions でこれらを実行しましょう。AWS Step Functions を実行するために、まず入力データの準備を行います。

### 入力データの準備

AWS Step Functions では、各処理の実行に必要なデータを入力することができます。例えば、学習データの場所や学習・推論スクリプトの場所を与えないと、Amazon SageMaker の API を呼び出すことができません。以下ではそれらのデータの準備を行います。

最終的には dict 形式の event という変数に全てを格納します。event の形式については以下で紹介します。
    
|変数名|変数の説明|
|:---|:---|
|role | ノートブックインスタンスの Role で create-model 実行時に必要です|
|etc | etc |


In [None]:

model_name = job_name + "-model"

instance_type = "ml.m4.xlarge"
image_uri = sagemaker.fw_utils.create_image_uri(region = sagemaker_session.boto_session.region_name,
                                    framework = "chainer",
                                    instance_type =  instance_type,
                                    framework_version = '5.0.0',
                                    py_version='py3'
                                   )
S3_bucket = "s3://" + sagemaker_session.default_bucket()
S3_src_path = S3_bucket +  '/'+job_name +'/source/sourcedir.tar.gz'
model_uri = S3_bucket + '/'+job_name +'/output/model.tar.gz'
S3_batch_result_path = S3_bucket +'/'+job_name +'/output/result'

In [None]:
event = {
    "role": role,
    "lambda_arn": lambda_arn,
    "train_instance_count": 1,
    "train_instance_type": instance_type,
    "job_name": job_name,
    "S3_bucket": S3_bucket,
    "model_uri": model_uri,
    "model_name": model_name,
    "container_host": "container-1",
    "container_image": image_uri,
    "batch_result_path": S3_batch_result_path,
    "data":{
        "train": train_input,
        "test": test_input,
        "valid": valid_input
    },
    "env": {
        "epochs": '3',
        "batch_size": '128',
        "sagemaker_container_log_level": "20",
        "sagemaker_enable_cloudwatch_metrics": "False",
        "sagemaker_job_name": job_name,
        "sagemaker_program": entry_point,
        "sagemaker_submit_directory": S3_src_path
    },
    "env_model": {
        "SAGEMAKER_CONTAINER_LOG_LEVEL": "20",
        "SAGEMAKER_REGION": sagemaker_session.boto_session.region_name,
        "SAGEMAKER_ENABLE_CLOUDWATCH_METRICS": "False",
        "SAGEMAKER_PROGRAM": entry_point,
        "SAGEMAKER_SUBMIT_DIRECTORY": S3_src_path
    },
    "role": role
}
json.dumps(event)

### State machine の構築

入力データの準備ができたら、`create_state_machine` から state machine を作成します。次のセルで、入力データを利用して実行するために、作成したときの response を受け取っておき、state machine の ARN を取れるようにしましょう。 

In [None]:
import boto3
 
client = boto3.client('stepfunctions')
response = client.create_state_machine(
        name=job_name,
        definition=json.dumps(start),
        roleArn= role
)
print(response)

### State machine の実行

入力データとして作成した event を input に設定し、state machine 作成時に取得した ARN を利用して、`start_execution`から state machine を実行します。

In [None]:
response = client.start_execution(
    stateMachineArn=response['stateMachineArn'],
    name=job_name,
    input=json.dumps(event)
)

AWS Step Functions のコンソール画面に移動すると、state machine が作成されていることを確認できます。以下のような実行結果を確認できると成功です。

![Workflow](./image/stepfunctions_result.png)
