# An end-to-end Vertex Training Pipeline Demonstration

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.2


In [2]:
import os
import json
from functools import partial

import kfp
import pprint
import yaml
from jinja2 import Template
from kfp.v2 import dsl
from kfp.v2.compiler import compiler
from kfp.v2.dsl import Dataset
from kfp.v2.google.client import AIPlatformClient

In [3]:
project_id='woven-rush-197905'
project_number='297370817971'

In [4]:
af_registry_location='asia-southeast1'
af_registry_name='mlops-vertex-kit'

In [5]:
components_dir='../components/'

In [6]:
def _load_custom_component(project_id: str,
                           af_registry_location: str,
                           af_registry_name: str,
                           components_dir: str,
                           component_name: str):
  component_path = os.path.join(components_dir,
                                component_name,
                                'component.yaml.jinja')
  with open(component_path, 'r') as f:
    component_text = Template(f.read()).render(
      project_id=project_id,
      af_registry_location=af_registry_location,
      af_registry_name=af_registry_name)

  return kfp.components.load_component_from_text(component_text)

load_custom_component = partial(_load_custom_component,
                                project_id=project_id,
                                af_registry_location=af_registry_location,
                                af_registry_name=af_registry_name,
                                components_dir=components_dir)

In [7]:
preprocess_op = load_custom_component(component_name='data_preprocess')
train_op = load_custom_component(component_name='train_model')
check_metrics_op = load_custom_component(component_name='check_model_metrics')
create_endpoint_op = load_custom_component(component_name='create_endpoint')
test_endpoint_op = load_custom_component(component_name='test_endpoint')
deploy_model_op = load_custom_component(component_name='deploy_model')
monitor_model_op = load_custom_component(component_name='monitor_model')

Then define the pipeline using the following function:

In [8]:
pipeline_region='asia-southeast1'
pipeline_root='gs://vertex_pipeline_demo_root/pipeline_root'

In [9]:
data_region='asia-southeast1'
input_dataset_uri='bq://woven-rush-197905.vertex_pipeline_demo.banknote_authentication'
gcs_data_output_folder='gs://vertex_pipeline_demo_root/datasets/training'
training_data_schema='VWT:float;SWT:float;KWT:float;Entropy:float;Class:int'

data_pipeline_root='gs://vertex_pipeline_demo_root/compute_root'

In [10]:
training_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/training:latest'
serving_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/serving:latest'
custom_job_service_account=f'{project_number}-compute@developer.gserviceaccount.com'

In [11]:
training_container_image_uri,serving_container_image_uri,custom_job_service_account

('asia-southeast1-docker.pkg.dev/woven-rush-197905/mlops-vertex-kit/training:latest',
 'asia-southeast1-docker.pkg.dev/woven-rush-197905/mlops-vertex-kit/serving:latest',
 '297370817971-compute@developer.gserviceaccount.com')

In [12]:
train_additional_args = json.dumps({
    'num_leaves_hp_param_min': 6,
    'num_leaves_hp_param_max': 11,
    'max_depth_hp_param_min': -1,
    'max_depth_hp_param_max': 4,
    'num_boost_round': 300,
    'min_data_in_leaf': 5
})
train_additional_args

'{"num_leaves_hp_param_min": 6, "num_leaves_hp_param_max": 11, "max_depth_hp_param_min": -1, "max_depth_hp_param_max": 4, "num_boost_round": 300, "min_data_in_leaf": 5}'

In [13]:
@dsl.pipeline(name='training-pipeline-template')
def pipeline(project_id: str,
             data_region: str,
             gcs_data_output_folder: str,
             input_dataset_uri: str,
             training_data_schema: str,
             data_pipeline_root: str,
             
             training_container_image_uri: str,
             train_additional_args: str,
             serving_container_image_uri: str,
             custom_job_service_account: str,
             hptune_region: str,
             hp_config_suggestions_per_request: int,
             hp_config_max_trials: int,
             
             metrics_name: str,
             metrics_threshold: float,
             
             endpoint_machine_type: str,
             endpoint_min_replica_count: int,
             endpoint_max_replica_count: int,
             endpoint_test_instances: str,
             
             monitoring_user_emails: str,
             monitoring_log_sample_rate: float,
             monitor_interval: int,
             monitoring_default_threshold: float,
             monitoring_custom_skew_thresholds: str,
             monitoring_custom_drift_thresholds: str,
             
             machine_type: str = "n1-standard-8",
             accelerator_count: int = 0,
             accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',
             vpc_network: str = "",
             enable_model_monitoring: str = 'False'):

    dataset_importer = kfp.v2.dsl.importer(
      artifact_uri=input_dataset_uri,
      artifact_class=Dataset,
      reimport=False)

    preprocess_task = preprocess_op(
      project_id=project_id,
      data_region=data_region,
      gcs_output_folder=gcs_data_output_folder,
      gcs_output_format="CSV",
      input_dataset=dataset_importer.output)
    
    train_task = train_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      input_data_schema=training_data_schema,
      training_container_image_uri=training_container_image_uri,
      train_additional_args=train_additional_args,
      serving_container_image_uri=serving_container_image_uri,
      custom_job_service_account=custom_job_service_account,
      input_dataset=preprocess_task.outputs['output_dataset'],
      machine_type=machine_type,
      accelerator_count=accelerator_count,
      accelerator_type=accelerator_type,
      hptune_region=hptune_region,
      hp_config_max_trials=hp_config_max_trials,
      hp_config_suggestions_per_request=hp_config_suggestions_per_request,
      vpc_network=vpc_network)
    
    check_metrics_task = check_metrics_op(
      metrics_name=metrics_name,
      metrics_threshold=metrics_threshold,
      basic_metrics=train_task.outputs['basic_metrics'])
    
    create_endpoint_task = create_endpoint_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      display_name='endpoint-classification-template',
      create_if_not_exists=True)

    deploy_model_task = deploy_model_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      machine_type=endpoint_machine_type,
      min_replica_count=endpoint_min_replica_count,
      max_replica_count=endpoint_max_replica_count,
      model=train_task.outputs['output_model'],
      endpoint=create_endpoint_task.outputs['endpoint'])
    
    test_endpoint_task = test_endpoint_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      endpoint=create_endpoint_task.outputs['endpoint'],
      test_instances=endpoint_test_instances,
    ).after(deploy_model_task)
    
    with dsl.Condition(enable_model_monitoring == 'True', name='Monitoring'):
        monitor_model_task = monitor_model_op(
          project_id=project_id,
          data_region=data_region,
          user_emails=monitoring_user_emails,
          log_sample_rate=monitoring_log_sample_rate,
          monitor_interval=monitor_interval,
          default_threshold=monitoring_default_threshold,
          custom_skew_thresholds=monitoring_custom_skew_thresholds,
          custom_drift_thresholds=monitoring_custom_drift_thresholds,
          endpoint=create_endpoint_task.outputs['endpoint'],
          instance_schema=train_task.outputs['instance_schema'],
          dataset=preprocess_task.outputs['output_dataset'])
        monitor_model_task.after(deploy_model_task)

### Compile and run the end-to-end ML pipeline
With our full pipeline defined, it's time to compile it:

In [14]:
compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path="training_pipeline_job.json"
)

Next, instantiate an API client:

In [15]:
api_client = AIPlatformClient(
    project_id=project_id,
    region=pipeline_region)



Next, kick off a pipeline run:

In [16]:
test_instances = json.dumps([
		{"VWT":3.6216,"SWT":8.6661,"KWT":-2.8073,"Entropy":-0.44699,"Class":"0"},
		{"VWT":4.5459,"SWT":8.1674,"KWT":-2.4586,"Entropy":-1.4621,"Class":"0"},
		{"VWT":3.866,"SWT":-2.6383,"KWT":1.9242,"Entropy":0.10645,"Class":"0"},
		{"VWT":-3.7503,"SWT":-13.4586,"KWT":17.5932,"Entropy":-2.7771,"Class":"1"},
		{"VWT":-3.5637,"SWT":-8.3827,"KWT":12.393,"Entropy":-1.2823,"Class":"1"},
		{"VWT":-2.5419,"SWT":-0.65804,"KWT":2.6842,"Entropy":1.1952,"Class":"1"}
		])
test_instances

'[{"VWT": 3.6216, "SWT": 8.6661, "KWT": -2.8073, "Entropy": -0.44699, "Class": "0"}, {"VWT": 4.5459, "SWT": 8.1674, "KWT": -2.4586, "Entropy": -1.4621, "Class": "0"}, {"VWT": 3.866, "SWT": -2.6383, "KWT": 1.9242, "Entropy": 0.10645, "Class": "0"}, {"VWT": -3.7503, "SWT": -13.4586, "KWT": 17.5932, "Entropy": -2.7771, "Class": "1"}, {"VWT": -3.5637, "SWT": -8.3827, "KWT": 12.393, "Entropy": -1.2823, "Class": "1"}, {"VWT": -2.5419, "SWT": -0.65804, "KWT": 2.6842, "Entropy": 1.1952, "Class": "1"}]'

In [17]:
pipeline_params = {
    'project_id': project_id,
    'data_region': data_region,
    'gcs_data_output_folder': gcs_data_output_folder,
    'input_dataset_uri': input_dataset_uri,
    'training_data_schema': training_data_schema,
    'data_pipeline_root': data_pipeline_root,
    
    'training_container_image_uri': training_container_image_uri,
    'train_additional_args': train_additional_args,
    'serving_container_image_uri': serving_container_image_uri,
    'custom_job_service_account': custom_job_service_account,
    'hptune_region':"asia-east1",
    'hp_config_suggestions_per_request': 5,
    'hp_config_max_trials': 30,
    
    'metrics_name': 'au_prc',
    'metrics_threshold': 0.4,
    
    'endpoint_machine_type': 'n1-standard-4',
    'endpoint_min_replica_count': 1,
    'endpoint_max_replica_count': 1,
    'endpoint_test_instances': test_instances,
    
    'monitoring_user_emails': 'luoshixin@google.com',
    'monitoring_log_sample_rate': 0.8,
    'monitor_interval': 3600,
    'monitoring_default_threshold': 0.3,
    'monitoring_custom_skew_thresholds': 'VWT:.5,SWT:.2,KWT:.7,Entropy:.4',
    'monitoring_custom_drift_thresholds': 'VWT:.5,SWT:.2,KWT:.7,Entropy:.4',
    'enable_model_monitoring': 'True'
}

response = api_client.create_run_from_job_spec(
    job_spec_path="training_pipeline_job.json", 
    pipeline_root=pipeline_root,
    parameter_values=pipeline_params,
    enable_caching=False)