# 03Tools - Pipelines

---
## Налаштування

Вхідні дані:

In [None]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

In [None]:
REGION = 'us-central1'
EXPERIMENT = 'pipeline_ex2'
SERIES = '03'

# data
BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'fraud'
BQ_TABLE = 'fraud_prepped'

# Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id'

пакети:

!pip install -U google-cloud-pipeline-components

In [None]:
from google.cloud import aiplatform
from google.cloud import bigquery

from datetime import datetime
from typing import NamedTuple

import kfp # used for dsl.pipeline
import kfp.v2.dsl as dsl # used for dsl.component, dsl.Output, dsl.Input, dsl.Artifact, dsl.Model, ...
# from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

клієнти:

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION)
bq = bigquery.Client()

параметри:

In [None]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = PROJECT_ID
URI = f"gs://{BUCKET}/{SERIES}/{EXPERIMENT}/pipelines"
RUN_NAME = f'run-{TIMESTAMP}'
DIR = f"temp/{EXPERIMENT}"

In [None]:
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

Ролі сервісного акаунту

In [None]:
!gcloud projects get-iam-policy $PROJECT_ID --filter="bindings.members:$SERVICE_ACCOUNT" --format='table(bindings.role)' --flatten="bindings[].members"

>Примітка: У списку має бути [roles/storage.objectAdmin](https://cloud.google.com/storage/docs/access-control/iam-roles)

оточення:

In [None]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Користувацькі компоненти (KFP)

Vertex AI Pipelines складаються з компонентів, які працюють незалежно, з входами і виходами, які з'єднуються у граф - конвеєр.  Для цього робочого процесу в блокноті використовуються наступні користувацькі компоненти для організації навчання моделі-кандидата, оцінки моделі-кандидата та існуючої моделі, порівняння їх на основі метрик моделі, якщо модель-кандидат краща, то замінити модель, яка вже розгорнута на існуючій кінцевій точці.  Ці кастомні компоненти побудовані як функції Python!

### Отримайте розгорнуту модель

In [None]:
@dsl.component(
    base_image = "python:3.9",
    packages_to_install = ['google-cloud-aiplatform']
)
def get_deployed_model(
    project: str,
    region: str,
    series: str,
    bqml_model: dsl.Output[dsl.Artifact],
    vertex_endpoint: dsl.Output[dsl.Artifact]
):

    # налаштування
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)

    # отримати (або створити) кінцеву точку
    endpoints = aiplatform.Endpoint.list(filter = f"display_name={series} AND labels.series={series}")
    if endpoints:
        endpoint = endpoints[0]
        print(f"Endpoint Exists: {endpoints[0].resource_name}")
    else:
        endpoint = aiplatform.Endpoint.create(
            display_name = f"{series}",
            labels = {'series' : f"{series}"}    
        )
        print(f"Endpoint Created: {endpoint.resource_name}")
    
    # отримати розгорнуту модель з найбільшим трафіком і отримати ім'я BQML-моделі
    traffic_split = endpoint.traffic_split
    if traffic_split:
        deployed_model_id = max(traffic_split, key = traffic_split.get)
        if deployed_model_id:
            for model in endpoint.list_models():
                if model.id == deployed_model_id:
                    deployed_model = model.model+f'@{model.model_version_id}'
            deployed_model = aiplatform.Model(model_name = deployed_model)
            bq_model = deployed_model.display_name+f"_{deployed_model.labels['timestamp']}"
        else: bq_model = 'none'
    else: bq_model = 'none'
    
    bqml_model.uri = bq_model 
    vertex_endpoint.uri = endpoint.resource_name

### Метрики моделі
- Отримання прогнозів для тестових даних з моделі BigQuery
- Обчисліть [середню точність для кривої precision-recal](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.average_precision_score.html#sklearn.metrics.average_precision_score)

In [None]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['pandas','db-dtypes','pyarrow','sklearn','google-cloud-bigquery']
)
def bqml_eval(
    project: str,
    region: str,
    var_target: str,
    bq_project: str,
    bq_dataset: str,
    bq_table: str,
    bqml_model: dsl.Input[dsl.Model],
    metrics: dsl.Output[dsl.Metrics],
    metricsc: dsl.Output[dsl.ClassificationMetrics]
) -> NamedTuple("model_eval", [("metric", float)]):

    # setup
    from collections import namedtuple
    from sklearn.metrics import average_precision_score, confusion_matrix
    from google.cloud import bigquery
    bq = bigquery.Client(project = project)

    if bqml_model.uri != 'none':
        query = f"""
        SELECT {var_target}, predicted_{var_target}, prob, splits 
        FROM ML.PREDICT (MODEL `{bq_project}.{bq_dataset}.{bqml_model.uri}`,(
            SELECT *
            FROM `{bq_project}.{bq_dataset}.{bq_table}`
            WHERE splits = 'TEST')
          ), UNNEST(predicted_{var_target}_probs)
        WHERE label=1
        """
        pred = bq.query(query = query).to_dataframe()

        auPRC = average_precision_score(pred[var_target].astype(int), pred['prob'], average='micro')    
        metrics.log_metric('auPRC', auPRC)
        metricsc.log_confusion_matrix(['Not Fraud', 'Fraud'], confusion_matrix(pred[var_target].astype(int), pred[f'predicted_{var_target}'].astype(int)).tolist())
    else:
        auPRC = 0.0
        metrics.log_metric('auPRC', auPRC)
    
    model_eval = namedtuple("model_eval", ["metric"])
    return model_eval(metric = float(auPRC))

### BigQuery - Навчання DNN

In [None]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-bigquery']
)
def bqml_dnn(
    project: str,
    region: str,
    series: str,
    experiment: str,
    timestamp: str,
    var_target: str,
    var_omit: str,
    bq_project: str,
    bq_dataset: str,
    bq_table: str,
    bqml_model: dsl.Output[dsl.Artifact]
) -> NamedTuple("bqml_training", [("query", str)]):
    
    from collections import namedtuple
    from google.cloud import bigquery
    bq = bigquery.Client(project = project)
    
    bq_model = f'{series}_{experiment}_{timestamp}'
    query = f"""
    CREATE OR REPLACE MODEL `{bq_project}.{bq_dataset}.{bq_model}`
    OPTIONS
        (model_type = 'DNN_CLASSIFIER',
            auto_class_weights = FALSE,
            input_label_cols = ['{var_target}'],
            data_split_col = 'custom_splits',
            data_split_method = 'CUSTOM',
            EARLY_STOP = FALSE,
            OPTIMIZER = 'SGD',
            HIDDEN_UNITS = [64, 32],
            LEARN_RATE = 0.001,
            ACTIVATION_FN = 'SIGMOID',
            MAX_ITERATIONS = 15,
            HPARAM_TUNING_ALGORITHM = 'VIZIER_DEFAULT',
            HPARAM_TUNING_OBJECTIVES = ['ROC_AUC'],
            DROPOUT = HPARAM_RANGE(0, 0.8),
            BATCH_SIZE = HPARAM_RANGE(8, 500),
            MAX_PARALLEL_TRIALS = 5,
            NUM_TRIALS = 10
        ) AS
    SELECT * EXCEPT({','.join(var_omit.split())}, splits),
        CASE
            WHEN splits = 'VALIDATE' THEN 'EVAL'
            ELSE splits
        END AS custom_splits
    FROM `{bq_project}.{bq_dataset}.{bq_table}`
    WHERE splits != 'TEST'
    """
    job = bq.query(query = query)
    job.result()
    bqml_model.uri = bq_model
    
    result = namedtuple("bqml_training", ["query"])
                
    return result(query = str(query))

### Порівняння моделей

In [None]:
@dsl.component
def model_compare(
    base_metric: float,
    challenger_metric: float,
) -> bool: 
    
    if base_metric < challenger_metric:
        replace = True
    else:
        replace = False
    
    return replace

### Експорт моделі

In [None]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-bigquery', 'google-cloud-aiplatform']
)
def bqml_export(
    project: str,
    region: str,
    series: str,
    experiment: str,
    timestamp: str,
    uri: str,
    run_name: str,
    bq_project: str,
    bq_dataset: str,
    bqml_model: dsl.Input[dsl.Model],
    tf_model: dsl.Output[dsl.Artifact],
    vertex_model: dsl.Output[dsl.Artifact]
):
    
    # hardcoded parameter
    deploy_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest'
    
    # налаштування
    from google.cloud import bigquery
    bq = bigquery.Client(project = project)
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    # експорт BQML Challenger Model
    query = f"""
    EXPORT MODEL `{bq_project}.{bq_dataset}.{bqml_model.uri}`
        OPTIONS (URI = '{uri}/models/{timestamp}/model')
    """
    export = bq.query(query = query)
    export.result()
    
    # завантаження морделі до Vertex AI Model Registry
    modelmatch = aiplatform.Model.list(filter = f'display_name={series}_{experiment} AND labels.series={series} AND labels.experiment={experiment}')

    upload_model = True
    if modelmatch:
        print("Model Already in Registry:")
        if run_name in modelmatch[0].version_aliases:
            print("This version already loaded, no action taken.")
            upload_model = False
            model = aiplatform.Model(model_name = modelmatch[0].resource_name)
        else:
            print('Loading model as new default version.')
            parent_model =  modelmatch[0].resource_name
    else:
        print('This is a new model, adding to model registry as version 1')
        parent_model = ''

    if upload_model:
        model = aiplatform.Model.upload(
            display_name = f'{series}_{experiment}',
            model_id = f'model_{series}_{experiment}',
            parent_model = parent_model,
            serving_container_image_uri = deploy_image,
            artifact_uri = f"{uri}/models/{timestamp}/model",
            is_default_version = True,
            version_aliases = [run_name],
            version_description = run_name,
            labels = {'series' : f'{series}', 'experiment' : f'{experiment}', 'timestamp': f'{timestamp}', 'run_name' : f'{run_name}'}
        )  
    
    tf_model.uri = f'{uri}/models/{timestamp}/model'
    vertex_model.uri = model.resource_name

### Заміна моделі в кінцевій точці

In [None]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-aiplatform']
)
def endpoint_update(
    project: str,
    region: str,
    series: str,
    experiment: str,
    vertex_endpoint: dsl.Input[dsl.Artifact],
    vertex_model: dsl.Input[dsl.Model]
):
    
    # hardcoded parameter
    deploy_compute = 'n1-standard-4'
    
    # налаштування
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)

    # отримання кінцевої точки
    endpoint = aiplatform.Endpoint(vertex_endpoint.uri)
    
    # отримання моделі
    model = aiplatform.Model(vertex_model.uri)
    
    # розгорнути модель на кінцевій точці зі 100% трафіку
    endpoint.deploy(
        model = model,
        deployed_model_display_name = model.display_name,
        traffic_percentage = 100,
        machine_type = deploy_compute,
        min_replica_count = 1,
        max_replica_count = 1
    )
    
    # видалити моделей з 0 трафіку
    for deployed_model in endpoint.list_models():
        if deployed_model.id in endpoint.traffic_split:
            print(f"Model {deployed_model.display_name} with version {deployed_model.model_version_id} has traffic = {endpoint.traffic_split[deployed_model.id]}")
        else:
            endpoint.undeploy(deployed_model_id = deployed_model.id)
            print(f"Undeploying {deployed_model.display_name} with version {deployed_model.model_version_id} because it has no traffic.")

---
## Ініціалізація Pipeline (KFP)

In [None]:
@dsl.pipeline(
    name = f'series-{SERIES}-endpoint-challenger',
    description = 'Update endpoint with challenger model (conditionally).'
)
def pipeline(
    project: str,
    region: str,
    series: str,
    experiment: str,
    timestamp: str,
    bq_project: str,
    bq_dataset: str,
    bq_table: str,
    var_target: str,
    var_omit: str,
    uri: str,
    run_name: str
):
   
    # get the current model
    current_model = get_deployed_model(
        project = project,
        region = region,
        series = series
    ).set_display_name('Get Current Model').set_caching_options(False)

    # get AUC for current model
    base_model_eval = bqml_eval(
        project = project,
        region = region,
        var_target = var_target,
        bq_project = bq_project,
        bq_dataset = bq_dataset,
        bq_table = bq_table,
        bqml_model = current_model.outputs['bqml_model']
    ).set_display_name('Metric for Current Model').set_caching_options(False)
    
    # train challenger model with BQML
    challenger_model = bqml_dnn(
        project = project,
        region = region,
        series = series,
        experiment = experiment,
        timestamp = timestamp,
        var_target = var_target,
        var_omit = var_omit,
        bq_project = bq_project,
        bq_dataset = bq_dataset,
        bq_table = bq_table
    ).set_display_name('Train Challenger Model').set_caching_options(True)
    
    # get AUC for challenger model
    challenger_model_eval = bqml_eval(
        project = project,
        region = region,
        var_target = var_target,
        bq_project = bq_project,
        bq_dataset = bq_dataset,
        bq_table = bq_table,
        bqml_model = challenger_model.outputs['bqml_model']
    ).set_display_name('Metric for Challenger Model').set_caching_options(False)
    challenger_model_eval.after(challenger_model)
    
    # compare models
    compare = model_compare(
        base_metric = base_model_eval.outputs["metric"],
        challenger_metric = challenger_model_eval.outputs["metric"]
    ).set_display_name('Compare Models')
    
    # conditional deployment
    with dsl.Condition(
        compare.output == 'true',
        name = "replace_model"
    ):
        # export BQML model to Vertex AI Model Registry
        export = bqml_export(
            project = project,
            region = region,
            series = series,
            experiment = experiment,
            timestamp = timestamp,
            uri = uri,
            run_name = run_name,
            bq_project = bq_project,
            bq_dataset = bq_dataset,
            bqml_model = challenger_model.outputs["bqml_model"]
        ).set_display_name('Export BQML Model')
        
        # replace model on endpoint (03b)
        replace = endpoint_update(
            project = project,
            region = region,
            series = series,
            experiment = experiment,
            vertex_endpoint = current_model.outputs['vertex_endpoint'],
            vertex_model = export.outputs['vertex_model']
        ).set_display_name('Deploy The Challenger Model')

---
## Компіляція та запуск конвеєра

### Компіляція вхідних даних

In [None]:
parameter_values = {
    "project" : PROJECT_ID,
    "region" : REGION,
    "series": SERIES,
    "experiment" : EXPERIMENT,
    "timestamp": TIMESTAMP,
    "bq_project": BQ_PROJECT,
    "bq_dataset": BQ_DATASET,
    "bq_table": BQ_TABLE,
    "var_target": VAR_TARGET,
    "var_omit": VAR_OMIT,
    "uri": URI,
    "run_name": RUN_NAME
}

### Компіляція конвеєра

In [None]:
# from kfp.v2 import compiler
kfp.v2.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = f"{DIR}/{EXPERIMENT}.json"
)

### Визначення завдання конвеєра

Використання скомпільованого конвеєра:

In [None]:
pipeline_job = aiplatform.PipelineJob(
    display_name = f'{EXPERIMENT}',
    template_path = f"{DIR}/{EXPERIMENT}.json",
    pipeline_root = f"{URI}/pipeline_root",
    parameter_values = parameter_values,
    enable_caching = False, # overrides all component/task settings
    labels = {'series': SERIES, 'experiment': EXPERIMENT}
)

### Подання Pipeline Job

In [None]:
response = pipeline_job.submit(
    service_account = SERVICE_ACCOUNT
)

Перейдіть за наступним посиланням, щоб переглянути завдання в консолі GCP:

In [None]:
print(f'Інформаційну панель можна переглянути тут:\n{pipeline_job._dashboard_uri()}')

#### Очікування завершення конвеєра

In [None]:
pipeline_job.wait()

### Отримання інформації про конвеєр

In [None]:
aiplatform.get_pipeline_df(pipeline = f'series-{SERIES}-endpoint-challenger')

## Огляд конвеєра

<p aligh="center"><center><img src="../architectures/notebooks/03/pipeline_ex2.png" width="75%"></center></p>