# Hello Pipeline

kubweflow pipeline의 첫번째 예제로 'Hello Pipeline'을 출력하는 pipeline을 만들어보겠습니다.  
pipeline 이라고하면 일련의 작업을 workflow로 작업할 수 있도록해야하지만,   
우선 하나의 작업만 pipeline으로 실행해보겠습니다.  

먼저 pipeline 실행에 필요한 패키지를 import합니다.

In [1]:
import kfp
from kfp import dsl, compiler, components

### component 정의

이제 pipeline으로 실행할 작업 단위로 component를 정의합니다.  
component는 kubeflow pipeline이 pipeline을 진행하는 하나의 작업 단위입니다.  
component를 정의할 때 중요한 것은 하나의 function으로 작업을 완결해야 한다는 것입니다.  
물론 function 내부에 subfunction은 있을 수 있습니다.  

@components decorator는 아래 function을 kubeflow pipeline component로 정의합니다.

In [2]:
@components.create_component_from_func
def hello(message: str) -> str:
    '''print input message'''
    print(message)
    return message

### pipeline 정의

component 정의가 끝났으면, 이제는 pipeline 즉 워크플로우를 정의합니다.  
pipeline은 앞에서 정의한 component를 이용하여   
각 작업에 필요한 component와 component 실행에 필요한 input, output data를 지정하고,  
데이터의 의존성에 따른 작업의 순서를 결정합니다.

component간에 input, output data를 주고받는 방법에 대해서는 다음에 자세히 설명하겠습니다.

@dsl.pipeline 데코레이터를 사용하여 pipeline을 정의합니다.  
pipeline 컴포넌트에 argument를 지정하여 instance를 지정하는 것으로 작업을 정의합니다.
컴포넌트의 인스턴스 하나는 k8s에서 하나의 pod을 생성하여 작업합니다.

여기서는 message를 인자로 받아서 이를 출력하고, 리턴하는 작업을 하게됩니다.

In [3]:
@dsl.pipeline(
    name='hello pipeline',
    description='print message'
)
def hello_pipeline(
    message: str = 'print message'
):
    #Passing pipeline parameter and a constant value as operation arguments
    hello_task = hello(message) #Returns a dsl.ContainerOp class instance. 

### pipeline 실행 - 1

pipeline을 실행하는 방법은 몇 가지가 있습니다.  
첫번째로 kfp sdk의 create_run_from_pipeline_func를 이용하여 바로 실행시키는 방법입니다.  
create_run_from_pipeline_func 함수에 pipeline 함수와, 함수에 전달할 argument를 지정합니다.  

아래의 details 링크는 현재 namu 시스템에서는 사용할 수 없습니다.  
namu의 pipeline 화면을 참고하십시오. 

In [4]:
# Specify pipeline argument values
arguments = {'message': 'hello kubeflow pipeline'}
# Launch a pipeline run given the pipeline function definition
kfp.Client().create_run_from_pipeline_func(hello_pipeline, arguments=arguments)
# The generated links below lead to the Experiment page and the pipeline run details page, respectively

KeyboardInterrupt: 

NAMU Pipeline 메뉴에 실행한 pipeline이 표시됩니다.  

<img src="../images/pipeline-01.png" width="800px">


보기 버튼을 누르면 pipeline의 그래프를 표시합니다.  
그래프의 각 노드를 선택하면, 노드의 Inpit, Output 데이터와 Visualization, Log 등을 확인할 수 있습니다.

<img src="../images/pipeline-02.png" width="800px">


### pipeline 실행 - 2

두번째로는 pipeline 정의를 컴파일하여 .yaml 파일을 만들고,  
.yaml 파일을 실행시키는 방법입니다.  

pipeline를 컴파일하기 위해서 kfp sdk의 compile 함수를 호출합니다.  
create_experiment 함수를 호출합니다.
NAMU에서는 개인 experiment는 "Default"를 사용하기로 약속하였습니다.  
마지막으로 .yaml 파일을 run_pipeline의 argument로 지정하면 pipeline이 실행됩니다.  
  
역시 아래의 details 링크는 현재 namu 시스템에서는 사용할 수 없습니다.
namu의 pipeline 화면을 참고하십시오. 

In [5]:
# Compile the pipeline
pipeline_func = hello_pipeline
pipeline_filename = pipeline_func.__name__ + '.yaml'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

In [6]:
# Get or create an experiment
client = kfp.Client()
experiment = client.create_experiment("Default")

ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'content-type': 'application/json', 'date': 'Fri, 18 Nov 2022 02:30:34 GMT', 'x-envoy-upstream-service-time': '4', 'server': 'envoy', 'transfer-encoding': 'chunked'})
HTTP response body: {"error":"Internal error: Unauthenticated: Request header error: there is no user identity header.: Request header error: there is no user identity header.\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).canAccessExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:249\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:148\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:150\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581","code":13,"message":"Internal error: Unauthenticated: Request header error: there is no user identity header.: Request header error: there is no user identity header.\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).canAccessExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:249\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:148\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:150\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581","details":[{"@type":"type.googleapis.com/api.Error","error_message":"Internal error: Unauthenticated: Request header error: there is no user identity header.: Request header error: there is no user identity header.\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).canAccessExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:249\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:148\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:150\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581","error_details":"Internal error: Unauthenticated: Request header error: there is no user identity header.: Request header error: there is no user identity header.\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).canAccessExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:249\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:148\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581\nFailed to authorize with API resource references\ngithub.com/kubeflow/pipelines/backend/src/common/util.Wrap\n\t/go/src/github.com/kubeflow/pipelines/backend/src/common/util/error.go:287\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*ExperimentServer).ListExperiment\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/experiment_server.go:150\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler.func1\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1089\nmain.apiServerInterceptor\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/interceptor.go:30\ngithub.com/kubeflow/pipelines/backend/api/go_client._ExperimentService_ListExperiment_Handler\n\t/go/src/github.com/kubeflow/pipelines/backend/api/go_client/experiment.pb.go:1091\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1282\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:1616\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.44.0/server.go:921\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1581"}]}


In [7]:
# Specify pipeline argument values
arguments = {'message': 'hello kubeflow pipeline'}

# Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

# The generated link below leads to the pipeline run information page.

NameError: name 'experiment' is not defined