# Mnistテーマにパイプラインを作成

In [1]:
#timestamp generate
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

#GCP
from google.cloud import storage as gcs
import google.cloud.aiplatform as aip
from typing import NamedTuple


# kubeflow SDK
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)

In [2]:
params = {
    "project_id": "test-hyron",
    "bucket_name": "aruha-mnist",
    "pipeline_root": "gs://aruha-mnist/pipeline_log", 
    
    "model_url": "gs://aruha-mnist/mlpos/mlpos/model",
    "code_url": "gs://aruha-mnist/mlpos/mlpos/code",
    
    "train_url":"gs://aruha-mnist/mlpos/mlpos/data/mnist_train.csv",
    "test_url" :"gs://aruha-mnist/mlpos/mlpos/data/mnist_test.csv",
    "valid_proportion": 0.2,
    "TIMESTAMP": TIMESTAMP,
    
    "epochs" : 5,
    "batch_size" : 32
}

In [3]:
project_id = params["project_id"]
client = gcs.Client(project_id)

bucket_name = params["bucket_name"]
bucket = client.get_bucket(bucket_name)


# コンポーネントの定義  


In [14]:


@component(base_image='tensorflow/tensorflow:latest')
def preprocess_conduct(
    project_id: str,
    bucket_name: str,
    train_url:str,
    test_url :str,
    valid_proportion: float,
    
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    valid_dataset: Output[Dataset]
):
    import numpy as np
    import tensorflow as tf
    from google.cloud import storage as gcs
    from io import BytesIO
    
    
    project_id = project_id
    client = gcs.Client(project_id)
    bucket = client.get_bucket(bucket_name)
    
    def data_load_from_url(bucket, bucket_name, url):
        path_ = url.split("gs://" + bucket_name + "/")[1]
        blob = bucket.blob(path_)
        content = blob.download_as_bytes()
        output = np.loadtxt(BytesIO(content), delimiter=',')
        
        return output
        
    train = data_load_from_url(bucket, bucket_name, train_url)
    test  = data_load_from_url(bucket, bucket_name, test_url)
    
    np.random.shuffle(train)
    np.random.shuffle(test)
    
    #時間短縮のため
    train = train[:10000]
    test = test[:1000]

    valid_data  = train[:int(len(train) * (valid_proportion))]
    train_data  = train[int(len(train) * (valid_proportion)) + 1:]
    test_data   = test
    
    #ファイルの受け渡しはsave / loadによってなされる。kubeflowは保存場所を提供しているにすぎない。
    _train_dataset = tf.data.Dataset.from_tensor_slices(( train_data[:, 1:]/255.0, train_data[:, 0].reshape(-1, 1)))
    _test_dataset = tf.data.Dataset.from_tensor_slices(( test_data[:, 1:]/255.0, test_data[:, 0].reshape(-1, 1)))
    _valid_dataset = tf.data.Dataset.from_tensor_slices(( valid_data[:, 1:]/255.0, valid_data[:, 0].reshape(-1, 1)))
    
    
    _train_dataset.save(train_dataset.path, compression=None, shard_func=None, checkpoint_args=None)
    _test_dataset.save(test_dataset.path, compression=None, shard_func=None, checkpoint_args=None)
    _valid_dataset.save(valid_dataset.path, compression=None, shard_func=None, checkpoint_args=None)
    
    

In [15]:
@component(base_image='tensorflow/tensorflow:latest')
def train_model_conduct(
    train_dataset :Input[Dataset],
    valid_dataset :Input[Dataset],
    
    batch_size    :int,
    epochs        :int,
    
    model         :Output[Artifact]

):
    import tensorflow as tf
    import os
    import shutil
    import csv
    
    class Simple_CNN(tf.keras.Model):
        def __init__(self):
            super().__init__()
            self.conv_2d_0 = tf.keras.layers.Conv2D(32, (4,4), activation = "relu", input_shape = (28,28,1))
            self.pooling_0 = tf.keras.layers.MaxPooling2D((2, 2))

            self.conv_2d_1 = tf.keras.layers.Conv2D(32, (4,4), activation = "relu")
            self.pooling_1 = tf.keras.layers.MaxPooling2D((2, 2))

            self.conv_2d_2 = tf.keras.layers.Conv2D(32, (4,4), activation = "relu")

            self.flatten = tf.keras.layers.Flatten()

            self.dense_0 = tf.keras.layers.Dense(64, activation = "relu")
            self.dense_1 = tf.keras.layers.Dense(10, activation = "softmax")


        def call(self, inputs):
            x = tf.reshape(inputs, [-1,28,28,1])
            x = self.pooling_0(self.conv_2d_0(x))
            x = self.pooling_1(self.conv_2d_1(x))
            x = self.conv_2d_2(x)

            x = self.flatten(x)
            x = self.dense_0(x)
            x = self.dense_1(x)

            return x
        
    def train_model(batch_size,epochs, train_dataset, valid_dataset):
        model = Simple_CNN()
        model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy']
        )
        
        history = model.fit(
            train_dataset,
            epochs = epochs, 
            validation_data = valid_dataset
        )

        return model, history
    
    _train_dataset = tf.data.Dataset.load(train_dataset.path)
    _valid_dataset = tf.data.Dataset.load(valid_dataset.path)
    
    _model, _history = train_model(batch_size, epochs,_train_dataset, _valid_dataset)
    
    _model.save(model.path)
        

In [32]:
@component(base_image='tensorflow/tensorflow:latest')
def test_model_conduct(
    test_dataset: Input[Dataset], 
    model       : Input[Artifact],
    
    loss        : Output[Artifact],
    accuracy    : Output[Artifact]
):
    import tensorflow as tf
    
    _test_dataset = tf.data.Dataset.load(test_dataset.path)
    
    _model = tf.keras.models.load_model(model.path)
        
    _loss, _accuracy = _model.evaluate(_test_dataset)
    
    with open(loss.path, "w") as f:
        f.write(str(_loss))
        
    with open(accuracy.path, "w") as f:
        f.write(str(_accuracy))
    

# パイプラインの定義

In [34]:
@dsl.pipeline(
    pipeline_root=params["pipeline_root"],
    name="example-pipeline",
) 
def pipeline(
    project_id: str,
    bucket_name: str,
    pipeline_root: str, 
    
    model_url: str,
    code_url: str,
    
    train_url:str,
    test_url :str,
    
    valid_proportion: float,
    TIMESTAMP: str,
    
    batch_size: int,
    epochs : int
):
    # データの前処理など
    preprocess_task = preprocess_conduct(
        project_id, 
        bucket_name, 
        train_url,
        test_url,
        valid_proportion
    )
    
    # 
    train_model_task = train_model_conduct(
        train_dataset = preprocess_task.outputs["train_dataset"],
        valid_dataset = preprocess_task.outputs["valid_dataset"],

        batch_size    = batch_size,
        epochs        = epochs
    )

    test_model_task = test_model_conduct(
        test_dataset = preprocess_task.outputs["test_dataset"], 
        model        = train_model_task.outputs["model"]
    )
    

#     save_model_task = save_model_conduct(
#         model_dir  = model_dir,
#         model_name = model_name,
#         base_dir   = base_dir,
#         code_dir   = code_dir,

#         model      = train_model_task.outputs["model"],
#         TIMESTAMP  = TIMESTAMP,
#         loss       = test_model_task.outputs["loss"],
#         accuracy   = test_model_task.outputs["Accuracy"]
#     )

    
    
    

# コンパイル/実行

In [35]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="test_pipeline.json"
)

In [36]:
DISPLAY_NAME = "test" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="test_pipeline.json",
    pipeline_root=params["pipeline_root"],
    parameter_values= params,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/784694072347/locations/us-central1/pipelineJobs/example-pipeline-20221120130857
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/784694072347/locations/us-central1/pipelineJobs/example-pipeline-20221120130857')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/example-pipeline-20221120130857?project=784694072347
PipelineJob projects/784694072347/locations/us-central1/pipelineJobs/example-pipeline-20221120130857 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/784694072347/locations/us-central1/pipelineJobs/example-pipeline-20221120130857 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/784694072347/locations/us-central1/pipelineJobs/example-pipeline-20221120130857 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob run completed. Resource name: projects/784694072347/l

# 今後対応したいこと

- ディレクトリの構成などをもうすこしちゃんと定義
- 結果をcsv出力してIBツールで確認できるように
- 学習/検証など主要部分だけ