TensorFlowを利用したMNISTのトレーニングを、Kubeflow Pipelinesに実装してみる。

実行環境
* Kubeflow 1.6.1
* Notebook: kubeflownotebookswg/jupyter-tensorflow-full:v1.6.0
* Image:
  * tensorflow/tensorflow:latest
  * kubeflowmnist2024001.azurecr.io/mnist-demo/mnist-inference:v1
  * ACR Secret が必要

# 事前準備

## KubeflowでNotebookを作成する
* tensorflow, kfp などは、Notebookのコンテナ イメージ（jupyter-tensorflow-full:v1.6.0）に含まれている。

## PVCの準備
PVCを作成してあることを確認する。
PVCは、Kubeflow UIのVolumesメニューから作成しておく。

In [16]:
!kubectl get pvc demo-vol-01

NAME          STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
demo-vol-01   Bound    pvc-1d664eec-7630-4164-ae76-823dff2ef304   3Gi        RWO            default        54m


コンパイルで生成されたYAMLを確認する。

In [None]:
!cat mnist_pipeline_and_predict.yaml

# パインプラインの作成（分割版）

トレーニングと推論のコードを、それぞれNGCのTensorFlowイメージでコンテナ化して利用する。

* コンテナは ACR に配置ずみ。（ServiceAccount default-editorに、ImagePullSecretが必要）
* PVC demo-vol-01 が必要

## 学習パイプライン

In [15]:
import kfp
from kfp import dsl
import kfp.components as comp
#import tensorflow as tf
#import numpy as np

def save_dataset():
    import tensorflow as tf
    import numpy as np
    import os

    save_dir = '/mnt/data/dataset'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
        
    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    
    np.savez_compressed(os.path.join(save_dir, 'mnist.npz'), x_train=x_train, y_train=y_train, x_test=x_test, y_test=y_test)
    
def preproc_dataset():
    import tensorflow as tf
    import numpy as np
    import os

    load_dir = '/mnt/data/dataset'
    save_dir = '/mnt/data/preproc_dataset'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with np.load(os.path.join(load_dir, 'mnist.npz'), allow_pickle=True) as data:
        x_train = data['x_train']
        y_train = data['y_train']
        x_test = data['x_test']
        y_test = data['y_test']

    x_train, x_test = x_train / 255.0, x_test / 255.0
    np.savez_compressed(os.path.join(save_dir, 'mnist_normalized.npz'), x_train=x_train, y_train=y_train, x_test=x_test, y_test=y_test)

def train_and_save_model():
    import tensorflow as tf
    import numpy as np
    import os

    load_dir = '/mnt/data/preproc_dataset'
    save_dir = '/mnt/data/saved_model'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with np.load(os.path.join(load_dir, 'mnist_normalized.npz'), allow_pickle=True) as data:
        x_train = data['x_train']
        y_train = data['y_train']
        x_test = data['x_test']
        y_test = data['y_test']

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
    model.fit(x_train, y_train, epochs=5)
    model.save(save_dir)

def test_model():
    import tensorflow as tf
    import numpy as np
    import os

    data_load_dir = '/mnt/data/preproc_dataset'
    model_load_dir = '/mnt/data/saved_model'
    with np.load(os.path.join(data_load_dir, 'mnist_normalized.npz'), allow_pickle=True) as data:
        x_test = data['x_test']
        y_test = data['y_test']

    model = tf.keras.models.load_model(model_load_dir)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
    model.evaluate(x_test, y_test, verbose=2)
    
def inference(pvc_name):
    import tensorflow as tf
    import numpy as np
    model = tf.keras.models.load_model(f'/mnt/data/saved_model')
    new_data = np.random.rand(28, 28)
    new_data = new_data.reshape(1, 28, 28) / 255.0
    predictions = model.predict(new_data)
    predicted_class = np.argmax(predictions, axis=1)
    print("Predicted class:", predicted_class)
    with open(f'/mnt/data/predictions.txt', 'w') as f:
        f.write("Predicted class: " + str(predicted_class[0]) + "\n")

# コンテナオペレーションを作成
save_dataset_op = comp.func_to_container_op(save_dataset, base_image='tensorflow/tensorflow:latest')
preproc_dataset_op = comp.func_to_container_op(preproc_dataset, base_image='tensorflow/tensorflow:latest')
train_and_save_op = comp.func_to_container_op(train_and_save_model, base_image='tensorflow/tensorflow:latest')
test_model_op = comp.func_to_container_op(test_model, base_image='tensorflow/tensorflow:latest')
#inference_op = comp.func_to_container_op(inference, base_image='tensorflow/tensorflow:latest')

# パイプラインに推論タスクを追加
@dsl.pipeline(
    name='Mnist Training and Prediction Pipeline',
    description='A pipeline that trains an MNIST model, saves it to PVC and makes a prediction.'
)

def mnist_pipeline(pvc_name='demo-vol-01'):
    save_dataset_task = save_dataset_op()
    save_dataset_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    
    preproc_dataset_task = preproc_dataset_op()
    preproc_dataset_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    preproc_dataset_task.after(save_dataset_task)
    
    train_and_save_task = train_and_save_op()
    train_and_save_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    train_and_save_task.after(preproc_dataset_task)

    test_model_task = test_model_op()
    test_model_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    test_model_task.after(train_and_save_task)
    
    #inference_task = inference_op(pvc_name)
    #inference_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    #inference_task.after(test_model_task)
    
# コンパイル
kfp.compiler.Compiler().compile(mnist_pipeline, 'mnist-training-pipeline.yaml')

YAMLが生成されたことを確認する。

In [None]:
!ls -l mnist-training-pipeline.yaml

YAMLファイルは、Kubeflow PipelinesのUIからアップロードする。

## 推論パイプライン

In [49]:
# 推論パイプライン
import kfp
from kfp import dsl
import kfp.components as comp
#import tensorflow as tf
#import numpy as np

def save_dataset_png():
    from PIL import Image
    import tensorflow as tf
    import os

    dir_path = '/mnt/data/images'
    os.makedirs(dir_path, exist_ok=True)

    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    for i in range(10):
        img = Image.fromarray(x_train[i])
        img.save(os.path.join(dir_path, f'test_{i}.png'))
        
def inference(pvc_name, image_file_path):       
    from PIL import Image
    import numpy as np
    import tensorflow as tf
    
    model_dir_path = '/mnt/data/saved_model'
    
    img = Image.open(image_file_path)
    img = img.convert('L').resize((28, 28))
    img_array = np.array(img) / 255.0
    img_array = img_array.reshape(1, 28, 28)

    model = tf.keras.models.load_model(model_dir_path)
    predictions = model.predict(img_array)
    predicted_class = np.argmax(predictions, axis=1)
    with open(f'/mnt/data/inference-number.txt', 'w') as f:
        f.write("Inference Number: " + str(predicted_class[0]) + "\n")

# コンテナオペレーションを作成
save_dataset_png_op = comp.func_to_container_op(save_dataset_png, base_image='kubeflowmnist2024001.azurecr.io/mnist-demo/mnist-inference:v1')
inference_op = comp.func_to_container_op(inference, base_image='kubeflowmnist2024001.azurecr.io/mnist-demo/mnist-inference:v1')

# パイプラインに推論タスクを追加
@dsl.pipeline(
    name='Mnist Inference Pipeline',
    description='Inference pipeline with an MNIST model.'
)

def mnist_pipeline(pvc_name='demo-vol-01', image_file_path = '/mnt/data/images/test_0.png'):
    save_dataset_png_task = save_dataset_png_op()
    save_dataset_png_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    
    inference_task = inference_op(pvc_name, image_file_path)
    inference_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    inference_task.after(save_dataset_png_task)
    
# コンパイル
kfp.compiler.Compiler().compile(mnist_pipeline, 'mnist-inference-pipeline.yaml')

## 推論パイプライン（Output 修正版）

In [53]:
# Output修正
# 推論パイプライン
import kfp
from kfp import dsl
import kfp.components as comp
#import tensorflow as tf
#import numpy as np

def save_dataset_png():
    from PIL import Image
    import tensorflow as tf
    import os

    dir_path = '/mnt/data/images'
    os.makedirs(dir_path, exist_ok=True)

    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    for i in range(10):
        img = Image.fromarray(x_train[i])
        img.save(os.path.join(dir_path, f'test_{i}.png'))
        
def inference(pvc_name, image_file_path) -> str:
    from PIL import Image
    import numpy as np
    import tensorflow as tf
    
    model_dir_path = '/mnt/data/saved_model'
    
    img = Image.open(image_file_path)
    img = img.convert('L').resize((28, 28))
    img_array = np.array(img) / 255.0
    img_array = img_array.reshape(1, 28, 28)

    model = tf.keras.models.load_model(model_dir_path)
    predictions = model.predict(img_array)
    predicted_class = np.argmax(predictions, axis=1)
    
    output_file = '/mnt/data/inference-number.txt'
    with open(output_file, 'w') as f:
        inference_output = "Inference Number: " + str(predicted_class[0]) + "\n"
        f.write(inference_output)
    return inference_output

# コンテナオペレーションを作成
save_dataset_png_op = comp.func_to_container_op(save_dataset_png, base_image='kubeflowmnist2024001.azurecr.io/mnist-demo/mnist-inference:v1')
inference_op = comp.func_to_container_op(inference, base_image='kubeflowmnist2024001.azurecr.io/mnist-demo/mnist-inference:v1')

# パイプラインに推論タスクを追加
@dsl.pipeline(
    name='Mnist Inference Pipeline',
    description='Inference pipeline with an MNIST model.'
)

def mnist_pipeline(pvc_name='demo-vol-01', image_file_path = '/mnt/data/images/test_0.png'):
    save_dataset_png_task = save_dataset_png_op()
    save_dataset_png_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    
    inference_task = inference_op(pvc_name, image_file_path)
    inference_task.add_pvolumes({f'/mnt/data': dsl.PipelineVolume(pvc=pvc_name)})
    inference_task.after(save_dataset_png_task)
    
# コンパイル
kfp.compiler.Compiler().compile(mnist_pipeline, 'mnist-inference-pipeline.yaml')

# パイプラインの作成（YAML のアップロード）

生成されたYAMLはローカルにダウンロードして、Kubeflow UIの「Pipelines (KFP)」からアップロードする。
* ブラウザで、Kubeflow UI を開く
* Pipelines 画面を開く
* 「Upload Pipeline」からアップロードする

# パイプラインの実行

パイプラインを実行する。
* Experimentsの作成（例：ex-01）
* 「Create Run」で、パイプラインを実行する。

# PVに保存されたデータの確認

PVをマウントしたPodを起動して、保存されたデータを確認する。

In [None]:
!kubectl apply -f pod.yaml

In [None]:
!kubectl get pod

In [None]:
!kubectl exec demo-pod  -- df /mnt

In [None]:
!kubectl exec demo-pod -- ls /mnt

In [None]:
!kubectl exec demo-pod  -- cat /mnt/data/inference-number.txt