## Xgboost Mnist classification pipeline

### Prerequisite

* Install Sagemaker, kfp and boto3 sdk. 

> Note: Be sure to use specified KFP SDK version in this notebook.

In [None]:
# !pip install kfp==1.8.13
import kfp
kfp.__version__

'1.8.13'

In [2]:
# !pip install boto3==1.26.109
import boto3
boto3.__version__

'1.26.109'

In [3]:
# !pip install sagemaker==2.146.0
import sagemaker
sagemaker.__version__

'2.145.0'

### Sagemaker IAM Role creation

* In order to run this pipeline, we need two levels of IAM permissions. One is Sagemaker user and another is sagemaker execution role. Below are the steps to obtain all required permissions.
Go to your cloud9 console or terminal with aws cli configured.  

> a) Create a IAM user
```bash
aws iam create-user --user-name sagemaker-kfp-user
aws iam attach-user-policy --user-name sagemaker-kfp-user --policy-arn arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
```
> b) Create an access key and export them as env variable  
```bash
aws iam create-access-key --user-name sagemaker-kfp-user > ~/environment/create_output.json
export AWS_ACCESS_KEY_ID_VALUE=$(jq -j .AccessKey.AccessKeyId ~/environment/create_output.json | base64)
export AWS_SECRET_ACCESS_KEY_VALUE=$(jq -j .AccessKey.SecretAccessKey ~/environment/create_output.json | base64)
```
> c) Create an IAM execution role for Sagemaker and S3 so that the job can assume this role in order to perform Sagemaker and S3 actions. Make a note of this role as you will need it during pipeline creation step
```bash
TRUST="{ \"Version\": \"2012-10-17\", \"Statement\": [ { \"Effect\": \"Allow\", \"Principal\": { \"Service\": \"sagemaker.amazonaws.com\" }, \"Action\": \"sts:AssumeRole\" } ] }"
aws iam create-role --role-name sagemaker-kfp-role --assume-role-policy-document "$TRUST"
aws iam attach-role-policy --role-name sagemaker-kfp-role --policy-arn arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
aws iam attach-role-policy --role-name sagemaker-kfp-role --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
aws iam get-role --role-name sagemaker-kfp-role --output text --query 'Role.Arn'

```
> d) Create Kubernetes secrets **aws-secret** with Sagemaker and S3 policies. Please make sure to create `aws-secret` in kubeflow user namespace.

```bash
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: aws-secret
  namespace: kubeflow-user-example-com
type: Opaque
data:
  AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID_VALUE
  AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY_VALUE
EOF
```
> e) Let’s assign sagemaker:InvokeEndpoint permission to Worker node IAM role
```bash
cat <<EoF > ~/environment/sagemaker-invoke.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:InvokeEndpoint"
            ],
            "Resource": "*"
        }
    ]
}
EoF
aws iam put-role-policy --role-name sagemaker-kfp-role --policy-name sagemaker-invoke-for-worker --policy-document file://~/environment/sagemaker-invoke.json
```
References: https://archive.eksworkshop.com/advanced/420_kubeflow/pipelines/

### Connect to Kubeflow pipeline endpoint

1. Importing Libraries

In [4]:
import kfp
import time
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret
from kfp.components import load_component_from_file, create_component_from_func

2. Initialise Kubeflow pipeline client

In [5]:
# crediantials are required if kubeflow is deployed for multi-tenancy
credentials = kfp.auth.ServiceAccountTokenVolumeCredentials(path='/var/run/secrets/kubeflow/pipelines/token')
client = kfp.Client(host="http://your-ml-pipeline-endpoint:8888", credentials=credentials)

In [6]:
experiment_name = 'sagemaker-kfp'

client.create_experiment(name=experiment_name,
                        description='Kubeflow pipeline with sagemaker',
                        namespace='kubeflow-user-example-com')

{'created_at': datetime.datetime(2023, 4, 17, 9, 18, 10, tzinfo=tzlocal()),
 'description': 'Kubeflow pipeline with sagemaker',
 'id': '47062046-4693-4a98-be61-10ee9c024732',
 'name': 'sagemaker-kfp',
 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',
                                  'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_AVAILABLE'}

### Get the Xgboost Model URI and Define the constants

3. Get the uri of the XGBoost Model from AWS Public ECR

In [7]:
from sagemaker import image_uris
image_uri = image_uris.retrieve(framework='xgboost',region='us-east-1', version="1.5-1")
print(image_uri)

683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.5-1


4. Defining the constants

In [8]:
S3_BUCKET = "kfp-sagemaker-example" 
S3_PIPELINE_PATH=f's3://{S3_BUCKET}/xgboost/pipeline_test'
# Configure your Sagemaker execution role.
SAGEMAKER_ROLE_ARN='arn:aws:iam::XXXXXXXXXXXX:role/sagemaker-kfp-role'
PREFIX = "xgboost/pipeline_test"
MNIST_DATA = "mnist_data/mnist.pkl.gz"
XGBOOST_IMAGE = "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.5-1"
MODEL_NAME = 'Xgboost-mnist-' + time.strftime("%Y-%b-%d-%H-%M-%S", time.gmtime())
BASE_IMAGE = 'ashwin456/python:3.10-slim-boto3' 

### Download the Sample Mnist data and Upload to s3

5. Data ingestion  

  Download the Mnist data from this [link](https://github.com/mnielsen/neural-networks-and-deep-learning/raw/master/data/mnist.pkl.gz) and upload it s3 bucket so that the kfp component can access it.

In [9]:
import pickle, gzip, numpy, urllib.request, json
from urllib.parse import urlparse

# Download the dataset
#urllib.request.urlretrieve("https://github.com/mnielsen/neural-networks-and-deep-learning/raw/master/data/mnist.pkl.gz", "mnist.pkl.gz")

In [10]:
def upload_data_s3(file_path: str, s3_bucket: str, key: str):
    import boto3
    
 # if credentials are not in the environmnet, provide them manually
    s3 = boto3.client('s3', region_name='us-east-1')
    s3.create_bucket(Bucket=s3_bucket)

    with open(file_path, 'rb') as data:
        s3.upload_fileobj(data, s3_bucket, key)

upload_data_s3("./mnist.pkl.gz", S3_BUCKET, MNIST_DATA)

### Data Set Preprocessing

6. Define the function that fetches the mnist data from s3 and processes it.

In [11]:
def prepare_data(mnist_data:str, s3_bucket:str, prefix:str):
    """
        Fetches the MNIST Data and processes it to produce train, test and validation set &
        Uploads the prepared data to s3.
        
        Args:
            mnist_data: str = Takes the file path in the s3 format "folder/filename"
            s3_bucket:str = Name of the bucket
            prefix: str = Name of the folder in which processed data should be uploaded
            
        Returns: None
            
    """
    #pending to clean up s3 download and upload code
    import pickle
    import gzip
    import io
    import boto3
    import botocore
    
    # Downloading the data
    session = boto3.Session()
    s3 = boto3.resource("s3", region_name='us-east-1')
    mnist_file = mnist_data.split(sep='/')[1]
    try:
        s3.Bucket(s3_bucket).download_file(mnist_data, mnist_file)
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "404":
            print(f"The object does not exist at {url}.")
        else:
            raise
    
    # loading 
    with gzip.open(mnist_file, "rb") as f:
        u = pickle._Unpickler(f)
        u.encoding = "latin1"
        train_set, valid_set, test_set = u.load() 

    # converting data to libsvm format to get training compatibilty of xgboost ecr image
    
    partitions = [("train", train_set), ("validation", valid_set), ("test", test_set)]
    for partition_name, partition in partitions:
        print(f"{partition_name}: {partition[0].shape} {partition[1].shape}")
        labels = [t.tolist() for t in partition[1]]
        vectors = [t.tolist() for t in partition[0]]
        num_partition = 5  # partition file into 5 parts
        partition_bound = int(len(labels) / num_partition)
        for i in range(num_partition):
            f = io.BytesIO()
            lab = labels[i * partition_bound : (i + 1) * partition_bound]
            val = vectors[i * partition_bound : (i + 1) * partition_bound]
            f.write(
                bytes(
                    "\n".join(
                        ["{} {}".format(label, " ".join(["{}:{}".format(i + 1, el) for i, el in enumerate(vec)]))
                            for label, vec in zip(lab, val)]
                    ),
                    "utf-8",
                )
            )
            f.seek(0)
            key = f"{prefix}/{partition_name}/examples{str(i)}"
            url = f"s3://{s3_bucket}/{key}"
            print(f"Writing to {url}")
            # uploading the converted file to s3
            session.resource("s3").Bucket(s3_bucket).Object(key).upload_fileobj(f)
            print(f"Done writing to {url}")

In [12]:
# Hyperparameters specific to XGBoost Model
hyperparameters = {
                   "eta": "0.2", 
                   "gamma": "4", 
                   "max_depth": "5", 
                   "min_child_weight": "6", 
                   "num_class": "10", 
                   "num_round": "10", 
                   "objective": "multi:softmax", 
                   "verbosity": "0"
    }

# specifing the channels(data) for training 
train_channels =[
    {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"s3://{S3_BUCKET}/{PREFIX}/train/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"s3://{S3_BUCKET}/{PREFIX}/validation/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None",
        },
    ]

### Download Sagemaker components and initialise

8. Creating reusable components from kubeflow pipeline sagemaker. Download from here 
[sagemaker-components](https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker)

In [13]:
#!wget -O train.yaml https://github.com/kubeflow/pipelines/blob/master/components/aws/sagemaker/TrainingJob/component.yaml
#!wget -O create_model.yaml https://github.com/kubeflow/pipelines/blob/master/components/aws/sagemaker/model/component.yaml 
#!wget -O deploy.yaml https://github.com/kubeflow/pipelines/blob/master/components/aws/sagemaker/deploy/component.yaml 

In [14]:
sagemaker_train_op = load_component_from_file("./train.yaml")
sagemaker_model_op = load_component_from_file("./create_model.yaml")
sagemaker_deploy_op = load_component_from_file("./deploy.yaml")

9. Initialise the components of our functions

In [15]:
prepare_data_op = create_component_from_func(func=prepare_data, base_image=BASE_IMAGE)

### Creating the Pipeline

10. Create pipeline defining all the components

In [16]:
@dsl.pipeline(
    name='MNIST Classification pipeline',
    description='MNIST Classification using XGBoost in SageMaker'
)
def xgboost_mnist_classification(
    endpoint_url='',
    region='us-east-1',
    network_isolation="True"
):

    prepare_data_task = prepare_data_op(
        MNIST_DATA, 
        S3_BUCKET, 
        PREFIX
    )
    
    training_task = sagemaker_train_op(
        region=region,
        endpoint_url="",
        job_name=MODEL_NAME,
        image=XGBOOST_IMAGE,
        training_input_mode="File",
        hyperparameters=hyperparameters,
        channels=train_channels,
        instance_type="ml.m4.xlarge",
        instance_count="1",
        volume_size="6",
        max_run_time="3600",
        model_artifact_path=S3_PIPELINE_PATH + "/output",
        output_encryption_key="",
        network_isolation=network_isolation,
        traffic_encryption="False",
        spot_instance="False",
        max_wait_time="3600",
        role=SAGEMAKER_ROLE_ARN,
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))
    
    training_task.after(prepare_data_task)
    
    create_model_task = sagemaker_model_op(
        region=region,
        endpoint_url=endpoint_url,
        model_name=training_task.outputs['job_name'],
        image=training_task.outputs['training_image'],
        model_artifact_url=training_task.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        vpc_subnets="",
        vpc_security_group_ids="",
        role=SAGEMAKER_ROLE_ARN
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    sagemaker_deploy = sagemaker_deploy_op(
        region=region,
        endpoint_url=endpoint_url,
        model_name_1=create_model_task.output
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

### Compile and run the the pipeline

11. Compile your pipeline

In [17]:
kfp.compiler.Compiler().compile(xgboost_mnist_classification, 'xgboost.zip')

12. Deploy your pipeline

In [18]:
# client = kfp.Client()
aws_experiment = client.create_experiment(name=experiment_name)
my_run = client.run_pipeline(aws_experiment.id, 'Xgboost-mnist', 
  'xgboost.zip')

### Testing our endpoint with sample input

In [19]:
with gzip.open('mnist.pkl.gz', 'rb') as f:
    train_set, valid_set, test_set = pickle.load(f, encoding='latin1')

In [21]:
import pickle, gzip, numpy, urllib.request, json
from urllib.parse import urlparse
import json
import io
import boto3

# Replace the endpoint name with yours.
ENDPOINT_NAME='Endpoint20230418182829-1YRI'
ENDPOINT_CONFIG="EndpointConfig20230418182829-1YRI"
# We will use the same dataset that was downloaded at the beginning of the notebook.

# Simple function to create a csv from our numpy array
def np2csv(arr):
    csv = io.BytesIO()
    numpy.savetxt(csv, arr, delimiter=',', fmt='%g')
    return csv.getvalue().decode().rstrip()

runtime = boto3.Session(region_name='us-east-1').client('sagemaker-runtime')

payload = np2csv(train_set[0][30:31])

response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                   ContentType='text/csv',
                                   Body=payload)
result = json.loads(response['Body'].read().decode())
print(f"Input: {train_set[1][31:32]}, Predicted: {result}")

Input: [8], Predicted: 8.0


### Clean up 

#### Delete the sagemaker endpoint, endpoint_config, and model

In [22]:
sm = boto3.Session(region_name='us-east-1').client("sagemaker")

In [23]:
sm.delete_endpoint_config(EndpointConfigName=ENDPOINT_CONFIG)
sm.delete_endpoint(EndpointName=ENDPOINT_NAME)
sm.delete_model(ModelName=MODEL_NAME)

{'ResponseMetadata': {'RequestId': '4a07b19c-d00e-4a0e-9f6e-b7c65ad6a949',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4a07b19c-d00e-4a0e-9f6e-b7c65ad6a949',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Tue, 18 Apr 2023 18:38:25 GMT'},
  'RetryAttempts': 0}}

In [None]:
export S3_BUCKET=S3_BUCKET
!aws s3 rb s3://$S3_BUCKET --force