# Amazon SageMaker Processing と AWS Step Functions Data Science SDK で機械学習ワークフローを構築する

Amazon SageMaker Processing を使うと、データの前/後処理やモデル評価のワークロードを Amazon SageMaker platform 上で簡単に実行することができます。Processingジョブは Amazon Simple Storage Service (Amazon S3) から入力データをダウンロードし、処理結果を Amazon S3 にアップロードします。

Step Functions SDK は AWS Step Function と Amazon SageMaker を使って、データサイエンティストが機械学習ワークフローを簡単に作成して実行するためのものです。詳しい情報は以下のドキュメントをご参照ください。

* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

AWS Step Functions Data Science SDK の SageMaker Processing Step [ProcessingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/sagemaker.html#stepfunctions.steps.sagemaker.ProcessingStep) によって、AWS Step Functions ワークフローで実装された Sageaker Processing を機械学習エンジニアが直接システムに統合することができます。

このノートブックは、SageMaker Processing Job を使ってデータの前処理、モデルの学習、モデルの精度評価の機械学習ワークフローを AWS Step Functions Data Science SDK を使って作成する方法をご紹介します。大まかな流れは以下の通りです。

1. AWS Step Functions Data Science SDK の `ProcessingStep` を使ってデータの前処理、特徴量エンジニアリング、学習用とテスト用への分割を行う scikit-learn スクリプトを実行する SageMaker Processing Job を実行
1. AWS Step Functions Data Science SDK の `TrainingStep` を使って前処理された学習データを使ったモデルの学習を実行
1. AWS Step Functions Data Science SDK の `ProcessingStep` を使って前処理したテスト用データを使った学習済モデルの評価を実行



このノートブックで使用するデータは [Census-Income KDD Dataset](https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29) です。このデータセットから特徴量を選択し、データクレンジングを実施し、二値分類モデルの利用できる形にデータを変換し、最後にデータを学習用とテスト用に分割します。このノートブックではロジスティック回帰モデルを使って、国勢調査の回答者の収入が 5万ドル以上か 5万ドル未満かを予測します。このデータセットはクラスごとの不均衡が大きく、ほとんどのデータに 5万ドル以下というラベルが付加されています。

## Setup

このノートブックを実行するのに必要なライブラリをインストールします。

In [None]:
# Import the latest sagemaker, stepfunctions and boto3 SDKs
import sys

!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.0.0"
!{sys.executable} -m pip install -qU "stepfunctions>=2.0.0"
!{sys.executable} -m pip show sagemaker stepfunctions

### 必要なモジュールのインポート

In [None]:
import io
import logging
import os
import random
import time
import uuid

import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.s3 import S3Uploader
from sagemaker.sklearn.processing import SKLearnProcessor

# SageMaker Session
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
role = get_execution_role()

次に、ノートブックから Step Functions を実行するための IAM ロール設定を行います。

## ノートブックインスタンスの IAM ロールに権限を追加

以下の手順を実行して、ノートブックインスタンスに紐づけられた IAM ロールに、AWS Step Functions のワークフローを作成して実行するための権限を追加してください。


1. [Amazon SageMaker console](https://console.aws.amazon.com/sagemaker/) を開く
2. **ノートブックインスタンス** を開いて現在使用しているノートブックインスタンスを選択する
3. **アクセス許可と暗号化** の部分に表示されている IAM ロールへのリンクをクリックする
4. IAM ロールの ARN は後で使用するのでメモ帳などにコピーしておく
5. **ポリシーをアタッチします** をクリックして `AWSStepFunctionsFullAccess` を検索する
6. `AWSStepFunctionsFullAccess` の横のチェックボックスをオンにして **ポリシーのアタッチ** をクリックする

もしこのノートブックを SageMaker のノートブックインスタンス以外で実行している場合、その環境で AWS CLI 設定を行ってください。詳細は [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html) をご参照ください。

次に Step Functions で使用する実行ロールを作成します。

## Step Functions の実行ロールの作成

 作成した Step Functions ワークフローは、AWS の他のサービスと連携するための IAM ロールを必要とします。

1. [IAM console](https://console.aws.amazon.com/iam/) にアクセス
2. 左側のメニューの **ロール** を選択し **ロールの作成** をクリック
3. **ユースケースの選択** で **Step Functions** をクリック
4. **次のステップ：アクセス権限** **次のステップ：タグ** **次のステップ：確認**をクリック
5. **ロール名** に `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` と入力して **ロールの作成** をクリック

Next, attach a AWS Managed IAM policy to the role you created as per below steps.

次に、作成したロールに AWS マネージド IAM ポリシーをアタッチします。

1. [IAM console](https://console.aws.amazon.com/iam/) にアクセス
2. 左側のメニューの **ロール** を選択
3. 先ほど作成した `AmazonSageMaker-StepFunctionsWorkflowExecutionRole`を検索
4. **ポリシーをアタッチします** をクリックして `CloudWatchEventsFullAccess` を検索
5. `CloudWatchEventsFullAccess` の横のチェックボックスをオンにして **ポリシーのアタッチ** をクリック

次に、別の新しいポリシーをロールにアタッチします。ベストプラクティスとして、以下のステップで特定のリソースのみのアクセス権限とこのサンプルを実行するのに必要なアクションのみを有効にします。

1. 左側のメニューの **ロール** を選択
1. 先ほど作成した `AmazonSageMaker-StepFunctionsWorkflowExecutionRole`を検索
1. **ポリシーをアタッチします** をクリックして **ポリシーの作成** をクリック
1. **JSON** タブをクリックして以下の内容をペースト<br>
NOTEBOOK_ROLE_ARN の部分をノートブックインスタンスで使用している IAM ロールの ARN に置き換えてください。

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:DescribeRule",
                "events:PutRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "NOTEBOOK_ROLE_ARN",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "batch:DescribeJobs",
                "batch:SubmitJob",
                "batch:TerminateJob",
                "dynamodb:DeleteItem",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "ecs:DescribeTasks",
                "ecs:RunTask",
                "ecs:StopTask",
                "glue:BatchStopJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:StartJobRun",
                "lambda:InvokeFunction",
                "sagemaker:CreateEndpoint",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateProcessingJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:CreateTransformJob",
                "sagemaker:DeleteEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:DescribeProcessingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:ListProcessingJobs",
                "sagemaker:ListTags",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:StopProcessingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:StopTransformJob",
                "sagemaker:UpdateEndpoint",
                "sns:Publish",
                "sqs:SendMessage"
            ],
            "Resource": "*"
        }
    ]
}
```

5. **次のステップ：タグ** **次のステップ：確認**をクリック
6. **名前** に `AmazonSageMaker-StepFunctionsWorkflowExecutionPolicy` と入力して **ポリシーの作成** をクリック
7. 左側のメニューで **ロール** を選択して `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` を検索
8. **ポリシーをアタッチします** をクリック
9. 前の手順で作成した `AmazonSageMaker-StepFunctionsWorkflowExecutionPolicy` ポリシーを検索してチェックボックスをオンにして **ポリシーのアタッチ** をクリック
11. AmazonSageMaker-StepFunctionsWorkflowExecutionRole の *Role ARN** をコピーして以下のセルにペースト

In [None]:
# paste the AmazonSageMaker-StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = "arn:aws:iam::420964472730:role/StepFunctionsWorkflowExecutionRole"

### Step Functions ワークフロー実行時の入力スキーマ作成

Step Functions ワークフローを実行する際に、パラメタなどを引数として渡すことができます。ここではそれらの引数のスキーマを作成します。

In [None]:
# Generate unique names for Pre-Processing Job, Training Job, and Model Evaluation Job for the Step Functions Workflow
training_job_name = "scikit-learn-training-{}".format(
    uuid.uuid1().hex
)  # Each Training Job requires a unique name
preprocessing_job_name = "scikit-learn-sm-preprocessing-{}".format(
    uuid.uuid1().hex
)  # Each Preprocessing job requires a unique name,
evaluation_job_name = "scikit-learn-sm-evaluation-{}".format(
    uuid.uuid1().hex
)  # Each Evaluation Job requires a unique name

In [None]:
# SageMaker expects unique names for each job, model and endpoint.
# If these names are not unique the execution will fail. Pass these
# dynamically for each execution using placeholders.
execution_input = ExecutionInput(
    schema={
        "PreprocessingJobName": str,
        "TrainingJobName": str,
        "EvaluationProcessingJobName": str,
    }
)

## データの前処理と特徴量エンジニアリング

データクレンジング 、前処理、特徴量エンジニアリングのスクリプトの前に、データセットの初めの 20行をのぞいてみましょう。ターゲット変数は `income` 列です。選択する特徴量は `age`, `education`, `major industry code`, `class of worker`, `num persons worked for employer`, `capital gains`, `capital losses`,  `dividends from stocks` です。

In [None]:
import pandas as pd

input_data = "s3://sagemaker-sample-data-{}/processing/census/census-income.csv".format(region)
df = pd.read_csv(input_data, nrows=10)
df.head(n=10)

scikit-learn の前処理スクリプトを実行するために `SKLearnProcessor`を作成します。これは、SageMaker が用意している scikit-learn のコンテナイメージを使って Processing ジョブを実行するためのものです。

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=1200,
)

以下のセルを実行すると `preprocessing.py` が作成されます。これは前処理のためのスクリプトです。以下のセルを書き換えて実行すれば、`preprocessing.py` が上書き保存されます。このスクリプトでは、以下の処理が実行されます。
n the next cell. In this script, you

* 重複データやコンフリクトしているデータの削除
* ターゲット変数 `income` 列をカテゴリ変数から 2つのラベルを持つ列に変換
* `age` と `num persons worked for employer` をビニングして数値からカテゴリ変数に変換
* 連続値である`capital gains`, `capital losses`, `dividends from stocks` を学習しやすいようスケーリング
* `education`, `major industry code`, `class of worker`を学習しやすいようエンコード
* データを学習用とテスト用に分割し特徴量とラベルの値をそれぞれ保存

学習スクリプトでは、前処理済みの学習用データとラベル情報を使用してモデルを学習します。また、モデル評価スクリプトでは学習済みモデルと前処理済みのテスト用データトラベル情報を使用してモデルを評価します。

In [None]:
%%writefile preprocessing.py

import argparse
import os
import warnings

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelBinarizer, KBinsDiscretizer
from sklearn.preprocessing import PolynomialFeatures
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning

warnings.filterwarnings(action="ignore", category=DataConversionWarning)


columns = [
    "age",
    "education",
    "major industry code",
    "class of worker",
    "num persons worked for employer",
    "capital gains",
    "capital losses",
    "dividends from stocks",
    "income",
]
class_labels = [" - 50000.", " 50000+."]


def print_shape(df):
    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data shape: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))

    input_data_path = os.path.join("/opt/ml/processing/input", "census-income.csv")

    print("Reading input data from {}".format(input_data_path))
    df = pd.read_csv(input_data_path)
    df = pd.DataFrame(data=df, columns=columns)
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)
    df.replace(class_labels, [0, 1], inplace=True)

    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data after cleaning: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )

    split_ratio = args.train_test_split_ratio
    print("Splitting data into train and test sets with ratio {}".format(split_ratio))
    X_train, X_test, y_train, y_test = train_test_split(
        df.drop("income", axis=1), df["income"], test_size=split_ratio, random_state=0
    )

    preprocess = make_column_transformer(
        (
            ["age", "num persons worked for employer"],
            KBinsDiscretizer(encode="onehot-dense", n_bins=10),
        ),
        (
            ["capital gains", "capital losses", "dividends from stocks"],
            StandardScaler(),
        ),
        (
            ["education", "major industry code", "class of worker"],
            OneHotEncoder(sparse=False),
        ),
    )
    print("Running preprocessing and feature engineering transformations")
    train_features = preprocess.fit_transform(X_train)
    test_features = preprocess.transform(X_test)

    print("Train data shape after preprocessing: {}".format(train_features.shape))
    print("Test data shape after preprocessing: {}".format(test_features.shape))

    train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
    train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")

    test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    print("Saving training features to {}".format(train_features_output_path))
    pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)

    print("Saving test features to {}".format(test_features_output_path))
    pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)

    print("Saving training labels to {}".format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)

    print("Saving test labels to {}".format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)

前処理用スクリプトを S3 にアップロードします。

In [None]:
PREPROCESSING_SCRIPT_LOCATION = "preprocessing.py"

input_code = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket=sagemaker_session.default_bucket(),
    key_prefix="data/sklearn_processing/code",
)

Processing ジョブの出力を保存する S3 パスを作成します。

In [None]:
s3_bucket_base_uri = "{}{}".format("s3://", sagemaker_session.default_bucket())
output_data = "{}/{}".format(s3_bucket_base_uri, "data/sklearn_processing/output")
preprocessed_training_data = "{}/{}".format(output_data, "train_data")

### `ProcessingStep` の作成

それでは、SageMaker Processing ジョブを起動するための [ProcessingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/sagemaker.html#stepfunctions.steps.sagemaker.ProcessingStep) を作成しましょう。

このステップは、前の手順で定義した SKLearnProcessor に入力と出力の情報を追加して使用します。

#### [ProcessingInputs](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ProcessingInput) と [ProcessingOutputs](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ProcessingOutput)  オブジェクトを作成して SageMaker Processing ジョブに入力と出力の情報を追加

In [None]:
inputs = [
    ProcessingInput(
        source=input_data, destination="/opt/ml/processing/input", input_name="input-1"
    ),
    ProcessingInput(
        source=input_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]

outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/train",
        destination="{}/{}".format(output_data, "train_data"),
        output_name="train_data",
    ),
    ProcessingOutput(
        source="/opt/ml/processing/test",
        destination="{}/{}".format(output_data, "test_data"),
        output_name="test_data",
    ),
]

####  `ProcessingStep` の作成

In [None]:
# preprocessing_job_name = generate_job_name()
processing_step = ProcessingStep(
    "SageMaker pre-processing step",
    processor=sklearn_processor,
    job_name=execution_input["PreprocessingJobName"],
    inputs=inputs,
    outputs=outputs,
    container_arguments=["--train-test-split-ratio", "0.2"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],
)

## 前処理済みデータを使ったモデルの学習

学習スクリプト `train.py` を使って学習ジョブを実行するための `SKLearn` インスタンスを作成します。これはあとで `TrainingStep` を作成する際に使用します。

In [None]:
from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point="train.py",
    train_instance_type="ml.m5.xlarge",
    role=role,
    framework_version="0.20.0",
    py_version="py3",
)

学習スクリプト `train.py` は、ロジスティック回帰モデルを学習し、学習済みモデルを `/opt/ml/model` に保存します。Amazon SageMaker は、学習ジョブの最後にそこに保存されているモデルを `model.tar.gz` に圧縮して S3 にアップロードします。

In [None]:
%%writefile train.py

import os

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib

if __name__ == "__main__":
    training_data_directory = "/opt/ml/input/data/train"
    train_features_data = os.path.join(training_data_directory, "train_features.csv")
    train_labels_data = os.path.join(training_data_directory, "train_labels.csv")
    print("Reading input data")
    X_train = pd.read_csv(train_features_data, header=None)
    y_train = pd.read_csv(train_labels_data, header=None)

    model = LogisticRegression(class_weight="balanced", solver="lbfgs")
    print("Training LR model")
    model.fit(X_train, y_train)
    model_output_directory = os.path.join("/opt/ml/model", "model.joblib")
    print("Saving model to {}".format(model_output_directory))
    joblib.dump(model, model_output_directory)

### `TrainingStep` の作成

In [None]:
training_step = steps.TrainingStep(
    "SageMaker Training Step",
    estimator=sklearn,
    data={"train": sagemaker.TrainingInput(preprocessed_training_data, content_type="text/csv")},
    job_name=execution_input["TrainingJobName"],
    wait_for_completion=True,
)

## モデルの評価

`evaluation.py` はモデル評価用のスクリプトです。このスクリプトは scikit-learn を用いるため、以前の手順で使用した`SKLearnProcessor` を使用します。このスクリプトは学習済みモデルとテスト用データセットを入力として受け取り、各分類クラスの分類評価メトリクス、precision、リコール、F1スコア、accuracy と ROC AUC が記載された JSON ファイルを出力します。

In [None]:
%%writefile evaluation.py

import json
import os
import tarfile

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

if __name__ == "__main__":
    model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
    print("Extracting model from path: {}".format(model_path))
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    print("Loading model")
    model = joblib.load("model.joblib")

    print("Loading test input data")
    test_features_data = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_data = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    X_test = pd.read_csv(test_features_data, header=None)
    y_test = pd.read_csv(test_labels_data, header=None)
    predictions = model.predict(X_test)

    print("Creating classification evaluation report")
    report_dict = classification_report(y_test, predictions, output_dict=True)
    report_dict["accuracy"] = accuracy_score(y_test, predictions)
    report_dict["roc_auc"] = roc_auc_score(y_test, predictions)

    print("Classification report:\n{}".format(report_dict))

    evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json")
    print("Saving classification report to {}".format(evaluation_output_path))

    with open(evaluation_output_path, "w") as f:
        f.write(json.dumps(report_dict))

In [None]:
MODELEVALUATION_SCRIPT_LOCATION = "evaluation.py"

input_evaluation_code = sagemaker_session.upload_data(
    MODELEVALUATION_SCRIPT_LOCATION,
    bucket=sagemaker_session.default_bucket(),
    key_prefix="data/sklearn_processing/code",
)

モデル評価用の ProcessingStep の入力と出力オブジェクトを作成します。

In [None]:
preprocessed_testing_data = "{}/{}".format(output_data, "test_data")
model_data_s3_uri = "{}/{}/{}".format(s3_bucket_base_uri, training_job_name, "output/model.tar.gz")
output_model_evaluation_s3_uri = "{}/{}/{}".format(
    s3_bucket_base_uri, training_job_name, "evaluation"
)
inputs_evaluation = [
    ProcessingInput(
        source=preprocessed_testing_data,
        destination="/opt/ml/processing/test",
        input_name="input-1",
    ),
    ProcessingInput(
        source=model_data_s3_uri,
        destination="/opt/ml/processing/model",
        input_name="input-2",
    ),
    ProcessingInput(
        source=input_evaluation_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]

outputs_evaluation = [
    ProcessingOutput(
        source="/opt/ml/processing/evaluation",
        destination=output_model_evaluation_s3_uri,
        output_name="evaluation",
    ),
]

In [None]:
model_evaluation_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=1200,
)

In [None]:
processing_evaluation_step = ProcessingStep(
    "SageMaker Processing Model Evaluation step",
    processor=model_evaluation_processor,
    job_name=execution_input["EvaluationProcessingJobName"],
    inputs=inputs_evaluation,
    outputs=outputs_evaluation,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/evaluation.py"],
)

いずれかのステップが失敗したときにワークフローが失敗だとわかるように `Fail` 状態を作成します。

In [None]:
failed_state_sagemaker_processing_failure = stepfunctions.steps.states.Fail(
    "ML Workflow failed", cause="SageMakerProcessingJobFailed"
)

#### ワークフローの中のエラーハンドリングを追加

エラーハンドリングのために [Catch Block](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/states.html#stepfunctions.steps.states.Catch) を使用します。もし Processing ジョブステップか学習ステップが失敗したら、`Fail` 状態に遷移します。

In [None]:
catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_processing_failure,
)

processing_step.add_catch(catch_state_processing)
processing_evaluation_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_processing)

## `Workflow` の作成と実行

In [None]:
workflow_graph = Chain([processing_step, training_step, processing_evaluation_step])
branching_workflow = Workflow(
    name="SageMakerProcessingWorkflow",
    definition=workflow_graph,
    role=workflow_execution_role,
)

branching_workflow.create()
# branching_workflow.update(workflow_graph)


# Execute workflow
execution = branching_workflow.execute(
    inputs={
        "PreprocessingJobName": preprocessing_job_name,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "TrainingJobName": training_job_name,  # Each Sagemaker Training job requires a unique name,
        "EvaluationProcessingJobName": evaluation_job_name,  # Each SageMaker processing job requires a unique name,
    }
)
execution_output = execution.get_output(wait=True)

In [None]:
execution.render_progress()

### ワークフローの出力を確認

Amazon S3 から `evaluation.json` を取得して確認します。ここにはモデルの評価レポートが書かれています。なお、以下のセルは Step Functions でワークフローの実行が完了してから（`evaluation.json` が出力されてから）実行してください。

In [None]:
workflow_execution_output_json = execution.get_output(wait=True)

In [None]:
from sagemaker.s3 import S3Downloader
import json

evaluation_output_config = workflow_execution_output_json["ProcessingOutputConfig"]
for output in evaluation_output_config["Outputs"]:
    if output["OutputName"] == "evaluation":
        evaluation_s3_uri = "{}/{}".format(output["S3Output"]["S3Uri"], "evaluation.json")
        break

evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
evaluation_output_dict = json.loads(evaluation_output)
print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))

## リソースの削除

このノートブックの実行が終わったら、不要なリソースを削除することを忘れないでください。以下のコードのコメントアウトを外してから実行すると、このノートブックで作成した Step Functions のワークフローを削除することができます。ノートブックインスタンス、各種データを保存した S3 バケットも不要であれば削除してください。

In [None]:
# branching_workflow.delete()