# <B> # SageMaker pipeline </B>
* Container: codna_python3

## AutoReload

In [11]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## 0. Install packages

In [2]:
install_needed = False  # should only be True once
# install_needed = False

In [3]:
%%bash
#!/bin/bash

DAEMON_PATH="/etc/docker"
MEMORY_SIZE=10G

FLAG=$(cat $DAEMON_PATH/daemon.json | jq 'has("data-root")')
# echo $FLAG

if [ "$FLAG" == true ]; then
    echo "Already revised"
else
    echo "Add data-root and default-shm-size=$MEMORY_SIZE"
    sudo cp $DAEMON_PATH/daemon.json $DAEMON_PATH/daemon.json.bak
    sudo cat $DAEMON_PATH/daemon.json.bak | jq '. += {"data-root":"/home/ec2-user/SageMaker/.container/docker","default-shm-size":"'$MEMORY_SIZE'"}' | sudo tee $DAEMON_PATH/daemon.json > /dev/null
    sudo service docker restart
    echo "Docker Restart"
fi

Already revised


In [4]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
    !{sys.executable} -m pip install -U pip
    !{sys.executable} -m pip install -U smdebug sagemaker-experiments
    !{sys.executable} -m pip install -U sagemaker

    IPython.Application.instance().kernel.do_shutdown(True)

installing deps and restarting kernel
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting sagemaker
  Downloading sagemaker-2.146.0.tar.gz (718 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m718.5/718.5 kB[0m [31m27.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: sagemaker
  Building wheel for sagemaker (setup.py) ... [?25ldone
[?25h  Created wheel for sagemaker: filename=sagemaker-2.146.0-py2.py3-none-any.whl size=964936 sha256=d3a9669420018e3c5e20828dddd14289133eb66b2cdc3c6309016c0923d44bc4
  Stored in directory: /home/ec2-user/.cache/pip/wheels/3a/04/13/2066fc4ef9ed243c9e8710b9c269f29e7711bca655da2eb416
Successfully built sagemaker
Installing collected packages: sagemaker
  Att

## 1. parameter store 설정

In [12]:
import boto3
from utils.ssm import parameter_store

In [13]:
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
strPrefix = pm.get_params(key="PREFIX")

In [14]:
strBucketName = pm.get_params(key="-".join([strPrefix, "BUCKET"]))
strExecutionRole = pm.get_params(key="-".join([strPrefix, "SAGEMAKER-ROLE-ARN"]))

## 2. Dataset

In [15]:
import os

In [16]:
strS3DataPath = f"s3://{strBucketName}/dataset" 
strLocalDataPath = os.path.join(os.getcwd(), "data")

## 3. MLOps pipeline
* pipeline:
    * https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#id2 
    * [Amazon SageMaker 모델 구축 파이프라인](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines.html)
    

In [17]:
import os
import time
import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession
from sagemaker.workflow.retry import StepRetryPolicy, StepExceptionTypeEnum, SageMakerJobExceptionTypeEnum, SageMakerJobStepRetryPolicy

In [32]:
class pipeline():

    def __init__(self, strPipelineName):

        self.strRegionName = boto3.Session().region_name
        self.pm = parameter_store(self.strRegionName)
        self.strPrefix = self.pm.get_params(key="PREFIX")
        
        self.strExecutionRole = self.pm.get_params(key="-".join([self.strPrefix, "SAGEMAKER-ROLE-ARN"]))
        self.strBucketName = self.pm.get_params(key="-".join([self.strPrefix, "BUCKET"]))
        self.strPipelineName = strPipelineName
        
        self.pipeline_session = PipelineSession()
        self.strDataPath = f"s3://{self.strBucketName}/dataset" 
        self.cache_config = CacheConfig(
            enable_caching=True,
            expire_after="T48H"
        )
        
        self.retry_policies=[                
            # retry when resource limit quota gets exceeded
            SageMakerJobStepRetryPolicy(
                exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
                expire_after_mins=180,
                interval_seconds=60,
                backoff_rate=1.0
            ),
        ]
        
        self.pm.put_params(key="-".join([self.strPrefix, "PIPELINE-NAME"]), value=self.strPipelineName, overwrite=True)
        
    def _step_training(self, ):
        
        strInstanceType = "ml.m5.2xlarge"
        nInstanceCount = 1
        bSpotTraining = False
        
        dicDataChannels = {
            "training": os.path.join(self.strDataPath, "train_.csv"),
            "testing": os.path.join(self.strDataPath, "test.csv"),
        }
        
        if bSpotTraining:
            nMaxWait = 1*60*60
            nMaxRun = 1*60*60
        else:
            nMaxWait = None
            nMaxRun = 1*60*60
    
        strOutputPath = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "training",
            "model-output"
        )

        strCodeLocation = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "training",
            "backup_codes"
        )
        
        dicHyperparameters = {
            "scale_pos_weight" : "19",    
            "max_depth": "2",
            "eta": "0.3",
            "objective": "binary:logistic",
            "num_round": "100",
        }
        
        self.estimator = XGBoost(
            entry_point="xgboost_starter_script.py",
            source_dir="source/train/",
            output_path=strOutputPath,
            code_location=strCodeLocation,
            hyperparameters=dicHyperparameters, ## Contatiner내 env. variable로 들어 감
            role=self.strExecutionRole,
            instance_count=nInstanceCount,
            instance_type=strInstanceType,
            framework_version="1.3-1",
            max_run=nMaxRun,
            use_spot_instances=bSpotTraining,
            max_wait=nMaxWait,
            #keep_alive_period_in_seconds=nKeepAliveSeconds,
            enable_sagemaker_metrics=True,
            volume_size=64, ## GB
            
            sagemaker_session=self.pipeline_session
            
        )
        
        job_name = "-".join([self.strPipelineName, "training-job"])
        step_training_args = self.estimator.fit(
            inputs=dicDataChannels,
            job_name=job_name,
            experiment_config={
              'TrialName': job_name,
              'TrialComponentDisplayName': job_name,
            },
            logs="All",
        )
        
        self.training_process = TrainingStep(
            name="TrainingProcess",
            step_args=step_training_args,
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
            
        print ("  \n== Training Step ==")
        print ("   \nArgs: ", self.training_process.arguments.items())   
        
    
    def _step_evaluation(self, ):
        
        strInstanceType = "ml.m5.2xlarge"
        nInstanceCount = 1
        
        strProcPrefixPath = "/opt/ml/processing"
        strTestDataPath = os.path.join(self.strDataPath, "test.csv")
        
        strOutputPath = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "evaluation",
            "output"
        )

        strCodeLocation = os.path.join(
            "s3://{}".format(self.strBucketName),
            self.strPipelineName,
            "evaluation",
            "backup_codes"
        )
        
        evaluation_processor = FrameworkProcessor(
            estimator_cls=XGBoost,
            framework_version="1.3-1",
            image_uri=None,
            role=self.strExecutionRole,
            instance_type=strInstanceType,
            instance_count=nInstanceCount,
            base_job_name="evaluation", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            
            sagemaker_session=self.pipeline_session
        )
        
        step_evaluation_args = evaluation_processor.run(
            code="evaluation.py",
            source_dir="source/evaluation/",
            inputs=[
                ProcessingInput(
                    source=strTestDataPath,
                    input_name="test_data",
                    destination=os.path.join(strProcPrefixPath, "test")
                ),
                ProcessingInput(
                    source=self.training_process.properties.ModelArtifacts.S3ModelArtifacts,
                    input_name="model_weight",
                    destination=os.path.join(strProcPrefixPath, "model")
                )
            ],
            outputs=[
                ProcessingOutput(
                    source=os.path.join(strProcPrefixPath, "output"),
                    output_name='evaluation',
                    destination=strOutputPath,
                )
            ],
        )
        
        self.evaluation_report = PropertyFile(
            name="EvaluationReport",
            output_name="evaluation", ## evaluation의 ProcessingOutput의 output_name
            path="evaluation.json", ## evaluate.py 에서 write하는 부분
        )
        
        self.evaluation_process = ProcessingStep(
            name="EvaluationProcess", ## Processing job이름
            step_args=step_evaluation_args,
            depends_on=[self.training_process],
            property_files=[self.evaluation_report],
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
    
        print ("  \n== Evaluation Step ==")
        print ("   \nArgs: ", self.evaluation_process.arguments.items())
        
    
    def _step_deploy(self, ):
        
        strInstanceType = "ml.m5.2xlarge"
        nInstanceCount = 1
        strEndpointName = f'endpoint--{self.strPipelineName}-{int(time.time())}'
        strProcPrefixPath = "/opt/ml/processing"
        
        deploy_processor = FrameworkProcessor(
            estimator_cls=XGBoost,
            framework_version="1.3-1",
            image_uri=None,
            role=self.strExecutionRole,
            instance_type=strInstanceType,
            instance_count=nInstanceCount,
            base_job_name="deploy", # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
            sagemaker_session=self.pipeline_session
        )
        
        step_deploy_args = deploy_processor.run(
            code="deploy.py",
            source_dir="source/deploy/",
            inputs=[
                ProcessingInput(
                    source="source/deploy/inference.py",
                    input_name="inference-py",
                    destination=os.path.join(strProcPrefixPath, "inference")
                ),
                ProcessingInput(
                    source="source/deploy/requirements.txt",
                    input_name="requirements-txt",
                    destination=os.path.join(strProcPrefixPath, "requirements")
                ),
            ],
            arguments=[
                "--prefix_deploy", strProcPrefixPath, \
                "--region", self.strRegionName, \
                "--instance_type", strInstanceType, \
                "--model_data", self.training_process.properties.ModelArtifacts.S3ModelArtifacts, \
                "--endpoint_name", strEndpointName, \
                "--execution_role", self.strExecutionRole, \
            ],
            job_name="deploy",
        )
        
        self.pm.put_params(key=self.strPrefix + "-ENDPOINT-NAME", value=strEndpointName, overwrite=True)
        
        self.deploy_process = ProcessingStep(
            name="DeployProcess", ## Processing job이름
            step_args=step_deploy_args,
            depends_on=[self.evaluation_process],
            cache_config=self.cache_config,
            retry_policies=self.retry_policies
        )
        
        print ("  \n== Deploy Step ==")
        print ("   \nArgs: ", self.deploy_process.arguments.items())
    
    def _get_pipeline(self, ):
        
        pipeline = Pipeline(
            name=self.strPipelineName,
            steps=[self.training_process, self.evaluation_process, self.deploy_process],
            sagemaker_session=self.pipeline_session
        )

        return pipeline
                      
    def execution(self, ):
    
        self._step_training()
        self._step_evaluation()
        self._step_deploy()
        
        pipeline = self._get_pipeline()
        pipeline.upsert(role_arn=self.strExecutionRole) ## Submit the pipeline definition to the SageMaker Pipelines service 
        execution = pipeline.start()
        
        desc = execution.describe()
        pm.put_params(
            key="-".join([strPrefix, "PIPELINE-ARN"]),
            value=desc["PipelineArn"],
            overwrite=True
        )
        print (execution.describe())
     

In [33]:
pipe = pipeline(
    strPipelineName=f'{strPrefix}-PIPELINE'
)
pipe.execution()

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.2xlarge.
INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.2xlarge.


  
== Training Step ==
   
Args:  dict_items([('AlgorithmSpecification', {'TrainingInputMode': 'File', 'TrainingImage': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.3-1', 'EnableSageMakerMetricsTimeSeries': True}), ('OutputDataConfig', {'S3OutputPath': 's3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/training/model-output'}), ('StoppingCondition', {'MaxRuntimeInSeconds': 3600}), ('ResourceConfig', {'VolumeSizeInGB': 64, 'InstanceCount': 1, 'InstanceType': 'ml.m5.2xlarge'}), ('RoleArn', 'arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436'), ('InputDataConfig', [{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://sagemaker-us-east-1-419974056037/dataset/train_.csv', 'S3DataDistributionType': 'FullyReplicated'}}, 'ChannelName': 'training'}, {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://sagemaker-us-east-1-419974056037/dataset/test.csv', 'S3DataDistributionType': 'FullyRep

INFO:sagemaker.processing:Uploaded source/evaluation/ to s3://sagemaker-us-east-1-419974056037/evaluation-2023-05-04-15-46-30-298/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-419974056037/evaluation-2023-05-04-15-46-30-298/source/runproc.sh
INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.2xlarge.


   
Args:  dict_items([('ProcessingResources', {'ClusterConfig': {'InstanceType': 'ml.m5.2xlarge', 'InstanceCount': 1, 'VolumeSizeInGB': 30}}), ('AppSpecification', {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.3-1', 'ContainerEntrypoint': ['/bin/bash', '/opt/ml/processing/input/entrypoint/runproc.sh']}), ('RoleArn', 'arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436'), ('ProcessingInputs', [{'InputName': 'test_data', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/dataset/test.csv', 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'model_weight', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f4bb3fdcb50>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDi

INFO:sagemaker.processing:Uploaded source/deploy/ to s3://sagemaker-us-east-1-419974056037/deploy/source/sourcedir.tar.gz


  
== Deploy Step ==


INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-419974056037/deploy/source/runproc.sh


   
Args:  dict_items([('ProcessingResources', {'ClusterConfig': {'InstanceType': 'ml.m5.2xlarge', 'InstanceCount': 1, 'VolumeSizeInGB': 30}}), ('AppSpecification', {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.3-1', 'ContainerArguments': ['--prefix_deploy', '/opt/ml/processing', '--region', 'us-east-1', '--instance_type', 'ml.m5.2xlarge', '--model_data', <sagemaker.workflow.properties.Properties object at 0x7f4bb3fdcb50>, '--endpoint_name', 'endpoint--DJ-SM-IMD-PIPELINE-1683215190', '--execution_role', 'arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436'], 'ContainerEntrypoint': ['/bin/bash', '/opt/ml/processing/input/entrypoint/runproc.sh']}), ('RoleArn', 'arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436'), ('ProcessingInputs', [{'InputName': 'inference-py', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/deploy/input/inference-py/inference

INFO:sagemaker.processing:Uploaded source/evaluation/ to s3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/code/0e7bf581858560b392b8bc18209c4aaf/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh
INFO:sagemaker.processing:Uploaded source/deploy/ to s3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/code/aeadf6a9077978739a1abd63ee2f9113/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/code/be2cbd35f419f747dca87a487b64f344/runproc.sh
INFO:sagemaker.processing:Uploaded source/evaluation/ to s3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/code/0e7bf581858560b392b8bc18209c4aaf/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-419974056037/DJ-SM-IMD-PIPELINE/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh
INFO:sagemaker.processing:Upload

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline', 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline/execution/5llmkchihygb', 'PipelineExecutionDisplayName': 'execution-1683215194867', 'PipelineExecutionStatus': 'Executing', 'PipelineExperimentConfig': {'ExperimentName': 'dj-sm-imd-pipeline', 'TrialName': '5llmkchihygb'}, 'CreationTime': datetime.datetime(2023, 5, 4, 15, 46, 34, 735000, tzinfo=tzlocal()), 'LastModifiedTime': datetime.datetime(2023, 5, 4, 15, 46, 34, 735000, tzinfo=tzlocal()), 'CreatedBy': {}, 'LastModifiedBy': {}, 'ResponseMetadata': {'RequestId': '13be44d2-2a88-47f5-ae82-345e1815b793', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '13be44d2-2a88-47f5-ae82-345e1815b793', 'content-type': 'application/x-amz-json-1.1', 'content-length': '495', 'date': 'Thu, 04 May 2023 15:46:35 GMT'}, 'RetryAttempts': 0}}


In [34]:
print (f'pipeline-arn: {pm.get_params(key="-".join([strPrefix, "PIPELINE-ARN"]))}')

pipeline-arn: arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline


## 4. Inference by boto3

In [37]:
import os
import json
import boto3
import pandas as pd

In [44]:
pdTest = pd.read_csv(f'{strLocalDataPath}/test.csv')
pdLabel = pdTest.iloc[:, 0].astype('int')
pdTest = pdTest.drop('fraud', axis=1)
payload = pdTest.values[0, :].tobytes() 

In [45]:
runtime_client = boto3.Session().client('sagemaker-runtime')
strEndpointName = pm.get_params(key="-".join([strPrefix, "ENDPOINT-NAME"]))
strEndpointName

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


'endpoint--DJ-SM-IMD-PIPELINE-1682324962'

In [49]:
strDeserializer = "text/csv"
#strDeserializer = "application/json"

response = runtime_client.invoke_endpoint(
    EndpointName=strEndpointName, 
    ContentType='application/x-npy',
    #Accept='application/json',
    Accept=strDeserializer,
    Body=payload
)

## deserialization
if strDeserializer == "application/json":
    out = json.loads(response['Body'].read().decode()) ## for json
elif strDeserializer == "text/csv":
    out =response['Body'].read().decode().split(",") ## for csv

print (f'Response: {out}')

Response: ['0.6454359889030457', '1.0']
