In [6]:
# set variables
# PROJECT_ID = "qwiklabs-gcp-00-35d32fccdc52"

REGION = 'us-central1'
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
BUCKET = 'gs://' + PROJECT_ID
print(BUCKET)

DATA_ROOT='{}/data'.format(BUCKET)
JOB_DIR_ROOT='{}/jobs'.format(BUCKET)
TRAINING_FILE_PATH='{}/{}'.format(DATA_ROOT, 'dataset.csv')
# VALIDATION_FILE_PATH='{}/{}/{}'.format(DATA_ROOT, 'validation', 'dataset.csv')
OUTPUT_DIR = '{}/models'.format(BUCKET)
print(TRAINING_FILE_PATH)
print(OUTPUT_DIR)

gs://qwiklabs-gcp-00-35d32fccdc52
gs://qwiklabs-gcp-00-35d32fccdc52/data/dataset.csv
gs://qwiklabs-gcp-00-35d32fccdc52/models


# TF in a KFP running in AI Platform

In [2]:
%%writefile ./tensorflow_trainer_image/train.py

"""Tensorflow predictor script."""

import pickle
import subprocess
import sys
import fire
import tensorflow as tf
import datetime
import os

import pandas as pd
import numpy as np

def load_dataset(pattern, window_size=30, batch_size=16, shuffle_buffer=100):
    """
    Description:  
    Input: 
      - series:
      - window_size:
      - batch_size: the batches to use when training
      -shuffle_buffer: size buffer, how data will be shuffled

    Output:
    """
    
    # read data
    data = pd.read_csv(pattern)
    time = np.array(data.times)
    series = np.array(data.values)[:,1].astype('float32')
    
    dataset = tf.data.Dataset.from_tensor_slices(series)
    dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
    dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
    dataset = dataset.shuffle(shuffle_buffer).map(lambda window: (window[:-1], window[-1])) # x and y (last one)
    dataset = dataset.batch(batch_size).prefetch(1)
    return dataset, series

def train_evaluate(training_dataset_path, 
                   # validation_dataset_path,
                   window_size,
                   batch_size,
                   epochs,
                   lr,
                   # num_train_examples, num_evals, 
                   output_dir):
    """
    Description: train script
    """
    
    EPOCHS = epochs
    LR = lr
    
    l0 = tf.keras.layers.Dense(2*window_size+1, input_shape=[window_size], activation='relu')
    l2 = tf.keras.layers.Dense(1)
    model = tf.keras.models.Sequential([l0, l2])
    
    lr_schedule = tf.keras.callbacks.LearningRateScheduler(lambda epoch: 1e-3)
    optimizer = tf.keras.optimizers.SGD(lr=LR, momentum=0.9)
    model.compile(loss="mse", optimizer=optimizer, metrics=['mae'])
    
    # load data
    (trainds,series) = load_dataset(pattern=training_dataset_path, window_size=window_size, batch_size=batch_size)
    # evalds = load_dataset(pattern=validation_dataset_path, mode='eval')
    
    history = model.fit(trainds, epochs=EPOCHS, verbose=0)
       
    if hptune:
        # trainds = load_dataset(pattern=training_dataset_path, window_size=window_size, batch_size=batch_size)
        results = np.array(model.predict(trainds))[:, 0]
        mae = tf.keras.metrics.mean_absolute_error(series[window_size:], results).numpy()  
        print('Model accuracy: {}'.format(mae))# Log it with hypertune
        hpt = hypertune.HyperTune()
        hpt.report_hyperparameter_tuning_metric(
            hyperparameter_metric_tag='mean_absolute_error', metric_value=mae)
               
    # Save the model
    if not hptune:
        EXPORT_PATH = os.path.join(output_dir, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
        tf.saved_model.save(obj=model, export_dir=EXPORT_PATH)  # with default serving function
        
        # model_filename = 'model.pkl'
        # with open(model_filename, 'wb') as model_file:
            # pickle.dump(pipeline, model_file)
            # gcs_model_path = '{}/{}'.format(job_dir, model_filename)
            # subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
        #print('Saved model in: {}'.format(gcs_model_path))
        
        print("Exported trained model to {}".format(EXPORT_PATH))
    
if __name__ == '__main__':
    fire.Fire(train_evaluate)

Overwriting ./tensorflow_trainer_image/train.py


In [17]:
%%writefile ./tensorflow_trainer_image/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire tensorflow==2.1.1
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Overwriting ./tensorflow_trainer_image/Dockerfile


In [2]:
TF_IMAGE_NAME='tensorflow_trainer_image'
TF_IMAGE_TAG='latest'
TF_IMAGE_URI='gcr.io/{}/{}:{}'.format(PROJECT_ID, TF_IMAGE_NAME, TF_IMAGE_TAG)

In [3]:
!gcloud builds submit --tag $TF_IMAGE_URI $TF_IMAGE_NAME

Creating temporary tarball archive of 2 file(s) totalling 3.4 KiB before compression.
Uploading tarball of [tensorflow_trainer_image] to [gs://qwiklabs-gcp-00-35d32fccdc52_cloudbuild/source/1631516268.184746-2e1134b6934044be8f8d83576b3bc6e3.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-gcp-00-35d32fccdc52/locations/global/builds/c1a72656-1516-4c67-bd40-eff179fb02db].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/c1a72656-1516-4c67-bd40-eff179fb02db?project=1050598426634].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "c1a72656-1516-4c67-bd40-eff179fb02db"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-gcp-00-35d32fccdc52_cloudbuild/source/1631516268.184746-2e1134b6934044be8f8d83576b3bc6e3.tgz#1631516269232037
Copying gs://qwiklabs-gcp-00-35d32fccdc52_cloudbuild/source/1631516268.184746-2e1134b6934044be8f8d83576b3bc6e3.tgz#1631516269232037...
/ [1 files][  1.6 KiB/  1.6 KiB]       

In [None]:
# build base image

In [4]:
%%writefile ./base_image/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire tensorflow==2.1.1 pandas==0.24.2 kfp==0.2.5

Overwriting ./base_image/Dockerfile


In [5]:
IMAGE_NAME='base_image'
TAG='latest'
BASE_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [6]:
!gcloud builds submit --timeout 15m --tag $BASE_IMAGE base_image

Creating temporary tarball archive of 1 file(s) totalling 120 bytes before compression.
Uploading tarball of [base_image] to [gs://qwiklabs-gcp-00-35d32fccdc52_cloudbuild/source/1631516658.012807-2892ce773dfb4738a23bfda1f37c4703.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-gcp-00-35d32fccdc52/locations/global/builds/823c074a-c462-4331-8c1d-9a9debbc6097].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/823c074a-c462-4331-8c1d-9a9debbc6097?project=1050598426634].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "823c074a-c462-4331-8c1d-9a9debbc6097"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-gcp-00-35d32fccdc52_cloudbuild/source/1631516658.012807-2892ce773dfb4738a23bfda1f37c4703.tgz#1631516658341445
Copying gs://qwiklabs-gcp-00-35d32fccdc52_cloudbuild/source/1631516658.012807-2892ce773dfb4738a23bfda1f37c4703.tgz#1631516658341445...
/ [1 files][  230.0 B/  230.0 B]                   

In [11]:
# pipeline

In [14]:
import numpy as np
import pandas as pd
import pickle
import uuid
import time
import tempfile

from googleapiclient import discovery
from googleapiclient import errors

from jinja2 import Template
from typing import NamedTuple

In [3]:
!mkdir pipeline

mkdir: cannot create directory ‘pipeline’: File exists


In [3]:
%%writefile ./pipeline/training_pipeline.py

import os
import kfp
from kfp.dsl.types import GCPProjectID
from kfp.dsl.types import GCPRegion
from kfp.dsl.types import GCSPath
from kfp.dsl.types import String
from kfp.gcp import use_gcp_secret
from kfp.components import func_to_container_op
from helper_components import retrieve_best_run
# import kfp.dsl as dsl
import kfp.gcp as gcp
import json


# Defaults and environment settings
BASE_IMAGE = os.getenv('BASE_IMAGE')
TRAINER_IMAGE = os.getenv('TRAINER_IMAGE')
RUNTIME_VERSION = os.getenv('RUNTIME_VERSION')
PYTHON_VERSION = os.getenv('PYTHON_VERSION')
BUCKET = os.getenv('BUCKET')
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')
USE_KFP_SA = os.getenv('USE_KFP_SA')

TRAINING_DATA_PATH = BUCKET + '/data/dataset.csv'

HYPERTUNE_SETTINGS = """
{
    "hyperparameters":  {
        "goal": "MAXIMIZE",
        "maxTrials": 4,
        "maxParallelTrials": 2,
        "hyperparameterMetricTag": "mean_absolute_error)",
        "enableTrialEarlyStopping": True,
        "params": [
            {
                "parameterName": "epochs",
                "type": "DISCRETE",
                "discreteValues": [500, 1000]
            },
            {
                "parameterName": "lr",
                "type": "DOUBLE",
                "minValue": 0.0001,
                "maxValue": 0.001,
                "scaleType": "UNIT_LINEAR_SCALE"
            }
        ]
    }
"""



# Create component factories
component_store = kfp.components.ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

# Load BigQuery and AI Platform Training op
# bigquery_query_op = component_store.load_component('bigquery/query')
mlengine_train_op = component_store.load_component('ml_engine/train')
mlengine_deploy_op = component_store.load_component('ml_engine/deploy')
retrieve_best_run_op = func_to_container_op(retrieve_best_run, base_image=BASE_IMAGE)

# dsl pipeline definition
@kfp.dsl.pipeline(
    name='Spanish Demand forecast Continuous Training',
    description='Pipeline to create training/validation on AI Platform Training Job'
)
def pipeline(project_id,
             gcs_root,
             model_id,
             version_id,
             replace_existing_version,
             region,
             hypertune_settings=HYPERTUNE_SETTINGS):

    # These are the output directories where our models will be saved
    # output_dir = gcs_root + '/models/pipeline'
    output_dir = '{}/{}/{}/{}'.format(gcs_root, 
                                      'models', 'pipeline', 
                                      kfp.dsl.RUN_ID_PLACEHOLDER)
    
    # Tune hyperparameters
    tune_args = [
        '--training_dataset_path', TRAINING_DATA_PATH,
        '--hptune', 'True']

    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir/hypertune',
                                kfp.dsl.RUN_ID_PLACEHOLDER)

    hypertune = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=tune_args,
        training_input=hypertune_settings)
    
    # Retrieve the best trial
    get_best_trial = retrieve_best_run_op(project_id, 
                                          hypertune.outputs['job_id']).set_display_name('Get best trial')

    # Train the model on a combined training and validation datasets
    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir',
                                kfp.dsl.RUN_ID_PLACEHOLDER)
    
    train_args = [
        '--training_dataset_path', TRAINING_DATA_PATH,
        '--output_dir', output_dir,
        '--window_size', '30',
        '--batch_size', '16', 
        get_best_trial.outputs['lr'], '--lr',
        get_best_trial.outputs['epochs'], '--epochs',
        '--hptune', 'False'
    ]

    train_model = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=train_args).set_display_name('Tensorflow Model Training')
    
    deploy_model = mlengine_deploy_op(
        model_uri=train_model.outputs['job_dir'],
        project_id=project_id,
        model_id=model_id,
        version_id=version_id,
        runtime_version=RUNTIME_VERSION,
        python_version=PYTHON_VERSION,
        replace_existing_version=replace_existing_version).set_display_name('Deploy Model')

    # Configure the pipeline to run using the service account defined
    # in the user-gcp-sa k8s secret
    if USE_KFP_SA == 'True':
        kfp.dsl.get_pipeline_conf().add_op_transformer(
              use_gcp_secret('user-gcp-sa'))

Overwriting ./pipeline/training_pipeline.py


In [7]:
TAG = 'latest'
TRAINER_IMAGE = 'gcr.io/{}/tensorflow_trainer_image:{}'.format(PROJECT_ID, TAG)
BASE_IMAGE='gcr.io/{}/base_image:{}'.format(PROJECT_ID, TAG)

In [8]:
USE_KFP_SA = False

COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/'
RUNTIME_VERSION = '1.15'
PYTHON_VERSION = '3.7'

%env TRAINER_IMAGE={TRAINER_IMAGE}
%env BASE_IMAGE={BASE_IMAGE}
%env BUCKET={BUCKET}
%env USE_KFP_SA={USE_KFP_SA}
%env COMPONENT_URL_SEARCH_PREFIX={COMPONENT_URL_SEARCH_PREFIX}
%env RUNTIME_VERSION={RUNTIME_VERSION}
%env PYTHON_VERSION={PYTHON_VERSION}

env: TRAINER_IMAGE=gcr.io/qwiklabs-gcp-00-35d32fccdc52/tensorflow_trainer_image:latest
env: BASE_IMAGE=gcr.io/qwiklabs-gcp-00-35d32fccdc52/base_image:latest
env: BUCKET=gs://qwiklabs-gcp-00-35d32fccdc52
env: USE_KFP_SA=False
env: COMPONENT_URL_SEARCH_PREFIX=https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/
env: RUNTIME_VERSION=1.15
env: PYTHON_VERSION=3.7


## compile pipeline in a yaml file

In [9]:
!dsl-compile --py pipeline/training_pipeline.py --output pipeline/training_pipeline.yaml

In [10]:
!head pipeline/training_pipeline.yaml

"apiVersion": |-
  argoproj.io/v1alpha1
"kind": |-
  Workflow
"metadata":
  "annotations":
    "pipelines.kubeflow.org/pipeline_spec": |-
      {"description": "Pipeline to create training/validation on AI Platform Training Job", "inputs": [{"name": "project_id"}, {"name": "gcs_root"}, {"name": "model_id"}, {"name": "version_id"}, {"name": "replace_existing_version"}, {"name": "region"}, {"default": "\n{\n    \"hyperparameters\":  {\n        \"goal\": \"MAXIMIZE\",\n        \"maxTrials\": 4,\n        \"maxParallelTrials\": 2,\n        \"hyperparameterMetricTag\": \"mean_absolute_error)\",\n        \"enableTrialEarlyStopping\": True,\n        \"params\": [\n            {\n                \"parameterName\": \"epochs\",\n                \"type\": \"DISCRETE\",\n                \"discreteValues\": [500, 1000]\n            },\n            {\n                \"parameterName\": \"lr\",\n                \"type\": \"DOUBLE\",\n                \"minValue\": 0.0001,\n                \"maxValue\":

In [13]:
ENDPOINT = '4a00ac13a87686cc-dot-us-central1.pipelines.googleusercontent.com'
PIPELINE_NAME = 'demand_predictor_pipeline_model_v2'

In [11]:
import kfp

In [14]:
!kfp --endpoint $ENDPOINT pipeline upload \
-p $PIPELINE_NAME \
./pipeline/training_pipeline.yaml

Pipeline 28034510-a334-4247-a9c3-e622811e9687 has been submitted

Pipeline Details
------------------
ID           28034510-a334-4247-a9c3-e622811e9687
Name         demand_predictor_pipeline_model_v2
Description
Uploaded at  2021-09-13T08:03:12+00:00
+--------------------------+------------------------------------------------------------+
| Parameter Name           | Default Value                                              |
| project_id               |                                                            |
+--------------------------+------------------------------------------------------------+
| gcs_root                 |                                                            |
+--------------------------+------------------------------------------------------------+
| model_id                 |                                                            |
+--------------------------+------------------------------------------------------------+
| version_id               | 

In [15]:

!kfp --endpoint $ENDPOINT pipeline list

+--------------------------------------+------------------------------------------------+---------------------------+
| Pipeline ID                          | Name                                           | Uploaded at               |
| 28034510-a334-4247-a9c3-e622811e9687 | demand_predictor_pipeline_model_v2             | 2021-09-13T08:03:12+00:00 |
+--------------------------------------+------------------------------------------------+---------------------------+
| cf36bc23-dfb6-4995-8558-3eab9e51398e | demand_predictor_pipeline_model                | 2021-09-13T07:50:01+00:00 |
+--------------------------------------+------------------------------------------------+---------------------------+
| 8fb0afdf-ae56-402f-94b0-9ae2b2ad616a | [Tutorial] V2 lightweight Python components    | 2021-09-13T07:06:07+00:00 |
+--------------------------------------+------------------------------------------------+---------------------------+
| 97a3acce-1e34-4438-96db-f98f06c3b700 | [Tutorial] DSL 

In [None]:
PIPELINE_ID='0918568d-758c-46cf-9752-e04a4403cd84' # TO DO: REPLACE WITH YOUR PIPELINE ID
EXPERIMENT_NAME = 'Covertype_Classifier_Training'
RUN_ID = 'Run_001'
SOURCE_TABLE = 'covertype_dataset.covertype'
DATASET_ID = 'splits'
MODEL_ID = 'covertype_classifier'
VERSION_ID = 'v01'
REPLACE_EXISTING_VERSION = 'True'

GCS_STAGING_PATH = '{}/staging'.format(ARTIFACT_STORE_URI)

In [None]:
!kfp --endpoint $ENDPOINT run submit \
-e $EXPERIMENT_NAME \
-r $RUN_ID \
-p $PIPELINE_ID \
project_id=$PROJECT_ID \
gcs_root=$GCS_STAGING_PATH \
region=$REGION \
source_table_name=$SOURCE_TABLE \
dataset_id=$DATASET_ID \
evaluation_metric_name=$EVALUATION_METRIC \
evaluation_metric_threshold=$EVALUATION_METRIC_THRESHOLD \
model_id=$MODEL_ID \
version_id=$VERSION_ID \
replace_existing_version=$REPLACE_EXISTING_VERSION