In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: using Kuberflow pipelines (KFP)

## 설치

필요한 library 설치

In [None]:
! pip3 install --upgrade google-cloud-aiplatform \
                         google-cloud-storage \
                         kfp \
                         google-cloud-pipeline-components

## 환경 설정

Project ID와 Region 설정 

In [None]:
!gcloud config list

In [None]:
PROJECT_ID = "{your project id}" 
REGION = "us-central1"

# Project ID 세팅
! gcloud config set project {PROJECT_ID}


### - Cloud Storage bucket 생성

dataset이나 artifact를 저장하기 위한 bucket을 생성

In [None]:
BUCKET_URI = f"gs://fs-practice-{PROJECT_ID}"  # @param {type:"string"}

In [None]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

### - Service Account 설정
Service Account가 Bucket에 접근할 수 있도록 설정

In [None]:
shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

## 주요 Library Import (kuberflow pipeline) 


In [None]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import compiler, dsl
from kfp.dsl import component

Vertex AI API Endpoint 설정

In [None]:
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

Vertext AI Pipeline root 설정

In [None]:
PIPELINE_ROOT = "{}/pipeline_root/fc_first_pipeline".format(BUCKET_URI)

Initialize AI platform object

In [None]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Python function-based Pipeline Component 설정 
### Python 함수 기반 파이프라인 컴포넌트 정의
이 튜토리얼에서는 세 단계로 구성된 간단한 파이프라인을 정의하며 각 단계는 컴포넌트로 정의.

#### 1. hello_world 컴포넌트 정의

먼저 매우 간단한 Python 함수를 기반으로 하는 컴포넌트를 정의. 이 함수는 문자열 입력 매개변수를 사용하고 해당 값을 출력으로 반환.

@component 데코레이터의 사용에 주목. 이 데코레이터는 정의한 함수를 KFP(Kuberflow Pipeline) 컴포넌트로 컴파일. 예를 들어, 이 예제에서는 컴포넌트에 사용할 기본 이미지(python:3.9) 및 컴포넌트 YAML 파일인 hw.yaml을 지정. 컴파일된 컴포넌트 사양이 이 파일에 작성.



In [None]:
@component(base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text


compiler.Compiler().compile(hello_world, "hw.yaml")

#### 2. two_outputs 컴포넌트 정의

아래의 첫 번째 컴포넌트, two_outputs,은 패키지 설치를 보여줌. 이 경우에는 google-cloud-storage 패키지를 설치. 

*참고*: 실제로 컴포넌트 함수는 이 패키지를 사용하지 않음.

또는 필요한 설치 사항이 포함된 기본 이미지를 지정할 수도 있음. two_outputs 컴포넌트는 두 개의 명명된 출력을 반환합니다.

In [None]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

#### 3. three_nicemeet_outputs 컴포넌트 정의

In [None]:
@component(base_image="python:3.9")
def three_nicemeet_outputs(name: str) -> str:
    result_string = "Nice to meet you! "+ name
    print(result_string)
    return result_string


compiler.Compiler().compile(three_nicemeet_outputs, "hw.yaml")

#### 4. Consumer 컴포넌트 정의

네 번째 컴포넌트인 consumer는 세 개의 문자열 입력을 받아서 출력으로 출력.

In [None]:
@component
def consumer(text1: str, text2: str, text3: str, text4: str) -> str:
    result = f"text1-> {text1}, text2-> {text2}, text3-> {text3}, text4-> {text4}"
    print(result)
    return result

#### 5. 컴포넌트를 사용하는 파이프라인 정의
이제 이 네 개의 컴포넌트를 사용하는 파이프라인을 정의.

위의 컴포넌트 정의를 평가함으로써 파이프라인 정의에서 사용되는 작업 팩토리 함수를 생성. 이러한 함수들은 파이프라인 단계를 생성하는 데 사용.

파이프라인은 입력 매개변수를 가져와서 이 매개변수를 첫 번째 세 파이프라인 단계 (hw_task 및 two_outputs_task)에 인수로 전달.
그런 다음 네 번째 파이프라인 단계 (consumer_task)에서는 첫 번째와 두 번째 단계의 출력을 사용. hello_world 컴포넌트 정의는 하나의 무명 출력만 반환하므로 hw_task.output로 참조. two_outputs 작업은 두 개의 명명된 출력을 반환하며, 이를 two_outputs_task.outputs["<output_name>"]로 액세스.

*참고:* @dsl.pipeline 데코레이터에서는 PIPELINE_ROOT 클라우드 스토리지 경로를 정의하고 있음. 여기에 이 정보를 포함하지 않았다면, 파이프라인 실행을 생성할 때 지정해야 했을 것임. 아래에서 볼 수 있듯이 이 정보가 필요.

In [None]:
@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there", name: str = "fc ryan"):
    hw_task = hello_world(text=text)
    two_outputs_task = two_outputs(text=text)
    three_outputs_task = three_nicemeet_outputs(name=name)
    consumer_task = consumer(
        text1=hw_task.output,
        text2=two_outputs_task.outputs["output_one"],
        text3=two_outputs_task.outputs["output_two"],
        text4=three_outputs_task.output
    )

## Pipeline 실행

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="intro_pipeline.yaml")

In [None]:
DISPLAY_NAME = "intro_pipeline_job_unique"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

## Pipeline 제거

In [None]:
job.delete()