# 03c - BQML + Vertex AI > Pipelines - automated pipelines for updating models

As time goes on change occurs:
- inputs to our models may shift in distribution compared to when the model was trained - called training-serving skew
- inputs to out models may shift over time - called prediction drift
- new inputs/features may become available
- a better model may be created

In the `03b` notebook we deployed the model built with BQML in the `03a` notebook to a Vertex AI Endpoint for online prediction.  In this notebook we will build a challenger model with the same training data, also using BQML but with a different model type - a deep neural network similar what we build in the `05` series of ntoebooks.  We will construct a Vertex AI Pipeline to orchestrate the process of building the new model, comparing to the deployed mode, and conditionally replacing the deployed model with the new one.  

This process could be triggered based on time elapsed, amount of new data, detect training-serving skew or prediction drift.  

### Prerequisites:
-  03a - BigQuery Machine Learning (BQML) - Machine Learning with SQL
-  03b - Vertex AI + BQML - Online Predictions with BQML Models

### Overview:
- Build Custom Pipeline Components
    - Use BigQuery ML to Get Predictions and Scikit-Learn to calculate model metrics
    - Use BigQuery ML to train a new model - A Deep Neural Network
    - Compare model metrics for baseline and challenger model
    - Export BigQuery ML model to Google Cloud Storage
    - Replace a model deployed to an endpoint (`03b`) with the challenger model, undeploy previous model
- Define the Pipeline Flow
- Compile the Pipeline
- Run the Pipeline in Vertex AI
- Get Predictions from the upated Endpoint

### Resources:
-  [Export formats for BigQuery ML models](https://cloud.google.com/bigquery-ml/docs/exporting-models)
-  [Python Client for Vertex AI](https://googleapis.dev/python/aiplatform/latest/aiplatform.html)
-  Codelab: [Vertex AI Pipelines Introduction](https://codelabs.developers.google.com/vertex-mlmd-pipelines#0)
-  [Vertex AI Prebuilt KFP Components](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/)


---
## Vertex AI - Conceptual Flow

<img src="architectures/slides/03c_arch.png">

---
## Vertex AI - Workflow

<img src="architectures/slides/03c_console.png">

---
## Setup

inputs:

In [43]:
REGION = 'us-central1'
PROJECT_ID='statmike-demo3'
DATANAME = 'fraud'
NOTEBOOK = '03c'

# Resources
DEPLOY_IMAGE='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest'
DEPLOY_COMPUTE = 'n1-standard-4'

# Model Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

packages:

In [44]:
from google.cloud import aiplatform
from datetime import datetime
from typing import NamedTuple
import kfp # used for dsl.pipeline
import kfp.v2.dsl as dsl # used for dsl.component, dsl.Output, dsl.Input, dsl.Artifact, dsl.Model, ...
from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

clients:

In [45]:
aiplatform.init(project=PROJECT_ID, location=REGION)
bigquery = bigquery.Client()

parameters:

In [46]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = PROJECT_ID
URI = f"gs://{BUCKET}/{DATANAME}/models/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

In [47]:
# Give service account roles/storage.objectAdmin permissions
# Console > IMA > Select Account <projectnumber>-compute@developer.gserviceaccount.com > edit - give role
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

'715288179162-compute@developer.gserviceaccount.com'

environment:

In [48]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Custom Components (KFP)

### Model Metrics
- Get Predictions for Test data from BigQuery Model
- Calculate [average precision for the precision-recall curve](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.average_precision_score.html#sklearn.metrics.average_precision_score)

In [119]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['pandas','pyarrow','sklearn','google-cloud-bigquery']
)
def bqml_eval(
    project: str,
    var_target: str,
    model: str,
    dataname: str,
    metrics: dsl.Output[dsl.Metrics],
    metricsc: dsl.Output[dsl.ClassificationMetrics]
) -> NamedTuple("model_eval", [("metric", float)]):

    from collections import namedtuple
    from sklearn.metrics import average_precision_score, confusion_matrix
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = project)

    query = f"""
    SELECT {var_target}, predicted_{var_target}, prob, splits 
    FROM ML.PREDICT (MODEL `{project}.{dataname}.{model}`,(
        SELECT *
        FROM `{project}.{dataname}.{dataname}_prepped`
        WHERE splits = 'TEST')
      ), UNNEST(predicted_{var_target}_probs)
    WHERE label=1
    """
    pred = bigquery.query(query = query).to_dataframe()

    auPRC = average_precision_score(pred[var_target], pred['prob'], average='micro')    
    metrics.log_metric('auPRC', auPRC)
    metricsc.log_confusion_matrix(['Not Fraud', 'Fraud'], confusion_matrix(pred[var_target], pred[f'predicted_{var_target}']).tolist())
    
    model_eval = namedtuple("model_eval", ["metric"])
    return model_eval(metric = float(auPRC))

### BigQuery - Train DNN

In [120]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-bigquery']
)
def bqml_dnn(
    project: str,
    var_target: str,
    var_omit: str,
    model: str,
    dataname: str,
    bqml_model: dsl.Output[dsl.Artifact]
) -> NamedTuple("bqml_training", [("query", str)]):
    
    from collections import namedtuple
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = project)
    
    query = f"""
    CREATE OR REPLACE MODEL `{project}.{dataname}.{model}`
    OPTIONS
        (model_type = 'DNN_CLASSIFIER',
            auto_class_weights = FALSE,
            input_label_cols = ['{var_target}'],
            data_split_col = 'custom_splits',
            data_split_method = 'CUSTOM',
            EARLY_STOP = FALSE,
            OPTIMIZER = 'SGD',
            HIDDEN_UNITS = [2],
            LEARN_RATE = 0.001,
            BATCH_SIZE = 10,
            DROPOUT = 0.25,
            ACTIVATION_FN = 'SIGMOID',
            MAX_ITERATIONS = 10
        ) AS
    SELECT * EXCEPT({','.join(var_omit.split())}, splits),
        CASE
            WHEN splits = 'TRAIN' THEN FALSE
            ELSE TRUE
        END AS custom_splits
    FROM `{project}.{dataname}.{dataname}_prepped`
    WHERE splits != 'TEST'
    """
    job = bigquery.query(query = query)
    job.result()
    bqml_model.uri = f"bq://{project}.{dataname}.{model}"
    
    result = namedtuple("bqml_training", ["query"])
                
    return result(query = str(query))

### Compare Models

In [135]:
@dsl.component
def model_compare(
    base_metric: float,
    challenger_metric: float,
) -> NamedTuple("Outputs", [("replace", str)]):
    
    from collections import namedtuple 
    
    if base_metric < challenger_metric:
        replace = 'true'
    else:
        replace = 'false'
    
    output = namedtuple("Outputs", "replace")
    
    return output(replace=str(replace))

### Export BQML Model

In [153]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-bigquery']
)
def bqml_export(
    project: str,
    export_location: str,
    bqml_model: dsl.Input[dsl.Model],
    tf_model: dsl.Output[dsl.Artifact],  
):
    
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = project)
    
    bqml_model_name = bqml_model.uri.split("/")[-1]
    export = bigquery.query(query = f"EXPORT MODEL `{bqml_model_name}` OPTIONS(URI = '{export_location}')")
    export.result()
    
    tf_model.uri = export_location

### Replace Model On Endpoint

In [123]:
@dsl.component(
    packages_to_install = ['google-cloud-aiplatform'],
    base_image = 'python:3.9'
)
def endpoint_update(
    project: str,
    region: str,
    newmodel: dsl.Input[dsl.Model],
    display_name: str,
    deploy_machine: str,
    deploy_container: str,
    label: str
):
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)

    # upload new model (03c)
    model = aiplatform.Model.upload(
        display_name = display_name,
        serving_container_image_uri = deploy_container,
        artifact_uri = newmodel.uri,
        labels = {'notebook':f'{label}'}
    )
    
    # find endpoint from notebook 03b
    for e in aiplatform.Endpoint.list():
        if e.display_name.startswith('03b'): endpoint = e
    print(endpoint.display_name)

    # list model(s) on 03b endpoint
    models = endpoint.list_models()
    if len(models) == 1:
        oldmodel = models[0]
    print(oldmodel)
    
    # deploy 03c model to endpoint with traffic_split = 100
    endpoint.deploy(
        model = model,
        deployed_model_display_name = display_name,
        traffic_percentage = 100,
        machine_type = deploy_machine,
        min_replica_count = 1,
        max_replica_count = 1
    )
    
    # undeploy 03b model
    endpoint.undeploy(
        deployed_model_id = oldmodel.id
    )

---
## Pipeline (KFP) Definition

In [154]:
@kfp.dsl.pipeline(
    name = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}', 
    pipeline_root = URI+'/'+str(TIMESTAMP)+'/kfp/'
)
def pipeline(
    project: str = PROJECT_ID,
    region: str = REGION,
    dataname: str = DATANAME,
    display_name: str = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    deploy_machine: str = DEPLOY_COMPUTE,
    deploy_container: str = DEPLOY_IMAGE,
    bq_source: str = f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_prepped',
    var_target: str = VAR_TARGET,
    var_omit: str = VAR_OMIT,
    uri: str = URI,
    label: str = NOTEBOOK 
):
        
    # get AUC for current model
    base_model_eval = bqml_eval(
        project = project,
        var_target = var_target,
        model = f'{dataname}_lr',
        dataname = dataname
    )
    
    # train challenger model with BQML
    challenger_model = bqml_dnn(
        project = project,
        var_target = var_target,
        var_omit = var_omit,
        model = f'{dataname}_dnn',
        dataname = dataname,
    )
    
    # get AUC for challenger model
    challenger_model_eval = bqml_eval(
        project = project,
        var_target = var_target,
        model = f'{dataname}_dnn',
        dataname = dataname
    )
    challenger_model_eval.after(challenger_model)
    
    # compare models
    compare = model_compare(
        base_metric = base_model_eval.outputs["metric"],
        challenger_metric = challenger_model_eval.outputs["metric"]
    )
    
    # conditional deployment
    with dsl.Condition(
        compare.outputs["replace"] == "true",
        name = "replace_model"
    ):
        # export BQML model
        export = bqml_export(
            project = project,
            export_location = uri,
            bqml_model = challenger_model.outputs["bqml_model"]
        )
        
        # replace model on endpoint (03b)
        replace = endpoint_update(
            project = project,
            region = region,
            newmodel = export.outputs["tf_model"],
            display_name = display_name,
            deploy_machine = deploy_machine,
            deploy_container = deploy_container,
            label = label
        )

---
## Compile Pipeline

In [155]:
kfp.v2.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = f"{DIR}/{NOTEBOOK}.json"
)

Move compiled pipeline files to GCS Bucket

In [156]:
!gsutil cp {DIR}/{NOTEBOOK}.json {URI}/{TIMESTAMP}/kfp/

Copying file://temp/03c/03c.json [Content-Type=application/json]...
/ [1 files][ 30.6 KiB/ 30.6 KiB]                                                
Operation completed over 1 objects/30.6 KiB.                                     


---
## Create Vertex AI Pipeline Job

In [157]:
pipeline = aiplatform.PipelineJob(
    display_name = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    template_path = f"{URI}/{TIMESTAMP}/kfp/{NOTEBOOK}.json",
    pipeline_root = f"{URI}/{TIMESTAMP}/kfp/",
    enable_caching = True,
    labels = {'notebook':f'{NOTEBOOK}'}
)

In [158]:
response = pipeline.run(service_account = SERVICE_ACCOUNT)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/715288179162/locations/us-central1/pipelineJobs/kfp-03c-fraud-20220217183723-20220217222516
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/715288179162/locations/us-central1/pipelineJobs/kfp-03c-fraud-20220217183723-20220217222516')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kfp-03c-fraud-20220217183723-20220217222516?project=715288179162
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/715288179162/locations/us-central1/pipelineJobs/kfp-03c-fraud-20220217183723-20220217222516 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/7

In [174]:
aiplatform.get_pipeline_df(pipeline = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}')

Unnamed: 0,pipeline_name,run_name,param.input:var_omit,param.input:deploy_machine,param.input:dataname,param.input:region,param.input:display_name,param.input:project,param.input:deploy_container,param.input:label,param.input:uri,param.input:var_target,param.input:bq_source,metric.auPRC,metric.confusionMatrix
0,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217222516,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,0.681381,"{'rows': [{'row': [28370.0, 0.0]}, {'row': [53..."
1,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217205016,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,0.681381,"{'rows': [{'row': [28370.0, 0.0]}, {'row': [53..."
2,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217204556,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,0.681381,{'annotationSpecs': [{'displayName': 'Not Frau...
3,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217204121,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,0.696497,{'annotationSpecs': [{'displayName': 'Not Frau...
4,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217203739,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,0.681381,{'annotationSpecs': [{'displayName': 'Not Frau...
5,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217195716,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,0.681381,{'annotationSpecs': [{'displayName': 'Not Frau...
6,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217193225,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,,
7,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217192700,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,,
8,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217185420,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,,
9,kfp-03c-fraud-20220217183723,kfp-03c-fraud-20220217183723-20220217183944,transaction_id,n1-standard-4,fraud,us-central1,03c_fraud_20220217183723,statmike-demo3,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,03c,gs://statmike-demo3/fraud/models/03c,Class,bq://statmike-demo3.fraud.fraud_prepped,,


## Review Pipeline Run

<img src="architectures/notebooks/03c_screenshots/kfp_pipeline.png">

---
## Prediction

### Prepare a record for prediction: instance and parameters lists

In [159]:
pred = bigquery.query(query = f"SELECT * FROM {DATANAME}.{DATANAME}_prepped WHERE splits='TEST' LIMIT 10").to_dataframe()

In [160]:
pred.head(4)

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V23,V24,V25,V26,V27,V28,Amount,Class,transaction_id,splits
0,7148,1.156386,0.193513,0.24222,0.660729,0.236144,0.311471,-0.08842,0.057844,1.123405,...,-0.051662,-0.262183,0.47787,0.556403,-0.046953,-0.021878,0.0,0,0eddc3ef-a61b-4fba-a3ab-0ed9a726dcf0,TEST
1,76311,-0.186529,0.545755,2.432618,3.266129,-0.784549,3.167033,-2.460489,-1.830983,0.389492,...,-0.40038,-1.26528,1.231,0.749402,0.147862,0.187856,0.0,0,b1111e03-a559-4eb4-ab32-e3aea0072ef7,TEST
2,125139,1.879049,0.212473,-0.085529,3.554091,0.205505,1.188395,-0.672662,0.375249,-0.494351,...,0.131433,0.256023,-0.13545,0.048878,0.003082,-0.042219,0.0,0,0a0f4b69-01ee-436e-ae52-02237cd6433e,TEST
3,51632,1.26405,0.182193,0.02091,0.47806,-0.037823,-0.490973,0.16669,-0.130607,-0.1572,...,-0.167644,0.075563,0.698539,0.556361,-0.052595,-0.011799,0.0,0,ed678d6e-8dea-4d45-92b7-74e7eba22402,TEST


In [161]:
newob = pred[pred.columns[~pred.columns.isin(VAR_OMIT.split()+[VAR_TARGET, 'splits'])]].to_dict(orient='records')[0]
#newob

Need to understand the format of variables that the predictions expect.  AutoML may convert the type of some variables. The following cells retrieve the model from the endpoint and its schemata:

In [162]:
newob['Time'] = str(newob['Time'])

In [163]:
instances = [json_format.ParseDict(newob, Value())]
parameters = json_format.ParseDict({}, Value())

### Get Predictions: Python Client

In [169]:
for e in aiplatform.Endpoint.list():
    if e.display_name.startswith('03b'): endpoint = e
print(endpoint.display_name)

03b_fraud_20220107180801


In [None]:
prediction = endpoint.predict(instances=instances, parameters=parameters)
prediction

In [None]:
prediction.predictions[0]['classes'][np.argmax(prediction.predictions[0]['scores'])]

### Get Predictions: REST

In [172]:
with open(f'{DIR}/request.json','w') as file:
    file.write(json.dumps({"instances": [newob]}))

In [173]:
!curl -X POST \
-H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
-H "Content-Type: application/json; charset=utf-8" \
-d @{DIR}/request.json \
https://{REGION}-aiplatform.googleapis.com/v1/{endpoint.resource_name}:predict

{
  "error": {
    "code": 400,
    "message": "{\n    \"error\": \"Serving signature name: \\\"serving_default\\\" not found in signature def\"\n}",
    "status": "INVALID_ARGUMENT"
  }
}


### Get Predictions: gcloud (CLI)

In [69]:
!gcloud beta ai endpoints predict {endpoint.name.rsplit('/',1)[-1]} --region={REGION} --json-request={DIR}/request.json

Using endpoint [https://us-central1-prediction-aiplatform.googleapis.com/]
[{'classes': ['0', '1'], 'scores': [0.9791908264160156, 0.02080914564430714]}]


---
## Remove Resources
see notebook "99 - Cleanup"