In [None]:
# Import necessary libraries
import kfp
from kfp import dsl
from kfp.v2.compiler import Compiler
from kfp.client import Client
import os
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist
import pickle

# Define the data preparation component using @dsl.component
@dsl.component
def data_prep_op() -> str:
    # Load MNIST dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # Normalize and reshape data
    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0
    x_train = x_train[..., tf.newaxis]
    x_test = x_test[..., tf.newaxis]

    # Save preprocessed data to a file
    data_path = "/mnt/data/mnist_data.pkl"
    os.makedirs(os.path.dirname(data_path), exist_ok=True)
    with open(data_path, "wb") as f:
        pickle.dump((x_train, y_train, x_test, y_test), f)

    print(f"Data saved to {data_path}")
    return data_path


# Define the model training component using @dsl.component
@dsl.component
def train_model_op(data_path: str) -> str:
    # Load preprocessed data
    with open(data_path, "rb") as f:
        x_train, y_train, _, _ = pickle.load(f)

    # Convert labels to one-hot encoding
    y_train = tf.keras.utils.to_categorical(y_train, 10)

    # Build and train the model
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation="relu", input_shape=(28, 28, 1)),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation="relu"),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(128, activation="relu"),
        layers.Dense(10, activation="softmax")
    ])

    model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
    model.fit(x_train, y_train, epochs=5, batch_size=64, validation_split=0.2)

    # Save the model
    model_path = "/mnt/data/mnist_model.h5"
    os.makedirs(os.path.dirname(model_path), exist_ok=True)
    model.save(model_path)
    print(f"Model saved to {model_path}")
    return model_path


# Define the model evaluation component using @dsl.component
@dsl.component
def evaluate_model_op(model_path: str, data_path: str) -> float:
    # Load the model and test data
    model = tf.keras.models.load_model(model_path)
    with open(data_path, "rb") as f:
        _, _, x_test, y_test = pickle.load(f)

    # Convert labels to one-hot encoding
    y_test = tf.keras.utils.to_categorical(y_test, 10)

    # Evaluate the model
    test_loss, test_acc = model.evaluate(x_test, y_test)
    print(f"Test Accuracy: {test_acc}")
    return test_acc


# Define the pipeline using the @dsl.pipeline decorator
@dsl.pipeline(
    name="demo2",
    description="A pipeline to train and evaluate an Demo 2 MNIST model."
)
def mnist_pipeline():
    # Data preparation step
    data_prep_task = data_prep_op()

    # Model training step
    train_model_task = train_model_op(data_path=data_prep_task.output)

    # Model evaluation step
    evaluate_model_op(model_path=train_model_task.output, data_path=data_prep_task.output)


# Compile the pipeline and upload it to Kubeflow
PIPELINE_NAME = "demo2"
PIPELINE_FILE = f"{PIPELINE_NAME}.yaml"

# Compile the pipeline to a YAML file
Compiler().compile(pipeline_func=mnist_pipeline, package_path=PIPELINE_FILE)
print(f"Pipeline compiled to {PIPELINE_FILE}")

# Connect to the Kubeflow client
client = Client(namespace="kubeflow")

# Upload the pipeline to Kubeflow
pipeline = client.upload_pipeline(pipeline_package_path=PIPELINE_FILE, pipeline_name=PIPELINE_NAME)