In [5]:
import random, string
import datetime
import pickle, gzip, numpy, urllib.request, json
from urllib.parse import urlparse
from sagemaker import get_execution_role
from sagemaker.amazon.common import write_numpy_to_dense_tensor
import io
import boto3

In [None]:
!pip install https://storage.googleapis.com/ml-pipeline/release/0.1.29/kfp.tar.gz --upgrade

In [None]:
# Restart the kernel to pick up pip installed libraries
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

# 1. Enter your S3 bucket below. you can leave it empty and a new bucket will be created

**Note**: Make sure the role assumed by SageMaker either has access to the bucket, or has permissions to create a new one.

In [7]:
S3_BUCKET = ''

### !!! Please do NOT edit anything in the following cell !!!

In [10]:
SAGEMAKER_ROLE_ARN=get_execution_role()
AWS_ACCOUNT_ID=boto3.client('sts').get_caller_identity().get('Account')
AWS_REGION = boto3.session.Session().region_name
SAGEMAKER_ROLE_ARN='arn:aws:iam::{}:role/TeamRole'.format(AWS_ACCOUNT_ID)

if not S3_BUCKET:
    HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])
    S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)

print(f'Your S3 bucket is: {S3_BUCKET}')
print(f'Your region is: {AWS_REGION}')
print(f'Your account ID is: {AWS_ACCOUNT_ID}')
print(f'Your SageMaker role ARN is: {SAGEMAKER_ROLE_ARN}')

Your S3 bucket is: okjwlfrlpdsixvqq5128298611399215-kubeflow-pipeline-data
Your region is: us-east-1
Your account ID is: 231470146047
Your SageMaker role ARN is: arn:aws:iam::231470146047:role/TeamRole


## 1.1. Check if the bucket exits. Create it if not

In [83]:
!aws s3 ls s3://$S3_BUCKET || aws s3 mb s3://$S3_BUCKET


An error occurred (NoSuchBucket) when calling the ListObjectsV2 operation: The specified bucket does not exist
make_bucket: elvupwxbctfruhrb3403662892282909-kubeflow-pipeline-data


## 1.2. Load the test dataset

**Note**: Of course you can use your own data set instead of downloading it again. If so, **please make sure you follow the S3 structure and the data format requirements**. Link to the SageMaker built-in kmeans library - https://docs.aws.amazon.com/sagemaker/latest/dg/k-means.html.

In [84]:
urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz")
with gzip.open('mnist.pkl.gz', 'rb') as f:
    train_set, valid_set, test_set = pickle.load(f, encoding='latin1')



bucket = S3_BUCKET
prefix = 'mnist_kmeans_example'


train_data_key = f'{prefix}/train_data'
test_data_key = f'{prefix}/test_data'
train_data_location = 's3://{}/{}'.format(S3_BUCKET, train_data_key)
test_data_location = 's3://{}/{}'.format(S3_BUCKET, test_data_key)
print('Training data will be uploaded to: {}'.format(train_data_location))
print('Test data will be uploaded to: {}'.format(test_data_location))

# Convert the training data into the format required by the SageMaker KMeans algorithm
buf = io.BytesIO()
write_numpy_to_dense_tensor(buf, train_set[0], train_set[1])
buf.seek(0)

boto3.resource('s3').Bucket(S3_BUCKET).Object(train_data_key).upload_fileobj(buf)

# Convert the test data into the format required by the SageMaker KMeans algorithm
write_numpy_to_dense_tensor(buf, test_set[0], test_set[1])
buf.seek(0)

boto3.resource('s3').Bucket(S3_BUCKET).Object(test_data_key).upload_fileobj(buf)

# Convert the valid data into the format required by the SageMaker KMeans algorithm
numpy.savetxt('valid-data.csv', valid_set[0], delimiter=',', fmt='%g')
s3_client = boto3.client('s3')
input_key = "{}/input/valid_data.csv".format(prefix)
s3_client.upload_file('valid-data.csv', S3_BUCKET, input_key)

Training data will be uploaded to: s3://elvupwxbctfruhrb3403662892282909-kubeflow-pipeline-data/mnist_kmeans_example/train_data
Test data will be uploaded to: s3://elvupwxbctfruhrb3403662892282909-kubeflow-pipeline-data/mnist_kmeans_example/test_data


# 2. Create the Pipeline

## 2.1. Full version

This version gives you full control to absolutely every part of the pipeline, including:
- what components to add,
- detailed hyperparameter setting,
- details on instance type,
- etc.

*If you want to use the very abstracted version - please, proceed to Step 3.2. below.*

### Download the SageMaker components for Kubeflow

In [85]:
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

In [86]:
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/train/component.yaml')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/deploy/component.yaml')
sagemaker_batch_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/batch_transform/component.yaml')
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/942be78bfe0f063084a5a006b3310b811a39f1ec/components/aws/sagemaker/hyperparameter_tuning/component.yaml')

In [104]:
# Configure S3 data path
S3_PIPELINE_PATH='s3://{}/{}'.format(S3_BUCKET, prefix)
print(S3_PIPELINE_PATH)

s3://elvupwxbctfruhrb3403662892282909-kubeflow-pipeline-data/mnist_kmeans_example


In [105]:
@dsl.pipeline(
    name='MNIST Classification pipeline',
    description='MNIST Classification using KMEANS in SageMaker'
)
def mnist_classification(region='us-east-1',
    image='382416733822.dkr.ecr.us-east-1.amazonaws.com/kmeans:1',
    training_input_mode='File',
    hpo_strategy='Bayesian',
    hpo_metric_name='test:msd',
    hpo_metric_type='Minimize',
    hpo_early_stopping_type='Off',
    hpo_static_parameters='{"k": "10", "feature_dim": "784"}',
    hpo_integer_parameters='[{"Name": "mini_batch_size", "MinValue": "500", "MaxValue": "600"}, {"Name": "extra_center_factor", "MinValue": "10", "MaxValue": "20"}]',
    hpo_continuous_parameters='[]',
    hpo_categorical_parameters='[{"Name": "init_method", "Values": ["random", "kmeans++"]}]',
    hpo_channels='[{"ChannelName": "train", \
                "DataSource": { \
                    "S3DataSource": { \
                        "S3Uri": "' + S3_PIPELINE_PATH + '/train_data",  \
                        "S3DataType": "S3Prefix", \
                        "S3DataDistributionType": "FullyReplicated" \
                        } \
                    }, \
                "ContentType": "", \
                "CompressionType": "None", \
                "RecordWrapperType": "None", \
                "InputMode": "File"}, \
               {"ChannelName": "test", \
                "DataSource": { \
                    "S3DataSource": { \
                        "S3Uri": "' + S3_PIPELINE_PATH + '/test_data", \
                        "S3DataType": "S3Prefix", \
                        "S3DataDistributionType": "FullyReplicated" \
                        } \
                    }, \
                "ContentType": "", \
                "CompressionType": "None", \
                "RecordWrapperType": "None", \
                "InputMode": "File"}]',
    hpo_spot_instance='False',
    hpo_max_wait_time='3600',
    hpo_checkpoint_config='{}',
    output_location=S3_PIPELINE_PATH + '/output',
    output_encryption_key='',
    instance_type='ml.p3.2xlarge',
    instance_count='1',
    volume_size='50',
    hpo_max_num_jobs='9',
    hpo_max_parallel_jobs='2',
    max_run_time='3600',
    endpoint_url='',
    network_isolation='True',
    traffic_encryption='False',
    train_channels='[{"ChannelName": "train", \
                "DataSource": { \
                    "S3DataSource": { \
                        "S3Uri": "' + S3_PIPELINE_PATH + '/train_data",  \
                        "S3DataType": "S3Prefix", \
                        "S3DataDistributionType": "FullyReplicated" \
                        } \
                    }, \
                "ContentType": "", \
                "CompressionType": "None", \
                "RecordWrapperType": "None", \
                "InputMode": "File"}]',
    train_spot_instance='False',
    train_max_wait_time='3600',
    train_checkpoint_config='{}',
    batch_transform_instance_type='ml.m4.xlarge',
    batch_transform_input=S3_PIPELINE_PATH + '/input',
    batch_transform_data_type='S3Prefix',
    batch_transform_content_type='text/csv',
    batch_transform_compression_type='None',
    batch_transform_ouput=S3_PIPELINE_PATH + '/output',
    batch_transform_max_concurrent='4',
    batch_transform_max_payload='6',
    batch_strategy='MultiRecord',
    batch_transform_split_type='Line',
    role_arn=SAGEMAKER_ROLE_ARN
    ):

    hpo = sagemaker_hpo_op(
        region=region,
        endpoint_url=endpoint_url,
        image=image,
        training_input_mode=training_input_mode,
        strategy=hpo_strategy,
        metric_name=hpo_metric_name,
        metric_type=hpo_metric_type,
        early_stopping_type=hpo_early_stopping_type,
        static_parameters=hpo_static_parameters,
        integer_parameters=hpo_integer_parameters,
        continuous_parameters=hpo_continuous_parameters,
        categorical_parameters=hpo_categorical_parameters,
        channels=hpo_channels,
        output_location=output_location,
        output_encryption_key=output_encryption_key,
        instance_type=instance_type,
        instance_count=instance_count,
        volume_size=volume_size,
        max_num_jobs=hpo_max_num_jobs,
        max_parallel_jobs=hpo_max_parallel_jobs,
        max_run_time=max_run_time,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=hpo_spot_instance,
        max_wait_time=hpo_max_wait_time,
        checkpoint_config=hpo_checkpoint_config,
        role=role_arn,
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    training = sagemaker_train_op(
        region=region,
        endpoint_url=endpoint_url,
        image=image,
        training_input_mode=training_input_mode,
        hyperparameters=hpo.outputs['best_hyperparameters'],
        channels=train_channels,
        instance_type=instance_type,
        instance_count=instance_count,
        volume_size=volume_size,
        max_run_time=max_run_time,
        model_artifact_path=output_location,
        output_encryption_key=output_encryption_key,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=train_spot_instance,
        max_wait_time=train_max_wait_time,
        checkpoint_config=train_checkpoint_config,
        role=role_arn,
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    create_model = sagemaker_model_op(
        region=region,
        endpoint_url=endpoint_url,
        model_name=training.outputs['job_name'],
        image=training.outputs['training_image'],
        model_artifact_url=training.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        role=role_arn
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    if False:
        prediction = sagemaker_deploy_op(
            region=region,
            endpoint_url=endpoint_url,
            model_name_1=create_model.output,
        ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    
    if False:
        batch_transform = sagemaker_batch_transform_op(
            region=region,
            endpoint_url=endpoint_url,
            model_name=create_model.output,
            instance_type=batch_transform_instance_type,
            instance_count=instance_count,
            max_concurrent=batch_transform_max_concurrent,
            max_payload=batch_transform_max_payload,
            batch_strategy=batch_strategy,
            input_location=batch_transform_input,
            data_type=batch_transform_data_type,
            content_type=batch_transform_content_type,
            split_type=batch_transform_split_type,
            compression_type=batch_transform_compression_type,
            output_location=batch_transform_ouput
        ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

In [106]:
kfp.compiler.Compiler().compile(mnist_classification, 'mnist-classification-pipeline-v2.zip')

In [107]:
kf_run_name = f'mnist-classification-pipeline-{datetime.datetime.now():%Y%m%d%H%M%S}'
print(kf_run_name)

mnist-classification-pipeline-20200816140706


In [108]:
client = kfp.Client()
aws_experiment = client.create_experiment(name='aws')
my_run = client.run_pipeline(aws_experiment.id, kf_run_name, 'mnist-classification-pipeline-v2.zip')

## 2.2. Abstracted version

In this version, data scientists only need to add a few parameters specific to what they want to do, such as whether of not to add hyperparameter training job, path to S3 bucket, etc, and nothing else.

In [1]:
from kfabstraction import create_abstracted_kf_pipeline

In [2]:
number_of_HPO_runs = 10
deploy_model = False
add_batch_transform = False
add_HPO = False
S3_BUCKET = 'elvupwxbctfruhrb3403662892282909-kubeflow-pipeline-data'
prefix = 'mnist_kmeans_example'

In [3]:
create_abstracted_kf_pipeline(add_HPO, add_batch_transform, \
                              number_of_HPO_runs, deploy_model, f's3://{S3_BUCKET}/{prefix}')



The name of the run is: mnist-classification-pipeline-20200816162007
