# 02 - Vertex AI > Pipelines - AutoML with clients (code) In automated pipeline

Taken and adapted from [Vertex Ai Mlops](https://github.com/statmike/vertex-ai-mlops), [Vertex AI Pipelines](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/model_evaluation/automl_tabular_classification_model_evaluation.ipynb).


## Installation

Install the latest version of Vertex AI SDK for Python.

In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade {USER_FLAG} google-cloud-aiplatform \
                                    google-cloud-storage \
                                    kfp \
                                    google-cloud-pipeline-components -q

### Restart the kernel

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Check the package versions

Check the versions of the packages you installed.  The KFP SDK version should be >=1.8.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.18
google_cloud_pipeline_components version: 1.0.32


---
## Setup

inputs:

In [2]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'heart-disease-classification-1'

In [3]:
REGION = 'europe-west1'
DATANAME = 'heart'
NOTEBOOK = 'pipeline'

# Resources
DEPLOY_COMPUTE = 'n1-standard-2'

# Model Training
VAR_TARGET = 'HeartDiseaseorAttack'
VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

packages:

In [7]:
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics,
                        Output, component)
from datetime import datetime
from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

clients:

In [10]:
bq = bigquery.Client()

parameters:

In [11]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = PROJECT_ID
BUCKET_URI = f"gs://{BUCKET}/{DATANAME}/models/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

In [12]:
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

'653408334039-compute@developer.gserviceaccount.com'

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [14]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

List the service accounts current roles:

In [15]:
!gcloud projects get-iam-policy $PROJECT_ID --filter="bindings.members:$SERVICE_ACCOUNT" --format='table(bindings.role)' --flatten="bindings[].members"

ROLE
roles/editor
roles/storage.objectAdmin


>Note: If the resulting list is missing [roles/storage.objectAdmin](https://cloud.google.com/storage/docs/access-control/iam-roles) then [revisit the setup notebook](../00%20-%20Setup/00%20-%20Environment%20Setup.ipynb#permissions) and add this permission to the service account with the provided instructions.

environment:

In [16]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Pipeline (KFP) Definition
- Flow
    - Create Vertex AI Dataset from link to BigQuery table
    - Create Vertex AI AutoML Tabular Training Job
    - Create Endpoint and Depoy trained model
    
Use [AI Platform Pipeline Components](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/)
- Specifically, [AutoMLTabularTrainingJobRunOp](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.AutoMLTabularTrainingJobRunOp)

Define a Job:
- Consider Weighting
- Model Type
- Optimization Objective

In [18]:
# set path for storing the pipeline artifacts
PIPELINE_NAME = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}'

PIPELINE_ROOT = BUCKET_URI+'/'+str(TIMESTAMP)+'/kfp/'
PIPELINE_NAME

'kfp-pipeline-heart-20230108134728'

In [19]:
@kfp.dsl.pipeline(
    name = PIPELINE_NAME,
    pipeline_root = PIPELINE_ROOT
)
def pipeline(
    project: str,
    dataname: str,
    display_name: str,
    deploy_machine: str,
    bq_source: str,
    var_target: str,
    var_omit: str,
    features: dict,
    labels: dict 
):
    
    from google_cloud_pipeline_components.aiplatform import (
        AutoMLTabularTrainingJobRunOp, EndpointCreateOp, ModelDeployOp,
        TabularDatasetCreateOp)
    
    # dataset
    dataset = TabularDatasetCreateOp(
        project = project,
        display_name = display_name,
        bq_source = bq_source,
        labels = labels
    )
    
    # training
    model = AutoMLTabularTrainingJobRunOp(
        project = project,
        display_name = display_name,
        optimization_prediction_type = "classification",
        optimization_objective = "maximize-au-prc",
        budget_milli_node_hours = 1000,
        disable_early_stopping=False,
        column_specs = features,
        dataset = dataset.outputs['dataset'],
        target_column = var_target,
        predefined_split_column_name = 'splits',
        labels = labels
    )
    
    # Endpoint: Creation
    endpoint = EndpointCreateOp(
        project = project,
        display_name = display_name,
        labels = labels
    )
    
    # Endpoint: Deployment of Model
    deployment = ModelDeployOp(
        model = model.outputs["model"],
        endpoint = endpoint.outputs["endpoint"],
        dedicated_resources_min_replica_count = 1,
        dedicated_resources_max_replica_count = 1,
        traffic_split = {"0": 100},
        dedicated_resources_machine_type= deploy_machine
    )

---
## Compile Pipeline

In [20]:
kfp.v2.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = f"{DIR}/{NOTEBOOK}.json"
)



Move compiled pipeline files to GCS Bucket

In [22]:
!gsutil cp {DIR}/{NOTEBOOK}.json {BUCKET_URI}/{TIMESTAMP}/kfp/

Copying file://temp/pipeline/pipeline.json [Content-Type=application/json]...
/ [1 files][ 22.0 KiB/ 22.0 KiB]                                                
Operation completed over 1 objects/22.0 KiB.                                     


---
## Create Vertex AI Pipeline Job

Get features dictionary for the pipeline input:

In [23]:
# get feature names
query = f"SELECT * FROM {DATANAME}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{DATANAME}_prepped'"
schema = bq.query(query).to_dataframe()
OMIT = VAR_OMIT.split() + [VAR_TARGET, 'splits']
features = schema[~schema.column_name.isin(OMIT)].column_name.tolist()
features = dict.fromkeys(features, 'auto')

Run The pipeline:

In [25]:
pipeline = aiplatform.PipelineJob(
    display_name = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    template_path = f"{BUCKET_URI}/{TIMESTAMP}/kfp/{NOTEBOOK}.json",
    parameter_values = {
        "project" : PROJECT_ID,
        "dataname" : DATANAME,
        "display_name" : f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
        "deploy_machine" : DEPLOY_COMPUTE,
        "bq_source" : f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_prepped',
        "var_target" : VAR_TARGET,
        "var_omit" : VAR_OMIT,
        "features" : features,
        "labels" : {'notebook': NOTEBOOK}       
    },
    labels = {'notebook': NOTEBOOK},
    enable_caching=False
)

In [None]:
response = pipeline.run(
    service_account = SERVICE_ACCOUNT
)

Creating PipelineJob
PipelineJob created. Resource name: projects/653408334039/locations/europe-west1/pipelineJobs/kfp-pipeline-heart-20230108134728-20230108134830
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/653408334039/locations/europe-west1/pipelineJobs/kfp-pipeline-heart-20230108134728-20230108134830')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/kfp-pipeline-heart-20230108134728-20230108134830?project=653408334039
PipelineJob projects/653408334039/locations/europe-west1/pipelineJobs/kfp-pipeline-heart-20230108134728-20230108134830 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/653408334039/locations/europe-west1/pipelineJobs/kfp-pipeline-heart-20230108134728-20230108134830 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/653408334039/locations/europe-west1/pipelineJobs/kfp-pipeline-heart-20230108134728-20230108134830 current s

Visual Representation of the pipeline can be viewed in the colsole:

In [None]:
print(f"Review the Pipeline as it runs here:\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/pipelines/runs/{pipeline.resource_name.split('/')[-1]}?project={PROJECT_ID}")

Retrieve the pipeline information:

In [94]:
aiplatform.get_pipeline_df(pipeline = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}')

Unnamed: 0,pipeline_name,run_name,param.input:deploy_machine,param.input:display_name,param.input:labels,param.input:bq_source,param.input:dataname,param.input:var_omit,param.input:var_target,param.input:project,param.input:features
0,kfp-pipeline-heart-20230108134728,kfp-pipeline-heart-20230108134728-20230108134830,n1-standard-2,pipeline_heart_20230108134728,"{""notebook"": ""pipeline""}",bq://heart-disease-classification-1.heart.hear...,heart,transaction_id,HeartDiseaseorAttack,heart-disease-classification-1,"{""HighBP"": ""auto"", ""HighChol"": ""auto"", ""CholCh..."


Evaluate, predict and explain in Vertex AI -> Endpoints