# [모듈 2] 사용자 정의 컨테이너 사용 (Bring Your Own Container)



이 노브북을 실행하면 아래와 같은 결과를 보실 수 있습니다.
- ![kf-pipeline.png](img/kf-pipeline.png)

## Amazon SageMaker Components for Kubeflow Pipelines Example - custom container
In this example we'll build a Kubeflow pipeline where every component call a different Amazon SageMaker feature.
Our simple pipeline will perform:

1. Hyperparameter optimization 
1. Select best hyperparameters and increase epochs
1. Training model on the best hyperparameters 
1. Create an Amazon SageMaker model
1. Deploy model

In [2]:
import kfp
from kfp import components
from kfp.components import func_to_container_op
from kfp import dsl
import time, os, json

Amazon SageMaker Component URLs are available here: <br>
https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker

In [3]:
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/hyperparameter_tuning/component.yaml')
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/train/component.yaml')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/deploy/component.yaml')

In [4]:
import sagemaker
import boto3

sess = boto3.Session()
account = boto3.client('sts').get_caller_identity().get('Account')
sm   = sess.client('sagemaker')
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)

#### Prepare training datasets and upload to Amazon S3

In [5]:
bucket_name = sagemaker_session.default_bucket()
job_folder      = 'jobs'
dataset_folder  = 'datasets'
local_dataset = 'cifar10'

import os
os.makedirs(local_dataset, exist_ok=True)
# TensorFlow is required to download and convert the dataset to TFRecord format
!python generate_cifar10_tfrecords.py --data-dir {local_dataset}
datasets = sagemaker_session.upload_data(path='cifar10', key_prefix='datasets/cifar10-dataset')

# If dataset is already in S3 use the dataset's path:
# datasets = 's3://{bucket_name}/{dataset_folder}/cifar10-dataset'




Download from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and extract.
Successfully downloaded cifar-10-python.tar.gz 170498071 bytes.
Generating cifar10/train/train.tfrecords
Generating cifar10/validation/validation.tfrecords
Generating cifar10/eval/eval.tfrecords
Done!


#### Build your Docker container and push it to Amazon ECR

------------------------------------------------------------
**STOP:** Open **`/docker/build_docker_push_to_ecr.ipynb`** and follow steps to build and push container to Amazon ECR before proceeding

------------------------------------------------------------

#### Create a custom pipeline op
Takes the results from a hyperparameter tuning job and increases the number of epochs for the next training job

In [8]:
def update_best_model_hyperparams(hpo_results, best_model_epoch = "80") -> str:
    import json
    r = json.loads(str(hpo_results))
    return json.dumps(dict(r,epochs=best_model_epoch))

get_best_hyp_op = func_to_container_op(update_best_model_hyperparams)

In [9]:
import boto3
region = boto3.Session().region_name
print("region: ", region)

region:  ap-northeast-2


## 추론 이미지 검색

Available Deep Learning Containers Images
- https://github.com/aws/deep-learning-containers/blob/master/available_images.md

특정 계정의 리파지토리 검색 명령어
- `aws ecr describe-repositories --repository-names tensorflow-inference --registry-id 763104351884`

특정 리파지토리의 Tag 검색
- `aws ecr list-images --repository-name tensorflow-inference --registry-id 763104351884 --max-items 100 | grep 1.15.2-cpu`


In [10]:
! aws ecr describe-repositories --repository-names tensorflow-inference --registry-id 763104351884

{
    "repositories": [
        {
            "repositoryArn": "arn:aws:ecr:ap-northeast-2:763104351884:repository/tensorflow-inference",
            "registryId": "763104351884",
            "repositoryName": "tensorflow-inference",
            "repositoryUri": "763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/tensorflow-inference",
            "createdAt": 1553139224.0,
            "imageTagMutability": "MUTABLE",
            "imageScanningConfiguration": {
                "scanOnPush": false
            },
            "encryptionConfiguration": {
                "encryptionType": "AES256"
            }
        }
    ]
}


In [11]:
! aws ecr list-images --repository-name tensorflow-inference --registry-id 763104351884 --max-items 100 | grep 1.15.2-cpu

            "imageTag": "1.15.2-cpu-py36-ubuntu18.04-v4.4"
            "imageTag": "1.15.2-cpu-py36-ubuntu18.04-v4.4-2020-03-19-21-33-20"


In [12]:
inference_image = '763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/tensorflow-inference:1.15.2-cpu-py36-ubuntu18.04-v4.4'
# inference_image = '763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:1.15.2-cpu'
# inference_image = '763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/tensorflow-inference:1.15.2-cpu'

#### Create a pipeline

In [13]:
@dsl.pipeline(
    name='cifar10 hpo train deploy pipeline',
    description='cifar10 hpo train deploy pipeline using sagemaker'
)
def cifar10_hpo_train_deploy(region= region,
                           training_input_mode='File',
                           train_image=f'{account}.dkr.ecr.{region}.amazonaws.com/sagemaker-kubernetes:latest',
                           serving_image=inference_image,
                           volume_size='50',
                           max_run_time='86400',
                           instance_type='ml.p3.8xlarge',
                           network_isolation='False',
                           traffic_encryption='False',
                           spot_instance='False',
                           channels='[ \
                    { \
                        "ChannelName": "train", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "s3://'+bucket_name+'/datasets/cifar10-dataset/train", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "validation", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "s3://'+bucket_name+'/datasets/cifar10-dataset/validation", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "eval", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "s3://'+bucket_name+'/datasets/cifar10-dataset/eval", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    } \
                ]'
                          ):
    # Component 1
    hpo = sagemaker_hpo_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        strategy='Bayesian',
        metric_name='val_acc',
        metric_definitions='{"val_acc": "val_acc: ([0-9\\\\.]+)"}',
        metric_type='Maximize',
        static_parameters='{ \
            "epochs": "1", \
            "momentum": "0.9", \
            "weight-decay": "0.0002", \
            "model_dir":"s3://'+bucket_name+'/jobs", \
            "sagemaker_region": "ap-northeast-2" \
        }',
        continuous_parameters='[ \
            {"Name": "learning-rate", "MinValue": "0.0001", "MaxValue": "0.1", "ScalingType": "Logarithmic"} \
        ]',
        categorical_parameters='[ \
            {"Name": "optimizer", "Values": ["sgd", "adam"]}, \
            {"Name": "batch-size", "Values": ["32", "128", "256"]}, \
            {"Name": "model-type", "Values": ["resnet", "custom"]} \
        ]',
        channels=channels,
        output_location=f's3://{bucket_name}/jobs',
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_num_jobs='1',
        max_parallel_jobs='1',
        max_run_time=max_run_time,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role
    )
    
    # Component 2
    training_hyp = get_best_hyp_op(hpo.outputs['best_hyperparameters'])
    
    # Component 3
    training = sagemaker_train_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        hyperparameters=training_hyp.output,        
        channels=channels,
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_run_time=max_run_time,
        model_artifact_path=f's3://{bucket_name}/jobs',
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role,
    )

    # Component 4
    create_model = sagemaker_model_op(
        region=region,
        model_name=training.outputs['job_name'],
        image=serving_image,
        model_artifact_url=training.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        role=role
    )

    # Component 5
    prediction = sagemaker_deploy_op(
        region=region,
        model_name_1=create_model.output,
        instance_type_1='ml.m5.large'
    )

In [14]:
kfp.compiler.Compiler().compile(cifar10_hpo_train_deploy,'sm-hpo-train-deploy-pipeline.zip')

## 잡 확인
- 현재 아래 링크는 동작 안합니다. 

In [15]:
client = kfp.Client()
aws_experiment = client.create_experiment(name='aws')

exp_name    = f'cifar10-hpo-train-deploy-kfp-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}'
my_run = client.run_pipeline(aws_experiment.id, exp_name, 'sm-hpo-train-deploy-pipeline.zip')

## 현재 추론은 에러 발생 함

In [23]:
# endpoint_name = 'Endpoint-20210408030418-BHHB'
endpoint_name = 'kf-console2'

In [29]:
import json, boto3
client = boto3.client('runtime.sagemaker')

file_name = '1000_dog.png'
with open(file_name, 'rb') as f:
    payload = f.read()
    
# print("payload: \n", payload)    

response = client.invoke_endpoint(EndpointName= endpoint_name, 
                                  ContentType='application/x-image', 
                                  Body=payload)
print(response['Body'].read())
labels = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']

ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received client error (415) from model with message "{"error": "Unsupported Media Type: application/x-image"}". See https://ap-northeast-2.console.aws.amazon.com/cloudwatch/home?region=ap-northeast-2#logEventViewer:group=/aws/sagemaker/Endpoints/kf-console2 in account 189546603447 for more information.

## 에러

`INFO:root:Submitting HyperParameter Tuning Job request to SageMaker...
Traceback (most recent call last):
  File "hyperparameter_tuning.py", line 95, in <module>
    main()
  File "hyperparameter_tuning.py", line 73, in main
    hpo_job_name = _utils.create_hyperparameter_tuning_job(client, vars(args))
  File "/app/common/_utils.py", line 574, in create_hyperparameter_tuning_job
    raise Exception(e.response['Error']['Message'])
Exception: User: arn:aws:sts::189546603447:assumed-role/eksctl-cluster-nodegroup-cpu-node-NodeInstanceRole-1TH85QZIZHJEA/i-09a03423d5bc7f8f4 is not authorized to perform: sagemaker:CreateHyperParameterTuningJob on resource: arn:aws:sagemaker:ap-northeast-2:189546603447:hyper-parameter-tuning-job/hpojob-20210408021532-53nh
11
`

- IAM Role (eksctl-cluster-nodegroup-cpu-node-NodeInstanceRole-1TH85QZIZHJEA/i-09a03423d5bc7f8f4) 에 SageMakerFullAccess 정책을 추가 해주세요
