In [None]:
%pip install --upgrade pip
%pip install tensorflow pandas scikit-learn mlflow
%pip install boto3

Collecting boto3
  Downloading boto3-1.38.0-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.39.0,>=1.38.0 (from boto3)
  Downloading botocore-1.38.0-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.13.0,>=0.12.0 (from boto3)
  Downloading s3transfer-0.12.0-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.38.0-py3-none-any.whl (139 kB)
Downloading botocore-1.38.0-py3-none-any.whl (13.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.5/13.5 MB[0m [31m18.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.12.0-py3-none-any.whl (84 kB)
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.38.0 botocore-1.38.0 jmespath-1.0.1 s3transfer-0.12.0


In [None]:
from typing import Annotated
from kfp import dsl, compiler

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['tensorflow', 'pandas', 'scikit-learn', 'mlflow', 'boto3'],
)
def load_mnist_data() -> str:
    import tensorflow as tf
    import pandas as pd
    from io import StringIO
    import os
    import boto3


    # ──────────────── 1. Load MNIST Dataset ──────────────── #

    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape((x_train.shape[0], -1))  # flatten 28x28 to 784
    df = pd.DataFrame(x_train)
    df["label"] = y_train

    print("MNIST data sample:\n", df.head())
    csv_data = df.to_csv(index=False)

    # ──────────────── 2. EDA + Cleaning ──────────────── #

    df = pd.read_csv(StringIO(csv_data))
    print("EDA loaded shape:", df.shape)

    df = df.dropna()
    csv_data = df.to_csv(index=False)


    # ──────────────── 3. Upload to MinIO ──────────────── #
    S3_ENDPOINT = "http://minio.kubeflow-user-example-com.svc:9000"
    BUCKET_NAME = "cleaned-data"
    # 1) Credentials
    s3 = boto3.resource(
        's3',
        endpoint_url=S3_ENDPOINT,
        region_name='us-east-1',
        aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
        config=boto3.session.Config(signature_version='s3v4'),
        verify=False
    )

    if not s3.Bucket(BUCKET_NAME) in s3.buckets.all():
        s3.create_bucket(Bucket=BUCKET_NAME)
        print(BUCKET_NAME + " bucket created!")
    else:
        print(BUCKET_NAME + " bucket already exists!")

    # 2) Upload to MinIO
    s3.Bucket(BUCKET_NAME).put_object(
        Key="cleaned_mnist_data.csv",
        Body=csv_data,
        ContentType="text/csv"
    )

    print("Data uploaded to MinIO bucket:", BUCKET_NAME)

    return BUCKET_NAME
    

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['tensorflow', 'pandas', 'scikit-learn', 'mlflow', 'boto3'],
)
def train_model(bucket_name:str) -> str:
    import pandas as pd
    import tensorflow as tf
    import json
    import mlflow
    from io import StringIO
    from sklearn.model_selection import train_test_split
    import numpy as np
    import os
    import boto3


    MLFLOW_TRACKING_URI = "http://mlflow.kubeflow-user-example-com.svc:5000"
    MODEL_SAVE_PATH = "mnist_model.keras"
    S3_ENDPOINT = "http://minio.kubeflow-user-example-com.svc:9000"
    MLFLOW_BUCKET_NAME = "mlflow"
    
    # 1) Credentials 
    # DO NOT DO THIS IN PRODUCTION, just for demo purposes
    os.environ["AWS_ACCESS_KEY_ID"]     = "minioDev"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "minioDevPass123"
    # 2) Tell MLflow/boto3 to use MinIO, not AWS
    os.environ["MLFLOW_S3_ENDPOINT_URL"] = S3_ENDPOINT
    os.environ["AWS_S3_VERIFY"]          = "false"


    # Create mlflow
    s3 = boto3.resource(
        's3',
        endpoint_url=S3_ENDPOINT,
        region_name='us-east-1',
        aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
        config=boto3.session.Config(signature_version='s3v4'),
        verify=False
    )

    # ──────────────── 1. Load Data from MinIO ──────────────── #

    # Load the cleaned data from MinIO
    obj = s3.Bucket(bucket_name).Object("cleaned_mnist_data.csv").get()
    csv_data = obj['Body'].read().decode('utf-8')
    print("Data loaded from MinIO bucket:", bucket_name)


    # ──────────────── 2. Train TF Model ──────────────── #

    MODEL_SAVE_PATH = "mnist_model.keras"

    df = pd.read_csv(StringIO(csv_data))
    print("Training data shape:", df.shape)

    X = df.drop(columns=["label"]).values
    y = df["label"].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation="relu", input_shape=(784,)),
        tf.keras.layers.Dense(10, activation="softmax")
    ])
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    model.fit(X_train, y_train, epochs=2, validation_split=0.1)
    _, accuracy = model.evaluate(X_test, y_test)

    model.save(MODEL_SAVE_PATH)

    metrics_json = json.dumps({"accuracy": accuracy})


    # ──────────────── 3. Evaluate & Log to MLflow ──────────────── #

    MLFLOW_TRACKING_URI = "http://mlflow.kubeflow-user-example-com.svc:5000"
    MLFLOW_EXPERIMENT_NAME = "mnist-tf-pipeline"

    metrics = json.loads(metrics_json)

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)

    with mlflow.start_run():
        mlflow.log_metric("accuracy", metrics["accuracy"])

    
    # ──────────────── 4. Upload Model to MLflow ──────────────── #


    if not s3.Bucket(MLFLOW_BUCKET_NAME) in s3.buckets.all():
        s3.create_bucket(Bucket=MLFLOW_BUCKET_NAME)
        print(MLFLOW_BUCKET_NAME + " bucket created!")
    else:
        print(MLFLOW_BUCKET_NAME + " bucket already exists!")
    

    # Load the actual model
    model = tf.keras.models.load_model(MODEL_SAVE_PATH)

    
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow.tensorflow.log_model(model=model, artifact_path=MODEL_SAVE_PATH)

    return MODEL_SAVE_PATH

@dsl.pipeline(
    name='mnist-tf-pipeline',
    description='A pipeline to train a TensorFlow model on MNIST data and log it to MLflow.',
)
def mnist_pipeline():
    # Load MNIST data
    load_data_task = load_mnist_data()
    load_data_task.set_display_name("Load MNIST Data")

    # Train model and log to MLflow
    train_model_task = train_model(csv_data=load_data_task.output)
    train_model_task.set_display_name("Train Model")

    compiler.Compiler().compile(
        pipeline_func=mnist_pipeline,
        package_path='mnist_tf_pipeline.yaml'
    )

    print("Pipeline compiled successfully!")

In [None]:
import kfp

client = kfp.Client()                         
experiment = client.create_experiment("mnist-tf-experiment-002")
run = client.run_pipeline(
    experiment_id=experiment.experiment_id,
    job_name="mnist-tf-pipeline-run",
    pipeline_package_path="mnist_tf_pipeline.yaml",
)