In [None]:
!pip install kfp

In [10]:
%%writefile ./pipeline/covertype_training_pipeline.py

"""KFP orchestrating BigQuery and Cloud AI Platform services."""

import os

from helper_components import evaluate_model
from helper_components import retrieve_best_run
from helper_components import prepoc_split_dataset
from jinja2 import Template
import kfp
from kfp.components import func_to_container_op
from kfp.dsl.types import Dict
from kfp.dsl.types import GCPProjectID
from kfp.dsl.types import GCPRegion
from kfp.dsl.types import GCSPath
from kfp.dsl.types import String
from kfp.gcp import use_gcp_secret

# Defaults and environment settings
BASE_IMAGE = os.getenv('BASE_IMAGE')
TRAINER_IMAGE = os.getenv('TRAINER_IMAGE')
RUNTIME_VERSION = os.getenv('RUNTIME_VERSION')
PYTHON_VERSION = os.getenv('PYTHON_VERSION')
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')
USE_KFP_SA = os.getenv('USE_KFP_SA')

TRAINING_FILE_PATH = 'datasets/training/data.csv'
VALIDATION_FILE_PATH = 'datasets/validation/data.csv'
TESTING_FILE_PATH = 'datasets/testing/data.csv'

# Parameter defaults
SPLITS_DATASET_ID = 'splits'
HYPERTUNE_SETTINGS = """
{
    "hyperparameters":  {
        "goal": "MAXIMIZE",
        "maxTrials": 6,
        "maxParallelTrials": 3,
        "hyperparameterMetricTag": "accuracy",
        "enableTrialEarlyStopping": True,
        "params": [
            {
                "parameterName": "max_iter",
                "type": "DISCRETE",
                "discreteValues": [500, 1000]
            },
            {
                "parameterName": "alpha",
                "type": "DOUBLE",
                "minValue": 0.0001,
                "maxValue": 0.001,
                "scaleType": "UNIT_LINEAR_SCALE"
            }
        ]
    }
}
"""

# Create component factories
component_store = kfp.components.ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

prepoc_split_op = func_to_container_op(prepoc_split_dataset, base_image=BASE_IMAGE)
mlengine_train_op = component_store.load_component('ml_engine/train')
mlengine_deploy_op = component_store.load_component('ml_engine/deploy')
retrieve_best_run_op = func_to_container_op(
    retrieve_best_run, base_image=BASE_IMAGE)
evaluate_model_op = func_to_container_op(evaluate_model, base_image=BASE_IMAGE)


@kfp.dsl.pipeline(
    name='Covertype Classifier Training',
    description='The pipeline training and deploying the Covertype classifierpipeline_yaml'
)
def covertype_train(project_id,
                    region,
                    source_table_name,
                    gcs_root,
                    dataset_id,
                    evaluation_metric_name,
                    evaluation_metric_threshold,
                    model_id,
                    version_id,
                    replace_existing_version,
                    hypertune_settings=HYPERTUNE_SETTINGS,
                    dataset_location='US'):
    """Orchestrates training and deployment of an sklearn model."""

    splited_prepoc_dataset = prepoc_split_op(gcs_root)
   
    # Tune hyperparameters
    tune_args = [
        '--training_dataset_path',
        splited_prepoc_dataset.outputs['training_file_path'],
        '--validation_dataset_path',
        splited_prepoc_dataset.outputs['validation_file_path'], '--hptune', 'True'
    ]

    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir/hypertune',
                                kfp.dsl.RUN_ID_PLACEHOLDER)

    hypertune = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=tune_args,
        training_input=hypertune_settings)

    # Retrieve the best trial
    get_best_trial = retrieve_best_run_op(
            project_id, hypertune.outputs['job_id'])

    # Train the model on a combined training and validation datasets
    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir', kfp.dsl.RUN_ID_PLACEHOLDER)

    train_args = [
        '--training_dataset_path',
        splited_prepoc_dataset.outputs['training_file_path'],
        '--validation_dataset_path',
        splited_prepoc_dataset.outputs['validation_file_path'], '--alpha',
        get_best_trial.outputs['alpha'], '--max_iter',
        get_best_trial.outputs['max_iter'], '--hptune', 'False'
    ]

    train_model = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=train_args)

    # Evaluate the model on the testing split
    eval_model = evaluate_model_op(
        dataset_path=str(splited_prepoc_dataset.outputs['testing_file_path']),
        model_path=str(train_model.outputs['job_dir']),
        metric_name=evaluation_metric_name)

    # Deploy the model if the primary metric is better than threshold
    with kfp.dsl.Condition(eval_model.outputs['metric_value'] > evaluation_metric_threshold):
        deploy_model = mlengine_deploy_op(
        model_uri=train_model.outputs['job_dir'],
        project_id=project_id,
        model_id=model_id,
        version_id=version_id,
        runtime_version=RUNTIME_VERSION,
        python_version=PYTHON_VERSION,
        replace_existing_version=replace_existing_version)

    # Configure the pipeline to run using the service account defined
    # in the user-gcp-sa k8s secret
    if USE_KFP_SA == 'True':
        kfp.dsl.get_pipeline_conf().add_op_transformer(
              use_gcp_secret('user-gcp-sa'))

Overwriting ./pipeline/covertype_training_pipeline.py


In [12]:
!gsutil ls

gs://artifacts.qwiklabs-gcp-01-9fd50757f427.appspot.com/
gs://qwiklabs-gcp-01-9fd50757f427-kubeflowpipelines-default/
gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/


In [13]:
REGION = 'us-central1'
ENDPOINT = '62df1bdac38e27a1-dot-us-central1.pipelines.googleusercontent.com' # TO DO: REPLACE WITH YOUR ENDPOINT
ARTIFACT_STORE_URI = 'gs://qwiklabs-gcp-01-9fd50757f427-kubeflowpipelines-default'  # TO DO: REPLACE WITH YOUR ARTIFACT_STORE NAME 
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

In [14]:
IMAGE_NAME='trainer_image'
TAG='latest'
TRAINER_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [15]:
!gcloud builds submit --timeout 15m --tag $TRAINER_IMAGE trainer_image

Creating temporary tarball archive of 2 file(s) totalling 3.4 KiB before compression.
Uploading tarball of [trainer_image] to [gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/source/1625237002.935745-fbe5e4d20389469b9e7708bbc02c11d7.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-gcp-01-9fd50757f427/locations/global/builds/6d034289-fdbb-4490-be33-9ff8a37c636b].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/6d034289-fdbb-4490-be33-9ff8a37c636b?project=661185394357].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "6d034289-fdbb-4490-be33-9ff8a37c636b"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/source/1625237002.935745-fbe5e4d20389469b9e7708bbc02c11d7.tgz#1625237003315290
Copying gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/source/1625237002.935745-fbe5e4d20389469b9e7708bbc02c11d7.tgz#1625237003315290...
/ [1 files][  1.6 KiB/  1.6 KiB]                   

In [17]:
IMAGE_NAME='base_image'
TAG='latest'
BASE_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [18]:
!gcloud builds submit --timeout 15m --tag $BASE_IMAGE base_image

Creating temporary tarball archive of 2 file(s) totalling 244 bytes before compression.
Uploading tarball of [base_image] to [gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/source/1625237258.285619-423f83140f2247bb82044644c3ba63fc.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-gcp-01-9fd50757f427/locations/global/builds/44e1a912-62a3-4b51-90bb-e2da38e18acc].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/44e1a912-62a3-4b51-90bb-e2da38e18acc?project=661185394357].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "44e1a912-62a3-4b51-90bb-e2da38e18acc"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/source/1625237258.285619-423f83140f2247bb82044644c3ba63fc.tgz#1625237258639472
Copying gs://qwiklabs-gcp-01-9fd50757f427_cloudbuild/source/1625237258.285619-423f83140f2247bb82044644c3ba63fc.tgz#1625237258639472...
/ [1 files][  284.0 B/  284.0 B]                    

In [None]:
USE_KFP_SA = False

COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/'
RUNTIME_VERSION = '1.15'
PYTHON_VERSION = '3.7'

%env USE_KFP_SA={USE_KFP_SA}
%env BASE_IMAGE={BASE_IMAGE}
%env TRAINER_IMAGE={TRAINER_IMAGE}
%env COMPONENT_URL_SEARCH_PREFIX={COMPONENT_URL_SEARCH_PREFIX}
%env RUNTIME_VERSION={RUNTIME_VERSION}
%env PYTHON_VERSION={PYTHON_VERSION}

In [None]:
!dsl-compile --py pipeline/covertype_training_pipeline.py --output covertype_training_pipeline.yaml

### Deploy the pipeline package

In [None]:
PIPELINE_NAME='covertype_continuous_training'

!kfp --endpoint $ENDPOINT pipeline upload \
-p $PIPELINE_NAME \
covertype_training_pipeline.yaml

In [None]:
!kfp --endpoint $ENDPOINT pipeline list

In [None]:
PIPELINE_ID='0918568d-758c-46cf-9752-e04a4403cd84' # TO DO: REPLACE WITH YOUR PIPELINE ID 

In [None]:
EXPERIMENT_NAME = 'Covertype_Classifier_Training'
RUN_ID = 'Run_001'
SOURCE_TABLE = 'covertype_dataset.covertype'
DATASET_ID = 'splits'
EVALUATION_METRIC = 'accuracy'
EVALUATION_METRIC_THRESHOLD = '0.69'
MODEL_ID = 'covertype_classifier'
VERSION_ID = 'v01'
REPLACE_EXISTING_VERSION = 'True'

GCS_STAGING_PATH = '{}/staging'.format(ARTIFACT_STORE_URI)

In [None]:
!kfp --endpoint $ENDPOINT run submit \
-e $EXPERIMENT_NAME \
-r $RUN_ID \
-p $PIPELINE_ID \
project_id=$PROJECT_ID \
gcs_root=$GCS_STAGING_PATH \
region=$REGION \
source_table_name=$SOURCE_TABLE \
dataset_id=$DATASET_ID \
evaluation_metric_name=$EVALUATION_METRIC \
evaluation_metric_threshold=$EVALUATION_METRIC_THRESHOLD \
model_id=$MODEL_ID \
version_id=$VERSION_ID \
replace_existing_version=$REPLACE_EXISTING_VERSION

In [None]:
!echo $ENDPOINT