The purpose of this notebook is to build our pipeline components, test each component and eventually build a pipeline out of these components and test that. Once the pipeline is built, we can move the code to a compile_pipeline.py file to be compiled as a CI/CD step. The code for running the pipeline will exist in a runpipeline.py file which will be run as part of the CI/CD as well. 

Set the values below according to your project.

In [None]:
PROJECT_ID = "prj-d-ml-machine-learning-id"
REGION = "us-central1"
BUCKET_URI = "gs://MACHINE_LEARNING_PROJECT_BUCKET_ID"
DATAFLOW_SUBNET="https://www.googleapis.com/compute/v1/projects/YOUR-PROJECD-P-SHARED-ID/regions/us-central1/subnetworks/sb-d-shared-restricted-us-central1"
KMS_KEY = "projects/KMS_PROJECT/locations/us-central1/keyRings/sample-keyring/cryptoKeys/ML_MACHINE_LEARNING_PROJECT_ID"
COMPUTE_ENGINE_SA = "MACHINE_LEARNING_PROJECT_NUMBER-compute@developer.gserviceaccount.com"
DATAFLOW_RUNNER_SA = "DATAFLOW-SA@MACHINE_LEARNING_PROJECT.iam.gserviceaccount.com"
VERTEX_MODEL_SA = "VERTEX-SA@MACHINE_LEARNING_PROJECT.iam.gserviceaccount.com"

You can set these variables to your desired values or leave them as is

In [None]:
KFP_COMPONENTS_PATH = "components"
SRC = "src"

In [None]:
# create a src directory for dataflow source code
!mkdir -m 777 -p {SRC}

This is the image we will use to run pipeline components. Replace the name of the artifact project with that of yours, e.g.:
##### "us-central1-docker.pkg.dev/{prj-c-ml-artifacts-####}/c-publish-artifacts/vertexpipeline:v2"
As part of the project inflation pipelines, the image from the Dockerfile in this repository is built and pushed to project

In [None]:
Image = "us-central1-docker.pkg.dev/prj-c-ml-artifacts-id/c-publish-artifacts/vertexpipeline:v2"

while working in the dev environment's notebook, we install the required dependencies

In [None]:
!pip install tensorflow==2.8.0 tensorflow-hub==0.13.0
!pip install kfp==2.7.0

In [None]:
!pip install protobuf==3.20.0

copy data to bucket

In [None]:
!gsutil cp -r data {BUCKET_URI}

import the required libraries to run experiments and build components

In [None]:
from pathlib import Path as path
from urllib.parse import urlparse
import os
from six.moves import urllib
import tempfile
import numpy as np
import pandas as pd
import tensorflow as tf
import os
import tensorflow_hub as hub
from google.cloud import aiplatform
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError
from kfp import compiler, dsl
from kfp.dsl import component
from kfp.dsl import Input, Output, Model, Metrics, OutputPath
from typing import NamedTuple

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

set up global variables for running experiments and pipeline components (you can leave them as is to avoid any errors)

In [None]:
DATA_URL = f'{BUCKET_URI}/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)
DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_train_table'
EVAL_TABLE_ID = 'census_eval_table'
RUNNER = "DataflowRunner"
REGION="us-central1"
JOB_NAME="census-ingest"
CSV_SCHEMA = [
      bigquery.SchemaField("age", "FLOAT64"),
      bigquery.SchemaField("workclass", "STRING"),
      bigquery.SchemaField("fnlwgt", "FLOAT64"),
      bigquery.SchemaField("education", "STRING"),
      bigquery.SchemaField("education_num", "FLOAT64"),
      bigquery.SchemaField("marital_status", "STRING"),
      bigquery.SchemaField("occupation", "STRING"),
      bigquery.SchemaField("relationship", "STRING"),
      bigquery.SchemaField("race", "STRING"),
      bigquery.SchemaField("gender", "STRING"),
      bigquery.SchemaField("capital_gain", "FLOAT64"),
      bigquery.SchemaField("capital_loss", "FLOAT64"),
      bigquery.SchemaField("hours_per_week", "FLOAT64"),
      bigquery.SchemaField("native_country", "STRING"),
      bigquery.SchemaField("income_bracket", "STRING"),
  ]

UNUSED_COLUMNS = ["fnlwgt", "education_num"]

# BigQuery dataset creation

In [None]:
# create a directory to save the component
!mkdir -m 777 -p {KFP_COMPONENTS_PATH}/bq_dataset_component

In [None]:
create_bq_dataset_query = f"""
CREATE SCHEMA IF NOT EXISTS {DATASET_ID}
"""

with open(
    f"{KFP_COMPONENTS_PATH}/bq_dataset_component/create_bq_dataset.sql", "w"
) as q:
    q.write(create_bq_dataset_query)
q.close()

# Dataflow sources
The below cells generate the dataflow source code for ingesting data from our gcs bucket to the biquery dataset create above

In [None]:
!touch {SRC}/__init__.py

In [None]:
%%writefile src/ingest_pipeline.py
from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.dataframe.io import read_csv
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import SetupOptions

def get_bigquery_schema():
    """
    A function to get the BigQuery schema.
    Returns:
        A list of BigQuery schema.
    """

    table_schema = bigquery.TableSchema()
    columns = (('age', 'FLOAT64', 'nullable'),
               ('workclass', 'STRING', 'nullable'),
               ('fnlwgt', 'FLOAT64', 'nullable'),
               ('education', 'STRING', 'nullable'),
               ('education_num', 'FLOAT64', 'nullable'),
               ('marital_status', 'STRING', 'nullable'),
               ('occupation', 'STRING', 'nullable'),
               ("relationship", "STRING", 'nullable'),
               ("race", "STRING", 'nullable'),
               ("gender", "STRING", 'nullable'),
               ("capital_gain", "FLOAT64", 'nullable'),
               ("capital_loss", "FLOAT64", 'nullable'),
               ("hours_per_week", "FLOAT64", 'nullable'),
               ("native_country", "STRING", 'nullable'),
               ("income_bracket", "STRING", 'nullable')
              )

    for column in columns:
        column_schema = bigquery.TableFieldSchema()
        column_schema.name = column[0]
        column_schema.type = column[1]
        column_schema.mode = column[2]
        table_schema.fields.append(column_schema)

    return table_schema

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--url', dest='url', default="BUCKET_URI/data/adult.data.csv",
                        help='url of the data to be downloaded')
    parser.add_argument('--bq-dataset', dest='dataset_id', required=False,
                        default='census_dataset', help='Dataset name used in BigQuery.')
    parser.add_argument('--bq-table', dest='table_id', required=False,
                        default='census_train_table', help='Table name used in BigQuery.')
    parser.add_argument('--bq-project', dest='project_id', required=False,
                        default='majid-test-407120', help='project id')
    args, pipeline_args = parser.parse_known_args()
    return args, pipeline_args

def transform(line):
    values = line.split(",")
    d = {}
    fields = ["age","workclass","fnlwgt","education","education_num",
              "marital_status","occupation","relationship","race","gender",
              "capital_gain","capital_loss","hours_per_week","native_country","income_bracket"]
    for i in range(len(fields)):
        d[fields[i]] = values[i].strip()
    return d

def load_data_into_bigquery(args, pipeline_args):
    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=options)
 
    (p 
     | 'Create PCollection' >> beam.Create([args.url])
     | 'ReadFromText' >> ReadAllFromText(skip_header_lines=1)
     | 'string to bq row' >> beam.Map(lambda s: transform(s))
     | 'WriteToBigQuery' >> WriteToBigQuery(
        table=args.table_id,
        dataset=args.dataset_id,
        project=args.project_id,
        schema=get_bigquery_schema(),
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    )
    )

    job = p.run()
    if options.get_all_options()['runner'] == 'DirectRunner':
        job.wait_until_finish()

if __name__ == '__main__':
    args, pipeline_args = get_args()
    logging.getLogger().setLevel(logging.INFO)
    load_data_into_bigquery(args, pipeline_args)

Now upload the dataflow source code to the bucket to be accessible by Dataflow

In [None]:
!gsutil cp -R {SRC} {BUCKET_URI}

# Dataflow arguments component
This components sole purpose is to prepare the command line arguments for Dataflow

In [None]:
@component(base_image=Image)
def build_dataflow_args(
    bq_dataset: str,
    url: str,
    bq_table: str,
    job_name: str,
    runner: str,
    bq_project: str,
    subnet: str,
    dataflow_sa: str,
) -> list:
    return [
        "--job_name",
        job_name,
        "--runner",
        runner,
        "--url",
        url,
        "--bq-dataset",
        bq_dataset,
        "--bq-table",
        bq_table,
        "--bq-project",
        bq_project,
        "--subnetwork",
        subnet,
        "--no_use_public_ips",
        "--worker_zone",
        "us-central1-c",
        "--service_account_email",
        dataflow_sa,
    ]

# Costum training component
The model training involves 3 simple steps:
1. Reading the data from bigquery using the tensorflow-io library (make sure you set the default kms key for your project above in advance)
2. Creating a keras model
3. Training the model on the data and optionally storing logs in a tensorboard directory (/tblogs) to enable tensorboard usage in future

After we test the training function, it can be wrapped in a vertex pipeline coponent using the @componet decorator as shown below.

In [None]:
!mkdir -m 777 -p {KFP_COMPONENTS_PATH}/custom_training_component

In [None]:
@component(
    base_image=Image,
    output_component_file=f"{KFP_COMPONENTS_PATH}/custom_training_component/training.yaml"
)
def custom_train_model(
    project: str,
    table: str,
    dataset: str,
    tb_log_dir: str,
    model: Output[Model],
    epochs: int = 5,
    batch_size: int = 32,
    lr: float = 0.01, # not used here but can be passed to an optimizer
):
    
    from tensorflow.python.framework import ops
    from tensorflow.python.framework import dtypes
    from tensorflow_io.bigquery import BigQueryClient
    from tensorflow_io.bigquery import BigQueryReadSession
    from tensorflow import feature_column
    from google.cloud import bigquery
    
    import tensorflow as tf
    CSV_SCHEMA = [
      bigquery.SchemaField("age", "FLOAT64"),
      bigquery.SchemaField("workclass", "STRING"),
      bigquery.SchemaField("fnlwgt", "FLOAT64"),
      bigquery.SchemaField("education", "STRING"),
      bigquery.SchemaField("education_num", "FLOAT64"),
      bigquery.SchemaField("marital_status", "STRING"),
      bigquery.SchemaField("occupation", "STRING"),
      bigquery.SchemaField("relationship", "STRING"),
      bigquery.SchemaField("race", "STRING"),
      bigquery.SchemaField("gender", "STRING"),
      bigquery.SchemaField("capital_gain", "FLOAT64"),
      bigquery.SchemaField("capital_loss", "FLOAT64"),
      bigquery.SchemaField("hours_per_week", "FLOAT64"),
      bigquery.SchemaField("native_country", "STRING"),
      bigquery.SchemaField("income_bracket", "STRING"),
  ]

    UNUSED_COLUMNS = ["fnlwgt", "education_num"]
    def transform_row(row_dict):
        # Trim all string tensors
        trimmed_dict = { column:
                      (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) 
                      for (column,tensor) in row_dict.items()
                      }
        # Extract feature column
        income_bracket = trimmed_dict.pop('income_bracket')
        # Convert feature column to 0.0/1.0
        income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), 
                     lambda: tf.constant(1.0), 
                     lambda: tf.constant(0.0))
        return (trimmed_dict, income_bracket_float)

    def read_bigquery(table_name, dataset=dataset):
        tensorflow_io_bigquery_client = BigQueryClient()
        read_session = tensorflow_io_bigquery_client.read_session(
          "projects/" + project,
          project, table, dataset,
          list(field.name for field in CSV_SCHEMA 
               if not field.name in UNUSED_COLUMNS),
          list(dtypes.double if field.field_type == 'FLOAT64' 
               else dtypes.string for field in CSV_SCHEMA
               if not field.name in UNUSED_COLUMNS),
          requested_streams=2)

        dataset = read_session.parallel_read_rows()
        transformed_ds = dataset.map(transform_row)
        return transformed_ds

    training_ds = read_bigquery(table).shuffle(10000).batch(batch_size)



    feature_columns = []
    def get_categorical_feature_values(column):
        query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, project, dataset, table)
        client = bigquery.Client(project=project)
        dataset_ref = client.dataset(dataset)
        job_config = bigquery.QueryJobConfig()
        query_job = client.query(query, job_config=job_config)
        result = query_job.to_dataframe()
        return result.values[:,0]

    # numeric cols
    for header in ['capital_gain', 'capital_loss', 'hours_per_week']:
        feature_columns.append(feature_column.numeric_column(header))

    # categorical cols
    for header in ['workclass', 'marital_status', 'occupation', 'relationship',
                   'race', 'native_country', 'education']:
        categorical_feature = feature_column.categorical_column_with_vocabulary_list(
            header, get_categorical_feature_values(header))
        categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
        feature_columns.append(categorical_feature_one_hot)

    # bucketized cols
    age = feature_column.numeric_column('age')
    age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
    feature_columns.append(age_buckets)

    feature_layer = tf.keras.layers.DenseFeatures(feature_columns)


    Dense = tf.keras.layers.Dense
    keras_model = tf.keras.Sequential(
      [
        feature_layer,
          Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
          Dense(75, activation=tf.nn.relu),
          Dense(50, activation=tf.nn.relu),
          Dense(25, activation=tf.nn.relu),
          Dense(1, activation=tf.nn.sigmoid)
      ])

    tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir)
    # Compile Keras model
    keras_model.compile(loss='binary_crossentropy', metrics=['accuracy'])
    keras_model.fit(training_ds, epochs=epochs, callbacks=[tensorboard])
    keras_model.save(model.path)

In [None]:
!pip install --upgrade google-cloud-pipeline-components

In [None]:
from google_cloud_pipeline_components.v1.custom_job import utils
custom_job_distributed_training_op = utils.create_custom_training_job_op_from_component(
    custom_train_model, replica_count=1
)

# Custom Evaluation component
This step is very similar to the training component. At the end we flag the model for deployment only if its' accuracy is over 80 percent

In [None]:
!mkdir -m 777 -p {KFP_COMPONENTS_PATH}/custom_eval_component

In [None]:
# evaluation component
@component(
    base_image=Image,
    output_component_file=f"{KFP_COMPONENTS_PATH}/custom_eval_component/eval.yaml"
)
def custom_eval_model(
    model_dir: str,
    project: str,
    table: str,
    dataset: str,
    tb_log_dir: str,
    model: Input[Model],
    metrics: Output[Metrics],
    batch_size: int = 32,
)-> NamedTuple("Outputs", [("dep_decision", str)]):
    from tensorflow.python.framework import ops
    from tensorflow.python.framework import dtypes
    from tensorflow_io.bigquery import BigQueryClient
    from tensorflow_io.bigquery import BigQueryReadSession
    from tensorflow import feature_column
    from google.cloud import bigquery
    
    
    import tensorflow as tf
    CSV_SCHEMA = [
      bigquery.SchemaField("age", "FLOAT64"),
      bigquery.SchemaField("workclass", "STRING"),
      bigquery.SchemaField("fnlwgt", "FLOAT64"),
      bigquery.SchemaField("education", "STRING"),
      bigquery.SchemaField("education_num", "FLOAT64"),
      bigquery.SchemaField("marital_status", "STRING"),
      bigquery.SchemaField("occupation", "STRING"),
      bigquery.SchemaField("relationship", "STRING"),
      bigquery.SchemaField("race", "STRING"),
      bigquery.SchemaField("gender", "STRING"),
      bigquery.SchemaField("capital_gain", "FLOAT64"),
      bigquery.SchemaField("capital_loss", "FLOAT64"),
      bigquery.SchemaField("hours_per_week", "FLOAT64"),
      bigquery.SchemaField("native_country", "STRING"),
      bigquery.SchemaField("income_bracket", "STRING"),
  ]

    UNUSED_COLUMNS = ["fnlwgt", "education_num"]
    def transform_row(row_dict):
        # Trim all string tensors
        trimmed_dict = { column:
                      (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) 
                      for (column,tensor) in row_dict.items()
                      }
        # Extract feature column
        income_bracket = trimmed_dict.pop('income_bracket')
        # Convert feature column to 0.0/1.0
        income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), 
                     lambda: tf.constant(1.0), 
                     lambda: tf.constant(0.0))
        return (trimmed_dict, income_bracket_float)

    def read_bigquery(table_name, dataset=dataset):
        tensorflow_io_bigquery_client = BigQueryClient()
        read_session = tensorflow_io_bigquery_client.read_session(
          "projects/" + project,
          project, table, dataset,
          list(field.name for field in CSV_SCHEMA 
               if not field.name in UNUSED_COLUMNS),
          list(dtypes.double if field.field_type == 'FLOAT64' 
               else dtypes.string for field in CSV_SCHEMA
               if not field.name in UNUSED_COLUMNS),
          requested_streams=2)

        dataset = read_session.parallel_read_rows()
        transformed_ds = dataset.map(transform_row)
        return transformed_ds

    eval_ds = read_bigquery(table).batch(batch_size)
    keras_model = tf.keras.models.load_model(model.path)
    tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tb_log_dir)
    loss, accuracy = keras_model.evaluate(eval_ds, callbacks=[tensorboard])
    metrics.log_metric("accuracy", accuracy)
    # Deploy the model only if its accuracy is higher than 80 percent
    if accuracy > 0.8:
        dep_decision = "true"
        keras_model.save(model_dir)
    else:
        dep_decision = "false"
    return (dep_decision,)

# Deployment component

In [None]:
!mkdir -m 777 -p {KFP_COMPONENTS_PATH}/deployment_component

In [None]:
@component(
    base_image=Image,
    output_component_file=f"{KFP_COMPONENTS_PATH}/deployment_component/deploy.yaml"
)
def deploy_model(
        serving_container_image_uri: str,
        model_name: str,
        model_dir: str,
        endpoint_name: str,
        project_id: str,
        region: str,
        split: int,
        min_nodes: int,
        max_nodes: int,
        encryption: str,
        service_account: str,
        model: Input[Model],
        vertex_model: Output[Model],
        vertex_endpoint: Output[Model]
):
    from google.cloud import aiplatform    
    aiplatform.init(service_account=service_account)
    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"',
        order_by='create_time desc',
        project=project_id,
        location=region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0] # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
                display_name=endpoint_name,
                project=project_id,
                location=region,
                encryption_spec_key_name=encryption
        )
        return endpoint

    endpoint = create_endpoint()
    

    def upload_model():
        listed_model = aiplatform.Model.list(
        filter=f'display_name="{model_name}"',
        project=project_id,
        location=region,
        )
        if len(listed_model) > 0:
            model_version = listed_model[0]
            model_upload = aiplatform.Model.upload(
                    display_name=model_name,
                    parent_model=model_version.resource_name,
                    artifact_uri=model_dir,
                    serving_container_image_uri=serving_container_image_uri,
                    location=region,
                    project=project_id,
                    encryption_spec_key_name=encryption
            )
        else:
            model_upload = aiplatform.Model.upload(
                    display_name=model_name,
                    artifact_uri=model_dir,
                    serving_container_image_uri=serving_container_image_uri,
                    location=region,
                    project=project_id,
                    encryption_spec_key_name=encryption,
                
            )
        return model_upload
    
    uploaded_model = upload_model()
    
    # Save data to the output params
    vertex_model.uri = uploaded_model.resource_name
    def deploy_to_endpoint(model, endpoint):
        deployed_models = endpoint.list_models()
        if len(deployed_models) > 0:
            latest_model_id = deployed_models[-1].id
            print("your objects properties:", deployed_models[0].create_time.__dir__())
            model_deploy = uploaded_model.deploy(
                endpoint=endpoint,
                traffic_split={"0": 25, latest_model_id: 75},
                deployed_model_display_name=model_name,
                min_replica_count=min_nodes,
                max_replica_count=max_nodes,
                encryption_spec_key_name=encryption,
                service_account=service_account
            )
        else:
            model_deploy = uploaded_model.deploy(
            endpoint=endpoint,
            traffic_split={"0": 100},
            min_replica_count=min_nodes,
            max_replica_count=max_nodes,
            deployed_model_display_name=model_name,
            encryption_spec_key_name=encryption,
            service_account=service_account
        )
        return model_deploy.resource_name

    vertex_endpoint.uri = deploy_to_endpoint(vertex_model, endpoint)
    vertex_endpoint.metadata['resourceName']=endpoint.resource_name

# Model monitoring component

In [None]:
!mkdir -m 777 -p {KFP_COMPONENTS_PATH}/monitoring_component

In [None]:
@component(
    base_image=Image,
    output_component_file=f"{KFP_COMPONENTS_PATH}/monitoring_component/monitoring.yaml"
)
def create_monitoring(
    monitoring_name: str,
    project_id: str,
    region: str,
    endpoint: Input[Model],
    bq_data_uri: str,
    bucket_name: str,
    email: str,
    encryption: str,
    service_account: str,
):
    from google.cloud.aiplatform import model_monitoring
    from google.cloud import aiplatform
    from google.cloud import bigquery
    from google.cloud import storage
    from collections import OrderedDict
    import time
    import yaml
    def ordered_dict_representer(self, value):  # can be a lambda if that's what you prefer
        return self.represent_mapping('tag:yaml.org,2002:map', value.items())
    yaml.add_representer(OrderedDict, ordered_dict_representer)
    
    aiplatform.init(service_account=service_account)
    list_monitors = aiplatform.ModelDeploymentMonitoringJob.list(filter=f'(state="JOB_STATE_SUCCEEDED" OR state="JOB_STATE_RUNNING") AND display_name="{monitoring_name}"', project=project_id)
    if len(list_monitors) == 0:
        alerting_config = model_monitoring.EmailAlertConfig(
            user_emails=[email], enable_logging=True
        )
        # schedule config
        MONITOR_INTERVAL = 1
        schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL)
        # sampling strategy
        SAMPLE_RATE = 0.5 
        logging_sampling_strategy = model_monitoring.RandomSampleConfig(sample_rate=SAMPLE_RATE)
        # drift config
        DRIFT_THRESHOLD_VALUE = 0.05
        DRIFT_THRESHOLDS = {
            "capital_gain": DRIFT_THRESHOLD_VALUE,
            "capital_loss": DRIFT_THRESHOLD_VALUE,
        }
        drift_config = model_monitoring.DriftDetectionConfig(drift_thresholds=DRIFT_THRESHOLDS)
        # Skew config
        DATASET_BQ_URI = bq_data_uri
        TARGET = "income_bracket"
        SKEW_THRESHOLD_VALUE = 0.5
        SKEW_THRESHOLDS = {
            "capital_gain": SKEW_THRESHOLD_VALUE,
            "capital_loss": SKEW_THRESHOLD_VALUE,
        }
        skew_config = model_monitoring.SkewDetectionConfig(
            data_source=DATASET_BQ_URI, skew_thresholds=SKEW_THRESHOLDS, target_field=TARGET
        )
        # objective config out of skew and drift configs
        objective_config = model_monitoring.ObjectiveConfig(
            skew_detection_config=skew_config,
            drift_detection_config=drift_config,
            explanation_config=None,
        )

        bqclient = bigquery.Client()
        table = bigquery.TableReference.from_string(DATASET_BQ_URI[5:])
        bq_table = bqclient.get_table(table)
        schema = bq_table.schema
        schemayaml = OrderedDict({
            "type": "object",
            "properties": {},
            "required": []
        })
        for feature in schema:
            if feature.name in ["income_bracket"]:
                continue
            if feature.field_type == "STRING":
                f_type = "string"
            else:
                f_type = "number"
            schemayaml['properties'][feature.name] = {"type": f_type}
            if feature.name not in ["fnlwgt", "education_num"]:
                schemayaml['required'].append(feature.name)
            
        with open("monitoring_schema.yaml", "w") as yaml_file:
            yaml.dump(schemayaml, yaml_file, default_flow_style=False)
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob("monitoring_schema.yaml")
        blob.upload_from_filename("monitoring_schema.yaml")

        monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
            display_name=monitoring_name,
            project=project_id,
            location=region,
            endpoint=endpoint.metadata['resourceName'],
            logging_sampling_strategy=logging_sampling_strategy,
            schedule_config=schedule_config,
            alert_config=alerting_config,
            objective_configs=objective_config,
            analysis_instance_schema_uri=f"gs://{bucket_name}/monitoring_schema.yaml",
            encryption_spec_key_name=encryption,
        )

# Pipeline build and compile

In [None]:
@dsl.pipeline(name="census-income-pipeline")
def pipeline(
    create_bq_dataset_query: str,
    project: str,
    deployment_project: str,
    region: str,
    model_dir: str,
    bucket_name: str,
    monitoring_name: str,
    monitoring_email: str,
    encryption: str,
    service_account: str,
    prod_service_account: str,
    dataflow_subnet: str,
    train_data_url: str=TRAINING_URL,
    eval_data_url: str=EVAL_URL,
    bq_dataset: str=DATASET_ID,
    bq_train_table: str=TRAINING_TABLE_ID,
    bq_eval_table: str=EVAL_TABLE_ID,
    job_name: str=JOB_NAME,
    python_file_path: str=f'{BUCKET_URI}/src/ingest_pipeline.py',
    dataflow_temp_location: str=f'{BUCKET_URI}/temp_dataflow',
    runner: str=RUNNER,
    lr: float=0.01, 
    epochs: int=5,
    batch_size: int=32,
    base_train_dir: str=f'{BUCKET_URI}/training', 
    tb_log_dir: str=f'{BUCKET_URI}/tblogs',
    deployment_image: str="us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest",
    deployed_model_name: str='income_bracket_predictor',
    endpoint_name: str='census_endpoint',
    min_nodes: int=2,
    max_nodes: int=4,
    traffic_split: int=25,
    dataflow_sa: str=DATAFLOW_RUNNER_SA,
):
    from google_cloud_pipeline_components.v1.bigquery import (
        BigqueryQueryJobOp)
    from google_cloud_pipeline_components.v1.dataflow import \
        DataflowPythonJobOp
    from google_cloud_pipeline_components.v1.wait_gcp_resources import \
        WaitGcpResourcesOp
    
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.batch_predict_job import \
        ModelBatchPredictOp
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node
    from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
    
    # create the dataset
    bq_dataset_op = BigqueryQueryJobOp(
        query=create_bq_dataset_query,
        project=project,
        location=region,
    )

    # instantiate dataflow args
    dataflow_args_train = build_dataflow_args(
        job_name=f"{job_name}train",
        url=train_data_url,
        bq_dataset=bq_dataset,
        bq_table=bq_train_table,
        runner=runner,
        bq_project=project,
        subnet=dataflow_subnet,
        dataflow_sa=dataflow_sa,
    ).after(bq_dataset_op)
    dataflow_args_eval = build_dataflow_args(
        job_name=f"{job_name}eval",
        url=eval_data_url,
        bq_dataset=bq_dataset,
        bq_table=bq_eval_table,
        runner=runner,
        bq_project=project,
        subnet=dataflow_subnet,
        dataflow_sa=dataflow_sa,
    ).after(bq_dataset_op)

    # run dataflow job
    dataflow_python_train_op = DataflowPythonJobOp(
        python_module_path=python_file_path,
        args=dataflow_args_train.output,
        project=project,
        location=region,
        temp_location=f"{dataflow_temp_location}/train",
    ).after(dataflow_args_train)
    dataflow_python_eval_op = DataflowPythonJobOp(
        python_module_path=python_file_path,
        args=dataflow_args_eval.output,
        project=project,
        location=region,
        temp_location=f"{dataflow_temp_location}/eval",
    ).after(dataflow_args_eval)

    dataflow_wait_train_op = WaitGcpResourcesOp(
        gcp_resources=dataflow_python_train_op.outputs["gcp_resources"]
    ).after(dataflow_python_train_op)
    dataflow_wait_eval_op = WaitGcpResourcesOp(
        gcp_resources=dataflow_python_eval_op.outputs["gcp_resources"]
    ).after(dataflow_python_eval_op)
        
    # create and train model
    custom_training_task = custom_job_distributed_training_op(
        lr=lr,
        epochs=epochs,
        project=project,
        table=bq_train_table,
        dataset=bq_dataset,
        location=region,
        base_output_directory=base_train_dir,
        tb_log_dir=tb_log_dir,
        batch_size=batch_size
    ).after(dataflow_wait_train_op)
    
    custom_eval_task = custom_eval_model(
        model_dir=model_dir,
        project=project,
        table=bq_eval_table,
        dataset=bq_dataset,
        tb_log_dir=tb_log_dir,
        model=custom_training_task.outputs["model"],
        batch_size=batch_size,
    )
    custom_eval_task.after(custom_training_task)
    custom_eval_task.after(dataflow_wait_eval_op)
    with dsl.If(
        custom_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        model_deploy_op = deploy_model(
            serving_container_image_uri=deployment_image,
            model_name=deployed_model_name,
            endpoint_name=endpoint_name,
            project_id=deployment_project,
            region=region,
            split=traffic_split,
            model=custom_training_task.outputs['model'],
            model_dir=model_dir,
            min_nodes=min_nodes,
            max_nodes=max_nodes,
            encryption=encryption,
            service_account=prod_service_account
        ).after(custom_eval_task)
   
        monitroing_job = create_monitoring(
            monitoring_name=monitoring_name,
            project_id=deployment_project,
            region=region,
            endpoint=model_deploy_op.outputs['vertex_endpoint'],
            bq_data_uri=f"bq://{project}.{bq_dataset}.{bq_train_table}",
            bucket_name=bucket_name,
            email=monitoring_email,
            encryption=encryption,
            service_account=service_account
        ).after(model_deploy_op)
        

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="./common/vertex-ai-pipeline/pipeline_package.yaml")

# Pipeline run

Finally to test the pipeline end to end, set the input arguments for each component accordingly.
Note that there are two service accounts supplied. One for the current project and one for the prod environment. The reason behind it the CI/CD design that runs the pipeline in one environment (non-prod) and deploys the model to prod. 

Remember to update the monitoring_config parameter to the email that will be used.

In [None]:
from datetime import datetime
timestamp = datetime.now().strftime("%d_%H_%M_%S")
pipelineroot = f'{BUCKET_URI}/pipelineroot'
# In dev, these two service accounts are the same as the deployment environment is the same as where pipeline runs
service_account = COMPUTE_ENGINE_SA
prod_service_account = VERTEX_MODEL_SA

data_config={
 "train_data_url": TRAINING_URL,
 "eval_data_url": EVAL_URL,
 "bq_dataset": DATASET_ID,
 "bq_train_table": TRAINING_TABLE_ID,
 "bq_eval_table": EVAL_TABLE_ID,
}

dataflow_config={
                "job_name": JOB_NAME,
                "python_file_path": f'{BUCKET_URI}/src/ingest_pipeline.py',
                "temp_location": f'{BUCKET_URI}/temp_dataflow',
                "runner": RUNNER,
                "subnet": DATAFLOW_SUBNET,
}

train_config={
             'lr': 0.01, 
             'epochs': 5, 
             'base_train_dir': f'{BUCKET_URI}/training', 
             'tb_log_dir': f'{BUCKET_URI}/tblogs',
}

deployment_config={
    'image': "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest",
    'model_name': "income_bracket_predictor",
    'endpoint_name': "census_income_endpoint",
    'min_nodes': 2,
    'max_nodes': 4,
    'deployment_project': PROJECT_ID,
    # important to replace the encryption key here with the key in your own dev environment.
    # format would be: projects/prj-d-kms-####/locations/us-central1/keyRings/sample-keyring/cryptoKeys/prj-d-ml-machine-learning
    "encryption": 'KMS_KEY'
    "service_account": service_account,
    "prod_service_account": prod_service_account,
}

monitoring_config={
    'email': 'YOUR-EMAIL@YOUR-COMPANY.COM', 
    'name': 'census_monitoring'  
}



pipeline = aiplatform.PipelineJob(
    display_name=f"census_income_{timestamp}",
    template_path='./common/vertex-ai-pipeline/pipeline_package.yaml',
    pipeline_root=pipelineroot,
    # important to replace the envryption key here with the key in your own dev environment.
    # format would be: projects/prj-d-kms-####/locations/us-central1/keyRings/sample-keyring/cryptoKeys/prj-d-ml-machine-learning
    encryption_spec_key_name='KMS_KEY',
    parameter_values={
        "create_bq_dataset_query": create_bq_dataset_query,
        "bq_dataset": data_config['bq_dataset'],
        "bq_train_table": data_config['bq_train_table'],
        "bq_eval_table": data_config['bq_eval_table'],
        "job_name": dataflow_config['job_name'],
        "train_data_url": data_config['train_data_url'],
        "eval_data_url": data_config['eval_data_url'],
        "python_file_path": dataflow_config['python_file_path'],
        "dataflow_temp_location": dataflow_config['temp_location'],
        "runner": dataflow_config['runner'],
        "dataflow_subnet": dataflow_config['subnet'],
        "project": PROJECT_ID,
        "region": REGION,
        "model_dir": f"{BUCKET_URI}",
        "bucket_name": BUCKET_URI[5:],
        "epochs": train_config['epochs'],
        "lr": train_config['lr'],
        "base_train_dir": train_config['base_train_dir'],
        "tb_log_dir": train_config['tb_log_dir'],
        "deployment_image": deployment_config['image'],
        "deployed_model_name": deployment_config["model_name"],
        "endpoint_name": deployment_config["endpoint_name"],
        "min_nodes": deployment_config["min_nodes"],
        "max_nodes": deployment_config["max_nodes"],
        "deployment_project": deployment_config["deployment_project"],
        "encryption": deployment_config.get("encryption"),
        "service_account": deployment_config["service_account"],
        "prod_service_account": deployment_config["prod_service_account"],
        "monitoring_name": monitoring_config['name'],
        "monitoring_email": monitoring_config['email'], 
        
    },
    enable_caching=False,
)


pipeline.run(service_account=service_account)