# Heart Failure Simple DNN Pipeline

## Setup

In [26]:
from google.cloud import aiplatform
from datetime import datetime

In [2]:
REGION = "us-central1"
PROJECT = !(gcloud config get-value project)
PROJECT = PROJECT[0]
BUCKET = PROJECT
%env PROJECT = {PROJECT}
%env BUCKET = {BUCKET}
%env REGION = {REGION}

env: PROJECT=qwiklabs-asl-02-99f66d8df225
env: BUCKET=qwiklabs-asl-02-99f66d8df225
env: REGION=us-central1


In [3]:
# Set `PATH` to include the directory containing KFP CLI
PATH = %env PATH
%env PATH=/home/jupyter/.local/bin:{PATH}

env: PATH=/home/jupyter/.local/bin:/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games


## Build the pipeline

Write the pipeline to disk:

In [72]:
%%writefile ./pipelines/kfp_heart_failure_simple_dnn_pipeline.py
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

"""Kubeflow Simple DNN Heart Failure Pipeline."""

import os

from google_cloud_pipeline_components.experimental.custom_job import (
    CustomTrainingJobOp,
)

from google_cloud_pipeline_components.aiplatform import (
    AutoMLTabularTrainingJobRunOp,
    EndpointCreateOp,
    ModelDeployOp,
    TabularDatasetCreateOp,
    ModelUploadOp,
)
from kfp.v2 import dsl

PIPELINE_ROOT = os.getenv("PIPELINE_ROOT")
PROJECT = os.getenv("PROJECT")
REGION = os.getenv("REGION")

DATASET_URI = os.getenv("DATASET_URI")
OUTPUT_URI = os.getenv("OUTPUT_URI")
PIPELINE_NAME = os.getenv("PIPELINE_NAME", "kfp-heartfailure-simple-dnn")
DISPLAY_NAME = os.getenv("MODEL_DISPLAY_NAME", PIPELINE_NAME)
SERVING_MACHINE_TYPE = os.getenv("SERVING_MACHINE_TYPE", "n1-standard-4")
TRAINING_CONTAINER_IMAGE_URI = os.getenv("TRAINING_CONTAINER_IMAGE_URI")
SERVING_CONTAINER_IMAGE_URI = os.getenv("SERVING_CONTAINER_IMAGE_URI")
BASE_OUTPUT_DIR = os.getenv("BASE_OUTPUT_DIR")


@dsl.pipeline(
    name=f"{PIPELINE_NAME}-vertex-pipeline",
    description=f"Vertex Pipeline for {PIPELINE_NAME}",
    pipeline_root=PIPELINE_ROOT,
)
def create_pipeline():

    worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": "n1-standard-4",
            },
            "replica_count": 1,
            "container_spec": {
                "image_uri": TRAINING_CONTAINER_IMAGE_URI,
                "args": [
                    f"--dataset_uri={DATASET_URI}",
                    f"--output_uri={OUTPUT_URI}",
                    "--epochs=100",
                    "--batch_size=100",
                    "--lr=.001",
                ],
            },
        }
    ]
    
    training_task = CustomTrainingJobOp(
        project=PROJECT,
        location=REGION,
        display_name=f"{PIPELINE_NAME}-training-job",
        worker_pool_specs=worker_pool_specs,
        base_output_directory=BASE_OUTPUT_DIR,
    )
    
    model_upload_task = ModelUploadOp(
        project=PROJECT,
        display_name=f"{PIPELINE_NAME}-upload-job",
        #artifact_uri=OUTPUT_URI,
        artifact_uri=f"{BASE_OUTPUT_DIR}/model",
        serving_container_image_uri=SERVING_CONTAINER_IMAGE_URI,
    ) 
    model_upload_task.after(training_task)

    endpoint_create_task = EndpointCreateOp(
        project=PROJECT,
        display_name=f"{PIPELINE_NAME}-endpoint-job",
        description="Heart Failure Simple DNN model",
    )
    endpoint_create_task.after(model_upload_task)

    model_deploy_task = ModelDeployOp(  # pylint: disable=unused-variable
        model=model_upload_task.outputs["model"],
        endpoint=endpoint_create_task.outputs["endpoint"],
        deployed_model_display_name=DISPLAY_NAME,
        dedicated_resources_machine_type=SERVING_MACHINE_TYPE,
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        #enable_access_logging=True, #comment out because of failure
    )


Overwriting ./pipelines/kfp_heart_failure_simple_dnn_pipeline.py


### Compile the pipeline

Define the environment variables that will be passed to the pipeline compiler:

In [73]:
ARTIFACT_STORE = f"gs://{PROJECT}-kfp-simple-dnn-artifact-store"
PIPELINE_ROOT = f"{ARTIFACT_STORE}/simple-dnn-pipeline"
DATASET_URI = f"gs://{BUCKET}/heart_failure/scaled-engineered-heart.csv"
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BASE_OUTPUT_DIR = f"{ARTIFACT_STORE}/models/{TIMESTAMP}"
OUTPUT_URI = f"gs://{BUCKET}/heart_failure/{TIMESTAMP}"

%env PIPELINE_ROOT={PIPELINE_ROOT}
%env PROJECT={PROJECT}
%env REGION={REGION}
%env DATASET_URI={DATASET_URI}
%env OUTPUT_URI={OUTPUT_URI}
%env BASE_OUTPUT_DIR={BASE_OUTPUT_DIR}

!echo {BASE_OUTPUT_DIR}

env: PIPELINE_ROOT=gs://qwiklabs-asl-02-99f66d8df225-kfp-simple-dnn-artifact-store/simple-dnn-pipeline
env: PROJECT=qwiklabs-asl-02-99f66d8df225
env: REGION=us-central1
env: DATASET_URI=gs://qwiklabs-asl-02-99f66d8df225/heart_failure/scaled-engineered-heart.csv
env: OUTPUT_URI=gs://qwiklabs-asl-02-99f66d8df225/heart_failure/20230208225130
env: BASE_OUTPUT_DIR=gs://qwiklabs-asl-02-99f66d8df225-kfp-simple-dnn-artifact-store/models/20230208225130
gs://qwiklabs-asl-02-99f66d8df225-kfp-simple-dnn-artifact-store/models/20230208225130


Verify the `ARTIFACT_STORE` has been created, and let us create it if not:

In [74]:
!gsutil ls | grep ^{ARTIFACT_STORE}/$ || gsutil mb -l {REGION} {ARTIFACT_STORE}

gs://qwiklabs-asl-02-99f66d8df225-kfp-simple-dnn-artifact-store/


Create training image

In [75]:
!pwd
!cp ./pipelines/train.py ./trainer_image_vertex/
!ls -altrh ./trainer_image_vertex
!cat ./trainer_image_vertex/Dockerfile

/home/jupyter/heart-failure
total 24K
drwxr-xr-x 2 jupyter jupyter 4.0K Feb  8 17:40 .ipynb_checkpoints
-rw-r--r-- 1 jupyter jupyter  180 Feb  8 17:46 Dockerfile
drwxr-xr-x 3 jupyter jupyter 4.0K Feb  8 17:46 .
drwxr-xr-x 6 jupyter jupyter 4.0K Feb  8 22:48 ..
-rw-r--r-- 1 jupyter jupyter 6.8K Feb  8 22:51 train.py
FROM gcr.io/deeplearning-platform-release/tf-cpu.2-8
RUN pip install -U fire cloudml-hypertune scikit-learn==0.20.4
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]


In [76]:
IMAGE_NAME = "trainer_image_heart_failure_simple_dnn_vertex"
TAG = "latest"
TRAINING_CONTAINER_IMAGE_URI = f"gcr.io/{PROJECT}/{IMAGE_NAME}:{TAG}"
TRAINING_CONTAINER_IMAGE_URI

'gcr.io/qwiklabs-asl-02-99f66d8df225/trainer_image_heart_failure_simple_dnn_vertex:latest'

build the container via cloud build

In [9]:
!gcloud builds submit --timeout 15m --tag $TRAINING_CONTAINER_IMAGE_URI trainer_image_vertex

Creating temporary tarball archive of 3 file(s) totalling 7.1 KiB before compression.
Uploading tarball of [trainer_image_vertex] to [gs://qwiklabs-asl-02-99f66d8df225_cloudbuild/source/1675878461.278595-6334ce48cbe7453e990fc0eff6de4308.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-asl-02-99f66d8df225/locations/global/builds/2a964443-6e75-4feb-9fc8-2a94615655e8].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/2a964443-6e75-4feb-9fc8-2a94615655e8?project=9475810701 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "2a964443-6e75-4feb-9fc8-2a94615655e8"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-asl-02-99f66d8df225_cloudbuild/source/1675878461.278595-6334ce48cbe7453e990fc0eff6de4308.tgz#1675878461576078
Copying gs://qwiklabs-asl-02-99f66d8df225_cloudbuild/source/1675878461.278595-6334ce48cbe7453e990fc0eff6de4308.tgz#1675878461576078...
/ [1 files][  3.0 KiB/  3.0 KiB]            

set the training and serving URIs

In [77]:
#SERVING_CONTAINER_IMAGE_URI = "gcr.io/deeplearning-platform-release/tf-cpu.2-8"
SERVING_CONTAINER_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest"
)
%env TRAINING_CONTAINER_IMAGE_URI={TRAINING_CONTAINER_IMAGE_URI}
%env SERVING_CONTAINER_IMAGE_URI={SERVING_CONTAINER_IMAGE_URI}

env: TRAINING_CONTAINER_IMAGE_URI=gcr.io/qwiklabs-asl-02-99f66d8df225/trainer_image_heart_failure_simple_dnn_vertex:latest
env: SERVING_CONTAINER_IMAGE_URI=us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest


#### Use the CLI compiler to compile the pipeline

Compile the pipeline from the Python file we generated into a JSON description using the following command:

In [78]:
PIPELINE_JSON = "pipelines/kfp_heart_failure_simple_dnn_pipeline.json"

In [79]:
!dsl-compile-v2 --py pipelines/kfp_heart_failure_simple_dnn_pipeline.py --output $PIPELINE_JSON



**Note:** You can also use the Python SDK to compile the pipeline:

```python
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=create_pipeline, 
    package_path=PIPELINE_JSON,
)

```

The result is the pipeline file. 

In [83]:
!head {PIPELINE_JSON}

{
  "pipelineSpec": {
    "components": {
      "comp-custom-training-job": {
        "executorLabel": "exec-custom-training-job",
        "inputDefinitions": {
          "parameters": {
            "base_output_directory": {
              "type": "STRING"
            },


### Deploy the pipeline package

Questions for class:

I hit this error - The replica workerpool0-0 exited with a non-zero status of 13. To find out more about why your job exited please check the logs: https://console.cloud.google.com/logs/viewer?project=9475810701&resource=ml_job%2Fjob_id%2F3079020273060544512&advancedFilter=resource.type%3D%22ml_job%22%0Aresource.labels.job_id%3D%223079020273060544512%22

- how do I restart a pipeline at a failed step?
- How do I get permissions to view this log - https://console.cloud.google.com/logs/viewer?project=9475810701&resource=ml_job%2Fjob_id%2F3079020273060544512&advancedFilter=resource.type%3D%22ml_job%22%0Aresource.labels.job_id%3D%223079020273060544512%22
- How do I debug this error: The replica workerpool0-0 exited with a non-zero status of 13.

In [None]:
aiplatform.init(project=PROJECT, location=REGION)

pipeline = aiplatform.PipelineJob(
    display_name="kfp_heart_failure_simple_dnn_pipeline",
    template_path=PIPELINE_JSON,
    enable_caching=False,
)

pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/9475810701/locations/us-central1/pipelineJobs/kfp-heartfailure-simple-dnn-vertex-pipeline-20230209030201
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/9475810701/locations/us-central1/pipelineJobs/kfp-heartfailure-simple-dnn-vertex-pipeline-20230209030201')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kfp-heartfailure-simple-dnn-vertex-pipeline-20230209030201?project=9475810701
PipelineJob projects/9475810701/locations/us-central1/pipelineJobs/kfp-heartfailure-simple-dnn-vertex-pipeline-20230209030201 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/9475810701/locations/us-central1/pipelineJobs/kfp-heartfailure-simple-dnn-vertex-pipeline-20230209030201 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/9475810701/locations/us-central1/pipelineJobs/kfp-heartfailure-simple-dn

Copyright 2021 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.