1. create a pipeline folder 

In [3]:
%%writefile ./pipeline2/winequality_training_pipeline.py
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""KFP pipeline orchestrating BigQuery and Cloud AI Platform services."""

import os

from helper_components import evaluate_model
from helper_components import retrieve_best_run
from jinja2 import Template
import kfp
from kfp.components import func_to_container_op
from kfp.dsl.types import Dict
from kfp.dsl.types import GCPProjectID
from kfp.dsl.types import GCPRegion
from kfp.dsl.types import GCSPath
from kfp.dsl.types import String
from kfp.gcp import use_gcp_secret

# Defaults and environment settings
BASE_IMAGE = os.getenv('BASE_IMAGE')
TRAINER_IMAGE = os.getenv('TRAINER_IMAGE')
RUNTIME_VERSION = os.getenv('RUNTIME_VERSION')
PYTHON_VERSION = os.getenv('PYTHON_VERSION')
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')
USE_KFP_SA = os.getenv('USE_KFP_SA')

TRAINING_FILE_PATH = 'data/training/dataset.csv'
VALIDATION_FILE_PATH = 'data/validation/dataset.csv'
TESTING_FILE_PATH = 'data/testing/dataset.csv'

# Parameter defaults
# SPLITS_DATASET_ID = 'splits'
HYPERTUNE_SETTINGS = """
{
    "hyperparameters":  {
        "goal": "MAXIMIZE",
        "maxTrials": 6,
        "maxParallelTrials": 3,
        "hyperparameterMetricTag": "accuracy",
        "enableTrialEarlyStopping": True,
        "params": [
            {
                "parameterName": "max_iter",
                "type": "DISCRETE",
                "discreteValues": [500, 1000]
            },
            {
                "parameterName": "alpha",
                "type": "DOUBLE",
                "minValue": 0.0001,
                "maxValue": 0.001,
                "scaleType": "UNIT_LINEAR_SCALE"
            }
        ]
    }
}
"""


# Helper functions
def generate_sampling_query(source_table_name, num_lots, lots):
    """Prepares the data sampling query."""

    sampling_query_template = """
         SELECT *
         FROM 
             `{{ source_table }}` AS cover
         WHERE 
         MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), {{ num_lots }}) IN ({{ lots }})
         """
    query = Template(sampling_query_template).render(
        source_table=source_table_name, num_lots=num_lots, lots=str(lots)[1:-1])

    return query


# Create component factories
component_store = kfp.components.ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

bigquery_query_op = component_store.load_component('bigquery/query')
mlengine_train_op = component_store.load_component('ml_engine/train')
mlengine_deploy_op = component_store.load_component('ml_engine/deploy')
retrieve_best_run_op = func_to_container_op(
    retrieve_best_run, base_image=BASE_IMAGE)
evaluate_model_op = func_to_container_op(evaluate_model, base_image=BASE_IMAGE)


@kfp.dsl.pipeline(
    name='WineQuality Training',
    description='The pipeline training and deploying the WineQuality classifierpipeline_yaml'
)
def winequality_train(project_id,
                    region,
                    source_table_name,
                    gcs_root,
                    dataset_id,
                    evaluation_metric_name,
                    evaluation_metric_threshold,
                    model_id,
                    version_id,
                    replace_existing_version,
                    hypertune_settings=HYPERTUNE_SETTINGS,
                    dataset_location='US'):
    """Orchestrates training and deployment of an sklearn model."""

    # Create the training split
    query = generate_sampling_query(
        source_table_name=source_table_name, num_lots=10, lots=[1, 2, 3, 4])

    training_file_path = '{}/{}'.format(gcs_root, TRAINING_FILE_PATH)

   

    # Create the validation split
    query = generate_sampling_query(
        source_table_name=source_table_name, num_lots=10, lots=[8])

    validation_file_path = '{}/{}'.format(gcs_root, VALIDATION_FILE_PATH)

    create_validation_split = bigquery_query_op(
        query=query,
        project_id=project_id,
        dataset_id=dataset_id,
        table_id='',
        output_gcs_path=validation_file_path,
        dataset_location=dataset_location)

    # Create the testing split
    query = generate_sampling_query(
        source_table_name=source_table_name, num_lots=10, lots=[9])

    testing_file_path = '{}/{}'.format(gcs_root, TESTING_FILE_PATH)

    create_testing_split = bigquery_query_op(
        query=query,
        project_id=project_id,
        dataset_id=dataset_id,
        table_id='',
        output_gcs_path=testing_file_path,
        dataset_location=dataset_location)

    # Tune hyperparameters
    tune_args = [
        '--training_dataset_path',
        create_training_split.outputs['output_gcs_path'],
        '--validation_dataset_path',
        create_validation_split.outputs['output_gcs_path'], '--hptune', 'True'
    ]

    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir/hypertune',
                                kfp.dsl.RUN_ID_PLACEHOLDER)

    hypertune = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=tune_args,
        training_input=hypertune_settings)

    # Retrieve the best trial
    get_best_trial = retrieve_best_run_op(
            project_id, hypertune.outputs['job_id'])

    # Train the model on a combined training and validation datasets
    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir', kfp.dsl.RUN_ID_PLACEHOLDER)

    train_args = [
        '--training_dataset_path',
        create_training_split.outputs['output_gcs_path'],
        '--validation_dataset_path',
        create_validation_split.outputs['output_gcs_path'], '--alpha',
        get_best_trial.outputs['alpha'], '--max_iter',
        get_best_trial.outputs['max_iter'], '--hptune', 'False'
    ]

    train_model = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=train_args)

    # Evaluate the model on the testing split
    eval_model = evaluate_model_op(
        dataset_path=str(create_testing_split.outputs['output_gcs_path']),
        model_path=str(train_model.outputs['job_dir']),
        metric_name=evaluation_metric_name)

    # Deploy the model if the primary metric is better than threshold
    with kfp.dsl.Condition(eval_model.outputs['metric_value'] > evaluation_metric_threshold):
        deploy_model = mlengine_deploy_op(
        model_uri=train_model.outputs['job_dir'],
        project_id=project_id,
        model_id=model_id,
        version_id=version_id,
        runtime_version=RUNTIME_VERSION,
        python_version=PYTHON_VERSION,
        replace_existing_version=replace_existing_version)

    # Configure the pipeline to run using the service account defined
    # in the user-gcp-sa k8s secret
    if USE_KFP_SA == 'True':
        kfp.dsl.get_pipeline_conf().add_op_transformer(
              use_gcp_secret('user-gcp-sa'))

Writing ./pipeline/covertype_training_pipeline.py


In [25]:
REGION = 'us-central1'
ENDPOINT = '4350ba2c9d142ec-dot-us-central2.pipelines.googleusercontent.com'
ARTIFACT_STORE_URI = 'gs://benazirsproject-demo'
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

2. copy /trainer_image 

In [5]:
IMAGE_NAME='trainer_image'
TAG='latest'
TRAINER_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [7]:
!gcloud builds submit --timeout 15m --tag $TRAINER_IMAGE trainer_image

Creating temporary tarball archive of 4 file(s) totalling 6.7 KiB before compression.
Uploading tarball of [trainer_image] to [gs://benazirsproject_cloudbuild/source/1598397678.54-4e5d5c68b7f24fd2a2c879f091589af8.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/benazirsproject/builds/0e2cba71-c124-414d-bcb7-511af3da2295].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/0e2cba71-c124-414d-bcb7-511af3da2295?project=981930454113].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "0e2cba71-c124-414d-bcb7-511af3da2295"

FETCHSOURCE
Fetching storage object: gs://benazirsproject_cloudbuild/source/1598397678.54-4e5d5c68b7f24fd2a2c879f091589af8.tgz#1598397678990038
Copying gs://benazirsproject_cloudbuild/source/1598397678.54-4e5d5c68b7f24fd2a2c879f091589af8.tgz#1598397678990038...
/ [1 files][  1.8 KiB/  1.8 KiB]                                                
Operation completed over 1 objects/1.8 KiB.        

# copy base image folder 


In [8]:
IMAGE_NAME='base_image'
TAG='latest'
BASE_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [9]:
!gcloud builds submit --timeout 15m --tag $BASE_IMAGE base_image

Creating temporary tarball archive of 2 file(s) totalling 244 bytes before compression.
Uploading tarball of [base_image] to [gs://benazirsproject_cloudbuild/source/1598397901.16-c86cfede5bc44723bbc2d2b5660fb853.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/benazirsproject/builds/13711ba2-ae2a-406e-b331-64629ef76cb7].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/13711ba2-ae2a-406e-b331-64629ef76cb7?project=981930454113].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "13711ba2-ae2a-406e-b331-64629ef76cb7"

FETCHSOURCE
Fetching storage object: gs://benazirsproject_cloudbuild/source/1598397901.16-c86cfede5bc44723bbc2d2b5660fb853.tgz#1598397901563822
Copying gs://benazirsproject_cloudbuild/source/1598397901.16-c86cfede5bc44723bbc2d2b5660fb853.tgz#1598397901563822...
/ [1 files][  284.0 B/  284.0 B]                                                
Operation completed over 1 objects/284.0 B.         

## compile pipeline 


In [11]:
USE_KFP_SA = False

COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/'
RUNTIME_VERSION = '1.15'
PYTHON_VERSION = '3.7'

%env USE_KFP_SA={USE_KFP_SA}
%env BASE_IMAGE={BASE_IMAGE}
%env TRAINER_IMAGE={TRAINER_IMAGE}
%env COMPONENT_URL_SEARCH_PREFIX={COMPONENT_URL_SEARCH_PREFIX}
%env RUNTIME_VERSION={RUNTIME_VERSION}
%env PYTHON_VERSION={PYTHON_VERSION}

env: USE_KFP_SA=False
env: BASE_IMAGE=gcr.io/benazirsproject/base_image:latest
env: TRAINER_IMAGE=gcr.io/benazirsproject/trainer_image:latest
env: COMPONENT_URL_SEARCH_PREFIX=https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/
env: RUNTIME_VERSION=1.15
env: PYTHON_VERSION=3.7


## copy helper_componenets 

In [14]:
!dsl-compile --py pipeline/winequality_training_pipeline.py --output winequality_training_pipeline.py.yaml 

In [16]:
!head winequality_training_pipeline.py.yaml ## where is this file . ?


apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: covertype-classifier-training-
  annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.0, pipelines.kubeflow.org/pipeline_compilation_time: '2020-08-25T23:31:13.021281',
    pipelines.kubeflow.org/pipeline_spec: '{"description": "The pipeline training
      and deploying the Covertype classifierpipeline_yaml", "inputs": [{"name": "project_id"},
      {"name": "region"}, {"name": "source_table_name"}, {"name": "gcs_root"}, {"name":
      "dataset_id"}, {"name": "evaluation_metric_name"}, {"name": "evaluation_metric_threshold"},
      {"name": "model_id"}, {"name": "version_id"}, {"name": "replace_existing_version"},


## deploy the pipeline package 

In [18]:
PIPELINE_NAME='wine_quality'

!kfp --endpoint $ENDPOINT pipeline upload \
-p $PIPELINE_NAME \
winequality_training_pipeline.py.yaml

Pipeline 78c0cb55-a88b-4c99-a37f-208c4b324d24 has been submitted

Pipeline Details
------------------
ID           78c0cb55-a88b-4c99-a37f-208c4b324d24
Name         covertype_continuous_training_self
Description
Uploaded at  2020-08-25T23:33:34+00:00
+-----------------------------+--------------------------------------------------+
| Parameter Name              | Default Value                                    |
| project_id                  |                                                  |
+-----------------------------+--------------------------------------------------+
| region                      |                                                  |
+-----------------------------+--------------------------------------------------+
| source_table_name           |                                                  |
+-----------------------------+--------------------------------------------------+
| gcs_root                    |                                                  |
+-

In [20]:
!kfp --endpoint $ENDPOINT pipeline list

+--------------------------------------+-------------------------------------------------+---------------------------+
| Pipeline ID                          | Name                                            | Uploaded at               |
| 78c0cb55-a88b-4c99-a37f-208c4b324d24 | covertype_continuous_training_self              | 2020-08-25T23:33:34+00:00 |
+--------------------------------------+-------------------------------------------------+---------------------------+
| e0edb3e8-4b9c-4354-80f7-51fa3fb5a87b | covertype_continuous_training                   | 2020-08-21T03:16:41+00:00 |
+--------------------------------------+-------------------------------------------------+---------------------------+
| aa5e2c2a-3e9b-44e5-973d-615d7b249766 | predict_pipeline                                | 2020-08-19T23:46:08+00:00 |
+--------------------------------------+-------------------------------------------------+---------------------------+
| ea88b6bd-9cea-4099-9008-2f81c0a39920 | train_p

In [21]:
PIPELINE_ID='78c0cb55-a88b-4c99-a37f-208c4b324d24'

In [27]:
EXPERIMENT_NAME = 'Wine_quality'
RUN_ID = 'Run_001'
SOURCE_TABLE = 'gs://benazirsproject-demo/data/training/winequality-red.csv'
EVALUATION_METRIC = 'accuracy'
EVALUATION_METRIC_THRESHOLD = '0.69'
MODEL_ID = 'wine_regressor'
VERSION_ID = 'v01'
REPLACE_EXISTING_VERSION = 'True'

GCS_STAGING_PATH = '{}/staging'.format(ARTIFACT_STORE_URI)

In [28]:
!kfp --endpoint $ENDPOINT run submit \
-e $EXPERIMENT_NAME \
-r $RUN_ID \
-p $PIPELINE_ID \
project_id=$PROJECT_ID \
gcs_root=$GCS_STAGING_PATH \
region=$REGION \
source_table_name=$SOURCE_TABLE \
evaluation_metric_name=$EVALUATION_METRIC \
evaluation_metric_threshold=$EVALUATION_METRIC_THRESHOLD \
model_id=$MODEL_ID \
version_id=$VERSION_ID \
replace_existing_version=$REPLACE_EXISTING_VERSION

Run 1dc97dc5-3d48-46be-a807-0cfa0b134eee is submitted
+--------------------------------------+---------+----------+---------------------------+
| run id                               | name    | status   | created at                |
| 1dc97dc5-3d48-46be-a807-0cfa0b134eee | Run_002 |          | 2020-08-25T23:53:34+00:00 |
+--------------------------------------+---------+----------+---------------------------+
