In [None]:
USER_FLAG = '--user'
!pip install -U pip
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.3.0 --upgrade
!pip3 install {USER_FLAG} kfp --upgrade
!pip install google_cloud_pipeline_components
! python -m pip install --user virtualenv
! echo "create env"
! python -m venv vertex_venv
! echo "Add kernel to jupyter"
! ipython kernel install --name "vertex_env" --user 

In [1]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

In [2]:
PROJECT_ID = 'churn-smu'
REGION = 'asia-southeast1'
BUCKET_NAME = 'practice-smu-123'

! gcloud config set project {PROJECT_ID}
! gcloud services enable  compute.googleapis.com \
                          containerregistry.googleapis.com \
                          aiplatform.googleapis.com \
                          cloudbuild.googleapis.com \
                          cloudfunctions.googleapis.com \
                          dataflow.googleapis.com

PATH=%env PATH
%env PATH = {PATH}:/home/jupyter/.local/bin

PIPELINE_ROOT = "gs://" + f'{BUCKET_NAME}/churn/pipeline_root_churn'
DATA_ROOT_TRAIN = "gs://" + f"{BUCKET_NAME}/churn/data/dev"
DATA_ROOT_EVAL = "gs://" + f"{BUCKET_NAME}/churn/data/val"
DATA_ROOT_SERVE = "gs://" + f"{BUCKET_NAME}/churn/data/serve"

print(f"Project ID: {PROJECT_ID}")
print(f"Pipeline Root: {PIPELINE_ROOT}")
print(f"GCS Bucket Name: {BUCKET_NAME}")
print(f"DataRoot Train Directory: {DATA_ROOT_TRAIN}")

from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import Artifact, Dataset, Input, Model, Output, Metrics, ClassificationMetrics, component, OutputPath, InputPath
import kfp
from kfp.v2 import compiler
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

Updated property [core/project].
Operation "operations/acat.p2-352395168404-16137358-8a85-48e3-b54b-418a511d6acf" finished successfully.
env: PATH=/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/home/jupyter/.local/bin
Project ID: churn-smu
Pipeline Root: gs://practice-smu-123/churn/pipeline_root_churn
GCS Bucket Name: practice-smu-123
DataRoot Train Directory: gs://practice-smu-123/churn/data/dev


### Yaml file

In [3]:
%%bash

GCR_IMAGE="gcr.io/churn-smu/churn-data-digestion:latest"

cat > data_ingest_component.yaml <<HERE

name: data_ingest
description: Download the dataset from GCS to pass to next component
inputs:
- {name: datapath, type: String}
outputs:
- {name: dataset, type: Dataset}
implementation:
  container:
    image: $GCR_IMAGE
    command:
    - python
    - data_ingest.py
    args:
    - --datapath
    - {inputValue: datapath}
    - --dataset
    - {outputUri: dataset}
HERE

In [4]:
ingest = kfp.components.load_component_from_file("data_ingest_component.yaml")

In [5]:
### YAML for data imputation

%%bash

GCR_IMAGE="gcr.io/churn-smu/churn-data-impute-store:latest"

cat > data_impute_component.yaml <<HERE

name: data_impute
description: Download the dataset from GCS to pass to next component
inputs:
- {name: pre_impute_dataset, type: Dataset}
- {name: bucket, type: String}
outputs:
- {name: post_impute_dataset, type: Dataset}
implementation:
  container:
    image: $GCR_IMAGE
    command:
    - python
    - impute_and_store.py
    args:
    - --pre_impute_dataset
    - {inputUri: pre_impute_dataset}
    - --bucket
    - {inputValue: bucket}
    - --post_impute_dataset
    - {outputUri: post_impute_dataset}
HERE

In [6]:
impute = kfp.components.load_component_from_file("data_impute_component.yaml")

In [7]:
### YAML for data scaling and encoding

%%bash

GCR_IMAGE="gcr.io/churn-smu/churn-data-enc-scl-store:latest"

cat > data_enc_scl_store_component.yaml <<HERE

name: data_encoding_scaling_store
description: Fit transform OneHotEncoder and StandardScaler and upload model artifacts to GCS
inputs:
- {name: pre_enc_dataset, type: Dataset}
- {name: bucket_name, type: String}
outputs:
- {name: post_enc_dataset, type: Dataset}
implementation:
  container:
    image: $GCR_IMAGE
    command:
    - python
    - enc_scl_store.py
    args:
    - --pre_enc_dataset
    - {inputUri: pre_enc_dataset}
    - --bucket_name
    - {inputValue: bucket_name}
    - --post_enc_dataset
    - {outputUri: post_enc_dataset}
HERE

In [8]:
enc_and_scl_store = kfp.components.load_component_from_file("data_enc_scl_store_component.yaml")

In [9]:
### YAML for data hyperparameter tuning

%%bash

GCR_IMAGE="gcr.io/churn-smu/churn-data-hyperparameter_tuning:latest"

cat > hyperparameter_tuning_component.yaml <<HERE

name: hyperparameter_tuning
description: Perform Hyperparameter Tuning and Store Data inside GCS as json
inputs:
- {name: dataset, type: Dataset}
- {name: bucket_name, type: String}
implementation:
  container:
    image: $GCR_IMAGE
    command:
    - python
    - hyperparameter_tuning.py
    args:
    - --dataset
    - {inputUri: dataset}
    - --bucket_name
    - {inputValue: bucket_name}
HERE

In [10]:
hyperparameter_tuning = kfp.components.load_component_from_file("hyperparameter_tuning_component.yaml")

In [11]:
### YAML for data training

%%bash

GCR_IMAGE="gcr.io/churn-smu/churn-data-training:latest"

cat > training_component.yaml <<HERE

name: training
description: Perform training and store artifacts
inputs:
- {name: dataset, type: Dataset}
- {name: bucket_name, type: String}
outputs:
- {name: model, type: Model}
- {name: classification_metrics, type: ClassificationMetrics}
- {name: base_metrics, type: Metrics}
- {name: feature_importance, type: Dataset}
implementation:
  container:
    image: $GCR_IMAGE
    command:
    - python
    - training.py
    args:
    - --dataset
    - {inputUri: dataset}
    - --bucket_name
    - {inputValue: bucket_name}
    - --model
    - {outputUri: model}
    - --classification_metrics
    - {outputUri: classification_metrics}
    - --base_metrics
    - {outputUri: base_metrics}
    - --feature_importance
    - {outputUri: feature_importance}
HERE

In [12]:
train = kfp.components.load_component_from_file("training_component.yaml")

In [14]:
@dsl.pipeline(pipeline_root = PIPELINE_ROOT, name = "churn-test")

def pipeline(data_root_train: str = DATA_ROOT_TRAIN + '/dev.csv',
             bucket_name: str = BUCKET_NAME,
             project: str = PROJECT_ID,
             region: str = REGION,
             ):
    
    data_train_op = ingest(data_root_train)
    impute_and_store_op = impute(data_train_op.outputs['dataset'],
                                 bucket = bucket_name)
    enc_and_scl_store_op = enc_and_scl_store(impute_and_store_op.outputs['post_impute_dataset'],
                                             bucket_name = bucket_name)
    hypertune_op = hyperparameter_tuning(enc_and_scl_store_op.outputs['post_enc_dataset'],
                                         bucket_name = bucket_name)
    # train_op = train(enc_and_scl_store_op.outputs['post_enc_dataset'],
    #                  bucket_name = bucket_name)

In [15]:
compiler.Compiler().compile(pipeline_func = pipeline,
                            package_path = 'churn_test.json')
start_pipeline = pipeline_jobs.PipelineJob(display_name = 'churn-test',
                                           template_path = 'churn_test.json',
                                           enable_caching = False,
                                           location = REGION,
                                           project = PROJECT_ID,
                                           )
start_pipeline.run()

Creating PipelineJob




PipelineJob created. Resource name: projects/352395168404/locations/asia-southeast1/pipelineJobs/churn-test-20220702082659
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/352395168404/locations/asia-southeast1/pipelineJobs/churn-test-20220702082659')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast1/pipelines/runs/churn-test-20220702082659?project=352395168404
PipelineJob projects/352395168404/locations/asia-southeast1/pipelineJobs/churn-test-20220702082659 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/352395168404/locations/asia-southeast1/pipelineJobs/churn-test-20220702082659 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/352395168404/locations/asia-southeast1/pipelineJobs/churn-test-20220702082659 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/352395168404/locations/asia-southeast1/pipelineJobs/churn-test-20220702082659

In [22]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
                project_id=PROJECT_ID,
                region=REGION,
                )

SERVICE_ACCOUNT = (
    "352395168404-compute@developer.gserviceaccount.com" # Replace the Xs with your generated service-account.
)
response = api_client.create_schedule_from_job_spec(
    enable_caching=True,
    job_spec_path="churn_test.json",
    schedule="0 0 4 * *",
    time_zone="Asia/Singapore",  # change this as necessary
  #  parameter_values={"display_name": 'test1'},
    pipeline_root=PIPELINE_ROOT,  
    service_account=SERVICE_ACCOUNT,    
)

