In [None]:
################################################################################
#
# vertex pipeline: dask in vertex pipelines on custom machine type/size
#
################################################################################

In [2]:
# imports
import google.cloud.aiplatform as aiplatform
from datetime import datetime
from typing import NamedTuple
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, Metrics, Model, Output, component)

In [4]:
# project variables
PROJECT_ID = "your-project"
REGION = "us-central1"
BUCKET_NAME = f"bkt-{PROJECT_ID}"
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [None]:
#####################################################################
#
# Components
#
#####################################################################

In [43]:
#----------------
# component 1 - using rapids container image
#----------------
@component(base_image="gcr.io/your-project/mycuda:latest"
           , packages_to_install=["google-cloud-bigquery", "dask[complete]"]
           , output_component_file="step_1.yaml")
def step_1() -> NamedTuple("Outputs", # output parameters and artifacts
               [("computed_sum", str), ("computed_duration", str)]
               ):
    # imports
    import time
    import dask.array as da
    
    # ops    
    start_time = time.time()
    x = da.random.random((1000000, 1000000), chunks=(1000, 1000))
    s = x.sum().compute()
    computed_sum = f"Sum: {str(s)}"
    end_time = time.time()
    elapsed_time = end_time-start_time
    computed_duration = f"Elapsed time: {str(elapsed_time)}"
    return (computed_sum, computed_duration)

In [None]:
#####################################################################
#
# Define Pipeline
#
#####################################################################

In [6]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BATCH_ID = "ap-dask-custom-machine-" + TIMESTAMP
DISPLAY_NAME = f"ap-dask-custom-machine-{TIMESTAMP}"
DESCRIPTION = f"Demo of running dask in custom machine type {TIMESTAMP}"

In [7]:
# the pipeline
@dsl.pipeline(name=DISPLAY_NAME, description=DESCRIPTION)
def my_pipeline(
    batch_id: str = BATCH_ID,
    project: str = PROJECT_ID,
    location: str = REGION,
):
    #------comment out one of the following
    # default machine type
    #step_1_task = step_1()
    # custom machine type
    step_1_task = (step_1().set_cpu_limit('96').set_memory_limit('624G'))

In [51]:
# the pipeline
@dsl.pipeline(name=DISPLAY_NAME, description=DESCRIPTION)
def my_pipeline(
    batch_id: str = BATCH_ID,
    project: str = PROJECT_ID,
    location: str = REGION,
):
    #------comment out one of the following
    # default machine type
    #step_1_task = step_1()
    # custom machine type
    step_1_task = (step_1()
                   .set_cpu_limit('16')
                   .set_memory_limit('60G')
                   .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_P100')
                   .set_gpu_limit('1')
                  )

In [9]:
PIPELINE_JSON_PKG_PATH = f"{DISPLAY_NAME}_json_pkg.json"

# compile the pipeline
compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path=PIPELINE_JSON_PKG_PATH,
)

In [10]:
# initialize Vertex AI SDK for Python
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

# define params to pass to pipeline
pipeline_params = {
    "project": PROJECT_ID,
}

# set the pipeline job
pipeline_job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PIPELINE_JSON_PKG_PATH,
    pipeline_root=BUCKET_URI,
    parameter_values=pipeline_params,
    enable_caching=True,
)

# run the pipeline
pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/444388004085/locations/us-central1/pipelineJobs/ap-dask-custom-machine-20220506043231-20220506043303
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/444388004085/locations/us-central1/pipelineJobs/ap-dask-custom-machine-20220506043231-20220506043303')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/ap-dask-custom-machine-20220506043231-20220506043303?project=444388004085
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/444388004085/locations/us-central1/pipelineJobs/ap-dask-custom-machine-20220506043231-20220506043303 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.