# 파이프라인 (데이터 전달)

파이프라인 구성에 있어서 데이터 전달은 필수적인 부분입니다.

kubeflow pipeline에서 파이프라인 작성자는 직접 생성한 인스턴스(task)를 연결하여 파이프라인을 구성합니다. 
이때 인스턴스의 출력을 다른 인스턴스의 입력에 인수로 전달하여 인스턴스 간의 연결을 구성합니다.
데이터의 크기 및 종류에 따라 시스템은 데이터를 저장하고 파이프라인 흐름에 따라 인스턴스에 전달합니다.

기본적으로 작은 크기의 데이터 전달에 대한 구성을 살펴본 후 학습 데이터 같은 큰 데이터 전달에 대한 예제를 확인합니다.

In [None]:
from typing import NamedTuple

import kfp
from kfp import components, dsl
from kfp.components import InputPath, OutputPath
from kfp.components import func_to_container_op

## 간단한 데이터 전달

파이프라인 구성 시 간단한 인수를 전달하여 구성할 수 있습니다.
이때 간단한 인수는 몇 킬로바이트를 초과하지 않은 작은 데이터입니다.list, dict, JSON 형태도 적은 양의 데이터라면 간단한 인수 전달로 사용할 수 있습니다. 데이터 인수는 전달할 때 string 형태로 변환합니다.

내장 변환기가 있어 일반적으로 str, int, float, bool, list, dict 형태를 지원하며 다른 형태의 데이터는 직접 변환해야 합니다.



### 출력 예제

In [3]:
@func_to_container_op
def print_small_text(text: str):
    '''Print small text'''
    print(text)
    
@dsl.pipeline(name='hello world example')
def test_example_pipeline():
    '''Pipeline that passes small constant string to to consumer'''
    print_task = print_small_text('Hello world') # Passing constant as argument to consumer

간단히 인수를 받으면 출력하는 컴포넌트와 파이프라인을 구성합니다.

In [5]:
kfp.compiler.Compiler().compile(test_example_pipeline, 'hello_world.yaml')

구성 후 yaml 및 tar.gz 형태로 파이프라인 구성 파일을 생성합니다.
생성된 파일은 kubeflow UI에서 파이프라인 생성을 통해 업로드 합니다.

### 인수 전달

인수 전달 시 데이터 형태가 정의되어야 하며 여러 인자를 입력 및 출력할 수 있습니다.

In [None]:
@func_to_container_op
def one_small_output_str() -> str:
    return 'one_output'

@func_to_container_op
def one_small_output_int() -> int:
    return 111

@func_to_container_op
def two_small_outputs() -> NamedTuple('Outputs', [('text', str), ('number', int)]):
    return ("two_output", 222)

출력 데이터 형태를 정의하여 컴포넌트 반환값을 입력합니다. 기존 출력 컴포넌트는 str 형태만을 출력하기 때문에 int 형태의 반환 값을 확인하기 어렵습니다. 아래와 같이 int 형태 출력 컴포넌트도 추가해줍니다.

In [None]:
@func_to_container_op
def print_small_int(text: int):
    '''Print small text'''
    print(text)

In [None]:
@dsl.pipeline(name='data passing')
def data_passing_pipeline():
    '''Pipeline that passes small constant string to to consumer'''
    single_small_output1_task = one_small_output_str()
    single_small_output2_task = one_small_output_int()

    tuple_small_output_task = two_small_outputs()

    print_one1_task = print_small_text(single_small_output1_task.output)
    print_one2_task = print_small_int(single_small_output2_task.output)

    print_two1_task = print_two_arguments(tuple_small_output_task.outputs['text'], tuple_small_output_task.outputs['number'])
    print_two1_task = print_two_arguments(single_small_output1_task.output, single_small_output2_task.output)

컴포넌트를 모두 정의하였다면 파이프라인을 구성합니다. 컴포넌트의 반환값은 직접 작성해야 합니다.

In [None]:
kfp.compiler.Compiler().compile(data_passing_pipeline, 'data_passing.yaml')

## 파일 전달

kubeflow를 통해 머신 러닝을 사용하기 위해서는 큰 용량의 데이터 전달은 필수적입니다.
파이프라인을 구성할 때 큰 데이터는 파일로 저장하고 불러옵니다. 이때 InputPath와 OutputPath를 활용합니다.
파일 경로를 전달하여 대용량 데이터를 저장하고 읽어옵니다.

InputPath 및 OutputPath는 type 인수를 지정하여 데이터의 유형을 지정할 수 있습니다. 본 예제에서는 간단한 Fashion-MNIST 데이터셋을 활용하기 때문에 Dataset을 활용합니다.

### Fashion-MNIST 데이터 불러오기

In [None]:
def train_data_load(
        output_dataset_train_data: OutputPath('Dataset')
):
    import tensorflow as tf
    import pandas as pd
    import pickle

    fashion_mnist = tf.keras.datasets.fashion_mnist
    (train_images, train_labels), (_, _) = fashion_mnist.load_data()

    df = pd.DataFrame(columns=['image', 'label'])
    for i, image in enumerate(train_images):
        df.loc[i] = ({'image': image, 'label': train_labels[i]})

    with open(output_dataset_train_data, 'wb') as f:
        pickle.dump(df, f, pickle.HIGHEST_PROTOCOL)

학습 데이터를 불러오는 컴포넌트를 생성하기 위한 함수를 작성합니다. 컴포넌트는 컨테이너로 존재하기 때문에 필요한 패키지는 함수 안에 작성합니다.
pandas를 활용하여 dataframe 형태로 데이터를 저장합니다.

In [None]:
train_data_load_op = components.create_component_from_func(
    train_data_load, base_image='tensorflow/tensorflow',
    packages_to_install=['pandas==1.4.2']
)

컴포넌트를 생성할 때 직접 작성했던 패키지 목록과 기본이 되는 이미지를 작성합니다. 본 예제에서는 tensorflow를 활용하기 때문에 tensorflow 이미지를 불러옵니다.

### 데이터 전처리

In [None]:
def preprocess(
        pre_data:InputPath('Dataset'),
        data: OutputPath('Dataset')
):
    import numpy as np
    import pickle
    import pandas as pd

    images = []
    labels = []
    with open(pre_data, 'rb') as file:
        tr_data = pickle.load(file)

    for i, item in enumerate(tr_data['image']):
        images.append(item)
        labels.append(tr_data['label'][i])
    images = np.array(images)
    labels = np.array(labels)

    images = images/255.0

    df = pd.DataFrame(columns=['image', 'label'])
    for i, image in enumerate(images):
        df.loc[i] = ({'image': image, 'label': labels[i]})

    with open(data, 'wb') as f:
        pickle.dump(df, f, pickle.HIGHEST_PROTOCOL)
        
preprocess_op = components.create_component_from_func(
    preprocess, base_image='python:3.9',
    packages_to_install=['numpy==1.23.2', 'pandas==1.4.2']
)

이미지 데이터를 normalization 하는 컴포넌트를 생성합니다. 실제 파일 입출력처럼 경로에 있는 파일을 읽어와 데이터를 처리하고 파일로 다시 저장하는 코드를 직접 작성합니다.

In [None]:
@dsl.pipeline(name='data load example')
def fashion_mnist_pipeline():
    train_data_load_task = train_data_load_op()
    preprocess_task = preprocess_op(
        train_data_load_task.outputs['output_dataset_train_data']
    )
    
kfp.compiler.Compiler().compile(fashion_mnist_pipeline, 'fashion_mnist_data_load.yaml')

인스턴스를 생성하여 인수 전달과 같이 파이프라인을 구성한 후 실행합니다.