# Pipeline

## reference
- https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html
- https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.html
- [batch transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html)

## Consideration
- https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-access.html
    - check IAM role for running pipeline

In [None]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"
region, role, default_bucket

In [None]:
!mkdir -p data
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path
)

base_uri = f"s3://hyun/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

In [None]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

In [4]:
# parameters. 파이프라인을 다시 실행할 때, 파이프라인 정의를 바꾸지 않고, 해당 파라미터들의 값만 바꿔서 다시 실행할 수 있게 함
# 각 파라미터들은 쓰고 싶은 스텝에 넣어주면, 파이프라인 정의에 설정됨.
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)
from sagemaker.workflow.functions import Join

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
bucket = ParameterString(
    name="bucket",
    default_value="my-bucket",
)

s3_url = Join("/", ["s3:/", bucket, "suffix"])


In [83]:
!mkdir -p abalone

In [84]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Because this is a headerless CSV file, specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    # label, numeric featuers, categorical feature 순. categorical feature는 oneHotEncoder에 의해, 카테고리 수만큼 컬럼이 생김
    # Sagemaker에서 피쳐들을 특별히 관리하지는 않음. oneHotEncoding으로 변환 하면, 각 카테고리값이 어떤 피쳐에 속하는지 구분이 사라지게 되는데, 학습할 때는 상관 없어서 그런듯.
    X = np.concatenate((y_pre, X_pre), axis=1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    # train, validation, test로 나누는 이유. https://modern-manual.tistory.com/19
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting abalone/preprocessing.py


In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

In [6]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        # s3 path가 없지만, default s3경로에 저장되는듯
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
)

In [7]:
model_path = f"{base_uri}/AbaloneTrain"

In [8]:
from sagemaker.estimator import Estimator


image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

# instance 갯수를 설정하여, 병렬 학습이 가능한 듯(여러 하이퍼 파라미터 테스트시 등은 별도로 학습 가능하므로)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path, # model.tar.gz 라는 파일명으로 저장됨
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

In [9]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


# 학습시 첫번째 컬럼을 label이라고 자동 인식 하는 듯
# TODO sagemaker execution페이지에서 학습의 output에 매트릭과, confusion metrics. precision-recall curve, ROC Curve등을 보여주는데 어떻게 하는 거지?
step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

In [90]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    
    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting abalone/evaluation.py


In [10]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

In [11]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval, # scikit learn을 안 써서, ScriptProcessor를 쓴 듯?
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)
# evaluation output을 evaluation_report라는 property에 설정하여, 이후 스텝에서 사용

In [12]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [13]:
from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

In [14]:
from sagemaker.workflow.steps import CreateModelStep

# A step that creates a model for use in transform steps or later publication as an endpoint.
# 모델 등록과 달리, 모델 생성은, registry안에 여러 버전으로 존재 하게 되어, 1:N 관계인가?
step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

In [15]:
from sagemaker.transformer import Transformer


# 대량의 데이터 처리시 사용. 여러 인스턴스에서 병렬 처리
# 예) 오프라인 추론시(실시간이 아닌 배치로 사용이 가능한 경우 등)
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"{base_uri}/AbaloneTransform"
)

In [16]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

# batch data 또는 endpoint의 input은 기본적으로 libsvm을 사용하는 듯.
step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

In [17]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
# todo 이 metrics는 어디서에서 사용되는 건지? sagemaker execution에서는 mse만 보이던데, 이 파일에는 mse와 표준편차 둘다 있던데..

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

In [18]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

In [19]:


step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

In [20]:
from sagemaker.workflow.pipeline import Pipeline

# preprocess -> 학습 -> evaluation -> 성능 체크 -> model을 등록, 생성, batch data로 추론

pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond], #input에 다른 step의 output이 있냐에 따라서, 알아서 DAG가 생성 되는 듯. 만약 output이 sagemaker의 output을 통하지 않고 저장된다면, 수동으로 설정할 수 도 있음 (step.add_depends_on)
)

In [None]:
import json

json.loads(pipeline.definition())


In [103]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


In [None]:

#parameters default value can be changed on starting time
# execution = pipeline.start(
#     parameters=dict(
#         ProcessingInstanceType="ml.c5.xlarge",
#         ModelApprovalStatus="Approved"
#     )
# )

In [None]:
execution.describe()


In [105]:
execution.wait()


In [None]:
execution.list_steps()


In [107]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

{'regression_metrics': {'mse': {'value': 4.338938721072455,
   'standard_deviation': 2.079469217220956}}}

In [None]:
execution.stop()


In [22]:
pipeline.delete()



{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:492339234003:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '425d9ac0-d73d-47a6-b3ce-9a559cee9ed7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '425d9ac0-d73d-47a6-b3ce-9a559cee9ed7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Sat, 15 Jan 2022 13:23:44 GMT'},
  'RetryAttempts': 0}}

In [None]:
# 이미 등록된 pipeline실행하기
# upsert는 role을 하면 pipeline이 생성되어서, 하면 안됨.
pipeline = Pipeline(
    name=pipeline_name)

pipeline.start()