# Create a Kubeflow Pipeline with BERT and Amazon SageMaker

In this notebook, we'll be creating a machine learning pipeline using Kubeflow and Amazon SageMaker. Our pipeline will utilize the BERT model for sentiment analysis.

## Install Dependencies

First, we need to install some dependencies. These include:

- **SageMaker Python SDK**: This SDK makes it easy to train and deploy models using Amazon SageMaker.
- **Kubeflow Pipelines SDK**: This SDK allows us to work with Kubeflow Pipelines.
- **AWS CLI**: This gives us a way to interact with AWS services from the command line.

We can install these dependencies using pip:

In [None]:
!pip install sagemaker==1.72.0
!pip install https://storage.googleapis.com/ml-pipeline/release/0.1.29/kfp.tar.gz --upgrade
!pip install awscli==1.18.140

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

## Setup Environment Variables

In this section, we'll set up some environment variables that we'll use throughout the notebook. These include:

- **Region**: The AWS region that we'll be using for SageMaker and other AWS services.
- **Account ID**: Our AWS account ID.
- **Bucket**: The name of the S3 bucket that we'll be using for storing data.
- **Role**: The IAM role that we'll be using for SageMaker.

We can get these values using the boto3 library, which allows us to interact with AWS services using Python.


In [None]:
import boto3

# Get the AWS region
aws_region_as_slist=!curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/'
region = aws_region_as_slist.s
print('Region: {}'.format(region))

# Get the AWS account ID
account_id=boto3.client('sts').get_caller_identity().get('Account')
print('Account ID: {}'.format(account_id))

# Define the S3 bucket name
bucket='sagemaker-{}-{}'.format(region, account_id)
print('S3 Bucket: {}'.format(bucket))

# Get the IAM role for SageMaker
iam_roles = boto3.client("iam").list_roles()["Roles"]
for iam_role in iam_roles:
    if "SageMakerExecutionRole" in iam_role["RoleName"]:
        role = iam_role["Arn"]
        break
print("Role: {}".format(role))

## Copy Data from Public S3 to Private S3

We're going to copy some data from a public S3 bucket to a private S3 bucket. This data will be used later in our pipeline. The data consists of Amazon reviews for digital software, digital video games, and gift cards.


In [None]:
s3_public_path_tsv = "s3://amazon-reviews-pds/tsv"
s3_private_path_tsv = "s3://{}/amazon-reviews-pds/tsv".format(bucket)
print(s3_private_path_tsv)

# Copy the data from the public to the private S3 bucket
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Digital_Software_v1_00.tsv.gz"
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz"
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Gift_Card_v1_00.tsv.gz"

# Define the S3 URI for the raw input data
raw_input_data_s3_uri = "s3://{}/amazon-reviews-pds/tsv/".format(bucket)

## Build Pipeline

In this section, we're going to build our pipeline. We'll be using the Amazon SageMaker Components for Kubeflow Pipelines, which are a set of Kubeflow Pipelines (KFP) components that make it easy to use Amazon SageMaker with Kubeflow Pipelines.

The Amazon SageMaker Components for Kubeflow Pipelines include components for:

- Processing: Preprocessing data and evaluating models.
- Training: Training a model. This includes defining the training script and setting up the training environment.
- Model: Creating a model in Amazon SageMaker.
- Deploy: Deploying a model to a real-time endpoint.

For more information, you can check out the following resources:

- [Amazon SageMaker Components for Kubeflow Pipelines on GitHub](https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker)
- [Using Amazon SageMaker with Kubeflow Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/usingamazon-sagemaker-components.html)



In this section of the code, we're using the `load_component_from_url` function from the `kfp.components` module to load the Amazon SageMaker components. These components are defined in YAML files hosted on GitHub. Each component corresponds to a specific operation in Amazon SageMaker, such as processing, training, creating a model, or deploying a model.


In [None]:
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

# Load the components
sagemaker_process_op = components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/3ebd075212e0a761b982880707ec497c36a99d80/components/aws/sagemaker/process/component.yaml"
)
sagemaker_train_op = components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/3ebd075212e0a761b982880707ec497c36a99d80/components/aws/sagemaker/train/component.yaml"
)
sagemaker_model_op = components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/3ebd075212e0a761b982880707ec497c36a99d80/components/aws/sagemaker/model/component.yaml"
)
sagemaker_deploy_op = components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/3ebd075212e0a761b982880707ec497c36a99d80/components/aws/sagemaker/deploy/component.yaml"
)

## Setup Pre-Processing Code

The pre-processing code is responsible for transforming raw data into a format that the model can be trained on. In this case, we're using a script named `preprocess-scikit-text-to-bert-feature-store.py`. We'll upload this script to an S3 bucket so that it can be accessed during the processing stage of the pipeline.


In [None]:
processing_code_s3_uri = "s3://{}/processing_code/preprocess-scikit-text-to-bert-feature-store.py".format(bucket)
print(processing_code_s3_uri)

!aws s3 cp ./preprocess-scikit-text-to-bert-feature-store.py $processing_code_s3_uri

## Package and Upload Training Code to S3
Next, we'll package and upload the training code to S3. The training code is responsible for defining and training the model. We'll package the code into a tarball and then upload it to an S3 bucket.

In [None]:
!tar -cvzf sourcedir.tar.gz -C ./code .
training_code_s3_uri = "s3://{}/training_code/sourcedir.tar.gz".format(bucket)
print(training_code_s3_uri)

!aws s3 cp sourcedir.tar.gz $training_code_s3_uri

## Define Helper Functions
We'll define a few helper functions that will be used to configure the inputs and outputs for the processing and training stages of the pipeline.


In the `processing_input` function, we're defining the configuration for the input data for the processing stage. This includes the name of the input, the S3 URI where the input data is stored, the local path where the input data will be downloaded to, and the data distribution type.

The `processing_output` function is similar, but it's used to define the configuration for the output data from the processing stage.

The `training_input` function is used to define the configuration for the input data for the training stage.


In [None]:
def processing_input(input_name, s3_uri, local_path, s3_data_distribution_type):
    return {
        "InputName": input_name,
        "S3Input": {
            "LocalPath": local_path,
            "S3Uri": s3_uri,
            "S3DataType": "S3Prefix",
            "S3DataDistributionType": s3_data_distribution_type,
            "S3InputMode": "File",
        },
    }

def processing_output(output_name, s3_uri, local_path, s3_upload_mode):
    return {
        "OutputName": output_name,
        "S3Output": {"LocalPath": local_path, "S3Uri": s3_uri, "S3UploadMode": s3_upload_mode},
    }

def training_input(input_name, s3_uri, s3_data_distribution_type):
    return {
        "ChannelName": input_name,
        "DataSource": {
            "S3DataSource": {
                "S3Uri": s3_uri,
                "S3DataType": "S3Prefix",
                "S3DataDistributionType": s3_data_distribution_type,
            }
        },
    }


## Setup Pipeline

The pipeline is defined using the `@dsl.pipeline` decorator, which takes in the name and description of the pipeline. The pipeline function `bert_pipeline` takes in the role, bucket, region, and raw_input_data_s3_uri as arguments.

The pipeline consists of three main stages: feature engineering, training, and deployment.

In the feature engineering stage, we're using the sagemaker_process_op to run the pre-processing script that we uploaded to S3 earlier. The output of this stage is the processed data, which is stored in S3.

In the training stage, we're using the sagemaker_train_op to train the model. The training script and the processed data from the previous stage are used as inputs. The output of this stage is the trained model, which is stored in S3.

In the deployment stage, we're using the sagemaker_model_op to create a model from the trained model artifact, and then the sagemaker_deploy_op to deploy the model to a SageMaker endpoint.

Each stage in the pipeline is connected using the .after() method, which ensures that the stages are executed in the correct order.



In [None]:
# Define the pipeline
@dsl.pipeline(
    name="BERT Pipeline",  # Name of the pipeline
    description="BERT Pipeline",  # Description of the pipeline
)
def bert_pipeline(role=role, bucket=bucket, region=region, raw_input_data_s3_uri=raw_input_data_s3_uri):
    # Import necessary libraries
    import time
    import json

    # Define the pipeline name with a unique timestamp
    pipeline_name = "kubeflow-pipeline-sagemaker-{}".format(int(time.time()))

    # Set network isolation to False
    network_isolation = False

    ########################
    # FEATURE ENGINEERING
    ########################

    # Define parameters for feature engineering
    max_seq_length = 64
    train_split_percentage = 0.90
    validation_split_percentage = 0.05
    test_split_percentage = 0.05
    balance_dataset = True

    # Define S3 URIs for the processed data
    processed_train_data_s3_uri = "s3://{}/{}/processing/output/bert-train".format(bucket, pipeline_name)
    processed_validation_data_s3_uri = "s3://{}/{}/processing/output/bert-validation".format(bucket, pipeline_name)
    processed_test_data_s3_uri = "s3://{}/{}/processing/output/bert-test".format(bucket, pipeline_name)

    # Define instance type and count for processing
    processing_instance_type = "ml.c5.2xlarge"
    processing_instance_count = 2

    # Define feature store prefix and group name
    timestamp = int(time.time())
    feature_store_offline_prefix = "reviews-feature-store-" + str(timestamp)
    feature_group_name = "reviews-feature-group-" + str(timestamp)

    # Define the processing image
    processing_image = "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3"

    # Define the processing operation
   
    process = sagemaker_process_op(
        role=role,
        region=region,
        image=processing_image,
        network_isolation=network_isolation,
        instance_type=processing_instance_type,
        instance_count=processing_instance_count,
        container_arguments=[
            "--train-split-percentage",
            str(train_split_percentage),
            "--validation-split-percentage",
            str(validation_split_percentage),
            "--test-split-percentage",
            str(test_split_percentage),
            "--max-seq-length",
            str(max_seq_length),
            "--balance-dataset",
            str(balance_dataset),
            "--feature-store-offline-prefix",
            str(feature_store_offline_prefix),
            "--feature-group-name",
            str(feature_group_name),
        ],
        environment={"AWS_DEFAULT_REGION": "eu-central-1"},  # hard-coding to avoid serialization issue
        container_entrypoint=[
            "python3",
            "/opt/ml/processing/input/code/preprocess-scikit-text-to-bert-feature-store.py",
        ],
        input_config=[
            processing_input(
                input_name="raw-input-data",
                s3_uri="{}".format(raw_input_data_s3_uri),
                local_path="/opt/ml/processing/input/data/",
                s3_data_distribution_type="ShardedByS3Key",
            ),
            processing_input(
                input_name="code",
                s3_uri="{}".format(processing_code_s3_uri),
                local_path="/opt/ml/processing/input/code",
                s3_data_distribution_type="FullyReplicated",
            ),
        ],
        output_config=[
            processing_output(
                output_name="bert-train",
                s3_uri="{}".format(processed_train_data_s3_uri),
                local_path="/opt/ml/processing/output/bert/train",
                s3_upload_mode="EndOfJob",
            ),
            processing_output(
                output_name="bert-validation",
                s3_uri="{}".format(processed_validation_data_s3_uri),
                local_path="/opt/ml/processing/output/bert/validation",
                s3_upload_mode="EndOfJob",
            ),
            processing_output(
                output_name="bert-test",
                s3_uri="{}".format(processed_test_data_s3_uri),
                local_path="/opt/ml/processing/output/bert/test",
                s3_upload_mode="EndOfJob",
            ),
        ],
    )

    ########################
    # TRAIN
    ########################

    # Define the training channels
    train_channels = [
        training_input(
            input_name="train", s3_uri=processed_train_data_s3_uri, s3_data_distribution_type="ShardedByS3Key"
        ),
        training_input(
            input_name="validation",
            s3_uri=processed_validation_data_s3_uri,
            s3_data_distribution_type="ShardedByS3Key",
        ),
        training_input(
            input_name="test", s3_uri=processed_test_data_s3_uri, s3_data_distribution_type="ShardedByS3Key"
        ),
    ]

    # Define hyperparameters for training
    epochs = 1
    learning_rate = 0.00001
    epsilon = 0.00000001
    train_batch_size = 128
    validation_batch_size = 128
    test_batch_size = 128
    train_steps_per_epoch = 100
    validation_steps = 100
    test_steps = 100
    train_volume_size = 1024
    use_xla = True
    use_amp = True
    freeze_bert_layer = False
    enable_sagemaker_debugger = False
    enable_checkpointing = False
    enable_tensorboard = False
    input_mode = "File"
    run_validation = True
    run_test = True
    run_sample_predictions = True

    train_instance_type = "ml.c5.9xlarge"
    train_instance_count = 1

    train_output_location = "s3://{}/{}/output".format(bucket, pipeline_name)

    hyperparameters = {
        "epochs": "{}".format(epochs),
        "learning_rate": "{}".format(learning_rate),
        "epsilon": "{}".format(epsilon),
        "train_batch_size": "{}".format(train_batch_size),
        "validation_batch_size": "{}".format(validation_batch_size),
        "test_batch_size": "{}".format(test_batch_size),
        "train_steps_per_epoch": "{}".format(train_steps_per_epoch),
        "validation_steps": "{}".format(validation_steps),
        "test_steps": "{}".format(test_steps),
        "use_xla": "{}".format(use_xla),
        "use_amp": "{}".format(use_amp),
        "max_seq_length": "{}".format(max_seq_length),
        "freeze_bert_layer": "{}".format(freeze_bert_layer),
        "enable_sagemaker_debugger": "{}".format(enable_sagemaker_debugger),
        "enable_checkpointing": "{}".format(enable_checkpointing),
        "enable_tensorboard": "{}".format(enable_tensorboard),
        "run_validation": "{}".format(run_validation),
        "run_test": "{}".format(run_test),
        "run_sample_predictions": "{}".format(run_sample_predictions),
        "model_dir": "{}".format(train_output_location),
        "sagemaker_program": "tf_bert_reviews.py",
        "sagemaker_region": "{}".format(region),
        "sagemaker_submit_directory": training_code_s3_uri,
    }
    hyperparameters_json = json.dumps(hyperparameters)

    # metric_definitions='{"val_acc": "val_accuracy: ([0-9\\\\.]+)"}',
    metrics_definitions = [
        {"Name": "train:loss", "Regex": "loss: ([0-9\\.]+)"},
        {"Name": "train:accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
        {"Name": "validation:loss", "Regex": "val_loss: ([0-9\\.]+)"},
        {"Name": "validation:accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
    ]
    metrics_definitions_json = json.dumps(metrics_definitions)
    print(metrics_definitions_json)

    # .after(process) is explicitly appended below
    train_image = "763104351884.dkr.ecr.{}.amazonaws.com/tensorflow-training:2.3.1-cpu-py37-ubuntu18.04".format(region)
    training = sagemaker_train_op(
        region=region,
        image=train_image,
        network_isolation=network_isolation,
        instance_type=train_instance_type,
        instance_count=train_instance_count,
        hyperparameters=hyperparameters_json,
        training_input_mode=input_mode,
        channels=train_channels,
        model_artifact_path=train_output_location,
        # metric_definitions=metrics_definitions_json,
        # TODO:  Add rules
        role=role,
    ).after(process)
    ########################
    # DEPLOY
    ########################

    # Define the serving image
    serve_image = "763104351884.dkr.ecr.{}.amazonaws.com/tensorflow-inference:2.3.1-cpu".format(region)

    # Define the model creation operation
    create_model = sagemaker_model_op(
        region=region,
        model_name=training.outputs["job_name"],
        image=serve_image,
        network_isolation=network_isolation,
        model_artifact_url=training.outputs["model_artifact_url"],
        role=role,
    )

    # Define the deployment operation
    deploy_instance_type = "ml.c5.9xlarge"
    deploy_instance_count = 1
    # .after(create_model) is implied because we depend on create_model.outputs
    deploy_model = sagemaker_deploy_op(
        region=region,
        variant_name_1="AllTraffic",
        model_name_1=create_model.output,
        instance_type_1=deploy_instance_type,
        initial_instance_count_1=deploy_instance_count,
    )

## Compile Kubeflow Pipeline

In [None]:
# Compile the Kubeflow pipeline
kfp.compiler.Compiler().compile(bert_pipeline, "bert-pipeline.zip")

# List the details of the compiled pipeline file
!ls -al ./bert-pipeline.zip

# Unzip the compiled pipeline file
!unzip -o ./bert-pipeline.zip

# Display the contents of the pipeline.yaml file using pygmentize for syntax highlighting
!pygmentize pipeline.yaml

## Launch Pipeline on Kubernetes Cluster

In [None]:
# Launch the pipeline on the Kubernetes cluster
client = kfp.Client()

# Create an experiment in Kubeflow
experiment = client.create_experiment(name="kubeflow")

# Run the pipeline within the context of the experiment
my_run = client.run_pipeline(experiment.id, "bert-pipeline", "bert-pipeline.zip")

The above training job may take 5-10 minutes. Please be patient.
In the meantime, open the SageMaker Console to monitor the progress of your training job.

First, we need to get the endpoint name of our newly-deployed SageMaker Prediction Endpoint.


## Make a Prediction

This code first compiles the defined Kubeflow pipeline into a zip file, then it unzips the file and displays the contents of the pipeline.yaml file. After that, it creates a new experiment in Kubeflow and runs the pipeline within the context of the experiment.

Once the pipeline is run, it waits for the training job to complete. After the training job is completed, it retrieves the endpoint name of the newly deployed SageMaker prediction endpoint. Then, it uses this endpoint to make predictions on some sample inputs and prints the predicted classes for each input.

In [None]:
# Import boto3 library for AWS operations
import boto3

# Create a SageMaker runtime client
sm_runtime = boto3.Session(region_name=region).client("sagemaker-runtime")

# Replace this with the actual endpoint name from the Kubeflow pipeline logs
endpoint_name = "<COPY-AND-PASTE-FROM-KUBEFLOW-PIPELINE-LOGS>"

# Define the inputs for prediction
inputs = [{"features": ["This is great!"]}, {"features": ["This is bad."]}]

# Invoke the SageMaker endpoint for prediction
response = sm_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="application/jsonlines",
    Accept="application/jsonlines",
    Body=json.dumps(inputs).encode("utf-8"),
)

# Print the response from the prediction
print("response: {}".format(response))

# Parse the predicted classes from the response
predicted_classes_str = response["Body"].read().decode()
predicted_classes_json = json.loads(predicted_classes_str)

predicted_classes = predicted_classes_json.splitlines()
print("predicted_classes: {}".format(predicted_classes))

# Print the predicted class for each input
for predicted_class_json, input_data in zip(predicted_classes, inputs):
    predicted_class = json.loads(predicted_class_json)["predicted_label"]
    print('Predicted star_rating: {} for review_body "{}"'.format(predicted_class, input_data["features"][0]))