# Example 1
---
kfp v2 | \
https://github.com/kubeflow/pipelines/tree/dd59f48cdd0f6cd7fac40306277ef5f3dad6e263/samples/v2

In [1]:
# Python 함수를 Component 로 바꿔주는 함수
# decorator 로도 사용할 수 있으며, 여러 옵션을 argument 로 설정할 수 있음
# add_op = create_component_from_func(
#                 func=add,
#                 base_image='python:3.7', # Optional : component 는 k8s pod 로 생성되며, 해당 pod 의 image 를 설정
#                 output_component_file='add.component.yaml', # Optional : component 도 yaml 로 compile 하여 재사용하기 쉽게 관리 가능
#                 packages_to_install=['pandas==0.24'], # Optional : base image 에는 없지만, python code 의 의존성 패키지가 있으면 component 생성 시 추가 가능
#             )
import kfp
from kfp.dsl import component as create_component_from_func
# from kfp.components import create_component_from_func

"""
kfp.components.create_component_from_func :
    Python 함수를 Component 로 바꿔주는 함수
    decorator 로도 사용할 수 있으며, 여러 옵션을 argument 로 설정할 수 있음
    
    add_op = create_component_from_func(
                func=add,
                base_image='python:3.7', # Optional : component 는 k8s pod 로 생성되며, 해당 pod 의 image 를 설정
                output_component_file='add.component.yaml', # Optional : component 도 yaml 로 compile 하여 재사용하기 쉽게 관리 가능
                packages_to_install=['pandas==0.24'], # Optional : base image 에는 없지만, python code 의 의존성 패키지가 있으면 component 생성 시 추가 가능
            )
"""


def add(value_1: int, value_2: int) -> int:
    """
    더하기
    """
    ret = value_1 + value_2
    return ret


def subtract(value_1: int, value_2: int) -> int:
    """
    빼기
    """
    ret = value_1 - value_2
    return ret


def multiply(value_1: int, value_2: int) -> int:
    """
    곱하기
    """
    ret = value_1 * value_2
    return ret


# Python 함수를 선언한 후, kfp.components.create_component_from_func 를 사용하여
# ContainerOp 타입(component)으로 convert
add_op = create_component_from_func(add)
subtract_op = create_component_from_func(subtract)
multiply_op = create_component_from_func(multiply)

from kfp.dsl import pipeline


@pipeline(name="add example")
def my_pipeline(value_1: int, value_2: int):
    task_1 = add_op(value_1=value_1, value_2=value_2)
    task_2 = subtract_op(value_1=value_1, value_2=value_2)

    # component 간의 data 를 넘기고 싶다면,
    # output -> input 으로 연결하면 DAG 상에서 연결됨

    # compile 된 pipeline.yaml 의 dag 파트의 dependency 부분 확인
    # uploaded pipeline 의 그래프 확인
    task_3 = multiply_op(value_1=task_1.output, value_2=task_2.output)

if __name__ == "__main__":
    # kfp dsl compile --py test.py --output pipeline_test.yaml
    kfp.compiler.Compiler().compile(
        my_pipeline,
        "pipeline_test.yaml"
    )

  return component_factory.create_component_from_func(


In [3]:
client = kfp.Client(
    host="http://ml-pipeline.kubeflow.svc.cluster.local:8888",
    ui_host="https://kubeflow.geniouslab.io/pipeline",
    # existing_token=TokenIssuer.get_token(),
    credentials=kfp.client.set_volume_credentials.ServiceAccountTokenVolumeCredentials(path='/var/run/secrets/kubernetes.io/serviceaccount/token'),
)
print(client.list_experiments())
run = client.create_run_from_pipeline_func(my_pipeline, arguments={"value_1": 123, "value_2": 31})
run

{'experiments': [{'created_at': datetime.datetime(2024, 8, 5, 1, 41, 16, tzinfo=tzlocal()),
                  'description': None,
                  'display_name': 'Default',
                  'experiment_id': '1357c4c9-d3d3-47da-9256-dfbfb15895b2',
                  'namespace': 'kubeflow-admin-space',
                  'storage_state': 'AVAILABLE'}],
 'next_page_token': None,
 'total_size': 1}


RunPipelineResult(run_id=70d2d2d0-2ef7-46ad-826a-b2faa4348813)

In [14]:
run.wait_for_run_completion()

{'created_at': datetime.datetime(2024, 8, 2, 6, 0, 19, tzinfo=tzlocal()),
 'description': None,
 'display_name': 'add-example 2024-08-02 06-00-19',
 'error': None,
 'experiment_id': 'cd08cddd-689c-4bb8-819c-d4be013c35a2',
 'finished_at': datetime.datetime(2024, 8, 2, 6, 0, 50, tzinfo=tzlocal()),
 'pipeline_spec': {'components': {'comp-add': {'executorLabel': 'exec-add',
                                               'inputDefinitions': {'parameters': {'value_1': {'parameterType': 'NUMBER_INTEGER'},
                                                                                   'value_2': {'parameterType': 'NUMBER_INTEGER'}}},
                                               'outputDefinitions': {'parameters': {'Output': {'parameterType': 'NUMBER_INTEGER'}}}},
                                  'comp-multiply': {'executorLabel': 'exec-multiply',
                                                    'inputDefinitions': {'parameters': {'value_1': {'parameterType': 'NUMBER_INTEGER'},
       

# Example 2
---
ref. \
https://github.com/kubeflow/pipelines/blob/dd59f48cdd0f6cd7fac40306277ef5f3dad6e263/samples/tutorials/Data%20passing%20in%20python%20components/Data%20passing%20in%20python%20components%20-%20Files.py

In [4]:
import kfp
from kfp.dsl import (
    component as create_component_from_func,
    InputPath,
    OutputPath
)
# from kfp.components import InputPath, OutputPath, create_component_from_func


# decorator 사용
@create_component_from_func
def write_file_op(
    # _path 라는 suffix 를 붙이고, type annotaion 은 OutputPath 로 선언
    data_output_path: OutputPath("Dict")
):
    # package import 문은 함수 내부에 선언
    import json

    # dict data 선언
    data = {
        "a": 300,
        "b": 10,
    }

    # file write to data_output_path
    with open(data_output_path, "w") as f:
        json.dump(data, f)


@create_component_from_func
def read_file_and_multiply_op(
    # input 역시, _path 라는 suffix 를 붙이고, type annotation 은 InputPath 로 선언
    data_input_path: InputPath("Dict")
) -> float:
    # package import 문은 함수 내부에 선언
    import json

    # file read to data_output_path
    print(data_input_path)
    with open(data_input_path, "r") as f:
        data = json.load(f)

    # multiply
    result = data["a"] * data["b"]

    print(f"Result: {result}")

    return result


@kfp.dsl.pipeline(name="Data Passing by File Example")
def data_passing_file_pipeline():
    write_file_task = write_file_op()
    # _ = read_file_and_multiply_op(write_file_task.outputs["data_output"])
    _ = read_file_and_multiply_op(data_input_path=write_file_task.outputs["data_output_path"])


if __name__ == "__main__":
    kfp.compiler.Compiler().compile(
        data_passing_file_pipeline,
        "./data_passing_file_pipeline.yaml"
    )

In [5]:
client = kfp.Client(
    host="http://ml-pipeline.kubeflow.svc.cluster.local:8888",
    ui_host="http://kubeflow.geniouslab.io/pipeline",
    # existing_token=TokenIssuer.get_token(),
    credentials=kfp.client.set_volume_credentials.ServiceAccountTokenVolumeCredentials(path='/var/run/secrets/kubernetes.io/serviceaccount/token'),
)
print(client.list_experiments())
run = client.create_run_from_pipeline_func(data_passing_file_pipeline, arguments={})
run

{'experiments': [{'created_at': datetime.datetime(2024, 8, 5, 1, 41, 16, tzinfo=tzlocal()),
                  'description': None,
                  'display_name': 'Default',
                  'experiment_id': '1357c4c9-d3d3-47da-9256-dfbfb15895b2',
                  'namespace': 'kubeflow-admin-space',
                  'storage_state': 'AVAILABLE'}],
 'next_page_token': None,
 'total_size': 1}


RunPipelineResult(run_id=fa2ec802-91d7-41ff-a8dd-e8ab30b8c2b9)

# Example 3

In [15]:
import kfp
from kfp.dsl import (
    component as create_component_from_func,
    OutputPath
)
# from kfp.components import OutputPath, create_component_from_func


@create_component_from_func
def export_metric_op(
    mlpipeline_metrics_path: OutputPath("Metrics"),
):
    # package import 문은 함수 내부에 선언
    import json

    # 아래와 같이 정해진 형태로, key = "metrics", value = List of dict
    # 단, 각각의 dict 는 "name", "numberValue" 라는 key 를 가지고 있어야 함
    # "name" 의 value 로 적은 string 이 ui 에서 metric 의 name 으로 parsing 됨
    # 예시이므로, 특정 모델에 대한 값을 직접 계산하지 않고 const 로 작성
    metrics = {
        "metrics": [
            # 개수는 따로 제한이 없음. 하나의 metric 만 출력하고 싶다면, 하나의 dict 만 원소로 갖는 list 로 작성
            {
                "name": "auroc",
                "numberValue": 0.8,  # 당연하게도 scala value 를 할당받은 python 변수를 작성
            },
            {
                "name": "f1",
                "numberValue": 0.9,
                "format": "PERCENTAGE",
                # metrics 출력 시 포맷을 지정 가능하며, Default 는 "RAW" 이며 PERCENTAGE 를 사용할 수도 있음
            },
        ],
    }

    # 위의 dict 타입의 변수 metrics 를 mlpipeline_metrics_path 에 json.dump 
    with open(mlpipeline_metrics_path, "w") as f:
        json.dump(metrics, f)


@kfp.dsl.pipeline(name="Export Metrics Example")
def export_metrics_pipeline():
    write_file_task = export_metric_op()


if __name__ == "__main__":
    kfp.compiler.Compiler().compile(
        export_metrics_pipeline,
        "./export_metrics_pipeline.yaml"
    )

In [16]:
client = kfp.Client(
    host="http://ml-pipeline.kubeflow.svc.cluster.local:8888",
    ui_host="http://kubeflow.geniouslab.io/pipeline",
    # existing_token=TokenIssuer.get_token(),
    credentials=kfp.client.set_volume_credentials.ServiceAccountTokenVolumeCredentials(path='/var/run/secrets/kubernetes.io/serviceaccount/token'),
)
print(client.list_experiments())
run = client.create_run_from_pipeline_func(export_metrics_pipeline, arguments={})
run

{'experiments': [{'created_at': datetime.datetime(2024, 8, 2, 5, 48, 53, tzinfo=tzlocal()),
                  'description': None,
                  'display_name': 'Default',
                  'experiment_id': 'cd08cddd-689c-4bb8-819c-d4be013c35a2',
                  'namespace': 'kubeflow-admin-space',
                  'storage_state': 'AVAILABLE'}],
 'next_page_token': None,
 'total_size': 1}


RunPipelineResult(run_id=fb8330a9-2a13-4d63-9743-ff8a5360cd93)