In [2]:
!pip install kfp --upgrade
!which dsl-compile

Collecting kfp
  Using cached kfp-1.8.13.tar.gz (300 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5
  Using cached google_api_core-2.8.2-py3-none-any.whl (114 kB)
Collecting google-cloud-storage<2,>=1.20.0
  Using cached google_cloud_storage-1.44.0-py2.py3-none-any.whl (106 kB)
Collecting kubernetes<19,>=8.0.0
  Using cached kubernetes-18.20.0-py2.py3-none-any.whl (1.6 MB)
Collecting google-api-python-client<2,>=1.7.8
  Using cached google_api_python_client-1.12.11-py2.py3-none-any.whl (62 kB)
Collecting requests-toolbelt<1,>=0.8.0
  Using cached requests_toolbelt-0.9.1-py2.py3-none-any.whl (54 kB)
Collecting kfp-server-api<2.0.0,>=1.1.2
  Using cached kfp-server-api-1.8.4.tar.gz (58 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting jsonschema<4,>=3.0.1
  Downloading jsonschema-3.2.0-py2.py3-none-any.whl (56 kB)
     |████████████████████████████████| 56 kB 633 kB/s             
Collectin

## Amazon SageMaker Components for Kubeflow Pipelines - script mode
In this example we'll build a Kubeflow pipeline where every component call a different Amazon SageMaker feature.
Our simple pipeline will perform:

1. Hyperparameter optimization 
1. Select best hyperparameters and increase epochs
1. Training model on the best hyperparameters 
1. Create an Amazon SageMaker model
1. Deploy model

In [3]:
import kfp
from kfp import components
from kfp.components import func_to_container_op
from kfp import dsl
import time, os, json

https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker

In [4]:
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/hyperparameter_tuning/component.yaml')
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/train/component.yaml')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/deploy/component.yaml')

In [5]:
import sagemaker
import boto3

sess = boto3.Session()
sm   = sess.client('sagemaker')
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)

#### Prepare training datasets and upload to Amazon S3

In [6]:
bucket_name = sagemaker_session.default_bucket()
job_folder      = 'jobs'
dataset_folder  = 'datasets'
local_dataset = 'cifar10'

!python generate_cifar10_tfrecords.py --data-dir {local_dataset}
datasets = sagemaker_session.upload_data(path='cifar10', key_prefix='datasets/cifar10-dataset')

# If dataset is already in S3 use the dataset's path:
# datasets = 's3://{bucket_name}/{dataset_folder}/cifar10-dataset'

2022-08-22 18:54:31.735387: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.
2022-08-22 18:54:31.741844: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:105] SageMaker Profiler is not enabled. The timeline writer thread will not be started, future recorded events will be dropped.
2022-08-22 18:54:31.879505: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.
Traceback (most recent call last):
  File "generate_cifar10_tfrecords.py", line 35, in <module>
    tf.logging.set_verbosity(tf.logging.ERROR)
AttributeError: module 'tensorflow' has no attribute 'logging'


FileNotFoundError: [Errno 2] No such file or directory: 'cifar10'

#### Upload training scripts to Amazon S3

In [None]:
!tar cvfz sourcedir.tar.gz --exclude=".ipynb*" -C code .
source_s3 = sagemaker_session.upload_data(path='sourcedir.tar.gz', key_prefix='training-scripts')
print('\nUploaded to S3 location:')
print(source_s3)

#### Create a custom pipeline op
Takes the results from a hyperparameter tuning job and increases the number of epochs for the next training job

In [None]:
def update_best_model_hyperparams(hpo_results, best_model_epoch = "80") -> str:
    import json
    r = json.loads(str(hpo_results))
    return json.dumps(dict(r,epochs=best_model_epoch))

get_best_hyp_op = func_to_container_op(update_best_model_hyperparams)

#### Create a pipeline

In [None]:
@dsl.pipeline(
    name='cifar10 hpo train deploy pipeline',
    description='cifar10 hpo train deploy pipeline using sagemaker'
)
def cifar10_hpo_train_deploy(region='us-west-2',
                           training_input_mode='File',
                           train_image='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-training:1.15.2-gpu-py36-cu100-ubuntu18.04',
                           serving_image='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:1.15.2-cpu',
                           volume_size='50',
                           max_run_time='86400',
                           instance_type='ml.p3.2xlarge',
                           network_isolation='False',
                           traffic_encryption='False',
                           spot_instance='False',
                           channels='[ \
                    { \
                        "ChannelName": "train", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "'+datasets+'/train", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "validation", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "'+datasets+'/validation", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "eval", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "'+datasets+'/eval", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    } \
                ]'
                          ):
    # Component 1
    hpo = sagemaker_hpo_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        strategy='Bayesian',
        metric_name='val_acc',
        metric_definitions='{"val_acc": "val_acc: ([0-9\\\\.]+)"}',
        metric_type='Maximize',
        static_parameters='{ \
            "epochs": "10", \
            "momentum": "0.9", \
            "weight-decay": "0.0002", \
            "model_dir":"s3://'+bucket_name+'/jobs", \
            "sagemaker_program": "cifar10-training-sagemaker.py", \
            "sagemaker_region": "us-west-2", \
            "sagemaker_submit_directory": "'+source_s3+'" \
        }',
        continuous_parameters='[ \
            {"Name": "learning-rate", "MinValue": "0.0001", "MaxValue": "0.1", "ScalingType": "Logarithmic"} \
        ]',
        categorical_parameters='[ \
            {"Name": "optimizer", "Values": ["sgd", "adam"]}, \
            {"Name": "batch-size", "Values": ["32", "128", "256"]}, \
            {"Name": "model-type", "Values": ["resnet", "custom"]} \
        ]',
        channels=channels,
        output_location=f's3://{bucket_name}/jobs',
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_num_jobs='16',
        max_parallel_jobs='4',
        max_run_time=max_run_time,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role
    )
    
    # Component 2
    training_hyp = get_best_hyp_op(hpo.outputs['best_hyperparameters'])
    
    # Component 3
    training = sagemaker_train_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        hyperparameters=training_hyp.output,
        channels=channels,
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_run_time=max_run_time,
        model_artifact_path=f's3://{bucket_name}/jobs',
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role,
    )

    # Component 4
    create_model = sagemaker_model_op(
        region=region,
        model_name=training.outputs['job_name'],
        image=serving_image,
        model_artifact_url=training.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        role=role
    )

    # Component 5
    prediction = sagemaker_deploy_op(
        region=region,
        model_name_1=create_model.output,
        instance_type_1='ml.m5.large'
    )

In [None]:
kfp.compiler.Compiler().compile(cifar10_hpo_train_deploy,'sm-hpo-train-deploy-pipeline.zip')

In [None]:
client = kfp.Client()
aws_experiment = client.create_experiment(name='sm-kfp-experiment')

exp_name    = f'cifar10-hpo-train-deploy-kfp-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}'
my_run = client.run_pipeline(aws_experiment.id, exp_name, 'sm-hpo-train-deploy-pipeline.zip')

In [None]:
import json, boto3, numpy as np
client = boto3.client('runtime.sagemaker')

file_name = '1000_dog.png'
with open(file_name, 'rb') as f:
    payload = f.read()

response = client.invoke_endpoint(EndpointName='Endpoint-20200522021801-DR5P', 
                                   ContentType='application/x-image', 
                                   Body=payload)
pred = json.loads(response['Body'].read())['predictions']
labels = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
for l,p in zip(labels, pred[0]):
    print(l,"{:.4f}".format(p*100))