In [None]:
import kfp
from kfp import dsl
from kfp.dsl import Output, Artifact, Metrics, HTML, Markdown, Model
from typing import NamedTuple

# 定义基础镜像常量
BASE_IMAGE = "public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0"

# 1. 获取数据组件
@dsl.component(base_image=BASE_IMAGE)
def get_data_batch() -> NamedTuple('Outputs', [('datapoints_training', float), ('datapoints_test', float), ('dataset_version', str)]):
    from tensorflow import keras
    from minio import Minio
    import numpy as np

    minio_client = Minio("100.65.11.110:9000", access_key="minio", secret_key="minio123", secure=False)
    minio_bucket = "mlpipeline"

    minio_client.fget_object(minio_bucket, "mnist.npz", "/tmp/mnist.npz")

    with np.load("/tmp/mnist.npz", allow_pickle=True) as f:
        x_train, y_train = f["x_train"], f["y_train"]
        x_test, y_test = f["x_test"], f["y_test"]

    # 存储到 Minio
    for name, data in [("x_train", x_train), ("y_train", y_train), ("x_test", x_test), ("y_test", y_test)]:
        local_path = f"/tmp/{name}.npy"
        np.save(local_path, data)
        minio_client.fput_object(minio_bucket, name, local_path)

    return float(x_train.shape[0]), float(x_test.shape[0]), "1.0"

# 2. 获取最新数据 (Dummy)
@dsl.component(base_image=BASE_IMAGE)
def get_latest_data():
    print("Adding latest data")

# 3. 数据预处理组件
@dsl.component(base_image=BASE_IMAGE)
def reshape_data():
    from minio import Minio
    import numpy as np

    minio_client = Minio("100.65.11.110:9000", access_key="minio", secret_key="minio123", secure=False)
    minio_bucket = "mlpipeline"

    def process_and_upload(name):
        minio_client.fget_object(minio_bucket, f"{name}.npy", f"/tmp/{name}.npy")
        data = np.load(f"/tmp/{name}.npy")
        data = data.reshape(-1, 28, 28, 1).astype('float32') / 255.0
        np.save(f"/tmp/{name}.npy", data)
        minio_client.fput_object(minio_bucket, f"{name}", f"/tmp/{name}.npy")

    process_and_upload("x_train")
    process_and_upload("x_test")

# 4. 模型构建组件 (利用 v2 的可视化 Artifacts)
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=['pandas']
)
def model_building(
        mlpipeline_metrics: Output[Metrics],
        no_epochs: int = 1,
        optimizer: str = "adam"
) -> NamedTuple('Outputs', [('model_summary', str)]):
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    import numpy as np
    import pandas as pd
    import os, glob

    minio_client = Minio("100.65.11.110:9000", access_key="minio", secret_key="minio123", secure=False)
    minio_bucket = "mlpipeline"

    # 加载数据
    def load_minio_npy(name):
        minio_client.fget_object(minio_bucket, name, f"/tmp/{name}.npy")
        return np.load(f"/tmp/{name}.npy")

    x_train, y_train = load_minio_npy("x_train"), load_minio_npy("y_train")
    x_test, y_test = load_minio_npy("x_test"), load_minio_npy("y_test")

    # 构建模型
    model = keras.models.Sequential([
        keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)),
        keras.layers.MaxPool2D(2, 2),
        keras.layers.Flatten(),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=['accuracy'])

    # 训练与评估
    model.fit(x_train, y_train, epochs=no_epochs, batch_size=20)
    model_loss, model_accuracy = model.evaluate(x_test, y_test)

    # 记录 KFP v2 指标
    mlpipeline_metrics.log_metric("model_accuracy", float(model_accuracy) * 100)
    mlpipeline_metrics.log_metric("model_loss", float(model_loss) * 100)

    # 保存模型并上传 Minio
    model_path = "/tmp/detect-digits"
    model.save(model_path)

    for root, dirs, files in os.walk(model_path):
        for file in files:
            local_f = os.path.join(root, file)
            remote_f = os.path.join("models/detect-digits/1/", os.path.relpath(local_f, model_path)).replace("\\", "/")
            minio_client.fput_object(minio_bucket, remote_f, local_f)

    return ("Model Accuracy: " + str(model_accuracy),)

# 5. 模型部署组件
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=['kserve==0.8.0.1']
)
def model_serving():
    from kubernetes import client
    from kserve import KServeClient, constants, utils, V1beta1InferenceService, \
        V1beta1InferenceServiceSpec, V1beta1PredictorSpec, V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace() or "default"
    name = f'digits-recognizer-{datetime.now().strftime("%Y%m%d-%H%M%S")}'

    isvc = V1beta1InferenceService(
        api_version=f"{constants.KSERVE_GROUP}/v1beta1",
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                service_account_name="sa-minio-kserve",
                tensorflow=V1beta1TFServingSpec(storage_uri="s3://mlpipeline/models/detect-digits/")
            )
        )
    )

    KServe = KServeClient()
    KServe.create(isvc)

# 6. 定义 Pipeline
@dsl.pipeline(name='digits-recognizer-pipeline-v2')
def digits_pipeline(no_epochs: int = 1, optimizer: str = "adam"):

    # 执行步骤
    task1_1 = get_data_batch()
    task1_2 = get_latest_data()

    task2 = reshape_data()
    # v2 中使用 .after() 依然有效，用于处理没有直接数据依赖的任务
    task2.after(task1_1, task1_2)

    task3 = model_building(no_epochs=no_epochs, optimizer=optimizer)
    task3.after(task2)

    task4 = model_serving()
    task4.after(task3)

# 7. 运行 Pipeline
if __name__ == "__main__":
    # 注意：v2.0 编译后的文件后缀通常为 .yaml
    kfp.compiler.Compiler().compile(pipeline_func=digits_pipeline, package_path='pipeline_v2.yaml')

    # 如果在集群内运行，可以取消下面注释
    # client = kfp.Client()
    # client.create_run_from_pipeline_func(digits_pipeline, arguments={"no_epochs": 1, "optimizer": "adam"})

