### ML Workflowの構築
StepFunctionsを利用したワークフローの構築を行います。

In [None]:
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker==2.16.1"
!git clone https://github.com/aws/aws-step-functions-data-science-sdk-python.git
%cd aws-step-functions-data-science-sdk-python
!pip install .
%cd ~/SageMaker
!{sys.executable} -m pip show sagemaker stepfunctions

In [None]:
import io
import logging
import os
import random
import time
import uuid

import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.s3 import S3Uploader

# SageMaker Session
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
role = get_execution_role()
sess = sagemaker.Session()
sagemaker.__version__

次に、SageMakerからStepFunctionsを利用するためのRoleを作成します。

### StepFunction中で利用するRoleの作成
step functionは他のAWSサービスを実行するためのIAM Roleを要求します。

1. IAM consoleに移動します
1. ロールを選択して、ロールを作成するをクリックします
1. 利用するサービスから Step Functions を選択します
1. 次のステップ、を複数回選択して、ロール名を入力します
1. ロール名には StepFunctionsWorkflowExecutionRole と入力し、ロールを作成します

次に、AWS Managed IAM policyを上で作成したRoleにアタッチします。

1. IAM consoleに移動します
1. ロールを選択します
1. StepFunctionsWorkflowExecutionRole を検索します
1. アクセス権限 tabにある、ポリシーをアタッチするをクリックし CloudWatchEventsFullAccess を検索します
1. ポリシーをアタッチします

次に、新しいポリシーを作成して、作成したRoleにアタッチします。

1. アクセス権限 tabにある、ポリシーをアタッチするをクリックし、ポリシーの作成を選択します
1. 以下の内容をJson tabに入力します

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:DescribeRule",
                "events:PutRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "NOTEBOOK_ROLE_ARN",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "batch:DescribeJobs",
                "batch:SubmitJob",
                "batch:TerminateJob",
                "dynamodb:DeleteItem",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "ecs:DescribeTasks",
                "ecs:RunTask",
                "ecs:StopTask",
                "glue:BatchStopJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:StartJobRun",
                "lambda:InvokeFunction",
                "sagemaker:CreateEndpoint",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateProcessingJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:CreateTransformJob",
                "sagemaker:DeleteEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:DescribeProcessingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:ListProcessingJobs",
                "sagemaker:ListTags",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:StopProcessingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:StopTransformJob",
                "sagemaker:UpdateEndpoint",
                "sns:Publish",
                "sqs:SendMessage"
            ],
            "Resource": "*"
        }
    ]
}
```

1. 上記の中にある NOTEBOOK_ROLE_ARN を、実行中のnotebookのARNに書き換えます
1. ポリシーを確認し、StepFunctionsWorkflowExecutionPolicy と名前をつけます
1. ポリシーを作成します
1. ロールからStepFunctionsWorkflowExecutionRole を検索します
1. Permissions tabからポリシーのアタッチをクリックします
1. 新しく作成した StepFunctionsWorkflowExecutionPolicy を選択して、次へをクリックします
1. ポリシーをアタッチします
1. StepFunctionsWorkflowExecutionRoleのARNをコピーして、下の文字列を置き換えます

In [None]:
workflow_execution_role = 'arn:aws:iam::752131146440:role/StepFunctionsWorkflowExecutionRole3'

notebookの実行RoleにStepFunctions実行の権限を追加します

1. IAM Consoleに移動します
1. Roleからのnotebookの実行権限Roleを検索します
1. アクセス権限 tabにあるポリシーをアタッチする、をクリックします
1. PolicyからAWSStepFunctionsFullAccessを検索して、アタッチします

## Step FunctionのJobと入力スキーマを定義します

In [None]:
# Generate unique names for Pre-Processing Job, Training Job, and Model Evaluation Job for the Step Functions Workflow
training_job_name = "xgboost-churn-training-{}".format(
    uuid.uuid1().hex
)  # Each Training Job requires a unique name
preprocessing_job_name = "xgboost-training-processing-{}".format(
    uuid.uuid1().hex
)
evaluation_job_name = "xgboost-training-evaluation-{}".format(
    uuid.uuid1().hex
)
print(training_job_name)
print(preprocessing_job_name)
print(evaluation_job_name)

In [None]:
# SageMaker expects unique names for each job, model and endpoint.
# If these names are not unique the execution will fail. Pass these
# dynamically for each execution using placeholders.
execution_input = ExecutionInput(
    schema={
        "PreprocessingJobName": str,
        "TrainingJobName": str,
        "EvaluationProcessingJobName": str,
    }
)

In [None]:
import boto3
# boto3の機能を使ってリポジトリ名に必要な情報を取得する
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name
print(region)
print(account_id)
ecr_repository = 'xgboost-churn-processing'
tag = ':latest'
nlpsample_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

script_processor = ScriptProcessor(
    image_uri='%s.dkr.ecr.ap-northeast-1.amazonaws.com/%s:latest' % (account_id, ecr_repository),
    role=role,
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.xlarge')

### 前処理のJobを作成します

In [None]:
PREPROCESSING_SCRIPT_LOCATION = "./SageMaker-MLWorkflow-XGBoost-sdkv2/processing.py"

input_code = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket=sagemaker_session.default_bucket(),
    key_prefix="xgboost-churn/processsing/code",
)
print(input_code)

#### 前処理対象のデータのパス
S3のイベントなどで実行する場合に変更になる箇所です

In [None]:
bucket = sagemaker_session.default_bucket()
input_data = 's3://{}/xgboost-churn-stepfunctions/xgboost-churn/churn.txt'.format(bucket)
print(input_data)

#### 前処理後のデータのパス

In [None]:
output_data = "s3://{}/{}".format(bucket, "xgboost-churn/processsing")
preprocessed_training_data = "{}/{}".format(output_data, "data")
print(output_data)
print(preprocessed_training_data)

#### 前処理のジョブ

In [None]:
inputs = [
    ProcessingInput(
        source=input_data, destination="/opt/ml/processing/input", input_name="input-1"
    ),
    ProcessingInput(
        source=input_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]

outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/output/data",
        destination="{}/{}".format(output_data,"data"),
        output_name="train_data",
    )
]

In [None]:
processing_step = ProcessingStep(
    "SageMaker processing step",
    processor=script_processor,
    job_name=execution_input["PreprocessingJobName"],
    inputs=inputs,
    outputs=outputs,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/processing.py"]
)

### Training のJobを作成します。

In [None]:
region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region_name, "1.2-1")

In [None]:
hyperparameters = {"max_depth":"5",
                        "eta":"0.2",
                        "gamma":"4",
                        "min_child_weight":"6",
                        "subsample":"0.8",
                        "objective":"binary:logistic",
                        "num_round":"100"}

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    hyperparameters=hyperparameters,
                                    instance_count=1, 
                                    instance_type='ml.m4.xlarge',
                                    sagemaker_session=sess)

In [None]:
# from sagemaker.session import s3_input
from sagemaker.inputs import TrainingInput

input_train_prefix="{}/train".format(preprocessed_training_data)
input_validation_prefix="{}/validation".format(preprocessed_training_data)

content_type='text/csv'
s3_input_train = TrainingInput(input_train_prefix, content_type=content_type)
s3_input_validation = TrainingInput(input_validation_prefix, content_type=content_type)

In [None]:
data_channels = {'train': s3_input_train, 'validation': s3_input_validation}

In [None]:
training_step = steps.TrainingStep(
    "SageMaker Training Step",
    estimator=xgb,
    data=data_channels,
    job_name=execution_input["TrainingJobName"],
    wait_for_completion=True,
)

### Evaluation のJobを作成します

In [None]:
PREPROCESSING_SCRIPT_LOCATION = "./SageMaker-MLWorkflow-XGBoost-sdkv2/evaluation.py"

evaluation_code = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket=sagemaker_session.default_bucket(),
    key_prefix="xgboost-churn/evaluation/code",
)
print(evaluation_code)

In [None]:
input_test = "{}/test.csv".format(preprocessed_training_data)
print(input_test)

In [None]:
model_data_s3_uri = "s3://{}/{}/{}".format(
    bucket, training_job_name, "output/model.tar.gz"
)
print(model_data_s3_uri)

In [None]:
output_model_evaluation_s3_uri = "s3://{}/{}/{}".format(
    bucket, training_job_name, "evaluation"
)
print(output_model_evaluation_s3_uri)

In [None]:
inputs_test = [
    ProcessingInput(
        source=input_test, destination="/opt/ml/processing/input", input_name="input-1"
    ),
    ProcessingInput(
        source=model_data_s3_uri,
        destination="/opt/ml/processing/model",
        input_name="input-2",
    ),
    ProcessingInput(
        source=evaluation_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]

outputs_test = [
    ProcessingOutput(
        source="/opt/ml/processing/evaluation",
        destination=output_model_evaluation_s3_uri,
        output_name="evaluation",
    ),
]

In [None]:
evaluation_ecr_repository = 'xgboost-churn-evaluation'
tag = ':latest'

model_evaluation_processor = ScriptProcessor(
    image_uri='%s.dkr.ecr.ap-northeast-1.amazonaws.com/%s:latest' % (account_id, evaluation_ecr_repository),
    role=role,
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.xlarge')

In [None]:
processing_evaluation_step = ProcessingStep(
    "SageMaker evaluation step",
    processor=model_evaluation_processor,
    job_name=execution_input["EvaluationProcessingJobName"],
    inputs=inputs_test,
    outputs=outputs_test,
    container_entrypoint=["python3", "/opt/ml/processing/input/code/evaluation.py"]
)

### 例外処理を追加

In [None]:
failed_state_sagemaker_processing_failure = stepfunctions.steps.states.Fail(
    "ML Workflow failed", cause="SageMakerProcessingJobFailed"
)

In [None]:
catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_processing_failure,
)

processing_step.add_catch(catch_state_processing)
processing_evaluation_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_processing)

### Workflowを作成

In [None]:
workflow_graph = Chain([processing_step, training_step, processing_evaluation_step])
branching_workflow = Workflow(
    name="SageMakerProcessingXGboostWorkflow15",    definition=workflow_graph,
    role=workflow_execution_role,
)

branching_workflow.create()

# Execute workflow
execution = branching_workflow.execute(
    inputs={
        "PreprocessingJobName": preprocessing_job_name,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "TrainingJobName": training_job_name,
        "EvaluationProcessingJobName": evaluation_job_name,
    }
)
execution_output = execution.get_output(wait=True)

In [None]:
execution.render_progress()