# Continuous Training with Kubeflow Pipeline and Vertex AI

**Learning Objectives:**
1. Learn how to use KF pre-built components
1. Learn how to build a KF pipeline with these components
1. Learn how to compile, upload, and run a KF pipeline


In this lab, you will build, deploy, and run a KFP pipeline that orchestrates the **Vertex AI** services to train, tune, and deploy a **scikit-learn** model using the Google pre-built components.

## Setup

In [1]:
import os
from datetime import datetime

from google.cloud import aiplatform
aiplatform.init(location="us-east1")

In [2]:
REGION = "us-east1"
PROJECT_ID = !(gcloud config get-value project)
PROJECT_ID = PROJECT_ID[0]

In [3]:
# Set `PATH` to include the directory containing KFP CLI
PATH = %env PATH
%env PATH=/home/jupyter/.local/bin:{PATH}

env: PATH=/home/jupyter/.local/bin:/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin


## Understanding the pipeline design

The workflow implemented by the pipeline is defined using a Python based Domain Specific Language (DSL). The pipeline's DSL is in the `pipeline_vertex/pipeline_prebuilt.py` file that we will generate below.

The pipeline's DSL has been designed to avoid hardcoding any environment specific settings like file paths or connection strings. These settings are provided to the pipeline code through a set of environment variables.

### Build the trainer image

In [4]:
TRAINING_APP_FOLDER = "training_app"
os.makedirs(TRAINING_APP_FOLDER, exist_ok=True)

In [5]:
%%writefile {TRAINING_APP_FOLDER}/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire cloudml-hypertune implicit
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Overwriting training_app/Dockerfile


Let's now build and push this trainer container to the container registry:

In [6]:
IMAGE_NAME = "trainer_image_beer_vertex"
TAG = "latest"
TRAINING_CONTAINER_IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}"
TRAINING_CONTAINER_IMAGE_URI

'gcr.io/qwiklabs-asl-04-5e165f533cac/trainer_image_beer_vertex:latest'

In [7]:
!gcloud builds submit --timeout 15m --tag $TRAINING_CONTAINER_IMAGE_URI $TRAINING_APP_FOLDER

Creating temporary tarball archive of 4 file(s) totalling 16.3 KiB before compression.
Uploading tarball of [training_app] to [gs://qwiklabs-asl-04-5e165f533cac_cloudbuild/source/1654731775.693158-5acbccaf0adf4994b66397225c6b0367.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-asl-04-5e165f533cac/locations/global/builds/6955d136-5436-429e-8408-97b86b05f64c].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/6955d136-5436-429e-8408-97b86b05f64c?project=547029906128].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "6955d136-5436-429e-8408-97b86b05f64c"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-asl-04-5e165f533cac_cloudbuild/source/1654731775.693158-5acbccaf0adf4994b66397225c6b0367.tgz#1654731775998777
Copying gs://qwiklabs-asl-04-5e165f533cac_cloudbuild/source/1654731775.693158-5acbccaf0adf4994b66397225c6b0367.tgz#1654731775998777...
/ [1 files][  3.2 KiB/  3.2 KiB]                   

## Building and deploying the pipeline

Let us write the pipeline to disk:

In [9]:
%%writefile ./jay-pipeline/pipeline.py
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import os

from kfp import dsl
from training_lightweight_component import train_and_deploy
from tuning_lightweight_component import tune_hyperparameters

PIPELINE_ROOT = os.getenv("PIPELINE_ROOT")
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")

TRAINING_CONTAINER_IMAGE_URI = os.getenv("TRAINING_CONTAINER_IMAGE_URI")

TRAINING_FILE_PATH = os.getenv("TRAINING_FILE_PATH")
VALIDATION_FILE_PATH = os.getenv("VALIDATION_FILE_PATH")

MAX_TRIAL_COUNT = int(os.getenv("MAX_TRIAL_COUNT", "5"))
PARALLEL_TRIAL_COUNT = int(os.getenv("PARALLEL_TRIAL_COUNT", "5"))
THRESHOLD = float(os.getenv("THRESHOLD", "0.6"))

@dsl.pipeline(
    name="beer-kfp-pipeline",
    description="The pipeline training and deploying the Beer recommandation",
    pipeline_root=PIPELINE_ROOT,
)
def beer_recom_train(
    training_container_uri: str = TRAINING_CONTAINER_IMAGE_URI,
    training_file_path: str = TRAINING_FILE_PATH,
    validation_file_path: str = VALIDATION_FILE_PATH,
    map_at_10_deployment_threshold: float = THRESHOLD,
    max_trial_count: int = MAX_TRIAL_COUNT,
    parallel_trial_count: int = PARALLEL_TRIAL_COUNT,
    pipeline_root: str = PIPELINE_ROOT,
):
    staging_bucket = f"{pipeline_root}/staging"

    tuning_op = tune_hyperparameters(
        project=PROJECT_ID,
        location=REGION,
        container_uri=training_container_uri,
        training_file_path=training_file_path,
        validation_file_path=validation_file_path,
        staging_bucket=staging_bucket,
        max_trial_count=max_trial_count,
        parallel_trial_count=parallel_trial_count,
    )

    map_at_10 = tuning_op.outputs["best_map_at_10"]

    with dsl.Condition(
        map_at_10 >= map_at_10_deployment_threshold, name="deploy_decision"
    ):
        train_and_deploy_op = (  # pylint: disable=unused-variable
            train_and_deploy(
                project=PROJECT_ID,
                location=REGION,
                container_uri=training_container_uri,
                training_file_path=training_file_path,
                validation_file_path=validation_file_path,
                staging_bucket=staging_bucket,
                factors=tuning_op.outputs["best_factors"],
                regularization=tuning_op.outputs["best_regularization"],
                iterations=tuning_op.outputs["best_iterations"],
            )
        )

Writing ./jay-pipeline/pipeline.py


### Compile the pipeline

Let stat by defining the environment variables that will be passed to the pipeline compiler:

In [10]:
ARTIFACT_STORE = f"gs://{PROJECT_ID}-beer-artifact-store"
PIPELINE_ROOT = f"{ARTIFACT_STORE}/pipeline"
DATA_ROOT = f"{ARTIFACT_STORE}/data"

TRAINING_FILE_PATH = f"{DATA_ROOT}/train.parquet"
VALIDATION_FILE_PATH = f"{DATA_ROOT}/valid.parquet"

%env PIPELINE_ROOT={PIPELINE_ROOT}
%env PROJECT_ID={PROJECT_ID}
%env REGION={REGION}
%env TRAINING_CONTAINER_IMAGE_URI={TRAINING_CONTAINER_IMAGE_URI}
%env TRAINING_FILE_PATH={TRAINING_FILE_PATH}
%env VALIDATION_FILE_PATH={VALIDATION_FILE_PATH}

env: PIPELINE_ROOT=gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/pipeline
env: PROJECT_ID=qwiklabs-asl-04-5e165f533cac
env: REGION=us-east1
env: TRAINING_CONTAINER_IMAGE_URI=gcr.io/qwiklabs-asl-04-5e165f533cac/trainer_image_beer_vertex:latest
env: TRAINING_FILE_PATH=gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/data/train.parquet
env: VALIDATION_FILE_PATH=gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/data/valid.parquet


Let us make sure that the `ARTIFACT_STORE` has been created, and let us create it if not:

In [11]:
!gsutil ls | grep ^{ARTIFACT_STORE}/$ || gsutil mb -l {REGION} {ARTIFACT_STORE}

gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/


#### Use the CLI compiler to compile the pipeline

We compile the pipeline from the Python file we generated into a JSON description using the following command:

In [12]:
PIPELINE_JSON = "beer_kfp_pipeline.json"

In [25]:
ARTIFACT_STORE = f"gs://{PROJECT_ID}-beer-kfp-artifact-store"
PIPELINE_ROOT = f"{ARTIFACT_STORE}/pipeline"
DATA_ROOT = f"{ARTIFACT_STORE}/data"
REGION = "us-east1"
TRAINING_FILE_PATH = f"{DATA_ROOT}/train.parquet"
VALIDATION_FILE_PATH = f"{DATA_ROOT}/valid.parquet"

%env PIPELINE_ROOT={PIPELINE_ROOT}
%env PROJECT_ID={PROJECT_ID}
%env REGION={REGION}
%env TRAINING_CONTAINER_IMAGE_URI={TRAINING_CONTAINER_IMAGE_URI}
%env TRAINING_FILE_PATH={TRAINING_FILE_PATH}
%env VALIDATION_FILE_PATH={VALIDATION_FILE_PATH}

env: PIPELINE_ROOT=gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/pipeline
env: PROJECT_ID=qwiklabs-asl-04-5e165f533cac
env: REGION=us-east1
env: TRAINING_CONTAINER_IMAGE_URI=gcr.io/qwiklabs-asl-04-5e165f533cac/trainer_image_beer_vertex:latest
env: TRAINING_FILE_PATH=gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/data/train.parquet
env: VALIDATION_FILE_PATH=gs://qwiklabs-asl-04-5e165f533cac-beer-kfp-artifact-store/data/valid.parquet


In [27]:
!dsl-compile-v2 --py jay-pipeline/pipeline_prebuilt.py --output $PIPELINE_JSON

In [None]:
!cat {PIPELINE_JSON}

In [29]:
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION)
pipeline = aiplatform.PipelineJob(
    display_name='beer_kfp_pipeline',
    template_path=PIPELINE_JSON, 
    enable_caching=False,
)

In [30]:
pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/547029906128/locations/us-east1/pipelineJobs/beer-kfp-pipeline-20220609003235
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/547029906128/locations/us-east1/pipelineJobs/beer-kfp-pipeline-20220609003235')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east1/pipelines/runs/beer-kfp-pipeline-20220609003235?project=547029906128
PipelineJob projects/547029906128/locations/us-east1/pipelineJobs/beer-kfp-pipeline-20220609003235 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/547029906128/locations/us-east1/pipelineJobs/beer-kfp-pipeline-20220609003235 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/547029906128/locations/us-east1/pipelineJobs/beer-kfp-pipeline-20220609003235 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/547029906128/locations/us-east1/pipelineJobs/beer-kfp-pi

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [hyperparameter-tuning-job].; Job (project_id = qwiklabs-asl-04-5e165f533cac, job_id = 6791173151377063936) is failed due to the above error.; Failed to handle the job: {project_number = 547029906128, job_id = 6791173151377063936}"


In [94]:
MOVIE_COUNTS = 2

movies = [[1,10],[2,150]]

tickets = [[11,1,7],[12,2,100],[13,2,20],[14,1,5]]

In [134]:
counters = [0  for _ in range(MOVIE_COUNTS+1)]
for i in range(len(movies)):
    counters[movies[i][0]] = movies[i][1]

In [135]:
counters

[0, 10, 150]

In [136]:
answer = 0
for i in range(len(tickets)):
    if counters[tickets[i][1]] == 0 :
        answer += min(counters[tickets[i][1]], tickets[i][2])
        counters[tickets[i][1]] -= tickets[i][2]
    elif counters[tickets[i][1]] > 0 :
        answer += counters[tickets[i][1]] 
        counters[tickets[i][1]] = 0 
        
    print(answer, counters)

10 [0, 0, 150]
160 [0, 0, 0]
160 [0, 0, -20]
160 [0, -5, -20]


In [69]:
answer

132