In [None]:
!python -m pip install --upgrade pip
!pip install tensorflow numpy kfp boto3 botocore

In [None]:
from IPython.core.display import HTML

HTML("<script>Jupyter.notebook.kernel.restart()</script>")

## Download Datasets

In [None]:
from os import environ, path
from urllib.parse import urlparse
from pathlib import Path
from typing import List
import os

import boto3


def get_s3_client():
    # allow s3 connection without creds
    if "AWS_ACCESS_KEY_ID" not in environ:
        from botocore import UNSIGNED
        from botocore.client import Config

        return boto3.client("s3", config=Config(signature_version=UNSIGNED))
    return boto3.client("s3")


def download_s3(bucket, key, outdir):
    s3 = get_s3_client()
    s3_object = s3.get_object(Bucket=bucket, Key=key)
    stream = s3_object["Body"]
    outfile = path.join(outdir, key)
    filepath = path.abspath(outfile)
    parent_dir = path.dirname(outfile)
    Path(parent_dir).mkdir(parents=True, exist_ok=True)
    with open(outfile, "wb+") as f:
        f.write(stream.read())
    print(f"file saved to: {outfile}")


def parse_s3_url(url):
    print(f"downloading: {url}")
    u = urlparse(url)
    bucket = u.netloc.split(".")[0]
    key = u.path.strip("/")
    return bucket, key


def download_s3_dir(data_urls: List[str], data_dir: str):
    """Download objects from S3"""

    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    for data_url in data_urls:
        bucket, key = parse_s3_url(data_url)
        download_s3(bucket, key, data_dir)


def download_data(data_dir: str, data_urls: str):
    # data_urls must be type string because kubeflow has no registered serializers for type "typing.List[str]"
    download_s3_dir(data_urls.split(","), data_dir)
    print("downloads complete")

## Train Model

In [None]:
import json


def gen_metadata(tensorboard_log_dir):
    return {
        "outputs": [
            {
                "type": "tensorboard",
                "source": tensorboard_log_dir,
            }
        ]
    }


def write_metadata(
    metadata_file: str,
    bucket: str,
    bucket_dir: str,
):
    tensorboard_log_dir = f"s3://{bucket}/{bucket_dir}"
    with open(metadata_file, "w") as f:
        json.dump(gen_metadata(tensorboard_log_dir), f)

In [None]:
import boto3
from botocore.exceptions import ClientError
import os


def get_s3_client():
    # allow s3 connection without creds
    if "AWS_ACCESS_KEY_ID" not in os.environ:
        from botocore import UNSIGNED
        from botocore.client import Config

        return boto3.client("s3", config=Config(signature_version=UNSIGNED))
    return boto3.client("s3")


def upload_file(s3_client, file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """
    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = file_name

    return s3_client.upload_file(file_name, bucket, object_name)


def bucket_exists(s3_client, bucket_name):
    exists = True
    try:
        s3_client.head_bucket(Bucket=bucket_name)
    except ClientError as e:
        # If a client error is thrown, then check that it was a 404 error.
        # If it was a 404 error, then the bucket does not exist.
        error_code = e.response["Error"]["Code"]
        if error_code == "404":
            exists = False
    return exists


def upload_dir(
    src_dir: str,
    bucket_name: str,
    bucket_dir: str,
):
    s3_client = get_s3_client()

    if not bucket_exists(s3_client, bucket_name):
        raise Exception(f"Bucket: {bucket_name} does not exist")

    for root, dirs, files in os.walk(src_dir):
        for name in files:
            local_path = os.path.join(root, name)
            upload_file(
                s3_client,
                local_path,
                bucket_name,
                f"{bucket_dir}/{os.path.relpath(local_path, src_dir)}",
            )

    response = s3_client.list_objects(Bucket=bucket_name)
    print(f"All objects in {bucket_name}:")

    for file in response["Contents"]:
        print(f"{bucket_name}/{file['Key']}")


# upload_dir(src_dir, bucket_name, bucket_dir)

In [None]:
from os import path

import tensorflow as tf
from tensorflow import keras


def gen_log_dirname(log_dir) -> str:
    return path.join(log_dir, "tensorboard", "fit")


def load_mnist(filepath, kind, normalize=True):
    import gzip
    import numpy as np

    """Load MNIST data from `filepath`"""
    labels_path = path.join(filepath, f"{kind}-labels-idx1-ubyte.gz")
    images_path = path.join(filepath, f"{kind}-images-idx3-ubyte.gz")

    with gzip.open(labels_path, "rb") as lbpath:
        labels = np.frombuffer(lbpath.read(), dtype=np.uint8, offset=8)

    with gzip.open(images_path, "rb") as imgpath:
        images = np.frombuffer(imgpath.read(), dtype=np.uint8, offset=16).reshape(
            len(labels), 28, 28
        )

    # normalize by dividing each pixel value by 255.0. This places the pixel value within the range 0 and 1.
    if normalize:
        images = images / 255.0

    return images, labels


def learning_rate(batch_size):
    # gradually reduce the learning rate during training
    return keras.optimizers.schedules.InverseTimeDecay(
        0.001, decay_steps=batch_size * 1000, decay_rate=1, staircase=False
    )


def create_model(batch_size):
    model = keras.Sequential(
        [
            keras.layers.Flatten(input_shape=(28, 28)),
            keras.layers.Dense(128, activation="relu"),
            keras.layers.Dropout(0.5),
            keras.layers.Dense(128, activation="relu"),
            keras.layers.Dense(10),
        ]
    )
    model.compile(
        optimizer=keras.optimizers.Adam(
            # learning_rate=1e-3,
            learning_rate=learning_rate(batch_size),
        ),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )
    model.summary()
    return model


def train_model(
    model_dir: str,
    data_dir: str,
    log_dir: str,
    epochs: int = 5,
) -> str:  # noqa: F821
    """Trains a model and saves to model dir and returns path to tensorboard logs."""

    train_images, train_labels = load_mnist(data_dir, kind="train", normalize=True)
    test_images, test_labels = load_mnist(data_dir, kind="t10k", normalize=True)

    tensorboard_log_dir = gen_log_dirname(log_dir)

    batch_size = len(train_images)
    model = create_model(batch_size)
    model.fit(
        x=train_images,
        y=train_labels,
        epochs=epochs,
        shuffle=True,
        # tensorboard args
        validation_data=(test_images, test_labels),
        callbacks=[
            tf.keras.callbacks.TensorBoard(
                log_dir=tensorboard_log_dir, histogram_freq=1
            )
        ],
    )

    model.save(model_dir, include_optimizer=True)
    return tensorboard_log_dir

In [None]:
def train_upload_tensorboard_s3(
    outdir,
    datadir,
    logdir,
    epochs,
    bucket,
    bucketdir,
    metadatafile,
):
    tensorboard_log_dir = train_model(
        outdir,
        datadir,
        logdir,
        epochs,
    )

    # upload tensorboard logs to s3
    upload_dir(tensorboard_log_dir, bucket, bucketdir)

    # write metdata file pointing to tensorboard logs hosted on S3
    write_metadata(metadatafile, bucket, bucketdir)

    return tensorboard_log_dir

## Evaluate Model

In [None]:
import json

import tensorflow as tf
from tensorflow import keras
from tensorflow.python.lib.io import file_io
import numpy as np

from typing import NamedTuple
from collections import namedtuple


def write_cm_to_csv(cm, class_labels, cm_path):
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append(
                (class_labels[target_index], class_labels[predicted_index], count)
            )

    df_cm = pd.DataFrame(data, columns=["target", "predicted", "count"])
    with file_io.FileIO(cm_path, "w") as f:
        df_cm.to_csv(
            f, columns=["target", "predicted", "count"], header=False, index=False
        )


def predict(model, test_images):
    # Define a Softmax layer to define outputs as probabilities
    probability_model = tf.keras.Sequential([model, tf.keras.layers.Softmax()])
    predictions = probability_model.predict(test_images)
    return np.ravel(np.matrix(predictions).argmax(1))


def evaluate_model(
    metrics_path: str,
    data_dir: str,
    model_dir: str,
) -> NamedTuple(
    "output",
    # https://www.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/
    # The output name must be MLPipeline Metrics or MLPipeline_Metrics (case does not matter).
    [("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics")],
):

    test_images, test_labels = load_mnist(data_dir, kind="t10k", normalize=False)
    model = keras.models.load_model(model_dir)

    loss, accuracy = model.evaluate(test_images, test_labels)
    metrics = {
        "metrics": [
            {"name": "loss", "numberValue": str(loss), "format": "PERCENTAGE"},
            {"name": "accuracy", "numberValue": str(accuracy), "format": "PERCENTAGE"},
        ]
    }
    with open(metrics_path, "w+") as f:
        json.dump(metrics, f)

    print_output = namedtuple(
        # "pipeline_metrics" is hardcoded value that could be anything
        "output",
        ["pipeline_metrics"],
    )
    return print_output(json.dumps(metrics))

## Export Model to S3

In [None]:
from os import environ, path
import boto3
from botocore.exceptions import ClientError

from kfp.components import InputPath, OutputPath


def get_s3_client():
    # allow s3 connection without creds
    if "AWS_ACCESS_KEY_ID" not in environ:
        from botocore import UNSIGNED
        from botocore.client import Config

        return boto3.client("s3", config=Config(signature_version=UNSIGNED))
    return boto3.client("s3")


def upload_file(s3_client, file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """
    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = file_name

    return s3_client.upload_file(file_name, bucket, object_name)


def bucket_exists(s3_client, bucket_name):
    exists = True
    try:
        s3_client.head_bucket(Bucket=bucket_name)
    except botocore.exceptions.ClientError as e:
        # If a client error is thrown, then check that it was a 404 error.
        # If it was a 404 error, then the bucket does not exist.
        error_code = e.response["Error"]["Code"]
        if error_code == "404":
            exists = False
    return exists


def export_model(
    src_dir: str,
    bucket_name: str,
    bucket_dir: str,
    # model_dir: InputPath(str), bucket_name: str, bucket_dir: str,
):
    s3_client = get_s3_client()

    if not bucket_exists(s3_client, bucket_name):
        raise Exception(f"Bucket: {bucket_name} does not exist")

    for root, dirs, files in os.walk(model_dir):
        for name in files:
            local_path = os.path.join(root, name)
            upload_file(
                s3_client,
                local_path,
                bucket_name,
                f"{bucket_dir}/{os.path.relpath(local_path, model_dir)}",
            )

    response = s3_client.list_objects(Bucket=bucket_name)
    print(f"All objects in {bucket_name}:")

    for file in response["Contents"]:
        print(f"{bucket_name}/{file['Key']}")

## Run entire pipeline

In [None]:
import kfp.components as components
from kfp import dsl, Client
from kfp.aws import use_aws_secret
import os

repo = "985486441319.dkr.ecr.eu-west-2.amazonaws.com/example-kubeflow-app/components"
BOTO_IMAGE = f"{repo}/boto:9c8763365c27682acac4514eafac52ebe829e72c"
TF_IMAGE = f"{repo}/tensorflow:62d0ce59ebc14a6f42cf23fd7180dcad129da1de"


def mnist_pipeline(  # nosec
    aws_secret_name: str = "aws-s3-data-secret-kfaas-demo",
    model_name: str = "mnist-fashion",
    model_version: str = "1",
    epochs: int = 10,
    bucket: str = "kfaas-demo-data-sandbox",
    bucket_dir_model: str = "demo/models",
    bucket_dir_tensorboard: str = "demo/tensorboard",
):

    metadata_file = os.path.join("tmp", "metadata.json")
    metrics_path = os.path.join("tmp", "metrics.json")
    log_dir = os.path.join("tmp", "logs")

    mnt_path = "/mnt"
    datasets_dir = "/mnt/datasets"
    model_dir = "/mnt/model"
    mnist_data_s3_urls = [
        "https://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz",
        "https://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz",
        "https://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz",
        "https://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz",
    ]

    aws_secret = use_aws_secret(
        aws_secret_name, "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
    )

    vop = dsl.VolumeOp(
        name="create_volume",
        resource_name="data-volume",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO,
    )

    download_op = components.func_to_container_op(
        download_data,
        base_image=BOTO_IMAGE,
        output_component_file="download_component.yaml",
    )
    download_task = download_op(
        data_dir=datasets_dir,
        data_urls=(",").join(mnist_data_s3_urls),
    ).add_pvolumes({mnt_path: vop.volume})

    train_op = components.func_to_container_op(
        train_upload_tensorboard_s3,
        base_image=TF_IMAGE,
        output_component_file="train_component.yaml",
    )

    train_task = (
        train_op(
            outdir=model_dir,
            datadir=datasets_dir,
            logdir=log_dir,
            epochs=epochs,
            bucket=bucket,
            bucketdir=f"{bucket_dir_tensorboard}/{model_name}/{model_version}/{dsl.RUN_ID_PLACEHOLDER}",
            metadatafile=metadata_file,
        )
        .add_pvolumes({mnt_path: download_task.pvolume})
        .apply(aws_secret)
    )
    train_task.after(download_task)

    evaluate_op = components.func_to_container_op(
        evaluate_model,
        base_image=TF_IMAGE,
        output_component_file="evaluate_component.yaml",
    )
    evaluate_task = evaluate_op(
        metrics_path=metrics_path,
        data_dir=datasets_dir,
        model_dir=model_dir
        # datasets_dir, modeldir=train_task.outputs["modeldir"]
    ).add_pvolumes({mnt_path: train_task.pvolume})
    evaluate_task.after(train_task)
    vop.delete().after(evaluate_task)

    export_op = components.func_to_container_op(export_model, base_image=TF_IMAGE)
    export_task = (
        export_op(
            # modeldir=train_task.outputs["modeldir"],
            src_dir=model_dir,
            bucket_name=bucket,
            bucket_dir=f"{bucket_dir_model}/{model_name}/{model_version}",
        )
        .add_pvolumes({mnt_path: evaluate_task.pvolume})
        .apply(aws_secret)
    )
    export_task.after(evaluate_task)


host = "ml-pipeline.andrew.svc.cluster.local:8888"
experiment_name = "mnist_notebook_server"
run_name = "run01"
# namespace = "kubeflow"
# namespace = "andrew"
# client = Client(namespace=namespace)
client = Client(host=host)
# client = Client(namespace=namespace, host=host)
arguments = {}
run_result = client.create_run_from_pipeline_func(
    mnist_pipeline,
    experiment_name=experiment_name,
    run_name=run_name,
    arguments=arguments,
)