In [None]:
# preprocess_data_pipeline.ipynb
def count_raw_images(raw_path: str) -> int:
    import fnmatch
    import os
    return len(fnmatch.filter(os.listdir(raw_path), '*.png'))

In [None]:
# preprocess_data_pipeline.ipynb
def preprocess_raw_images(raw_path: str,
                          save_path: str,
                          size: int = 1000) -> int:
    import datetime
    import fnmatch
    import numpy as np
    import os
    from PIL import Image

    img_rows, img_cols = 28, 28

    files = fnmatch.filter(os.listdir(raw_path), '*.png')
    files.sort(key=lambda fn: os.path.getmtime(os.path.join(raw_path, fn)))
    target_files = files[:size].copy()

    x_train = []
    y_train = []
    for i, name in enumerate(target_files):
        img = Image.open(os.path.join(raw_path, name))
        img_array = np.array(img)
        x_train.append(img_array.reshape(img_rows, img_cols))
        y_train.append(int(name[0]))

    x_train = np.array(x_train)
    # Save numpy arr to npz
    date_postfix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    os.makedirs(save_path, exist_ok=True)
    npz_name = f"{date_postfix}-{size}.npz"
    npz_save_path = f"{save_path}/{npz_name}"
    np.savez(npz_save_path, x_train=x_train, y_train=y_train)
    
    # delete image 
    for i, name in enumerate(target_files):
        file_path = os.path.join(raw_path, name)
        if os.path.isfile(file_path):
            os.remove(file_path) 
    
    return npz_save_path

In [None]:
# preprocess_data_pipeline.ipynb
print(preprocess_raw_images("new_dataset/raw", "new_dataset/train"))

In [None]:
# preprocess_data_pipeline.ipynb
import kfp
from kfp import dsl
from kfp.components import func_to_container_op

def disable_cache(op):
    op.execution_options.caching_strategy.max_cache_staleness = "P0D"  

def preprocess_data_pipeline(raw_path: str = "/notebook/new_dataset/raw", 
                             save_path: str = "/notebook/new_dataset/train", 
                             size: int = 1000):
    
    notebook_vol = dsl.PipelineVolume(pvc="workspace-handson")
    count_op = func_to_container_op(count_raw_images) 
    preprocess_op = func_to_container_op(preprocess_raw_images,
                                         packages_to_install=["numpy", "pillow"]) 
    
    count = count_op(raw_path).add_pvolumes(pvolumes={"/notebook": notebook_vol})
    disable_cache(count)
    with dsl.Condition(count.output > size):
        preprocess = preprocess_op(raw_path, save_path, size) \
            .add_pvolumes(pvolumes={"/notebook": notebook_vol})
        disable_cache(preprocess)

In [None]:
# preprocess_data_pipeline.ipynb
arguments = {"raw_path": "/notebook/new_dataset/raw",
             "save_path": "/notebook/new_dataset/train"}

client = kfp.Client()
client.create_run_from_pipeline_func(preprocess_data_pipeline, 
                                     experiment_name="preprocessed_data",
                                     arguments=arguments)   

In [None]:
# preprocess_data_pipeline.ipynb
kfp.compiler.Compiler().compile(
    pipeline_func=preprocess_data_pipeline,
    package_path='preprocess_data_pipeline.yaml')

client.upload_pipeline(pipeline_name="preprocess_data_pipeline",
                       description="Convert raw images to npz",
                       pipeline_package_path="preprocess_data_pipeline.yaml")