# Kubeflow Pipeline Example
The following notebook demonstrates the process of creating a Kubeflow Pipeline that trains and deploys a K-Means clustering algorithm on Amazon SageMaker infrastructure. The K-Means algorithm is trained on the MNIST dataset using SageMaker hyper-parameter tuning and batch transform jobs.

Once the model has been trained sufficiently, it will be deployed as a model endpoint using on SageMaker. Inference can then be performed using this endpoint.

The training, deployment and inference process described in this notebook has 4 steps:
1. *S3 Bucket Creation & Data Upload*
2. *Kubeflow Pipeline Creation*
3. *Kubeflow Pipeline Deployment*
4. *K-Means Model Endpoint Inference*

Before proceeding, we will install the Kubeflow Pipelines SDK, `kfp`.

In [2]:
# Installs the Kubeflow Pipelines (kfp) package
!python3 -m pip install kfp kfp-server-api --upgrade
!pip show kfp

### 1. S3 Bucket Creation & Data Upload
In this section, we first create an S3 bucket that will store the data needed to train the K-Means algorithm, which will be MNIST. The MNIST dataset is downloaded from Amazon's SageMaker sample datasets S3 bucket. The dataset is unzipped, and split out into train, validation and test set folders.

Before these folders are uploaded to the S3 bucket that we created, some data pre-processing must occur. More specifically the numpy arrays that describe the MNIST features will be converted to dense tensors are required by SageMaker's K-Means implementation.

In [3]:
import random, string
import boto3, gzip, pickle
import numpy, urllib.request, json, io
from urllib.parse import urlparse
from sagemaker.amazon.common import write_numpy_to_dense_tensor

In [4]:
from botocore.config import Config

# Sets the AWS region
AWS_REGION = 'eu-west-1'

boto3_config = Config(region_name=AWS_REGION)

# Initializes the S3 client
s3_client = boto3.client('s3', config=boto3_config)

In [5]:
# Creates a random hash to prefix to S3 bucket name
HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(3)] + [random.choice(string.digits) for n in range(3)])

# Creates the S3 bucket name
S3_BUCKET = '{}-kubeflow-pipeline-data-bucket'.format(HASH)

# Creates the S3 bucket
s3_client.create_bucket(Bucket=S3_BUCKET, 
                        CreateBucketConfiguration={
                            'LocationConstraint': AWS_REGION
                        })

print('S3 Bucket:', S3_BUCKET)

S3 Bucket: ofw751-kubeflow-pipeline-data-bucket


In [6]:
# Downloads the MNIST data from S3
s3_client.download_file(f"sagemaker-sample-data-{AWS_REGION}", 
                        "algorithms/kmeans/mnist/mnist.pkl.gz", 
                        "mnist.pkl.gz")

mnist_data = gzip.open('mnist.pkl.gz', 'rb')
train_set, valid_set, test_set = pickle.load(mnist_data, encoding='latin1')
mnist_data.close()

In [7]:
# Defines the train data key & the S3 path
train_data_key = 'mnist_kmeans_example/train_data'
train_data_location = 's3://{}/{}'.format(S3_BUCKET, train_data_key)
print('Training data will be uploaded to: {}'.format(train_data_location))

# Defines the test data key & the S3 path
test_data_key = 'mnist_kmeans_example/test_data'
test_data_location = 's3://{}/{}'.format(S3_BUCKET, test_data_key)
print('\nTest data will be uploaded to: {}'.format(test_data_location))

# Converts the train data from numpy array to dense tensor (required by SageMaker K-Means algorithm)
buf = io.BytesIO()
write_numpy_to_dense_tensor(buf, train_set[0], train_set[1])
buf.seek(0)
boto3.resource('s3').Bucket(S3_BUCKET).Object(train_data_key).upload_fileobj(buf)

# Converts the train data from numpy array to dense tensor (required by SageMaker K-Means algorithm)
write_numpy_to_dense_tensor(buf, test_set[0], test_set[1])
buf.seek(0)
boto3.resource('s3').Bucket(S3_BUCKET).Object(test_data_key).upload_fileobj(buf)

numpy.savetxt('valid-data.csv', 
              valid_set[0], 
              delimiter=',', 
              fmt='%g')

input_key = "{}/valid_data.csv".format("mnist_kmeans_example/input")

# Uploads the validation data file to the S3 bucket
s3_client.upload_file('valid-data.csv', S3_BUCKET, input_key)

Training data will be uploaded to: s3://ofw751-kubeflow-pipeline-data-bucket/mnist_kmeans_example/train_data

Test data will be uploaded to: s3://ofw751-kubeflow-pipeline-data-bucket/mnist_kmeans_example/test_data


### 2. Kubeflow Pipeline Creation
Now that the MNIST data is ready and has been uploaded to the S3 bucket, the Kubeflow Pipeline can be created. The Kubeflow Pipeline will use the following resource components provided by SageMaker:
- SageMaker Training Job component
- SageMaker Hyper-parameter Tuning component
- SageMaker Batch Transform Job component
- SageMaker Model component
- SageMaker Model Endpoint component

The SageMaker K-Means algorithm is provided by AWS as a pre-built algorithm in the form of a Docker image which can be run on SageMaker as a container.

In [8]:
import kfp
from kfp import components, dsl

In [9]:
# SageMaker Training Job component
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.1.1-beta.1/components/aws/sagemaker/train/component.yaml')

# SageMaker Batch Transform Job component
sagemaker_batch_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.1.1-beta.1/components/aws/sagemaker/batch_transform/component.yaml')

# SageMaker Hyper-parameter Tuning component
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.1.1-beta.1/components/aws/sagemaker/hyperparameter_tuning/component.yaml')

# SageMaker Model component
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.1.1-beta.1/components/aws/sagemaker/model/component.yaml')

# SageMaker Model Endpoint component
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.1.1-beta.1/components/aws/sagemaker/deploy/component.yaml')

The following list describes the regional addresses for the Amazon Elastic Container Registry (ECR) images that provide the SageMaker K-Means algorithm. In this example, we will use the image available in the `eu-west-1` (Irish) region.

|Region| ECR Image|
|------|----------|
|us-west-1|632365934929.dkr.ecr.us-west-1.amazonaws.com|
|us-west-2|174872318107.dkr.ecr.us-west-2.amazonaws.com|
|us-east-1|382416733822.dkr.ecr.us-east-1.amazonaws.com|
|us-east-2|404615174143.dkr.ecr.us-east-2.amazonaws.com|
|us-gov-west-1|226302683700.dkr.ecr.us-gov-west-1.amazonaws.com|
|ap-east-1|286214385809.dkr.ecr.ap-east-1.amazonaws.com|
|ap-northeast-1|351501993468.dkr.ecr.ap-northeast-1.amazonaws.com|
|ap-northeast-2|835164637446.dkr.ecr.ap-northeast-2.amazonaws.com|
|ap-south-1|991648021394.dkr.ecr.ap-south-1.amazonaws.com|
|ap-southeast-1|475088953585.dkr.ecr.ap-southeast-1.amazonaws.com|
|ap-southeast-2|712309505854.dkr.ecr.ap-southeast-2.amazonaws.com|
|ca-central-1|469771592824.dkr.ecr.ca-central-1.amazonaws.com|
|eu-central-1|664544806723.dkr.ecr.eu-central-1.amazonaws.com|
|eu-north-1|669576153137.dkr.ecr.eu-north-1.amazonaws.com|
|eu-west-1|438346466558.dkr.ecr.eu-west-1.amazonaws.com|
|eu-west-2|644912444149.dkr.ecr.eu-west-2.amazonaws.com|
|eu-west-3|749696950732.dkr.ecr.eu-west-3.amazonaws.com|
|me-south-1|249704162688.dkr.ecr.me-south-1.amazonaws.com|
|sa-east-1|855470959533.dkr.ecr.sa-east-1.amazonaws.com|

In [10]:
# Specifies the appropriate ECR image
KMEANS_ECR_IMAGE = '438346466558.dkr.ecr.eu-west-1.amazonaws.com'

# Creates the S3 pipeline path
S3_PIPELINE_PATH = 's3://{}/mnist_kmeans_example'.format(S3_BUCKET)

# Specifies the SageMaker Execution Role ARN
SAGEMAKER_ROLE_ARN = 'arn:aws:iam::920198949818:role/kubeflow-pipeline-sagemaker-role'

In [11]:
@dsl.pipeline(
    name='kmeans-mnist-classification-pipeline',
    description='K-Means MNIST Classification Pipeline'
)

def kmeans_mnist_classification_pipeline(region=AWS_REGION,
    image=KMEANS_ECR_IMAGE+'/kmeans:1',
    training_input_mode='File',
    hpo_strategy='Bayesian',
    hpo_metric_name='test:msd',
    hpo_metric_type='Minimize',
    hpo_early_stopping_type='Off',
    hpo_static_parameters='{"k": "10", "feature_dim": "784"}',
    hpo_integer_parameters='[{"Name": "mini_batch_size", "MinValue": "500", "MaxValue": "600"}, {"Name": "extra_center_factor", "MinValue": "10", "MaxValue": "20"}]',
    hpo_continuous_parameters='[]',
    hpo_categorical_parameters='[{"Name": "init_method", "Values": ["random", "kmeans++"]}]',
    hpo_channels='[{"ChannelName": "train", \
                "DataSource": { \
                    "S3DataSource": { \
                        "S3Uri": "' + S3_PIPELINE_PATH + '/train_data",  \
                        "S3DataType": "S3Prefix", \
                        "S3DataDistributionType": "FullyReplicated" \
                        } \
                    }, \
                "ContentType": "", \
                "CompressionType": "None", \
                "RecordWrapperType": "None", \
                "InputMode": "File"}, \
               {"ChannelName": "test", \
                "DataSource": { \
                    "S3DataSource": { \
                        "S3Uri": "' + S3_PIPELINE_PATH + '/test_data", \
                        "S3DataType": "S3Prefix", \
                        "S3DataDistributionType": "FullyReplicated" \
                        } \
                    }, \
                "ContentType": "", \
                "CompressionType": "None", \
                "RecordWrapperType": "None", \
                "InputMode": "File"}]',
    hpo_spot_instance='False',
    hpo_max_wait_time='3600',
    hpo_checkpoint_config='{}',
    output_location=S3_PIPELINE_PATH + '/output',
    output_encryption_key='',
    instance_type='ml.p3.2xlarge',
    instance_count='1',
    volume_size='50',
    hpo_max_num_jobs='9',
    hpo_max_parallel_jobs='2',
    max_run_time='3600',
    endpoint_url='',
    network_isolation='True',
    traffic_encryption='False',
    train_channels='[{"ChannelName": "train", \
                "DataSource": { \
                    "S3DataSource": { \
                        "S3Uri": "' + S3_PIPELINE_PATH + '/train_data",  \
                        "S3DataType": "S3Prefix", \
                        "S3DataDistributionType": "FullyReplicated" \
                        } \
                    }, \
                "ContentType": "", \
                "CompressionType": "None", \
                "RecordWrapperType": "None", \
                "InputMode": "File"}]',
    train_spot_instance='False',
    train_max_wait_time='3600',
    train_checkpoint_config='{}',
    batch_transform_instance_type='ml.m4.xlarge',
    batch_transform_input=S3_PIPELINE_PATH + '/input',
    batch_transform_data_type='S3Prefix',
    batch_transform_content_type='text/csv',
    batch_transform_compression_type='None',
    batch_transform_ouput=S3_PIPELINE_PATH + '/output',
    batch_transform_max_concurrent='4',
    batch_transform_max_payload='6',
    batch_strategy='MultiRecord',
    batch_transform_split_type='Line',
    role_arn=SAGEMAKER_ROLE_ARN
    ):

    hpo = sagemaker_hpo_op(
        region=region,
        endpoint_url=endpoint_url,
        image=image,
        training_input_mode=training_input_mode,
        strategy=hpo_strategy,
        metric_name=hpo_metric_name,
        metric_type=hpo_metric_type,
        early_stopping_type=hpo_early_stopping_type,
        static_parameters=hpo_static_parameters,
        integer_parameters=hpo_integer_parameters,
        continuous_parameters=hpo_continuous_parameters,
        categorical_parameters=hpo_categorical_parameters,
        channels=hpo_channels,
        output_location=output_location,
        output_encryption_key=output_encryption_key,
        instance_type=instance_type,
        instance_count=instance_count,
        volume_size=volume_size,
        max_num_jobs=hpo_max_num_jobs,
        max_parallel_jobs=hpo_max_parallel_jobs,
        max_run_time=max_run_time,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=hpo_spot_instance,
        max_wait_time=hpo_max_wait_time,
        checkpoint_config=hpo_checkpoint_config,
        role=role_arn,
    )

    training = sagemaker_train_op(
        region=region,
        endpoint_url=endpoint_url,
        image=image,
        training_input_mode=training_input_mode,
        hyperparameters=hpo.outputs['best_hyperparameters'],
        channels=train_channels,
        instance_type=instance_type,
        instance_count=instance_count,
        volume_size=volume_size,
        max_run_time=max_run_time,
        model_artifact_path=output_location,
        output_encryption_key=output_encryption_key,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=train_spot_instance,
        max_wait_time=train_max_wait_time,
        checkpoint_config=train_checkpoint_config,
        role=role_arn,
    )

    create_model = sagemaker_model_op(
        region=region,
        endpoint_url=endpoint_url,
        model_name=training.outputs['job_name'],
        image=training.outputs['training_image'],
        model_artifact_url=training.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        role=role_arn
    )

    prediction = sagemaker_deploy_op(
        region=region,
        endpoint_url=endpoint_url,
        model_name_1=create_model.output,
    )

    batch_transform = sagemaker_batch_transform_op(
        region=region,
        endpoint_url=endpoint_url,
        model_name=create_model.output,
        instance_type=batch_transform_instance_type,
        instance_count=instance_count,
        max_concurrent=batch_transform_max_concurrent,
        max_payload=batch_transform_max_payload,
        batch_strategy=batch_strategy,
        input_location=batch_transform_input,
        data_type=batch_transform_data_type,
        content_type=batch_transform_content_type,
        split_type=batch_transform_split_type,
        compression_type=batch_transform_compression_type,
        output_location=batch_transform_ouput
    )

### 3. Kubeflow Pipeline Deployment
In v1.1.0 of Kubeflow, in-cluster communication from the notebook server to the Kubeflow Pipeline is not currently supported. A workaround is to pass a session cookie to the `kfp` SDK so that it can communicate with the cluster. More documentation is provided [here](https://www.kubeflow.org/docs/aws/pipeline/).

In the meantime, the simplest thing to do is compile the Kubeflow Pipeline in the notebook which will generate a zip file called `kmeans-mnist-classification-pipeline.zip`. This zip file can be used to manually deploy the Pipeline in Kubeflow Console by following these steps:
1. Run the following cell to compile the Kubeflow Pipeline and you should see a `kmeans-mnist-classification-pipeline.zip` file appear in Jupyter.
2. Download the `kmeans-mnist-classification-pipeline.zip` file to your local machine.
3. In the Kubeflow Console, click the *Pipelines* button.
4. In the Pipelines UI, click the *Upload Pipeline button* on the top right.
5. For *Pipeline Name*, type `kmeans-mnist-classification-pipeline`.
6. Select the *Upload Pipeline* button and upload the `kmeans-mnist-classification-pipeline.zip` file from your Downloads folder and click the *Create* button.
7. Now that the Pipeline is ready, an *Experiment* can be created by clicking the *Create experiment* button.
8. For the *Experiment name*, type `kmeans_mnist_classification_experiment` and click the *Next* button and you will be re-directed to the *Start a run* view where you can initialise an Experiment Run.
9. Experiment names must be unique so if that name already exists, modify it to make it unique.
10. In the *Start a run* view, choose the `kmeans-mnist-classification-pipeline` Pipeline that you previously created.
11. For the *Run name*, type `kmeans-mnist-classification-pipeline-run`.
12. Leave the remaining defaults unchanged and click the *Start* button.
13. In the *Runs* view, click the Run that you have just created and you should see the directed acyclic graph (DAG) that visually represents the Kubeflow Pipeline that you have created.
14. The run will take around 15-20 minutes to complete.

In [12]:
# Compiles the Kubeflow Pipeline
kfp.compiler.Compiler().compile(kmeans_mnist_classification_pipeline, 
                                'kmeans-mnist-classification-pipeline.zip')

**You can skip the following cell.**

In [None]:
# alb_cookie_content='<cookie-content-here>'

# authservice_session='authservice_session='+alb_cookie_content

# alb_dns = 'http://8cd1d44c-istiosystem-istio-2af2-1274113137.eu-west-1.elb.amazonaws.com'

# client = kfp.Client(host=alb_dns+'/pipeline', 
#                     cookies=authservice_session)

# # Creates a Kubeflow Experiment
# kmeans_mnist_classification_experiment = client.create_experiment(name='kmeans_mnist_classification_experiment', 
#                                                                   namespace='kflow-user')

# # Runs the Kubeflow Pipeline
# kmeans_mnist_classification_pipeline_run = client.run_pipeline(kmeans_mnist_classification_experiment.id,
#                                                                'kmeans-mnist-classification-pipeline-run', 
#                                                                'kmeans-mnist-classification-pipeline-run.zip')

### 4. K-Means Model Endpoint Inference
Once the Experiment Run has successfully completed, an Amazon SageMaker Model Endpoint should have been successfully deployed.

In [14]:
# Initializes the SageMaker client
sagemaker_client = boto3.client('sagemaker', 
                                config=boto3_config)

In [15]:
# Parses the SageMaker Model Endpoint name
kmeans_model_endpoint = sagemaker_client.list_endpoints()['Endpoints'][0]['EndpointName']
print('K-Means Model Endpoint:', kmeans_model_endpoint)

K-Means Model Endpoint: Endpoint20201217142527-L123


In [16]:
def np2csv(arr):
    '''Creates a CSV file from the Numpy array'''
    csv = io.BytesIO()
    numpy.savetxt(csv, arr, delimiter=',', fmt='%g')
    return csv.getvalue().decode().rstrip()

In [17]:
# Initialises the SageMaker runtime client
sagemaker_runtime = boto3.client('sagemaker-runtime',
                                 config=boto3_config)

# Invokes the np2csv function to create a payload to perform inference on
payload = np2csv(train_set[0][30:31])

# Invokes the SageMaker Model Endpoint and captures the response
kmeans_model_endpoint_response = sagemaker_runtime.invoke_endpoint(EndpointName=kmeans_model_endpoint,
                                                      ContentType='text/csv',
                                                      Body=payload)

prediction = json.loads(kmeans_model_endpoint_response['Body'].read().decode())
print(prediction)

{'predictions': [{'distance_to_cluster': 7.2547078132629395, 'closest_cluster': 5.0}]}


### Resource Cleanup
The following resources should be deleted once you have finished with the notebook:
- SageMaker Model Endpoint
- SageMaker Model Endpoint Configuration
- SageMaker Model
- S3 Bucket

In [18]:
# Parses the SageMaker Model name
kmeans_model = sagemaker_client.list_models()['Models'][0]['ModelName']
print('K-Means Model:', kmeans_model)

K-Means Model: TrainingJob-20201217142147-IDXG


In [19]:
# Parses the SageMaker Model Endpoint Config name
kmeans_model_endpoint_config = sagemaker_client.describe_endpoint(EndpointName=kmeans_model_endpoint)['EndpointConfigName']
print('K-Means Model Endpoint Config:', kmeans_model_endpoint_config)

K-Means Model Endpoint Config: EndpointConfig20201217142527-L123


In [20]:
# Deletes the SageMaker Model Endpoint
sagemaker_client.delete_endpoint(EndpointName=kmeans_model_endpoint)

{'ResponseMetadata': {'RequestId': '233e427a-4293-4c32-aa3a-56d10842090a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '233e427a-4293-4c32-aa3a-56d10842090a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Thu, 17 Dec 2020 14:56:41 GMT'},
  'RetryAttempts': 0}}

In [21]:
# Deletes the SageMaker Model Endpoint Config
sagemaker_client.delete_endpoint_config(EndpointConfigName=kmeans_model_endpoint_config)

{'ResponseMetadata': {'RequestId': '11ab6a20-4fd7-4a3b-bd07-83105bdaaac3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '11ab6a20-4fd7-4a3b-bd07-83105bdaaac3',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Thu, 17 Dec 2020 14:56:44 GMT'},
  'RetryAttempts': 0}}

In [22]:
# Deletes the SageMaker Model
sagemaker_client.delete_model(ModelName=kmeans_model)

{'ResponseMetadata': {'RequestId': 'e1e8a66b-2605-4776-8838-e10c4799dd57',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e1e8a66b-2605-4776-8838-e10c4799dd57',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Thu, 17 Dec 2020 14:56:45 GMT'},
  'RetryAttempts': 0}}

In [23]:
mnist_data_bucket = boto3.resource('s3').Bucket(S3_BUCKET)

# Deletes all of the objects in S3 bucket + deletes the bucket
mnist_data_bucket.objects.all().delete()
mnist_data_bucket.delete()

{'ResponseMetadata': {'RequestId': '46412DA4034226AB',
  'HostId': '3ywwaHCDSSQ/UWr6AZI1/iTb5DasrbDUmSf6u92UiOeRb5woIJzv+F1+tlRx+kucAKwLKR5A6JM=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': '3ywwaHCDSSQ/UWr6AZI1/iTb5DasrbDUmSf6u92UiOeRb5woIJzv+F1+tlRx+kucAKwLKR5A6JM=',
   'x-amz-request-id': '46412DA4034226AB',
   'date': 'Thu, 17 Dec 2020 14:56:50 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}