# Build Pipeline with Two Branches for GCP AutoML

- One for creating a new dataset, training a new model, deploying the model to a new Endpoint
- One for importing(adding) an additional dataset, retraining the previouls model, deploying the model to the already exist Endpoint

> **NOTICE**: You should change the GCS bucket names, GCP project ID with your owns

<img src="https://i.ibb.co/kXcVrcm/Screen-Shot-2022-01-14-at-10-55-29-AM.png" height="1000"/>

# General Setup

In [None]:
!pip install -Uqq kfp
!pip install -Uqq google-cloud-aiplatform
!pip install -Uqq google-cloud-pipeline-components

In [None]:
!gcloud init

Welcome! This command will take you through the configuration of gcloud.

Settings from your current configuration [handson2] are:
component_manager:
  disable_update_check: 'True'
core:
  account: chansung.tester.1015@gmail.com
  project: celtic-iridium-338202

Pick configuration to use:
 [1] Re-initialize this configuration [handson2] with new settings 
 [2] Create a new configuration
 [3] Switch to and re-initialize existing configuration: [default]
 [4] Switch to and re-initialize existing configuration: [handson]
Please enter your numeric choice:  2

Enter configuration name. Names start with a lower case letter and contain only 
lower case letters a-z, digits 0-9, and hyphens '-':  hhh
Your current configuration has been set to: [hhh]

You can skip diagnostics next time by using the following flag:
  gcloud init --skip-diagnostics

Network diagnostic detects and fixes local network connection issues.
Reachability Check passed.
Network diagnostic passed (1/1 checks passed).

Choos

In [None]:
import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2.dsl import component

In [None]:
#@title GCS
PROJECT_ID = 'phonic-agility-338223'  #@param {type:"string"}
PIPELINE_ROOT_PATH = 'gs://my-pipeline-1012'  #@param {type:"string"}
PIPELINE_NAME = 'cifar10-pipeline-automl' #@param {type:"string"}

# Custom Components for retraining

## get_dataset_id component

to get dataset ID if one exists

In [None]:
from kfp.v2.dsl import Artifact, Output

@component(
    packages_to_install=["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)
def get_dataset_id(project_id: str, 
                  location: str,
                  dataset_name: str,
                  dataset_path: str,
                  dataset: Output[Artifact]) -> str:
    from google.cloud import aiplatform
    from google.cloud.aiplatform.datasets.image_dataset import ImageDataset
    from google_cloud_pipeline_components.types.artifact_types import VertexDataset

    
    aiplatform.init(project=project_id, location=location)
    
    datasets = aiplatform.ImageDataset.list(project=project_id,
                                            location=location,
                                            filter=f'display_name={dataset_name}')
    
    if len(datasets) > 0:
        dataset.metadata['resourceName'] = f'projects/{project_id}/locations/{location}/datasets/{datasets[0].name}'
        return f'projects/{project_id}/locations/{location}/datasets/{datasets[0].name}'
    else:
        return 'None'

In [None]:
from kfp.v2.dsl import Artifact, Output

@component(
    packages_to_install=["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)
def get_model_id(project_id: str, 
                 location: str,
                 model_name: str,
                 model: Output[Artifact]) -> str:
    from google.cloud import aiplatform
    from google_cloud_pipeline_components.types.artifact_types import VertexModel
    
    aiplatform.init(project=project_id, location=location)
    
    models = aiplatform.Model.list(project=project_id,
                                   location=location,
                                   filter=f'display_name={model_name}')
    
    if len(models) > 0:
        model.metadata['resourceName'] = f'projects/{project_id}/locations/{location}/models/{models[0].name}'
        return f'projects/{project_id}/locations/{location}/models/{models[0].name}'
    else:
        return 'None'

# Pipeline

## Define pipeline

In [None]:
from google.cloud.aiplatform.datasets.image_dataset import ImageDataset
from google_cloud_pipeline_components.types.artifact_types import VertexDataset

@kfp.dsl.pipeline(
    name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT_PATH)
def pipeline(project_id: str, 
             location: str,
             dataset_name: str,
             dataset_path: str,
             base_model_name: str):
    
    dataset_op = get_dataset_id(project_id=project_id,
                                location=location,
                                dataset_name=dataset_name,
                                dataset_path=dataset_path)
    
    with kfp.dsl.Condition(dataset_op.outputs['Output'] == 'None', name="create dataset"):
        ds_op = gcc_aip.ImageDatasetCreateOp(
            project=project_id,
            display_name=dataset_name,
            gcs_source=dataset_path,
            import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
        )
        ds_op.after(dataset_op)
        
        training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
            project=project_id,
            display_name="train-cifar10-automl",
            prediction_type="classification",
            model_type="CLOUD",
            base_model=None,
            dataset=ds_op.outputs["dataset"],
            model_display_name="cifar10-model",
            training_fraction_split=0.6,
            validation_fraction_split=0.2,
            test_fraction_split=0.2,
            budget_milli_node_hours=8000,
        )
        training_job_run_op.after(ds_op)

        create_endpoint_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            display_name = "cifar10-automl-endpoint",
        )
        create_endpoint_op.after(training_job_run_op)

        model_deploy_op = gcc_aip.ModelDeployOp(
            model=training_job_run_op.outputs["model"],
            endpoint=create_endpoint_op.outputs['endpoint'],
            automatic_resources_min_replica_count=1,
            automatic_resources_max_replica_count=1,
        )
        model_deploy_op.after(create_endpoint_op)

    with kfp.dsl.Condition(dataset_op.outputs['Output'] != 'None', name="update dataset"):
        ds_op = gcc_aip.ImageDatasetImportDataOp(
            dataset=dataset_op.outputs['dataset'],
            gcs_source=dataset_path,
            import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification
        )
        ds_op.after(dataset_op)

        model_op = get_model_id(
            project_id=project_id,
            location=location,
            model_name=base_model_name
        )
        model_op.after(ds_op)

        with kfp.dsl.Condition(model_op.outputs['Output'] != 'None', name='model exist'):
          training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
              project=project_id,
              display_name="train-cifar10-automl",
              prediction_type="classification",
              model_type="CLOUD",
              base_model=model_op.outputs['model'],
              dataset=ds_op.outputs["dataset"],
              model_display_name="cifar10-model",
              training_fraction_split=0.6,
              validation_fraction_split=0.2,
              test_fraction_split=0.2,
              budget_milli_node_hours=8000,
          )
          training_job_run_op.after(model_op)

          create_endpoint_op = gcc_aip.EndpointCreateOp(
              project=project_id,
              display_name = "cifar10-automl-endpoint",
          )
          create_endpoint_op.after(training_job_run_op)

          model_deploy_op = gcc_aip.ModelDeployOp(
              model=training_job_run_op.outputs["model"],
              endpoint=create_endpoint_op.outputs['endpoint'],
              automatic_resources_min_replica_count=1,
              automatic_resources_max_replica_count=1,
              traffic_split={"0": 100},
          )
          model_deploy_op.after(create_endpoint_op)      

        with kfp.dsl.Condition(model_op.outputs['Output'] == 'None', name='model not exist'):
          training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
              project=project_id,
              display_name="train-cifar10-automl",
              prediction_type="classification",
              model_type="CLOUD",
              base_model=None,
              dataset=ds_op.outputs["dataset"],
              model_display_name="cifar10-model",
              training_fraction_split=0.6,
              validation_fraction_split=0.2,
              test_fraction_split=0.2,
              budget_milli_node_hours=8000,
          )
          training_job_run_op.after(model_op)

          create_endpoint_op = gcc_aip.EndpointCreateOp(
              project=project_id,
              display_name = "cifar10-automl-endpoint",
          )
          create_endpoint_op.after(training_job_run_op)

          model_deploy_op = gcc_aip.ModelDeployOp(
              model=training_job_run_op.outputs["model"],
              endpoint=create_endpoint_op.outputs['endpoint'],
              automatic_resources_min_replica_count=1,
              automatic_resources_max_replica_count=1,
          )
          model_deploy_op.after(create_endpoint_op)          

## Compile pipeline

In [None]:
pipeline_spec_file = 'cifar10_classification_pipeline.json'

In [None]:
from kfp.v2 import compiler

compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path=pipeline_spec_file)



## Create GCP Bucket & Copy the pipeline spec 

In [None]:
#@title GCS
REGION = "us-central1" #@param {type:"string"}

!gsutil mb -l {REGION} {PIPELINE_ROOT_PATH}
!gsutil cp {pipeline_spec_file} {PIPELINE_ROOT_PATH}/

Creating gs://my-pipeline-1012/...
ServiceException: 409 A Cloud Storage bucket named 'my-pipeline-1012' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.
Copying file://cifar10_classification_pipeline.json [Content-Type=application/json]...
/ [1 files][ 68.2 KiB/ 68.2 KiB]                                                
Operation completed over 1 objects/68.2 KiB.                                     


# Test Pipeline

In [None]:
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [None]:
import os
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID

In [None]:
location = 'us-central1'

job = aiplatform.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="cifar10_classification_pipeline.json",
    pipeline_root=PIPELINE_ROOT_PATH,
    parameter_values={
        'project_id': PROJECT_ID,
        'location': REGION,
        'dataset_name': 'cifar10-dataset',
        'dataset_path': 'gs://my-cifar10-dataset/span-1/annotations.csv',
        'base_model_name': 'cifar10-model',
    }
)

job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/161071819378/locations/us-central1/pipelineJobs/cifar10-pipeline-automl-20220114011458
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/161071819378/locations/us-central1/pipelineJobs/cifar10-pipeline-automl-20220114011458')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/cifar10-pipeline-automl-20220114011458?project=161071819378


In [None]:
location = 'us-central1'

job = aiplatform.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="cifar10_classification_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id,
        'location': location,
        'dataset_name': 'cifar10-dataset2',
        'dataset_path': 'gs://my-cifar10-dataset/span-2/annotations.csv',
    }
)

job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


InvalidArgument: 400 Some input parameters of the PipelineSpec.root are missing from PipelineJob.runtimeConfig.parameters: ([base_model_name])