In [1]:
# imports for this notebook to run
import sys
from datetime import datetime
from typing import NamedTuple

from google.cloud import aiplatform as vertex

import kfp
from kfp.v2 import dsl, compiler

In [2]:
# specify parameters
P = ! gcloud config list --format 'value(core.project)'
PROJECT_ID = P[0]
P = ! gcloud projects list --filter="$(gcloud config get-value project)" --format="value(PROJECT_NUMBER)"
PROJECT_NUMBER = P[0]
REGION = "us-central1"
SERVICE_ACCOUNT = f"sa-vertex-pipelines@{PROJECT_ID}.iam.gserviceaccount.com"

# exercise details
USE_CASE = "nestedpipelines"
ML_FRAMEWORK = "noml"
MODEL_TYPE = "nomodel"
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

#-------------------
# vertex pipeline
#-------------------

GCS_BUCKET_NAME = f"bkt-{REGION}-{USE_CASE.lower()}"
GCS_BUCKET_PATH = f"gs://{GCS_BUCKET_NAME}"
PIPELINE_ROOT_NAME = f"{GCS_BUCKET_NAME}/pipeline_root"
PIPELINE_ROOT_PATH = f"{GCS_BUCKET_PATH}/pipeline_root"

#-------------------
# GCS source data
#-------------------

# master source data bucket
GCS_DATA_SOURCE_BUCKET_NAME = f"bkt-{REGION}-data"
GCS_DATA_SOURCE_BUCKET_PATH = f"gs://{GCS_DATA_SOURCE_BUCKET_NAME}"

# training data table
TABLE_TRAIN = "tab_class_10inps_1krows_tes_3498.csv"
GCS_TRAIN_URI = f"{GCS_DATA_SOURCE_BUCKET_PATH}/{TABLE_TRAIN}"

In [None]:
# create use case gcs bucket if needed
! gsutil mb -p {PROJECT_ID} -c standard -l {REGION} {GCS_BUCKET_PATH}
! gsutil ls -L -b {GCS_BUCKET_PATH}

# Inner Pipelines

In [None]:
#-------------------
# import data
#-------------------

In [9]:
@dsl.pipeline(name='inner-pipeline-1')
def import_data_pipeline(gcs_train_uri: str):
    dataset_task = dsl.importer(
        artifact_uri = gcs_train_uri,
        artifact_class = dsl.Dataset,
        reimport=True
    )    
    return dataset_task

In [10]:
#-------------------
# handle data
#-------------------

In [11]:
# function
@dsl.component
def handle_data_function(
    dataset: dsl.Input[dsl.Dataset]
) -> str:
    # do anything here
    return dataset.path

In [12]:
# pipeline
@dsl.pipeline(name='inner-pipeline-2')
def handle_data_pipeline(
    dataset: dsl.Input[dsl.Dataset]
):
    task = handle_data_function(dataset.output)
    return task.output

# Outer pipeline

In [18]:
# master pipeline
@dsl.pipeline(name='outer-pipeline')
def master_pipeline(gcs_train_uri: str):
    task_1 = import_data_pipeline(gcs_train_uri)
    task_2 = handle_data_pipeline(task_1.output)
    return task_2

In [19]:
# compile the pipeline
my_package_path = 'my_vertex_pipeline_specification_file.json'
compiler.Compiler().compile(pipeline_func=master_pipeline, package_path=my_package_path)

In [20]:
#####################################################################
#
# compile and run the pipeline
#
#####################################################################

In [None]:
# runtime parameters to pass to pipeline
pipeline_params = { "gcs_train_uri" : GCS_TRAIN_URI
                  }

# run the pipeline
vertex.init(project=PROJECT_ID)

job = vertex.PipelineJob(
    display_name = "my-pipeline-job-name",
    template_path = my_package_path,
    pipeline_root = PIPELINE_ROOT_PATH,
    parameter_values = pipeline_params,
    enable_caching = False
)

job.submit(service_account = f"sa-vertex-pipelines@{PROJECT_ID}.iam.gserviceaccount.com")