In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: pipeline control structures using the KFP SDK

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
     </a>
  </td>
</table>
<br/><br/><br/>

## Overview

This notebooks shows how to use [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/) to build [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) that use control structures.

### Objective

In this tutorial, you use the KFP SDK to build pipelines that use loops and conditionals, including nested examples.

This tutorial uses the following Google Cloud ML services:

- `Vertex AI Pipelines`

The steps performed include:

- Create a KFP pipeline:
    - Use control flow components
- Compile the KFP pipeline.
- Execute the KFP pipeline using `Vertex AI Pipelines`

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

If you are using Colab or Vertex AI Workbench Notebook, your environment already meets all the requirements to run this notebook. You can skip this step.

Otherwise, make sure your environment meets this notebook's requirements. You need the following:

- The Cloud Storage SDK
- Git
- Python 3
- virtualenv
- Jupyter notebook running in a virtual environment with Python 3

The Cloud Storage guide to [Setting up a Python development environment](https://cloud.google.com/python/setup) and the [Jupyter installation guide](https://jupyter.org/install) provide detailed instructions for meeting these requirements. The following steps provide a condensed set of instructions:

1. [Install and initialize the SDK](https://cloud.google.com/sdk/docs/).

2. [Install Python 3](https://cloud.google.com/python/setup#installing_python).

3. [Install virtualenv](Ihttps://cloud.google.com/python/setup#installing_and_using_virtualenv) and create a virtual environment that uses Python 3.

4. Activate that environment and run `pip3 install Jupyter` in a terminal shell to install Jupyter.

5. Run `jupyter notebook` on the command line in a terminal shell to launch Jupyter.

6. Open this notebook in the Jupyter Notebook Dashboard.


## Installation

Install the packages required for executing this notebook.

In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.13 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.17 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

In [3]:
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

In [5]:
! pip3 install -U google-cloud-storage {USER_FLAG} -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.13 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.17 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

### Restart the kernel

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [1]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.13


## Before you begin

### GPU runtime

This tutorial does not require a GPU runtime.

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the Vertex AI APIs, Compute Engine APIs, and Cloud Storage.](https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,compute_component,storage-component.googleapis.com)

4. [The Google Cloud SDK](https://cloud.google.com/sdk) is already installed in Google Cloud Notebook.

5. Enter your project ID in the cell below. Then run the  cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$`.

In [3]:
PROJECT_ID = "vertex-and-spark-demo"  # @param {type:"string"}

In [4]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)

In [None]:
#! gcloud config set project $PROJECT_ID

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)

In [5]:
REGION = "us-central1"  # @param {type: "string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append the timestamp onto the name of resources you create in this tutorial.

In [6]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your Google Cloud account

**If you are using Vertex AI Workbench Notebook**, your environment is already authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "Vertex AI"
into the filter box, and select
   **Vertex AI Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [None]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Vertex AI Workbench, then don't execute this code
IS_COLAB = "google.colab" in sys.modules
if not os.path.exists("/opt/deeplearning/metadata/env_version") and not os.getenv(
    "DL_ANACONDA_HOME"
):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you initialize the Vertex AI SDK for Python, you specify a Cloud Storage staging bucket. The staging bucket is where all the data associated with your dataset and model resources are retained across sessions.

Set the name of your Cloud Storage bucket below. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.

In [7]:
BUCKET_NAME = "vertex-and-spark-demo-bucket"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [8]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "aip-" + TIMESTAMP
    BUCKET_URI = "gs://" + BUCKET_NAME

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_URI

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [9]:
! gsutil ls -al $BUCKET_URI

      4136  2022-08-22T00:53:26Z  gs://vertex-and-spark-demo-bucket/batch_examples.csv#1661129606249814  metageneration=1
                                 gs://vertex-and-spark-demo-bucket/pipeline_root/
TOTAL: 1 objects, 4136 bytes (4.04 KiB)


#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [10]:
SERVICE_ACCOUNT = "s8s-lab-sa@vertex-and-spark-demo.iam.gserviceaccount.com"  # @param {type:"string"}

In [9]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [10]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [11]:
import json

import google.cloud.aiplatform as aip
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [12]:
PIPELINE_ROOT = "{}/pipeline_root/control".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [13]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define pipeline components

The following example defines three simple pipeline components:

- A component that generates a list of dicts and outputs the stringified json.
(Note: This component requires an `import json` in the component function definition)
- A component that just prints its input string
- A component that does a 'coin flip' and outputs `heads` or `tails`.

In [14]:
@component
def args_generator_op() -> str:
    import json

    return json.dumps(
        [{"cats": "1", "dogs": "2"}, {"cats": "10", "dogs": "20"}],
        sort_keys=True,
    )


@component
def print_op(msg: str):
    print(msg)


@component
def flip_coin_op() -> str:
    """Flip a coin and output heads or tails randomly."""
    import random

    result = "heads" if random.randint(0, 1) == 0 else "tails"
    return result

## Define a pipeline that uses control structures

The following example defines a pipeline that uses these components and demonstrates the use of  `dsl.Condition` and `dsl.ParallelFor`.

The `json_string` input's default value is a nested JSON list converted to a string. As the pipeline definition shows, the loop and conditional expressions are able to process this string as a list, and access list items and sub-items.
The same holds for the list output by the `args_generator_op`.

In [15]:
@dsl.pipeline(
    name="control-{}".format(TIMESTAMP),
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    json_string: str = json.dumps(
        [
            {
                "snakes": "anaconda",
                "lizards": "anole",
                "bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
            },
            {
                "snakes": "cobra",
                "lizards": "gecko",
                "bunnies": [{"cottontail": "roger"}],
            },
            {
                "snakes": "boa",
                "lizards": "iguana",
                "bunnies": [
                    {"cottontail": "fluffy"},
                    {"fuzzy_lop": "petunia", "cottontail": "peter"},
                ],
            },
        ],
        sort_keys=True,
    )
):

    flip1 = flip_coin_op()

    with dsl.Condition(
        flip1.output != "no-such-result", name="alwaystrue"
    ):  # always true

        args_generator = args_generator_op()
        with dsl.ParallelFor(args_generator.output) as item:
            print_op(json_string)

            with dsl.Condition(flip1.output == "heads", name="heads"):
                print_op(item.cats)

            with dsl.Condition(flip1.output == "tails", name="tails"):
                print_op(item.dogs)

    with dsl.ParallelFor(json_string) as item:
        with dsl.Condition(item.snakes == "boa", name="snakes"):
            print_op(item.snakes)
            print_op(item.lizards)
            print_op(item.bunnies)

    # it is possible to access sub-items
    with dsl.ParallelFor(json_string) as item:
        with dsl.ParallelFor(item.bunnies) as item_bunnies:
            print_op(item_bunnies.cottontail)

## Compile the pipeline

Next, compile the pipeline.

In [16]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="control_pipeline.json".replace(" ", "_")
)



## Run the pipeline

Next, run the pipeline.

In [17]:
DISPLAY_NAME = "control_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="control_pipeline.json".replace(" ", "_"),
    pipeline_root=PIPELINE_ROOT,
)

job.run()

! rm control_pipeline.json

Creating PipelineJob
PipelineJob created. Resource name: projects/110702671568/locations/us-central1/pipelineJobs/control-20220822002156-20220822002428
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/110702671568/locations/us-central1/pipelineJobs/control-20220822002156-20220822002428')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/control-20220822002156-20220822002428?project=110702671568
PipelineJob projects/110702671568/locations/us-central1/pipelineJobs/control-20220822002156-20220822002428 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/110702671568/locations/us-central1/pipelineJobs/control-20220822002156-20220822002428 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/110702671568/locations/us-central1/pipelineJobs/control-20220822002156-20220822002428 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/11070267

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the UI, many of the pipeline DAG nodes will expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/control_flow_dag.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/control_flow_dag.png" width="95%"/></a>

You can see, for example, that the 'heads' condition passed, and thus the 'tails' condition— as we would expect— did not.

# SPARK Components

In [13]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"


KFP SDK version: 1.8.13
google_cloud_pipeline_components version: 1.0.17


In [14]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [15]:
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

In [16]:
BUCKET_NAME

'gs://vertex-and-spark-demo-bucket'

In [17]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/opt/conda/bin:/opt/conda/condabin:/opt/conda/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/jupyter/.local/bin


'gs://vertex-and-spark-demo-bucket/pipeline_root/'

In [34]:
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text


In [35]:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')


In [36]:
@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)


In [37]:
@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

In [38]:
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

In [39]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)



In [40]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [41]:
job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

In [42]:
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/110702671568/locations/us-central1/pipelineJobs/hello-world-pipeline-20220822003755
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/110702671568/locations/us-central1/pipelineJobs/hello-world-pipeline-20220822003755')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-pipeline-20220822003755?project=110702671568


# Spark jobs

In [45]:
%%writefile batch_examples.csv
Area,Perimeter,MajorAxisLength,MinorAxisLength,AspectRation,Eccentricity,ConvexArea,EquivDiameter,Extent,Solidity,roundness,Compactness,ShapeFactor1,ShapeFactor2,ShapeFactor3,ShapeFactor4
23288,558.113,207.567738,143.085693,1.450653336,0.7244336162,23545,172.1952453,0.8045881703,0.9890847314,0.9395021523,0.8295857874,0.008913077034,0.002604069884,0.6882125787,0.9983578734
23689,575.638,205.9678003,146.7475015,1.403552348,0.7016945718,24018,173.6714472,0.7652721693,0.9863019402,0.8983750474,0.8431970773,0.00869465998,0.002711119968,0.7109813112,0.9978994889
23727,559.503,189.7993849,159.3717704,1.190922235,0.5430731512,24021,173.8106863,0.8037601626,0.9877607094,0.952462433,0.9157600082,0.007999299741,0.003470231343,0.8386163926,0.9987269085
31158,641.105,212.0669751,187.1929601,1.132879009,0.4699241567,31474,199.1773023,0.7813134733,0.989959967,0.9526231013,0.9392188582,0.0068061806,0.003267009878,0.8821320637,0.9993488983
32514,649.012,221.4454899,187.1344232,1.183349841,0.5346736437,32843,203.4652564,0.7849831,0.9899826447,0.9700068737,0.9188051492,0.00681077351,0.002994124691,0.8442029022,0.9989873701
33078,659.456,235.5600775,178.9312328,1.316483846,0.6503915309,33333,205.2223615,0.7877214708,0.9923499235,0.9558229607,0.8712102818,0.007121351881,0.002530662194,0.7590073551,0.9992209221
33680,683.09,256.203255,167.9334938,1.525623324,0.7552213942,34019,207.081404,0.80680321,0.9900349805,0.9070392732,0.8082699962,0.007606985006,0.002002710402,0.6533003868,0.9966903078
33954,716.75,277.3684803,156.3563259,1.773951126,0.825970469,34420,207.9220419,0.7994819873,0.9864613597,0.8305492781,0.7496238998,0.008168948587,0.001591181142,0.5619359911,0.996846984
36322,719.437,272.0582306,170.8914975,1.591993952,0.7780978465,36717,215.0502424,0.7718560075,0.9892420405,0.8818487005,0.7904566678,0.007490177594,0.001803782407,0.6248217437,0.9947124371
36675,742.917,285.8908964,166.8819538,1.713132487,0.8119506999,37613,216.0927123,0.7788277766,0.9750618137,0.8350248381,0.7558572692,0.0077952528,0.001569528272,0.5713202115,0.9787472145
37454,772.679,297.6274753,162.1493177,1.835514817,0.8385619338,38113,218.3756257,0.8016695205,0.9827093118,0.7883332637,0.7337213257,0.007946480356,0.001420623993,0.5383469838,0.9881438654
37789,766.378,313.5680678,154.3409867,2.031657789,0.8704771226,38251,219.3500608,0.7805870567,0.9879218844,0.8085170916,0.6995293312,0.008297866252,0.001225659709,0.4893412853,0.9941740339
47883,873.536,327.9986493,186.5201272,1.758516115,0.822571799,48753,246.9140116,0.7584464543,0.9821549443,0.7885506623,0.7527897207,0.006850002074,0.00135695419,0.5666923636,0.9965376533
49777,861.277,300.7570338,211.6168613,1.42123379,0.7105823885,50590,251.7499649,0.8019106536,0.9839296304,0.843243269,0.8370542883,0.00604208839,0.001829706116,0.7006598815,0.9958014989
49882,891.505,357.1890036,179.8346914,1.986207449,0.8640114945,51042,252.0153467,0.7260210171,0.9772736178,0.7886896753,0.7055518063,0.007160679276,0.001094585314,0.4978033513,0.9887407248
53249,919.923,325.3866286,208.9174205,1.557489212,0.7666552108,54195,260.3818974,0.6966846347,0.9825445152,0.7907120655,0.8002231025,0.00611066177,0.001545654241,0.6403570138,0.9973491406
61129,964.969,369.3481688,210.9473449,1.750902193,0.8208567513,61796,278.9836198,0.7501135067,0.9892064211,0.8249553283,0.7553404711,0.006042110436,0.001213219664,0.5705392272,0.9989583843
61918,960.372,353.1381442,224.0962377,1.575832543,0.7728529173,62627,280.7782864,0.7539207091,0.9886790043,0.8436218213,0.7950947556,0.005703319619,0.00140599258,0.6321756704,0.9962029945
141953,1402.05,524.2311633,346.3974998,1.513380332,0.7505863011,143704,425.1354762,0.7147107987,0.9878152313,0.9074598849,0.8109694843,0.003692991084,0.0009853172185,0.6576715044,0.9953071199
145285,1440.991,524.9567463,353.0769977,1.486805285,0.7400216694,146709,430.0960442,0.7860466375,0.9902937107,0.8792413513,0.8192980608,0.003613289371,0.001004269363,0.6712493125,0.9980170255
146153,1476.383,526.1933264,356.528288,1.475881001,0.7354662103,149267,431.3789276,0.7319360978,0.9791380546,0.8425962592,0.8198107159,0.003600290972,0.001003163512,0.6720896099,0.991924286

Writing batch_examples.csv


In [46]:
!gsutil cp batch_examples.csv $BUCKET_NAME

Copying file://batch_examples.csv [Content-Type=text/csv]...
/ [1 files][  4.0 KiB/  4.0 KiB]                                                
Operation completed over 1 objects/4.0 KiB.                                      


In [47]:
BUCKET_NAME

'gs://vertex-and-spark-demo-bucket'

In [21]:
@component(base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:latest", 
           packages_to_install=["google-cloud-dataproc==2.5.0", "google-cloud-storage==1.41.1", "scikit-learn==0.24.2"], 
           output_component_file="2_sparkling_vertex_train_gb.yaml")

def train_gb(project_id:str, 
             region:str, 
             bucket_name:str,
             cluster_spec:str,
            #train_dataset:Input[Dataset],
            #test_dataset:Input[Dataset],
            #val_dataset:Input[Dataset],
            #metrics: Output[Metrics],
            graph_metrics: Output[ClassificationMetrics],
            model: Output[Model])-> NamedTuple("Outputs",
                                    [("metrics_dict", str),
                                     ("thold_metric", float),
                                     ("model_uri", str),
                                     ("train_dataset_uri", str),
                                     ("val_dataset_uri", str),],):
    
    train_gb_pyspark_job_spec = {
     'reference': {
         'project_id': project_id,
         'job_id': train_gb_job_id
     },
     'placement': {
         'cluster_name': cluster_name
     },
     'pyspark_job': {
         'main_python_file_uri': f'gs://{bucket_name}/train_gb.py',
         'args': ['--project-id', project_id,
                  '--bucket', f'gs://{bucket_name}',
                  '--train-uri', train_dataset.uri,
                  '--test-uri', test_dataset.uri,
                  '--metrics-file', metrics_file_name,
                  '--model', model_name]
                  }
    }


In [23]:
@pipeline(name="custom-spark-beans-custom",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://vertex-and-spark-demo.beans_demo.large_dataset",
    bucket: str = BUCKET_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = REGION,
    bq_dest: str = "",
    container_uri: str = "",
    batch_destination: str = ""
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        display_name="tabular-beans-dataset",
        bq_source=bq_source,
        project=project,
        location=gcp_region
    )
    # Create the train_gb operation to train and evaluate model
    train_gb_op = (train_gb(
      project_id=project_id,
      region=region,
      bucket_name=bucket_name,
      cluster_spec=cluster_spec,
      dataset=dataset_create_op.outputs["dataset"],
      train_dataset=dataset_create_op.outputs['train_dataset'],
      test_dataset=dataset_create_op.outputs['test_dataset'],
      val_dataset=dataset_create_op.outputs['val_dataset'])
      .after(dataset_create_op))
    # Set Condition to validate the model compared au_prc threshold
    with Condition(train_gb_op.outputs['thold_metric'] > thold,
                 name=AUPR_HYPERTUNE_CONDITION):
        # Create the hypertune_gb operation to hypertune and evaluate model
        hypertune_gb_op = (hypertune_gb(
           project_id=project_id,
           region=region,
           bucket_name=bucket_name,
           cluster_spec=cluster_spec,
           train_dataset_uri=train_gb_op.outputs['train_dataset_uri'],
           val_dataset_uri=train_gb_op.outputs['val_dataset_uri'])
           .after(train_gb_op))

        # Create the deploy_op to deploy the model on Dataproc cluster
        deploy_gb_op = deploy_gb(
            project_id=project_id,
            region=region,
            bucket_name=bucket_name,
            cluster_spec=cluster_spec,
            stream_folder=stream_folder,
            model=hypertune_gb_op.outputs['tune_model'])
    

In [1]:
@component(base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:latest", 
           packages_to_install=["google-cloud-dataproc==2.5.0", "google-cloud-storage==1.41.1", "scikit-learn==0.24.2"], 
           output_component_file="3_sparkling_vertex_train_gb.yaml")

def prepare_data(
    bq_table: str,
    output_data_path: OutputPath("Dataset")
):
    
    # As a precaution, run a query in Spark SQL to ensure no NULL values exist.
    sql_query = """    
        WITH
          base_table AS (
          SELECT
            *
          FROM
            `vertex-and-spark-demo.beans_demo.large_dataset`)
          -- Main Query
        SELECT
          *,
          RAND() AS pseudo_random,
        IF
          (RAND() < 0.8,
            'train',
            'test') AS split_set,
        FROM
          large_dataset
    """
    clean_data = spark.sql(sql_query)

    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()



    # Create an input DataFrame for Spark ML using the above function.
    training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                                 "features"])
    training_data.cache()



NameError: name 'component' is not defined

In [19]:
def run_get_prediction_transform(args):
    bucket = args.bucket
    stream_uri = f'{args.bucket}/{args.stream_folder}'
    tune_model_uri = args.tune_model_uri

    # create a spark session
    logging.info(f"Instantiating the {APP_NAME} Spark session.")
    spark = (SparkSession.builder \
             .master("local") \
             .appName(APP_NAME) \
             .getOrCreate())

    try:

        # start prediction process
        logging.info(f"Ingest streaming data under {stream_uri}.")
        stream_raw_df = (spark.readStream \
                         .option("header", True) \
                         .option("delimiter", ',') \
                         .option("maxFilesPerTrigger", 1) \
                         .option("rowsPerSecond", 10) \
                         .schema(DATA_SCHEMA) \
                         .csv(stream_uri))
        logging.info(f"Load model into memory.")
        tune_model = CrossValidatorModel.load(tune_model_uri)
        logging.info(f" Start streaming prediction process.")
        gb_prediction_transform = get_prediction_transform(
            stream_raw_df,
            tune_model)
        logging.info(f" Run prediction query.")
        predictions_query = (gb_prediction_transform \
                             .writeStream \
                             .format("console") \
                             .outputMode("append") \
                             .queryName("predictions") \
                             .start())
        predictions_query.awaitTermination()
    except RuntimeError as error:
        logging.info(error)

In [20]:
def _submit_pyspark_job(project_id, region, job_spec):
    # create the job client.
    job_client = dataproc.JobControllerClient(
        client_options={
            'api_endpoint':
            f'{region}-dataproc.googleapis.com:443'
        })

    # create the job operation.
    job_op = job_client.submit_job_as_operation(
        request={"project_id": project_id,
                 "region": region,
                 "job": job_spec}
    )
    result = job_op.result()
    return result

In [18]:
@kfp.dsl.pipeline(name=f"pyspark-anomaly-detection-pipeline-{ID}", pipeline_root=PIPELINE_ROOT)

def pipeline(project_id:str = PROJECT_ID,
            region:str = REGION,
            bucket_name:str = BUCKET,
            cluster_spec: BASE_CLUSTER_SPEC,
            raw_data: str = RAW_DATA_PATH,
            thold: float = AUPR_THRESHOLD,
            stream_folder: str = STREAM_FOLDER
            ):
    """
    Combine prepare_data, train_model, hypertune_gb and deploy_gb components in order to train and serve the predictive maintenance model.

    Args:
        project_id: The name of pipeline GPC project
        region: The region where the pipeline will run
        bucket_name: The bucket name where training data are stored
        cluster_spec: The basic cluster spec to run pyspark jobs
        raw_data: The name of training data source
        thold: The minimum performance threshold to deploy the model
        stream_folder: The name of folder to store the predictions

    """

    # Create the prepare_data operation to get training data
    prepare_data_op = dataset_create_op(
      project_id=project_id,
      region=region,
      bucket_name=bucket_name,
      cluster_spec=cluster_spec,
      raw_file=raw_data)

    # Create the train_gb operation to train and evaluate model
    train_gb_op = (train_gb(
      project_id=project_id,
      region=region,
      bucket_name=bucket_name,
      cluster_spec=cluster_spec,
      train_dataset=prepare_data_op.outputs['train_dataset'],
      test_dataset=prepare_data_op.outputs['test_dataset'],
      val_dataset=prepare_data_op.outputs['val_dataset'])
      .after(prepare_data_op))


    # Set Condition to validate the model compared au_prc threshold
    with Condition(train_gb_op.outputs['thold_metric'] > thold,
                 name=AUPR_HYPERTUNE_CONDITION):
        # Create the hypertune_gb operation to hypertune and evaluate model
        hypertune_gb_op = (hypertune_gb(
           project_id=project_id,
           region=region,
           bucket_name=bucket_name,
           cluster_spec=cluster_spec,
           train_dataset_uri=train_gb_op.outputs['train_dataset_uri'],
           val_dataset_uri=train_gb_op.outputs['val_dataset_uri'])
           .after(train_gb_op))

        # Create the deploy_op to deploy the model on Dataproc cluster
        deploy_gb_op = deploy_gb(
            project_id=project_id,
            region=region,
            bucket_name=bucket_name,
            cluster_spec=cluster_spec,
            stream_folder=stream_folder,
            model=hypertune_gb_op.outputs['tune_model'])

SyntaxError: non-default argument follows default argument (2828720695.py, line 3)

In [49]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="custom_train_pipeline.json"
)



In [50]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [52]:
pipeline_job = aiplatform.PipelineJob(
    display_name="custom-train-pipeline",
    template_path="custom_train_pipeline.json",
    job_id="custom-train-pipeline-{0}".format(TIMESTAMP),
    parameter_values={
        "project": PROJECT_ID,
        "bucket": BUCKET_NAME,
        "bq_dest": "bq://{0}".format(PROJECT_ID),
        "container_uri": "gcr.io/{0}/sparkcustom:v1".format(PROJECT_ID),
        "batch_destination": "{0}/batchpredresults".format(BUCKET_NAME)
    },
    enable_caching=True,
)

In [53]:
pipeline_job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/110702671568/locations/us-central1/pipelineJobs/custom-train-pipeline-20220822010011
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/110702671568/locations/us-central1/pipelineJobs/custom-train-pipeline-20220822010011')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-train-pipeline-20220822010011?project=110702671568


# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial -- *Note:* this is auto-generated and not all resources may be applicable for this tutorial:

- Dataset
- Pipeline
- Model
- Endpoint
- Batch Job
- Custom Job
- Hyperparameter Tuning Job
- Cloud Storage Bucket

In [None]:
delete_dataset = True
delete_pipeline = True
delete_model = True
delete_endpoint = True
delete_batchjob = True
delete_customjob = True
delete_hptjob = True
delete_bucket = True

try:
    if delete_model and "DISPLAY_NAME" in globals():
        models = aip.Model.list(
            filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
        )
        model = models[0]
        aip.Model.delete(model)
        print("Deleted model:", model)
except Exception as e:
    print(e)

try:
    if delete_endpoint and "DISPLAY_NAME" in globals():
        endpoints = aip.Endpoint.list(
            filter=f"display_name={DISPLAY_NAME}_endpoint", order_by="create_time"
        )
        endpoint = endpoints[0]
        endpoint.undeploy_all()
        aip.Endpoint.delete(endpoint.resource_name)
        print("Deleted endpoint:", endpoint)
except Exception as e:
    print(e)

if delete_dataset and "DISPLAY_NAME" in globals():
    if "none" == "tabular":
        try:
            datasets = aip.TabularDataset.list(
                filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
            )
            dataset = datasets[0]
            aip.TabularDataset.delete(dataset.resource_name)
            print("Deleted dataset:", dataset)
        except Exception as e:
            print(e)

    if "none" == "image":
        try:
            datasets = aip.ImageDataset.list(
                filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
            )
            dataset = datasets[0]
            aip.ImageDataset.delete(dataset.resource_name)
            print("Deleted dataset:", dataset)
        except Exception as e:
            print(e)

    if "none" == "text":
        try:
            datasets = aip.TextDataset.list(
                filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
            )
            dataset = datasets[0]
            aip.TextDataset.delete(dataset.resource_name)
            print("Deleted dataset:", dataset)
        except Exception as e:
            print(e)

    if "none" == "video":
        try:
            datasets = aip.VideoDataset.list(
                filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
            )
            dataset = datasets[0]
            aip.VideoDataset.delete(dataset.resource_name)
            print("Deleted dataset:", dataset)
        except Exception as e:
            print(e)

try:
    if delete_pipeline and "DISPLAY_NAME" in globals():
        pipelines = aip.PipelineJob.list(
            filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
        )
        pipeline = pipelines[0]
        aip.PipelineJob.delete(pipeline.resource_name)
        print("Deleted pipeline:", pipeline)
except Exception as e:
    print(e)

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI