In [1]:
# Imports and Base Image

import kfp
import kfp.dsl as dsl
from kfp.dsl import Input, Output
from kfp.dsl import Dataset, Artifact
from kfp.dsl import Model, Metrics, ClassificationMetrics

from typing import NamedTuple

BASE_IMAGE = 'nvcr.io/nvidia/tensorflow:25.01-tf2-py3'


In [2]:
# load data

@dsl.component(
    base_image=BASE_IMAGE,
)
def load_data(
    x_train_pickle: Output[Dataset],
    y_train_pickle: Output[Dataset],
    x_test_pickle: Output[Dataset],
    y_test_pickle: Output[Dataset],
):
    # import dataset
    from keras.datasets import mnist
    import numpy as np
    import pickle

    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # count the number of unique train labels
    unique, counts = np.unique(y_train, return_counts=True)
    print("Train labels: ", dict(zip(unique, counts)))

    # count the number of unique test labels
    unique, counts = np.unique(y_test, return_counts=True)
    print("\nTest labels: ", dict(zip(unique, counts)))

    with open(x_train_pickle.path, "wb") as file:
        pickle.dump(x_train, file)

    with open(y_train_pickle.path, "wb") as file:
        pickle.dump(y_train, file)

    with open(x_test_pickle.path, "wb") as file:
        pickle.dump(x_test, file)

    with open(y_test_pickle.path, "wb") as file:
        pickle.dump(y_test, file)

In [3]:
# Preprocess data 

@dsl.component(
    base_image=BASE_IMAGE
)
def preprocess_data(
    x_train_pickle: Input[Dataset],
    y_train_pickle: Input[Dataset],
    x_test_pickle: Input[Dataset],
    y_test_pickle: Input[Dataset],
    x_train_prep: Output[Dataset], # Output preprocessed training data (needed for quantization)
    y_train_prep: Output[Dataset],
    x_test_prep: Output[Dataset],
    y_test_prep: Output[Dataset],
) -> NamedTuple("outputs", input_size=int, num_labels=int):

    from keras.utils import to_categorical
    import numpy as np
    import pickle
    from typing import NamedTuple

    with open(x_train_pickle.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pickle.path, "rb") as file:
        y_train = pickle.load(file)

    with open(x_test_pickle.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_pickle.path, "rb") as file:
        y_test = pickle.load(file)

    num_labels = len(np.unique(y_train))

    # Reshape and normalize training data (needed for quantization calibration)
    image_size = x_train.shape[1]
    input_size = image_size * image_size
    x_train_processed = np.reshape(x_train, [-1, input_size])
    x_train_processed = x_train_processed.astype("float32") / 255

    # Reshape and normalize test data
    x_test_processed = np.reshape(x_test, [-1, input_size])
    x_test_processed = x_test_processed.astype("float32") / 255

    # One-hot encode labels
    y_train_processed = to_categorical(y_train)
    y_test_processed = to_categorical(y_test)

    # Save processed data
    with open(x_train_prep.path, "wb") as file:
        pickle.dump(x_train_processed, file)

    with open(y_train_prep.path, "wb") as file:
        pickle.dump(y_train_processed, file)

    with open(x_test_prep.path, "wb") as file:
        pickle.dump(x_test_processed, file)

    with open(y_test_prep.path, "wb") as file:
        pickle.dump(y_test_processed, file)

    outputs = NamedTuple("outputs", input_size=int, num_labels=int)
    return outputs(input_size, num_labels)


In [4]:
# Train the model

@dsl.component(
    base_image=BASE_IMAGE
)
def train(
    input_size: int,
    num_labels: int,
    epochs: int,
    x_train_pickle: Input[Dataset],
    y_train_pickle: Input[Dataset],
    model_artifact: Output[Model],
    log: Output[Artifact],
):
    from keras.callbacks import TensorBoard
    from keras.models import Sequential
    from keras.layers import Dense, Activation, Dropout
    import pickle
    import os
    from datetime import datetime

    import tensorflow as tf
    gpus = tf.config.list_physical_devices('GPU')
    if not gpus:
        print("TensorFlow CANNOT see any GPUs. GPU acceleration is NOT possible.")
    else:
        print(f"TensorFlow found {len(gpus)} GPU(s): {gpus}")

    batch_size = 128
    hidden_units = 256
    dropout = 0.45

    with open(x_train_pickle.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pickle.path, "rb") as file:
        y_train = pickle.load(file)


    log_dir = f"{log.path}/logs/fit/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

    model = Sequential()
    model.add(Dense(hidden_units, input_dim=input_size))
    model.add(Activation("relu"))
    model.add(Dropout(dropout))
    model.add(Dense(hidden_units))
    model.add(Activation("relu"))
    model.add(Dropout(dropout))
    model.add(Dense(num_labels))
    model.add(Activation("softmax"))

    model.summary()

    model.compile(
        loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )

    model.fit(
        x_train,
        y_train,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[tensorboard_callback],
    )

    # model.save(model_artifact.path)  
    
    os.makedirs(model_artifact.path, exist_ok=True)
    model_path = os.path.join(model_artifact.path, "model.keras")
    model.save(model_path)
    print(f"[train] model saved to {model_path}")


In [5]:
# Evaluate the model

@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=["scikit-learn"],
)
def evaluate(
    model_artifact: Input[Model],
    metrics: Output[ClassificationMetrics],
    scalar_metrics: Output[Metrics],
    x_test_pickle: Input[Dataset],
    y_test_pickle: Input[Dataset],
):
    from keras.models import load_model
    from keras.metrics import Precision
    from sklearn.metrics import confusion_matrix
    import numpy as np
    import os
    import pickle

    import os
    model_path = os.path.join(model_artifact.path, "model.keras")
    print(f"[evaluate] loading model from {model_path}")
    model = load_model(model_path)

    batch_size = 128

    with open(x_test_pickle.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_pickle.path, "rb") as file:
        y_test = pickle.load(file)

    predictions = model.predict(x_test, batch_size=batch_size)
    predictions = (predictions >= 0.5).astype(int)

    metrics.log_confusion_matrix(
        ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
        confusion_matrix(
            y_test.argmax(axis=1), predictions.argmax(axis=1)
        ).tolist(),  # .tolist() to convert np array to list.
    )
    m = Precision()
    m.update_state(y_test, predictions)
    loss, acc = model.evaluate(x_test, y_test, batch_size=batch_size)
    scalar_metrics.log_metric("accuracy", acc)
    scalar_metrics.log_metric("loss", loss)
    scalar_metrics.log_metric("precision", m.result().numpy().tolist())

In [6]:
# Evaluate fp32 model

@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=["scikit-learn"],
)
def evaluate_fp32_model(
    model_artifact: Input[Model], # Input is the directory containing the Keras model
    metrics: Output[ClassificationMetrics],
    scalar_metrics: Output[Metrics],
    x_test_prep: Input[Dataset], # Changed input name for clarity
    y_test_prep: Input[Dataset], # Changed input name for clarity
):
    from keras.models import load_model
    from keras.metrics import Precision # Note: Keras Precision might not be ideal here
    from sklearn.metrics import confusion_matrix, accuracy_score, precision_score
    import numpy as np
    import os
    import pickle

    # Load the Keras model
    model_dir_path = os.path.join(model_artifact.path, "fp32_model")
    model_path = os.path.join(model_dir_path, "model.keras")
    print(f"[evaluate_fp32] loading model from {model_path}")
    model = load_model(model_path)

    batch_size = 128

    with open(x_test_prep.path, "rb") as file: # Changed path name
        x_test = pickle.load(file)

    with open(y_test_prep.path, "rb") as file: # Changed path name
        y_test = pickle.load(file) # y_test is one-hot encoded

    # Evaluate using Keras model.evaluate
    loss, acc = model.evaluate(x_test, y_test, batch_size=batch_size, verbose=0)
    print(f"[evaluate_fp32] Keras Evaluation - Loss: {loss:.4f}, Accuracy: {acc:.4f}")

    # Get predictions for confusion matrix and other metrics
    y_pred_proba = model.predict(x_test, batch_size=batch_size)
    y_pred_labels = np.argmax(y_pred_proba, axis=1)
    y_true_labels = np.argmax(y_test, axis=1)

    # Calculate metrics using sklearn
    # Use macro average for precision as it's multi-class
    precision = precision_score(y_true_labels, y_pred_labels, average='macro', zero_division=0)
    print(f"[evaluate_fp32] Sklearn Metrics - Accuracy: {acc:.4f}, Precision (macro): {precision:.4f}")

    # Log metrics
    scalar_metrics.log_metric("fp32_accuracy", float(acc))
    scalar_metrics.log_metric("fp32_loss", float(loss))
    scalar_metrics.log_metric("fp32_precision_macro", float(precision))

    metrics.log_confusion_matrix(
        [str(i) for i in range(10)], # Class names '0' through '9'
        confusion_matrix(y_true_labels, y_pred_labels).tolist(), # Convert np array to list
    )
    print("[evaluate_fp32] Metrics logged.")

In [7]:
# Evaluate quantized model

@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=["scikit-learn"],
)
def evaluate_quantized_model(
    quantized_model_artifact: Input[Model], # Input is the directory containing .tflite
    metrics: Output[ClassificationMetrics],
    scalar_metrics: Output[Metrics],
    x_test_prep: Input[Dataset],
    y_test_prep: Input[Dataset],
):
    import tensorflow as tf
    import numpy as np
    import pickle
    import os
    from sklearn.metrics import confusion_matrix, accuracy_score, precision_score

    # Load the TFLite model and allocate tensors
    tflite_model_path = os.path.join(quantized_model_artifact.path, 'quantized_model.tflite')
    print(f"[evaluate_quantized] Loading TFLite model from: {tflite_model_path}")
    interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
    interpreter.allocate_tensors()

    # Get input and output tensor details
    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]
    input_dtype = input_details['dtype']
    output_dtype = output_details['dtype']
    input_scale, input_zero_point = input_details["quantization"]
    output_scale, output_zero_point = output_details["quantization"]
    print(f"[evaluate_quantized] Input Details: {input_details}")
    print(f"[evaluate_quantized] Output Details: {output_details}")

    # Load test data
    print(f"[evaluate_quantized] Loading test data...")
    with open(x_test_prep.path, "rb") as file:
        x_test_float = pickle.load(file) # Load original float32 data

    with open(y_test_prep.path, "rb") as file:
        y_test_one_hot = pickle.load(file)
    y_true_labels = np.argmax(y_test_one_hot, axis=1)

    print(f"[evaluate_quantized] Quantizing input data to {input_dtype}...")
    # Quantize input data according to input tensor details
    # Formula: int_value = float_value / scale + zero_point
    x_test_quantized = (x_test_float / input_scale) + input_zero_point
    x_test_quantized = x_test_quantized.astype(input_dtype)
    print(f"[evaluate_quantized] Input data shape: {x_test_quantized.shape}, dtype: {x_test_quantized.dtype}")


    # Run inference
    print(f"[evaluate_quantized] Running inference on {len(x_test_quantized)} samples...")
    y_pred_quantized_list = []
    for i in range(len(x_test_quantized)):
        interpreter.set_tensor(input_details['index'], np.expand_dims(x_test_quantized[i], axis=0))
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details['index'])
        y_pred_quantized_list.append(output_data[0]) # Remove batch dim

    y_pred_quantized = np.array(y_pred_quantized_list)
    print(f"[evaluate_quantized] Inference complete. Output shape: {y_pred_quantized.shape}, dtype: {y_pred_quantized.dtype}")

    # Dequantize output predictions to calculate loss/metrics easily
    # Formula: float_value = (int_value - zero_point) * scale
    y_pred_float = (y_pred_quantized.astype(np.float32) - output_zero_point) * output_scale
    print(f"[evaluate_quantized] Dequantized output shape: {y_pred_float.shape}, dtype: {y_pred_float.dtype}")

    # Get predicted labels
    y_pred_labels = np.argmax(y_pred_float, axis=1)

    # Calculate metrics
    accuracy = accuracy_score(y_true_labels, y_pred_labels)
    # Use macro average for precision as it's multi-class
    precision = precision_score(y_true_labels, y_pred_labels, average='macro', zero_division=0)
    # Calculate categorical cross-entropy loss (requires probabilities)
    # Softmax the dequantized outputs
    def softmax(x):
        e_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return e_x / e_x.sum(axis=1, keepdims=True)

    y_pred_proba = softmax(y_pred_float)
    # Compute loss - use stable version
    N = y_pred_proba.shape[0]
    log_likelihood = -np.log(y_pred_proba[range(N), y_true_labels] + 1e-9) # Add epsilon for stability
    loss = np.sum(log_likelihood) / N

    print(f"[evaluate_quantized] Metrics - Accuracy: {accuracy:.4f}, Precision (macro): {precision:.4f}, Loss: {loss:.4f}")

    # Log metrics
    scalar_metrics.log_metric("quantized_int8_accuracy", float(accuracy))
    scalar_metrics.log_metric("quantized_int8_loss", float(loss))
    scalar_metrics.log_metric("quantized_int8_precision_macro", float(precision))

    metrics.log_confusion_matrix(
        [str(i) for i in range(10)], # Class names '0' through '9'
        confusion_matrix(y_true_labels, y_pred_labels).tolist(), # Convert np array to list
    )
    print("[evaluate_quantized] Metrics logged.")

In [8]:
@dsl.pipeline(
    name="mnist-pipeline-tf-gpu",
)
def mnist_pipeline(epochs: int):
    data = (
        load_data()
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("2")
        .set_cpu_request("2")
    )
    preprocess = (
        preprocess_data(
            x_train_pickle=data.outputs["x_train_pickle"],
            y_train_pickle=data.outputs["y_train_pickle"],
            x_test_pickle=data.outputs["x_test_pickle"],
            y_test_pickle=data.outputs["y_test_pickle"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    preprocess.after(data)
    model = (
        train(
            input_size=preprocess.outputs["input_size"],
            num_labels=preprocess.outputs["num_labels"],
            epochs=epochs,
            x_train_pickle=preprocess.outputs["x_train_prep"],
            y_train_pickle=preprocess.outputs["y_train_prep"],
        )
        .set_gpu_limit(1)
        .add_node_selector_constraint('nvidia.com/gpu')
        .set_memory_limit("2G")
        .set_memory_request("1G")
        .set_cpu_limit("1")
        .set_cpu_request("0.5")        
    )
    model.after(preprocess)
    evaluation = (
        evaluate(
            model_artifact=model.outputs["model_artifact"],
            x_test_pickle=preprocess.outputs["x_test_prep"],
            y_test_pickle=preprocess.outputs["y_test_prep"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    evaluation.after(model)

In [9]:
# Run pipeline (
client = kfp.Client()

# Make sure KFP client points to your Kubeflow endpoint if needed
# client = kfp.Client(host='<your-kubeflow-pipelines-url>')

run = client.create_run_from_pipeline_func(
    mnist_pipeline,
    arguments={"epochs": 3},
    experiment_name="mnist_pipeline",
)

print(f"Pipeline run submitted. Run details: {run.run_id}")




Pipeline run submitted. Run details: 9a5deb68-8570-4e33-b106-a15552bf7c4c
