# SageMaker Pipelines의 Unit Testing 하기

In [10]:
%reload_ext autoreload
%autoreload 2

In [125]:
import os

import boto3
import sagemaker
import pandas as pd


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_context import LocalPipelineSession

boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client("sagemaker")
default_bucket = sagemaker_session.default_bucket()

# pipeline_session = PipelineSession(
#     boto_session=boto_session,
#     sagemaker_client=sagemaker_client,
#     default_bucket=default_bucket,
# )

pipeline_session = LocalPipelineSession(
    boto_session=boto_session,
    default_bucket=default_bucket,
)

from sagemaker.workflow.steps import CacheConfig
cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

BASE_DIR = os.path.join(os.getcwd(), 'pipelines/abalone/')
BASE_DIR

'/root/model-building-test-p-yggcwe5dwpjm/sagemaker-model-building-test-p-yggcwe5dwpjm-modelbuild/pipelines/abalone/'

## 1. 필요 Packages import

### 1-1. Parameters 정의에 필요한 Packages

In [126]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


### 1-2. Processing에 필요한 Packages

In [127]:
from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)

from sagemaker.workflow.steps import ProcessingStep

### 1-3. Training에 필요한 Packages

In [128]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.steps import TrainingStep

### 1-4. Evaluation에 필요한 Packages

In [129]:
from sagemaker.workflow.properties import PropertyFile

### 1-5. Model Metrics에 필요한 Packages

In [147]:
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
)

from sagemaker.workflow.functions import (
    JsonGet,
    Join,
)

from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model

## 2. get_pipeline의 입력 변수

In [131]:
model_package_group_name="AbalonePackageGroup"
pipeline_name="AbalonePipeline"
base_job_prefix="Abalone"
processing_instance_type="ml.m5.xlarge"
training_instance_type="ml.m5.xlarge"

## 3. 모델 빌딩 파이프라인 스텝(Step) 정의


### 3-1. 모델 빌딩 파이프라인 변수 생성

In [132]:
# parameters for pipeline execution
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv",
)

### 3-2. 전처리 스텝 단계 정의

크게 아래와 같은 순서로 정의 합니다.
- 프로세싱 오브젝트 정의 (SKLearnProcessor)
- 프로세싱 스텝 정의
    - 일력 데이터 세트
        - source: S3 경로 (input_data_uri)
        - destination: 도커 컨테이너의 내부 폴더 위치
    - 출력 위치
        - 훈련 전처리 데이터 결과 위치
        - 테스트 전처리 데이터 결과 위치
    - 프로세싱 코드
    - 프로세싱 코드에 넘길 인자 


In [133]:
# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)
step_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code=os.path.join(BASE_DIR, "preprocess.py"),
    arguments=["--input-data", input_data],
)
step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    step_args=step_args,
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


훈련의 입력으로 사용할 이전 단계의 Processing 결과는 아래 형태로 제공됩니다.
- `step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri`

In [134]:
import sm_pipelines_exec as sm_exec

test_parameters_list = [processing_instance_count, model_approval_status, input_data]
test_steps_list = [step_process]

execution = sm_exec.exec_pipelines(pipeline_name, role, test_parameters_list, test_steps_list)


Job Name:  Abalone/sklearn-abalone-preprocess-2023-01-09-08-42-19-134
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/code/2eda08d19d8e93cbfb937696aa03d533/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariable object at 0x7fad03ed1610>, 'PreprocessAbaloneData', 'output', 'train']), 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'validation', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariab

In [135]:
sm_exec.describe_pipelines(execution)
sm_exec.get_step_results(execution,test_steps_list)

Pipelines Status : Succeeded 

- StepName : PreprocessAbaloneData, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/b67huv47ewxp'} 

{'Processing': 'pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc'}
Processing : pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc
[{'AppManaged': False,
  'OutputName': 'train',
  'S3Output': {'LocalPath': '/opt/ml/processing/train',
               'S3UploadMode': 'EndOfJob',
               'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/b67huv47ewxp/PreprocessAbaloneData/output/train'}},
 {'AppManaged': False,
  'OutputName': 'validation',
  'S3Output': {'LocalPath': '/opt/ml/processing/validation',
               'S3UploadMode': 'EndOfJob',
               'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/b67huv47ewxp/PreprocessAbaloneData/output/validation'}},
 {'AppManaged': False,
  'OutputName': 'test',
  'S3Ou

## 4. 모델 학습을 위한 학습단계 정의 

In [136]:
# training step for generating model artifacts
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/abalone-train",
    sagemaker_session=pipeline_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)
step_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)
step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=step_args,
    cache_config=cache_config,
)

In [137]:
test_steps_list = [step_process, step_train]
execution = sm_exec.exec_pipelines(pipeline_name, role, test_parameters_list, test_steps_list)


Job Name:  Abalone/sklearn-abalone-preprocess-2023-01-09-08-42-29-816
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/code/2eda08d19d8e93cbfb937696aa03d533/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariable object at 0x7fad03ed1610>, 'PreprocessAbaloneData', 'output', 'train']), 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'validation', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariab

In [139]:
sm_exec.describe_pipelines(execution)
sm_exec.get_step_results(execution,test_steps_list)

Pipelines Status : Succeeded 

- StepName : TrainAbaloneModel, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/mu6jv29rzzfc'} 

- StepName : PreprocessAbaloneData, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/b67huv47ewxp'} 

{'Processing': 'pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc',
 'Training': 'pipelines-mu6jv29rzzfc-TrainAbaloneModel-rYZGCJvKgK'}
Processing : pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc
[{'AppManaged': False,
  'OutputName': 'train',
  'S3Output': {'LocalPath': '/opt/ml/processing/train',
               'S3UploadMode': 'EndOfJob',
               'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/b67huv47ewxp/PreprocessAbaloneData/output/train'}},
 {'AppManaged': False,
  'OutputName': 'validation',
  'S3Output': {'LocalPath':

## 5. 모델 검증을 위한 Evaluation 단계 정의 

In [140]:
# processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-abalone-eval",
    sagemaker_session=pipeline_session,
    role=role,
)
step_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code=os.path.join(BASE_DIR, "evaluate.py"),
)
evaluation_report = PropertyFile(
    name="AbaloneEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="EvaluateAbaloneModel",
    step_args=step_args,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

In [141]:
test_steps_list = [step_process, step_train, step_eval]
execution = sm_exec.exec_pipelines(pipeline_name, role, test_parameters_list, test_steps_list)


Job Name:  Abalone/sklearn-abalone-preprocess-2023-01-09-08-42-42-128
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/code/2eda08d19d8e93cbfb937696aa03d533/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariable object at 0x7fad03ed1610>, 'PreprocessAbaloneData', 'output', 'train']), 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'validation', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariab

In [143]:
sm_exec.describe_pipelines(execution)
sm_exec.get_step_results(execution,test_steps_list)

Pipelines Status : Succeeded 

- StepName : EvaluateAbaloneModel, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/otnwv17cs193'} 

- StepName : TrainAbaloneModel, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/mu6jv29rzzfc'} 

- StepName : PreprocessAbaloneData, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/b67huv47ewxp'} 

{'Processing': 'pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc',
 'Training': 'pipelines-mu6jv29rzzfc-TrainAbaloneModel-rYZGCJvKgK'}
Processing : pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc
[{'AppManaged': False,
  'OutputName': 'train',
  'S3Output': {'LocalPath': '/opt/ml/processing/train',
               'S3UploadMode': 'EndOfJob',
         

## 6. Model Metrics 단계 정의

In [155]:
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on='/', values=[step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"], 'evaluation.json']),
        content_type="application/json"
    )
)

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(
    name="RegisterAbaloneModel",
    step_args=step_args,
)

# condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0,
)
step_cond = ConditionStep(
    name="CheckMSEAbaloneEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)


Job Name:  Abalone/script-abalone-eval-2023-01-09-09-11-20-402
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7facfcda0a10>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fad02b07dd0>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-223194520082/Abalone/script-abalone-eval-2023-01-09-09-11-20-402/input/code/evaluate.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3

In [169]:
test_steps_list = [step_process, step_train, step_eval, step_cond]
execution = sm_exec.exec_pipelines(pipeline_name, role, test_parameters_list, test_steps_list)


Job Name:  Abalone/sklearn-abalone-preprocess-2023-01-09-09-29-58-847
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-223194520082/AbalonePipeline/code/2eda08d19d8e93cbfb937696aa03d533/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariable object at 0x7fad03ed1610>, 'PreprocessAbaloneData', 'output', 'train']), 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'validation', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-east-1-223194520082', 'AbalonePipeline', <sagemaker.workflow.execution_variables.ExecutionVariab

In [189]:
sm_exec.describe_pipelines(execution)
sm_exec.get_step_results(execution,test_steps_list)

Pipelines Status : Succeeded 

- StepName : RegisterAbaloneModel-RegisterModel, StepStatus : Succeeded
- StepName : CheckMSEAbaloneEvaluation, StepStatus : Succeeded
- StepName : EvaluateAbaloneModel, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/otnwv17cs193'} 

- StepName : TrainAbaloneModel, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/mu6jv29rzzfc'} 

- StepName : PreprocessAbaloneData, StepStatus : Succeeded
- CacheHitResult : {'SourcePipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223194520082:pipeline/abalonepipeline/execution/b67huv47ewxp'} 

{'Condition': 'True',
 'Processing': 'pipelines-b67huv47ewxp-preprocessabalonedat-nd2bfouvdc',
 'RegisterModel': 'abalonepackagegroup/4',
 'Training': 'pipelines-mu6jv29rzzfc-TrainAbaloneModel-rYZGCJvKgK'}
step_name : Processing
P