# 3 - Vertex AI > Pipelines - AutoML with clients (code) In automated pipeline

Use[ Kubeflow](https://www.kubeflow.org/) Pipelines running on [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) to orchestrate the process of training a custom model with AutoML Tabular and deploy it to a Vertex AI Endpoint for serving (online and batch) predictions and explanations.  This demonstrates how to automate the processes of (02a) or (02b) with pipeline orchestration.



### Prerequisites:
-  01 -  BigQuery - Table Data Source

### Overview:
-  Use Kubeflow Python SDK to build a pipeline
   -  Create pipeline using Google Cloud Pipeline Components (from google_cloud_pipeline_components import aiplatform as gcc_aip)
      -  Use cc_aip.TabularDatasetCreateOp to register dataset from BigQuery table
      -  Train AutoML tabular model with gcc_aip.AutoMLTabularTrainingJobRunOp
      -  Deploy model to endpoint using gcc_aip.ModelDeployOp
   -  Compile the pipeline
      -  kfp.v2.compiler.Compiler().compile
   -  Move the pipeline code to GCS Bucket
   -  Run the pipeline with google.cloud.aiplatform.PipelineJob
-  Online Predictions using Vertex AI Endpoint
-  Online Explanations using Vertex AI Endpoint
-  Batch Prediction Job for predictions and explanation with source and destination tables in BigQuery

### Resources:
-  [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline#google-cloud-components) see aiplatform.PipelineJob
-  [Python Client for Vertex AI](https://googleapis.dev/python/aiplatform/latest/aiplatform.html)
-  [Kubeflow Pipelines Components for Google Cloud](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud)



---
## Setup

inputs:

In [20]:
REGION = 'us-central1'
PROJECT_ID='statmike-mlops'
DATANAME = 'fraud'
NOTEBOOK = '02c'

# Resources
DEPLOY_COMPUTE = 'n1-standard-4'

# Model Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

packages:

In [21]:
from google.cloud import aiplatform
from datetime import datetime
import kfp
#import kfp.v2.dsl as dsl
from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

clients:

In [22]:
aiplatform.init(project=PROJECT_ID, location=REGION)
bigquery = bigquery.Client()

parameters:

In [32]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = PROJECT_ID
URI = f"gs://{BUCKET}/{DATANAME}/models/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

In [33]:
# Give service account roles/storage.objectAdmin permissions
# Console > IMA > Select Account <projectnumber>-compute@developer.gserviceaccount.com > edit - give role
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

'764015827198-compute@developer.gserviceaccount.com'

environment:

In [34]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Pipeline (KFP) Definition
- Flow
    - Create Vertex AI Dataset from link to BigQuery table
    - Create Vertex AI AutoML Tabular Training Job
    - Create Endpoint and Depoy trained model
    
Use [AI Platform Pipeline Components](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/)
- Specifically, [AutoMLTabularTrainingJobRunOp](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.AutoMLTabularTrainingJobRunOp)

Define a Job:
- Consider Weighting
- Model Type
- Optimization Objective

In [52]:
@kfp.dsl.pipeline(name = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}', pipeline_root = URI+'/'+str(TIMESTAMP)+'/kfp/')
def pipeline(
    project: str = PROJECT_ID,
    dataname: str = DATANAME,
    display_name: str = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    deploy_machine: str = DEPLOY_COMPUTE,
    bq_source: str = f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_prepped',
    var_target: str = VAR_TARGET,
    var_omit: str = VAR_OMIT,
    label: str = NOTEBOOK 
):
    
    # dataset
    dataset = gcc_aip.TabularDatasetCreateOp(
        project = project,
        display_name = display_name,
        bq_source = bq_source,
        labels = {'notebook':f'{label}'}
    )
    
    # get feature names
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = PROJECT_ID)
    query = f"SELECT * FROM {DATANAME}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{DATANAME}_prepped'"
    schema = bigquery.query(query).to_dataframe()
    OMIT = VAR_OMIT.split() + [VAR_TARGET, 'splits']
    features = schema[~schema.column_name.isin(OMIT)].column_name.tolist()
    features = dict.fromkeys(features, 'auto')
    
    # training
    model = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project = project,
        display_name = display_name,
        optimization_prediction_type = "classification",
        optimization_objective = "maximize-au-prc",
        budget_milli_node_hours = 1000,
        disable_early_stopping=False,
        column_specs = features,
        dataset = dataset.outputs['dataset'],
        target_column = var_target,
        predefined_split_column_name = 'splits',
        labels = {'notebook':f'{label}'}
    )
    
    # Endpoint: Creation
    endpoint = gcc_aip.EndpointCreateOp(
        project = project,
        display_name = display_name,
        labels = {'notebook':f'{label}'}
    )
    
    # Endpoint: Deployment of Model
    deployment = gcc_aip.ModelDeployOp(
        model = model.outputs["model"],
        endpoint = endpoint.outputs["endpoint"],
        dedicated_resources_min_replica_count = 1,
        dedicated_resources_max_replica_count = 1,
        traffic_split = {"0": 100},
        dedicated_resources_machine_type= deploy_machine
    )

---
## Compile Pipeline

In [53]:
kfp.v2.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = f"{DIR}/{NOTEBOOK}.json"
)



Move compiled pipeline files to GCS Bucket

In [54]:
!gsutil cp {DIR}/{NOTEBOOK}.json {URI}/{TIMESTAMP}/kfp/

Copying file://temp/02c/02c.json [Content-Type=application/json]...
/ [1 files][ 18.1 KiB/ 18.1 KiB]                                                
Operation completed over 1 objects/18.1 KiB.                                     


---
## Create Vertex AI Pipeline Job

In [55]:
pipeline = aiplatform.PipelineJob(
    display_name = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    template_path = f"{URI}/{TIMESTAMP}/kfp/{NOTEBOOK}.json",
    pipeline_root = f"{URI}/{TIMESTAMP}/kfp/",
    labels = {'notebook':f'{NOTEBOOK}'}
)

In [56]:
response = pipeline.run(service_account = SERVICE_ACCOUNT)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/764015827198/locations/us-central1/pipelineJobs/kfp-02c-fraud-20211124170329-20211129104121
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/764015827198/locations/us-central1/pipelineJobs/kfp-02c-fraud-20211124170329-20211129104121')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kfp-02c-fraud-20211124170329-20211129104121?project=764015827198
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/764015827198/locations/us-central1/pipelineJobs/kfp-02c-fraud-20211124170329-20211129104121 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/7

In [57]:
aiplatform.get_pipeline_df(pipeline = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}')

Unnamed: 0,pipeline_name,run_name,param.input:project,param.input:display_name,param.input:label,param.input:var_target,param.input:deploy_machine,param.input:var_omit,param.input:dataname,param.input:bq_source
0,kfp-02c-fraud-20211124170329,kfp-02c-fraud-20211124170329-20211129104121,statmike-demo2,02c_fraud_20211124170329,02c,Class,n1-standard-4,transaction_id,fraud,bq://statmike-demo2.fraud.fraud_prepped
1,kfp-02c-fraud-20211124170329,kfp-02c-fraud-20211124170329-20211124170348,statmike-demo2,02c_fraud_20211124170329,02c,Class,n1-standard-4,transaction_id,fraud,bq://statmike-demo2.fraud.fraud_prepped


---
## Evaluation
While the model above was trained using AutoML with the API, it is still possible to review the evaluation metrics directly in the Google Cloud Console.  Just visit the Models section of Vertex AI service and select the model and it will present the evaluation metrics with many helpful visuals.

It is also possible to retrieve the evaluation metrics for you model using the API.  This section shows how to use the API.


Get the Model:

In [None]:
model = aiplatform.Model.list(filter=f'display_name={NOTEBOOK}_{DATANAME}_{TIMESTAMP}')

Setup a model client for the model create by this notebook:

In [None]:
model[0].resource_name

'projects/715288179162/locations/us-central1/models/3283726660725112832'

In [None]:
model_client = aiplatform.gapic.ModelServiceClient(
    client_options = {
        'api_endpoint' : f'{REGION}-aiplatform.googleapis.com'
    }
)

Retrives the aggregate model evalution metrics for the model as a whole.  First, use `.list_model_evaluations` to retrieve the evaluation id, then use `.get_model_evaluation` for the evaluation id:

In [None]:
evaluations = model_client.list_model_evaluations(parent = model.resource_name)
evals = iter(evaluations)
eval_id = next(evals).name
geteval = model_client.get_model_evaluation(name = eval_id)

Review several of the metrics include in the evaluation.  Also, compare these to the results in the console view.

In [None]:
geteval.metrics['auPrc']

0.99965274

In [None]:
for i in range(len(geteval.metrics['confusionMatrix']['annotationSpecs'])):
    print('True Label = ', geteval.metrics['confusionMatrix']['annotationSpecs'][i]['displayName'], ' has Predicted labels = ', geteval.metrics['confusionMatrix']['rows'][i])

True Label =  0  has Predicted labels =  [28366.0, 4.0]
True Label =  1  has Predicted labels =  [15.0, 38.0]


For models with labels you can retrieve the evaluation metrics for each slice of the model:

In [None]:
slices = model_client.list_model_evaluation_slices(parent = eval_id)

In [None]:
for slice in slices:
    print('Label = ', slice.slice_.value, 'has auPrc = ', slice.metrics['auPrc'])

Label =  1 has auPrc =  0.8105681
Label =  0 has auPrc =  0.9997379


---
## Prediction

### Prepare a record for prediction: instance and parameters lists

In [58]:
pred = bigquery.query(query = f"SELECT * FROM {DATANAME}.{DATANAME}_prepped WHERE splits='TEST' LIMIT 10").to_dataframe()

In [59]:
pred.head(4)

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V23,V24,V25,V26,V27,V28,Amount,Class,transaction_id,splits
0,79470,1.073063,-0.026793,0.796502,1.754389,-0.21991,0.930349,-0.515894,0.461495,-0.132966,...,0.048539,-0.258218,0.171861,1.028847,-0.037692,-0.013125,0.0,0,07fdced0-3837-47a1-9526-64d74ad9b113,TEST
1,122082,1.878563,0.020785,-1.621113,2.908813,2.507847,4.709442,-0.830626,1.136154,-0.395755,...,0.121098,0.707538,0.1401,0.155684,0.016375,-0.053892,0.0,0,7c1f61ba-7586-414e-ba8a-1c4385d59933,TEST
2,11494,-1.169744,0.462878,1.587579,-1.25256,1.012817,-0.918413,0.817632,-0.522804,1.388247,...,-0.042144,-0.021269,-0.283979,0.556661,-0.191722,-0.071773,0.0,0,a3046c87-cee5-40fd-9302-4d230b823246,TEST
3,58900,-2.728403,-2.340346,2.551039,2.623092,3.014728,-0.807294,-2.286621,0.757561,-1.279297,...,0.370139,-0.212784,-0.104956,1.355781,-0.053063,0.125386,0.0,0,bc3f2800-a4bb-4077-b017-f55f03c4f00c,TEST


In [60]:
newob = pred[pred.columns[~pred.columns.isin(VAR_OMIT.split()+[VAR_TARGET, 'splits'])]].to_dict(orient='records')[0]
#newob

Need to understand the format of variables that the predictions expect.  AutoML may convert the type of some variables. The following cells retrieve the model from the endpoint and its schemata:

In [61]:
newob['Time'] = str(newob['Time'])

In [62]:
instances = [json_format.ParseDict(newob, Value())]
parameters = json_format.ParseDict({}, Value())

### Get Predictions: Python Client

In [63]:
aiplatform.Endpoint.list(filter=f'display_name={NOTEBOOK}_{DATANAME}_{TIMESTAMP}')

[<google.cloud.aiplatform.models.Endpoint object at 0x7f8357e8a210> 
 resource name: projects/764015827198/locations/us-central1/endpoints/8965971966745903104]

In [64]:
endpoint = aiplatform.Endpoint.list(filter=f'display_name={NOTEBOOK}_{DATANAME}_{TIMESTAMP}')[0]
endpoint.display_name

'02c_fraud_20211124170329'

In [65]:
prediction = endpoint.predict(instances=instances, parameters=parameters)
prediction

Prediction(predictions=[{'scores': [0.9791908264160156, 0.02080914564430714], 'classes': ['0', '1']}], deployed_model_id='6067940790905077760', explanations=None)

In [66]:
prediction.predictions[0]['classes'][np.argmax(prediction.predictions[0]['scores'])]

'0'

### Get Predictions: REST

In [67]:
with open(f'{DIR}/request.json','w') as file:
    file.write(json.dumps({"instances": [newob]}))

In [68]:
!curl -X POST \
-H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
-H "Content-Type: application/json; charset=utf-8" \
-d @{DIR}/request.json \
https://{REGION}-aiplatform.googleapis.com/v1/{endpoint.resource_name}:predict

{
  "predictions": [
    {
      "scores": [
        0.97919082641601562,
        0.02080914564430714
      ],
      "classes": [
        "0",
        "1"
      ]
    }
  ],
  "deployedModelId": "6067940790905077760",
  "model": "projects/764015827198/locations/us-central1/models/8454738642248663040",
  "modelDisplayName": "02c_fraud_20211124170329"
}


### Get Predictions: gcloud (CLI)

In [69]:
!gcloud beta ai endpoints predict {endpoint.name.rsplit('/',1)[-1]} --region={REGION} --json-request={DIR}/request.json

Using endpoint [https://us-central1-prediction-aiplatform.googleapis.com/]
[{'classes': ['0', '1'], 'scores': [0.9791908264160156, 0.02080914564430714]}]


---
## Explanations
Interpretation Guide
- https://cloud.google.com/vertex-ai/docs/predictions/interpreting-results-automl#tabular

In [70]:
explanation = endpoint.explain(instances=instances, parameters=parameters)

In [71]:
explanation.predictions

[{'scores': [0.9791908264160156, 0.02080914564430714], 'classes': ['0', '1']}]

In [72]:
print("attribution:")
print("baseline output",explanation.explanations[0].attributions[0].baseline_output_value)
print("instance output",explanation.explanations[0].attributions[0].instance_output_value)
print("output_index",explanation.explanations[0].attributions[0].output_index)
print("output display value",explanation.explanations[0].attributions[0].output_display_name)
print("approximation error",explanation.explanations[0].attributions[0].approximation_error)

attribution:
baseline output 0.9515751004219055
instance output 0.9791908264160156
output_index [0]
output display value 0
approximation error 0.01368357880410392


In [None]:
import matplotlib.pyplot as plt
features = []
scores = []
for k in explanation.explanations[0].attributions[0].feature_attributions:
    features.append(k)
    scores.append(explanation.explanations[0].attributions[0].feature_attributions[k])
features = [x for _, x in sorted(zip(scores, features))]
scores = sorted(scores)
fig, ax = plt.subplots()
fig.set_size_inches(9, 9)
ax.barh(features, scores)
fig.show()

---
## Batch Predictions: BigQuery Source to BigQuery Destination, with Explanations

In [None]:
batch = aiplatform.BatchPredictionJob.create(
    job_display_name = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    model_name = endpoint.list_models()[0].model,
    instances_format = "bigquery",
    predictions_format = "bigquery",
    bigquery_source = f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_prepped',
    bigquery_destination_prefix = f"{PROJECT_ID}",
    generate_explanation=True,
    labels = {'notebook':f'{NOTEBOOK}'}
)

INFO:google.cloud.aiplatform.jobs:Creating BatchPredictionJob
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob created. Resource name: projects/691911073727/locations/us-central1/batchPredictionJobs/5522655591295614976
INFO:google.cloud.aiplatform.jobs:To use this BatchPredictionJob in another session:
INFO:google.cloud.aiplatform.jobs:bpj = aiplatform.BatchPredictionJob('projects/691911073727/locations/us-central1/batchPredictionJobs/5522655591295614976')
INFO:google.cloud.aiplatform.jobs:View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/5522655591295614976?project=691911073727
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/691911073727/locations/us-central1/batchPredictionJobs/5522655591295614976 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/691911073727/locations/us-central1/batchPredictionJobs/5522655591295614976 current state:
JobState.JOB_STAT