In [1]:
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

kfp.__version__

'1.6.3'

In [2]:
import os

with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
    TOKEN = f.read()

In [3]:
from kubernetes import client as k8s_client


def attach_output_volume(op):
    """Attaches emptyDir volumes to container operations.
    See https://github.com/kubeflow/pipelines/issues/1654
    """

    # Handle auto-generated pipeline metadata
    op.output_artifact_paths['mlpipeline-ui-metadata'] = '/tmp/outputs/mlpipeline-ui-metadata.json'
    op.output_artifact_paths['mlpipeline-metrics'] = '/tmp/outputs/mlpipeline-metrics.json'

    # Add somewhere to store regular output
    op.add_volume(k8s_client.V1Volume(name='volume', empty_dir=k8s_client.V1EmptyDirVolumeSource()))
    op.container.add_volume_mount(k8s_client.V1VolumeMount(name='volume', mount_path='/output'))

    # func_to_container_op wants to store outputs under /tmp/outputs
    op.add_volume(
        k8s_client.V1Volume(name='outputs', empty_dir=k8s_client.V1EmptyDirVolumeSource())
    )
    op.container.add_volume_mount(
        k8s_client.V1VolumeMount(name='outputs', mount_path='/tmp/outputs')
    )
    
    # dataset store
    op.add_volume(
        k8s_client.V1Volume(name='data', empty_dir=k8s_client.V1EmptyDirVolumeSource())
    )
    op.container.add_volume_mount(
        k8s_client.V1VolumeMount(name='data', mount_path='/data')
    )

    return op

In [4]:
endpoint = 'http://ml-pipeline.kubeflow:8888'
kfp_client = kfp.Client(host=endpoint, existing_token=TOKEN)

In [5]:
import torch
import torch.nn as nn
from torchvision import datasets, models, transforms, utils

torch.__version__

'1.8.1+cpu'

In [6]:
from kubernetes import client as k8s_client
from functools import partial


torch_func_to_container_op = partial(
    func_to_container_op,
    base_image='bitnami/pytorch:1.8.1',
    packages_to_install=['kubernetes', 'requests']
)

In [7]:
@torch_func_to_container_op
def download_dataset(data_path: OutputPath = '/data'):
    from torchvision import datasets, transforms
    
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
    ])
    mnist_train = datasets.MNIST(
        root=data_path,
        train=True,
        download=True,
        transform=transform
    )


@dsl.pipeline(
   name='test-mnist-dataset-pipeline',
)
def test_mnist_dataset_pipeline():
    data_loading_task = download_dataset()
    dsl.get_pipeline_conf().add_op_transformer(attach_output_volume)

In [8]:
run = kfp_client.create_run_from_pipeline_func(
    test_mnist_dataset_pipeline,
    arguments={}
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)

http://ml-pipeline.kubeflow:8888/#/runs/details/3574b262-5b6d-43af-a357-3c1de4158023


In [21]:
kfp.compiler.Compiler().compile(test_dataset_pipeline, 'test-mnist-dataset.yaml')

In [14]:
transform = transforms.Compose([
    transforms.ToTensor(), # 轉為 Tensor
    transforms.Lambda(lambda x: x.repeat(3, 1, 1)), # 灰階轉為 RGB
])
mnist_train = datasets.MNIST(
    root='./data',       # 資料放置路徑
    train=True,          # 訓練資料集
    download=True,       # 自動下載
    transform=transform  # 轉換函數
)