# Vertex AI Pipelines 데모 1
Vertex AI Pipelines 동작에 익숙해지기 위해 Kubeflow Pipelines(KFP) SDK를 사용하여 간단한 파이프라인을 만듭니다. 이 파이프라인 ML과 관련된 작업을 수행하지 않습니다.
- KFP SDK에서 사용자 지정 구성 요소를 만드는 방법
- Vertex AI Pipelines에서 파이프라인을 실행하고 모니터링 하는 방법

`product-name`과 `emoji` 설명이라는 두 개의 컴포넌트의 설명을 출력하는 파이프라인을 생성합니다. 이 파이프라인 세 가지 컴포넌트로 구성됩니다.
- `product_nanme` : 이 컴포넌트는 제품 이름을 입력으로 사용하고 해당 문자열을 출력으로 반환합니다.
- `emoji`: 이 컴포넌트는 이모티콘의 텍스트 설명을 가져와 이모티콘으로 변환합니다. 예를 들어 ✨의 텍스트 코드는 `sparkles`입니다. 이 구성 요소는 이모티콘 라이브러리를 사용하여 파이프라인에서 외부 종속성을 관리하는 방법을 보여줍니다.
- `build_sentence`: 이 마지막 구성 요소는 이전 두 개의 출력을 사용하여 이모티콘을 사용하는 문장을 만듭니다. 예를 들어 결과 출력은 `Vertex Pipelines is ✨`일 수 있습니다.

## 1. 라이브러리 준비

In [1]:
USER_FLAG = "--user"

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0



### 커널 재시작

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Kubeflow Pipeline 버전 확인
KFP SDK 버전은 >= 1.8이어야 함  
확인을 해보자

In [2]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


## 2. 프로젝트ID 및 GCS 버킷 설정

In [1]:
# 프로젝트ID 입력
PROJECT_ID = "seventh-jet-360700"

In [2]:
# GCS 버킷 입력
BUCKET_NAME="gs://cokebox-ml"

## 3. 라이브러리 가져오기

In [3]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

## 4. KFP 변수 선언하기 

파이프라인을 구축하기 전에 마지막으로 해야 할 일은 몇 가지 상수 변수를 정의하는 것입니다. `PIPELINE_ROOT`파이프라인에서 생성된 아티팩트가 작성될 Cloud Storage 경로입니다. 여기서는 리전으로 `us-central1`를 사용하고 있지만 버킷을 생성할 때 다른 리전을 사용한 경우 아래 코드에서 `REGION` 변수를 업데이트 하십시오.

In [5]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/opt/conda/bin:/opt/conda/condabin:/opt/conda/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin


'gs://cokebox-ml/pipeline_root/'

## 5. 첫 파이프라인 만들기 

### 5.1 Python 베이스 컴포넌트 **`product_name`** 만들기

KFP SDK를 사용하여 Python 기능을 기반으로 구성 요소를 만들 수 있습니다. 첫 번째 파이프라인의 3가지 구성 요소에 이를 사용합니다. `product_name` 먼저 단순히 문자열을 입력으로 받아 해당 문자열을 반환하는 컴포넌트를 빌드 합니다. 노트북에 다음을 추가합니다.

In [6]:
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

- 데코레이터는 `@component`파이프라인이 실행될 때 이 함수를 컴포넌트로 컴파일합니다. 사용자 정의 컴포넌트를 작성할 때마다 이것을 사용합니다.
- 매개변수는 `base_image`이 구성요소가 사용할 컨테이너 이미지를 지정합니다.
- 매개변수는 선택 사항 `output_component_file`이며 컴파일된 구성 요소를 쓸 yaml 파일을 지정합니다. 셀을 실행한 후 노트북 인스턴스에 작성된 해당 파일을 볼 수 있습니다. 이 구성 요소를 다른 사람과 공유하려면 생성된 yaml 파일을 보내고 다음과 함께 로드하도록 할 수 있습니다.

In [8]:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')

`-> str` 함수 정의 이후는 이 컴포넌트의 출력 유형을 지정

### 5.2 두 개의 추가 컴포넌트 

파이프라인을 완료하기 위해 두 개의 구성 요소를 더 만듭니다. 우리가 정의할 첫 번째 것은 문자열을 입력으로 사용하고 이 문자열이 있는 경우 해당 이모티콘으로 변환합니다. 전달된 입력 텍스트와 결과 이모티콘이 있는 튜플을 반환합니다.

In [9]:
@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

이 구성 요소는 이전 구성 요소보다 약간 더 복잡합니다. 새로운 기능을 분석해 보겠습니다.

- 매개변수는 이 `packages_to_install` 컨테이너에 대한 외부 라이브러리 종속성을 구성 요소에 알려줍니다. 이 경우 [emoji](https://pypi.org/project/emoji/) 라는 라이브러리를 사용하고 있습니다 .
- 이 컴포넌트는 `NamedTuple` 호출 된 `Outputs`를 반환합니다. 이 튜플의 각 문자열에는 키 `emoji_text` 및 `emoji`가 있습니다. 다음 구성 요소에서 이를 사용하여 출력에 액세스합니다.

이 파이프라인의 마지막 구성 요소는 처음 두 개의 출력을 사용하고 이를 결합하여 문자열을 반환합니다.

In [10]:
@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

이 구성 요소는 이전에 정의한 단계의 출력을 사용하는 방법을 알고 있습니까? 좋은 질문! 다음 단계에서 모두 함께 묶을 것입니다.

### 5.3 파이프라인에 컴포넌트 담기 

위에서 정의한 구성 요소 정의는 파이프라인 정의에서 단계를 생성하는 데 사용할 수 있는 팩토리 함수를 생성했습니다. 파이프라인을 설정하려면 `@pipeline` 데코레이터를 사용하고 파이프라인에 이름과 설명을 지정하고 파이프라인의 아티팩트가 작성되어야 하는 루트 경로를 제공합니다. 아티팩트란 파이프라인에서 생성된 모든 출력 파일을 의미합니다. 이 인트로 파이프라인은 생성하지 않지만 다음 파이프라인은 생성합니다.

다음 코드 블록에서는 `intro_pipeline` 함수를 정의합니다. 여기에서 초기 파이프라인 단계에 대한 입력과 단계가 서로 연결되는 방식을 지정합니다.

- `product_task` 제품 이름을 입력으로 사용합니다. 여기에서 `Vertex AI Pipelines`를 전달하지만 원하는 대로 변경할 수 있습니다.
- `emoji_task` 이모티콘의 텍스트 코드를 입력으로 사용합니다. 원하는 대로 변경할 수도 있습니다. 예를 들어 `party_face`는 🥳 이모티콘을 나타냅니다. 이 구성 요소와 `product_task` 구성 요소에는 입력을 제공하는 단계가 없으므로 파이프라인을 정의할 때 수동으로 입력을 지정합니다.
- 파이프라인의 마지막 단계 `consumer_task`에는 세 가지 입력 매개변수가 있습니다.
  - `product_task`의 출력. 이 단계는 하나의 출력만 생성하므로 `product_task.output`를 통해 참조할 수 있습니다.
  - 우리 `emoji_task`단계 `emoji`의 출력. 출력 매개변수의 이름을 지정한 위에서 정의된 `emoji` 컴포넌트를 참조하십시오 .
  - 마찬가지로 `emoji` 컴포넌트의 `emoji_text` 정의된 출력입니다. 파이프라인이 이모티콘과 일치하지 않는 텍스트를 전달하는 경우 이 텍스트를 사용하여 문장을 구성합니다.

In [11]:
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

### 5.4 파이프라인 컴파일 및 실행 

파이프라인이 정의되면 컴파일할 준비가 된 것입니다. 다음은 파이프라인을 실행하는 데 사용할 JSON 파일을 생성합니다.

In [12]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)



다음으로 `TIMESTAMP` 변수를 생성합니다. 우리는 이것을 작업 ID에서 사용할 것입니다

In [13]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

그 다음 파이프라인 `job`을 정의합니다.

In [14]:
job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

마지막으로 `submit`을 실행하여 새 파이프라인 실행을 생성합니다.

In [15]:
job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/16539566961/locations/us-central1/pipelineJobs/hello-world-pipeline-20220922082602
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/16539566961/locations/us-central1/pipelineJobs/hello-world-pipeline-20220922082602')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-pipeline-20220922082602?project=16539566961


## 6. End-to-End ML 파이프라인 생성 

이제 첫 번째 ML 파이프라인을 구축할 때입니다. 이 파이프라인에서 우리는 UCI 머신러닝 [드라이 빈 데이터세트](https://archive.ics.uci.edu/ml/datasets/Dry+Bean+Dataset)를 사용할 것입니다.
In Computers and Electronics in Agriculture, 174, 105507. DOI.

이것은 테이블 형식의 데이터 세트이며 파이프라인에서 이 데이터 세트를 사용하여 특성에 따라 빈을 7가지 유형 중 하나로 분류하는 AutoML 모델을 훈련, 평가 및 배포합니다.

이 파이프라인은 다음을 수행합니다.

- Vertex AI에서 데이터 세트 생성
- AutoML 을 사용하여 표 형식 분류 모델 학습
- 이 모델에 대한 평가 측정항목 가져오기
- 평가 메트릭을 기반으로 Vertex Pipelines에서 조건부 논리를 사용하여 모델을 배포할지 여부를 결정합니다.
- 정점 예측 을 사용하여 엔드포인트에 모델 배포

설명된 각 단계는 구성 요소가 됩니다. [`google_cloud_pipeline_components`](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud) 대부분의 파이프라인 단계는 이 코드랩의 앞부분에서 가져온 라이브러리 를 통해 Vertex AI 서비스에 대해 사전 빌드된 구성 요소를 사용합니다 . 이 섹션에서는 먼저 하나의 사용자 지정 구성 요소를 정의합니다. 그런 다음 미리 빌드된 구성 요소를 사용하여 나머지 파이프라인 단계를 정의합니다. 사전 구축된 구성 요소를 사용하면 모델 교육 및 배포와 같은 Vertex AI 서비스에 더 쉽게 액세스할 수 있습니다.


> 이 단계의 대부분은 이 파이프라인의 AutoML 학습 부분에 사용되며 약 1시간이 소요됩니다.

### 6.1 모델 평가를 위한 사용자 지정 컴포넌트

우리가 정의할 사용자 지정 구성 요소는 모델 교육이 완료되면 파이프라인이 끝날 때 사용됩니다. 이 구성 요소는 몇 가지 작업을 수행합니다.

- 학습된 AutoML 분류 모델에서 평가 측정항목 가져오기
- 메트릭을 구문 분석하고 Vertex Pipelines UI에서 렌더링
- 메트릭을 임계값과 비교하여 모델을 배포해야 하는지 여부를 결정합니다.

구성요소를 정의하기 전에 입력 및 출력 매개변수를 이해합시다. 입력으로 이 파이프라인은 Cloud 프로젝트에 대한 일부 메타데이터, 학습된 결과 모델(이 구성요소는 나중에 정의함), 모델의 평가 측정항목 및 `thresholds_dict_str`. `thresholds_dict_str`파이프라인을 실행할 때 정의할 항목입니다 . 이 분류 모델의 경우 이것은 모델을 배포해야 하는 ROC 곡선 값 아래의 영역이 됩니다. 예를 들어 0.95를 전달하면 이 지표가 95%를 초과하는 경우에만 파이프라인이 모델을 배포하기를 원한다는 의미입니다.

평가 구성 요소는 모델 배포 여부를 나타내는 문자열을 반환합니다. 노트북 셀에 다음을 추가하여 이 사용자 지정 구성 요소를 만듭니다.



In [16]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

### 6.2 Google Cloud Pre-built 컴포넌트 추가 
이 단계에서는 나머지 파이프라인 구성 요소를 정의하고 모든 구성 요소가 서로 어떻게 맞는지 확인합니다. 먼저 타임스탬프를 사용하여 파이프라인 실행의 표시 이름을 정의합니다.

In [18]:
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

automl-beans1663836412


In [19]:
@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

이 코드에서 무슨 일이 일어나는지 봅시다:

- 먼저 이전 파이프라인과 마찬가지로 이 파이프라인이 사용하는 입력 매개변수를 정의합니다. 파이프라인의 다른 단계의 출력에 의존하지 않으므로 수동으로 설정해야 합니다.
- 파이프라인의 나머지 부분은 Vertex AI 서비스와 상호 작용하기 위해 [미리 빌드된 몇 가지 컴포넌트](https://cloud.google.com/vertex-ai/docs/gcpc-list)를 사용합니다.
  - `TabularDatasetCreateOpCloud`: Storage 또는 BigQuery의 데이터 세트 소스가 제공된 Vertex AI에 테이블 형식 데이터 세트를 생성합니다. 이 파이프라인에서는 BigQuery 테이블 URL을 통해 데이터를 전달합니다.
  - `AutoMLTabularTrainingJobRunOp`: 테이블 형식 데이터 세트에 대한 AutoML 학습 작업을 시작합니다. 모델 유형(이 경우 분류), 열의 일부 데이터, 훈련을 실행할 기간 및 데이터 세트에 대한 포인터를 포함하여 이 구성 요소에 몇 가지 구성 매개변수를 전달합니다. 이 구성 요소에 데이터 세트를 전달하기 위해 다음을 통해 이전 구성 요소 의 출력을 제공합니다. `dataset_create_op.outputs["dataset"]`
  - `EndpointCreateOpVertex`: AI Endpoint을 만듭니다. 이 단계에서 생성된 끝점은 다음 구성 요소에 대한 입력으로 전달됩니다.
  - `ModelDeployOpVertex`: AI Endpoint에 주어진 모델을 배포합니다. 이 경우 이전 단계에서 생성된 엔드포인트를 사용합니다. 사용 가능한 추가 구성 옵션이 있지만 여기에서는 배포하려는 엔드포인트 머신 유형 및 모델을 제공합니다. 파이프라인에서 학습 단계의 출력에 액세스하여 모델을 전달합니다.
  
- 이 파이프라인은 또한 조건을 정의할 수 있는 Vertex Pipelines의 기능인 `조건 논리`와 해당 조건의 결과를 기반으로 하는 다양한 분기를 사용합니다. 파이프라인을 정의할 때 `thresholds_dict_str` 매개변수를 전달했음을 기억하십시오. 이것은 모델을 엔드포인트에 배포할지 여부를 결정하는 데 사용하는 정확도 임계값입니다. 이를 구현하기 위해 KFP SDK의 `Condition` 클래스를 사용합니다. 우리가 전달하는 조건은 이 코드랩의 앞부분에서 정의한 사용자 정의 평가 구성 요소의 출력입니다. 이 조건이 true이면 파이프라인은 계속해서 KFP SDK의 `deploy_op` 컴포넌트를 실행합니다. 정확도가 사전 정의된 임계값을 충족하지 않으면 파이프라인이 여기서 중지되고 모델을 배포하지 않습니다.

### 6.3 End-to-End ML 파이프라인 컴파일 및 실행
전체 파이프라인이 정의되면 컴파일할 시간입니다.

In [20]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)



다음으로 `job`을 정의합니다.

In [21]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

마지막으로 job을 실행합니다.

In [22]:
ml_pipeline_job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/16539566961/locations/us-central1/pipelineJobs/automl-tab-beans-training-v2-20220922085105
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/16539566961/locations/us-central1/pipelineJobs/automl-tab-beans-training-v2-20220922085105')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/automl-tab-beans-training-v2-20220922085105?project=16539566961


콘솔에서 파이프라인을 보려면 위의 셀을 실행한 후 로그에 표시된 링크로 이동하십시오. 이 파이프라인을 실행하는 데 1시간이 조금 넘게 걸립니다. 대부분의 시간은 AutoML 학습 단계에서 소비됩니다. 완성된 파이프라인은 다음과 같습니다.

![](https://codelabs.developers.google.com/static/vertex-pipelines-intro/img/e2epipeline.png)

상단의 "아티팩트 확장" 버튼을 토글하면 파이프라인에서 생성된 다양한 아티팩트에 대한 세부 정보를 볼 수 있습니다. 예를 들어 dataset아티팩트를 클릭하면 생성된 Vertex AI 데이터 세트에 대한 세부 정보가 표시됩니다. 여기에서 링크를 클릭하면 해당 데이터세트에 대한 페이지로 이동할 수 있습니다.

### 6.4 파이프라인 실행 간 메트릭 비교
이 파이프라인을 여러 번 실행하는 경우 실행 간에 메트릭을 비교할 수 있습니다. 메소드를 사용하여 `aiplatform.get_pipeline_df()` 실행 메타데이터에 액세스할 수 있습니다. 여기에서 이 파이프라인의 모든 실행에 대한 메타데이터를 가져와 Pandas DataFrame에 로드합니다.

In [23]:
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Unnamed: 0,pipeline_name,run_name,param.input:project,param.input:api_endpoint,param.input:display_name,param.input:bq_source,param.input:gcp_region,param.input:thresholds_dict_str
0,automl-tab-beans-training-v2,automl-tab-beans-training-v2-20220922085105,seventh-jet-360700,us-central1-aiplatform.googleapis.com,automl-beans1663836412,bq://aju-dev-demos.beans.beans1,us-central1,"{""auRoc"": 0.95}"


Vertex AI를 사용하여 다음을 수행하는 방법을 배웠습니다.

- Kubeflow Pipelines SDK를 사용하여 사용자 지정 구성 요소로 종단 간 파이프라인 구축
- Vertex Pipelines에서 파이프라인을 실행하고 SDK로 파이프라인 실행 시작
- 콘솔에서 정점 파이프라인 그래프 보기 및 분석
- 미리 빌드된 파이프라인 구성 요소를 사용하여 파이프라인에 Vertex AI 서비스 추가
- 반복되는 파이프라인 작업 예약