In [17]:
BASE_IMAGE = "tensorflow/tensorflow:latest"

In [18]:
import kfp
import kfp.dsl as dsl
from kfp.dsl import Input, Output
from kfp.dsl import Dataset, Artifact
from kfp.dsl import Model, Metrics, ClassificationMetrics

from typing import NamedTuple

In [19]:
@dsl.component(
    base_image=BASE_IMAGE,
)
def load_data(
    x_train_pickle: Output[Dataset],
    y_train_pickle: Output[Dataset],
    x_test_pickle: Output[Dataset],
    y_test_pickle: Output[Dataset],
):
    # import dataset
    from keras.datasets import mnist
    import numpy as np
    import pickle

    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # count the number of unique train labels
    unique, counts = np.unique(y_train, return_counts=True)
    print("Train labels: ", dict(zip(unique, counts)))

    # count the number of unique test labels
    unique, counts = np.unique(y_test, return_counts=True)
    print("\nTest labels: ", dict(zip(unique, counts)))
    indexes = np.random.randint(0, x_train.shape[0], size=25)
    images = x_train[indexes]
    with open(x_train_pickle.path, "wb") as file:
        pickle.dump(x_train, file)

    with open(y_train_pickle.path, "wb") as file:
        pickle.dump(y_train, file)

    with open(x_test_pickle.path, "wb") as file:
        pickle.dump(x_test, file)

    with open(y_test_pickle.path, "wb") as file:
        pickle.dump(y_test, file)

  @dsl.component(


In [20]:
@dsl.component(base_image=BASE_IMAGE)
def preprocess_data(
    x_train_pickle: Input[Dataset],
    y_train_pickle: Input[Dataset],
    x_test_pickle: Input[Dataset],
    y_test_pickle: Input[Dataset],
    x_train_prep: Output[Dataset],
    y_train_prep: Output[Dataset],
    x_test_prep: Output[Dataset],
    y_test_prep: Output[Dataset],
) -> NamedTuple("outputs", input_size=int, num_labels=int):
    from keras.utils import to_categorical
    import numpy as np
    import pickle
    from typing import NamedTuple

    with open(x_train_pickle.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pickle.path, "rb") as file:
        y_train = pickle.load(file)

    with open(x_test_pickle.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_pickle.path, "rb") as file:
        y_test = pickle.load(file)

    num_labels = len(np.unique(y_train))

    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)
    image_size = x_train.shape[1]
    input_size = image_size * image_size
    # resize and normalize
    x_train = np.reshape(x_train, [-1, input_size])
    x_train = x_train.astype("float32") / 255
    x_test = np.reshape(x_test, [-1, input_size])
    x_test = x_test.astype("float32") / 255
    with open(x_train_prep.path, "wb") as file:
        pickle.dump(x_train, file)

    with open(y_train_prep.path, "wb") as file:
        pickle.dump(y_train, file)

    with open(x_test_prep.path, "wb") as file:
        pickle.dump(x_test, file)

    with open(y_test_prep.path, "wb") as file:
        pickle.dump(y_test, file)
    outputs = NamedTuple("outputs", input_size=int, num_labels=int)
    return outputs(input_size, num_labels)

  @dsl.component(


In [21]:
@dsl.component(base_image=BASE_IMAGE)
def train(
    input_size: int,
    num_labels: int,
    epochs: int,
    x_train_pickle: Input[Dataset],
    y_train_pickle: Input[Dataset],
    model_artifact: Output[Model],
    log: Output[Artifact],
):
    from keras.callbacks import TensorBoard
    from keras.models import Sequential
    from keras.layers import Dense, Activation, Dropout
    import pickle
    from datetime import datetime

    batch_size = 128
    hidden_units = 256
    dropout = 0.45

    with open(x_train_pickle.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pickle.path, "rb") as file:
        y_train = pickle.load(file)

    log_dir = f"{log.path}/logs/fit/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

    model = Sequential()
    model.add(Dense(hidden_units, input_dim=input_size))
    model.add(Activation("relu"))
    model.add(Dropout(dropout))
    model.add(Dense(hidden_units))
    model.add(Activation("relu"))
    model.add(Dropout(dropout))
    model.add(Dense(num_labels))
    model.add(Activation("softmax"))

    model.summary()

    model.compile(
        loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )

    model.fit(
        x_train,
        y_train,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[tensorboard_callback],
    )

    model.save(model_artifact.path)

  @dsl.component(


In [22]:
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=["scikit-learn"],
)
def evaluate(
    model_artifact: Input[Model],
    metrics: Output[ClassificationMetrics],
    scalar_metrics: Output[Metrics],
    x_test_pickle: Input[Dataset],
    y_test_pickle: Input[Dataset],
):
    from keras.models import load_model
    from keras.metrics import Precision
    from sklearn.metrics import confusion_matrix
    import numpy as np
    import pickle

    model = load_model(model_artifact.path)

    batch_size = 128

    with open(x_test_pickle.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_pickle.path, "rb") as file:
        y_test = pickle.load(file)

    predictions = model.predict(x_test, batch_size=batch_size)
    predictions = (predictions >= 0.5).astype(int)

    metrics.log_confusion_matrix(
        ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
        confusion_matrix(
            y_test.argmax(axis=1), predictions.argmax(axis=1)
        ).tolist(),  # .tolist() to convert np array to list.
    )
    m = Precision()
    m.update_state(y_test, predictions)
    loss, acc = model.evaluate(x_test, y_test, batch_size=batch_size)
    scalar_metrics.log_metric("accuracy", acc)
    scalar_metrics.log_metric("loss", loss)
    scalar_metrics.log_metric("precision", m.result().numpy().tolist())

  @dsl.component(


In [23]:
@dsl.pipeline(
    name="mnist_pipeline",
)
def mnist_pipeline(epochs: int):
    data = (
        load_data()
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("2")
        .set_cpu_request("2")
    )
    preprocess = (
        preprocess_data(
            x_train_pickle=data.outputs["x_train_pickle"],
            y_train_pickle=data.outputs["y_train_pickle"],
            x_test_pickle=data.outputs["x_test_pickle"],
            y_test_pickle=data.outputs["y_test_pickle"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    preprocess.after(data)
    model = (
        train(
            input_size=preprocess.outputs["input_size"],
            num_labels=preprocess.outputs["num_labels"],
            epochs=epochs,
            x_train_pickle=preprocess.outputs["x_train_prep"],
            y_train_pickle=preprocess.outputs["y_train_prep"],
        )
        .set_memory_limit("6G")
        .set_memory_request("6G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    model.after(preprocess)
    evaluation = (
        evaluate(
            model_artifact=model.outputs["model_artifact"],
            x_test_pickle=preprocess.outputs["x_test_prep"],
            y_test_pickle=preprocess.outputs["y_test_prep"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    evaluation.after(model)


client = kfp.Client()
client.create_run_from_pipeline_func(
    mnist_pipeline,
    arguments={"epochs": 30},
    experiment_name="mnist_pipeline",
)

RunPipelineResult(run_id=9170bbd4-63c8-44bd-b4ff-1b00c4639f72)