In [1]:
from gcp_modules import GCPTrainingConfig, package_and_upload_training_code, \
    GCPTrainingJob, GCPMetricsLogger

In [2]:
import mlflow_config



Enter your project ID: test-mlflow-1click
Enter the name of your MLFlow experiment: test
Tracking uri set to https://mlflow-dot-test-mlflow-1click.ew.r.appspot.com


# GCP Training Modules

This notebook demonstrates how to use the GCP training modules to train an sklearn clustering model and log training metrics to MLFLow. The modeling done here is for the [Instacart Segmentation](https://docs.google.com/presentation/d/1tYHSvDUMDh06qA_Qxk2SWlpUG-976pAfN8cJyEHDHrs/edit?usp=sharing) case study. All modules in this notebook are from `gcp_modules.py`

## Define Variables

First, we define job level variables like GCS path to training data, output GCS bucket, if hyperparamter tuning should be performed, and the GCP project that will be used to run the training

In [3]:
project_name = %env PROJECT_ID
project_id = 'projects/{}'.format(project_name)
experiment_name = %env EXPERIMENT_NAME
run_name = None

In [4]:
run_tuning = True
training_dir = './training' # local path to the training module to be packaged using setuptools
gcs_output_bucket = '{}-training-data'.format(project_name)
gcs_package_prefix = 'training/sklearn_clustering_test/{}.tar.gz'.format(experiment_name)
gcs_training_data_dir = 'gs://{}-training-data/data/instacart'.format(project_name)

## Training

GCP AI Platform supports training via python training scripts or by custom docker containers. The current modules support only trainig by script. GCP supports most major frameworks for script training (Scikit-learn, Tensorflow, PyTorch, Keras, XGBoost).

In script training, users have to package their script/code as a python package. This package has to then be uploaded to GCS and the GCS path specified when submitting the training job. The `./training` directory has been provided to show what the structure of the directory should look like. The training script alone is also made available at the bottom of this notebook.

We will use a function (`package_and_upload_training_code`) to facilitate building the package and uploading it to GCS.

First, put the path to your ml-test service key json you generated during setup in the cell below. It will set an environmental variable that will allow you to authenticate to GCP

In [5]:

package_and_upload_training_code(training_dir, gcs_output_bucket)

test-mlflow-1click-training-data
training_packages/training_package.tar.gz
File test-mlflow-1click-training-data uploaded to ./training_package.tar.gz/training_packages/training_package.tar.gz.


You can examine your file in [GCS browser](https://console.cloud.google.com/storage/browser/raybeam-training-ml-setup-test;tab=objects?forceOnBucketsSortingFiltering=false&project=raybeam-training&prefix=&forceOnObjectsSortingFiltering=false)

### Configure and Submit Training Job
Next, we will define a training job configuration to be used for training and we will submit the training job to GCP. First, we have to define a dict of hyperparamters to use for training. These hyperparamters must be command line arguments that the training script must accept.

In [6]:
hyperparameters = {'num_clusters': 4, 'reduce': True, 'num_dimensions': 2}

`GCPTrainingConfig` contains a set of default configurations for the job (e.g. instance type, # instances, # workers, region...etc). These can all be overridden by passing another value when initializing the module (e.g. `GCPTrainingConfig(..., masterType='complex_model_m')`)

For more information on training job configurations: https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#TrainingInput

In [7]:
config = GCPTrainingConfig(gcs_output_bucket, gcs_training_data_dir)
config.set_hyperparameters(metric_name='ssd', hyperparams=hyperparameters)

We can examine the json configuration as such:

In [8]:
config.config

{'scaleTier': 'CUSTOM',
 'masterType': 'complex_model_m',
 'workerType': 'complex_model_m',
 'parameterServerType': 'large_model',
 'workerCount': 1,
 'parameterServerCount': 1,
 'packageUris': ['gs://test-mlflow-1click-training-data/training_packages/training_package.tar.gz'],
 'pythonModule': 'trainer.task',
 'args': ['--train', 'gs://test-mlflow-1click-training-data/data/instacart'],
 'region': 'us-central1',
 'jobDir': 'gs://test-mlflow-1click-training-data/artifacts',
 'runtimeVersion': '2.2',
 'pythonVersion': '3.7',
 'scheduling': {'maxWaitTime': '3600s', 'maxRunningTime': '14400s'},
 'hyperparameters': {'goal': 'MINIMIZE',
  'hyperparameterMetricTag': 'ssd',
  'maxTrials': 1,
  'maxParallelTrials': 1,
  'params': [{'parameterName': 'num_clusters',
    'type': 'CATEGORICAL',
    'categoricalValues': ['4']},
   {'parameterName': 'reduce',
    'type': 'CATEGORICAL',
    'categoricalValues': ['True']},
   {'parameterName': 'num_dimensions',
    'type': 'CATEGORICAL',
    'categoric

`GCPTrainingJob` takes the configuration created above and will use it to submit and track a training job. `submit_job()` will submit the job and return the response JSON. `track_job()` will monitor the job until a success or failure response is given, at which point it will return the response

In [9]:
job = GCPTrainingJob(gcp_project_name=project_name, job_name='training_test', training_config=config)
job.submit_job()

{'jobId': 'training_test_2021_09_13_14_22_46',
 'trainingInput': {'scaleTier': 'CUSTOM',
  'masterType': 'complex_model_m',
  'workerType': 'complex_model_m',
  'parameterServerType': 'large_model',
  'workerCount': '1',
  'parameterServerCount': '1',
  'packageUris': ['gs://test-mlflow-1click-training-data/training_packages/training_package.tar.gz'],
  'pythonModule': 'trainer.task',
  'args': ['--train', 'gs://test-mlflow-1click-training-data/data/instacart'],
  'hyperparameters': {'goal': 'MINIMIZE',
   'params': [{'parameterName': 'num_clusters',
     'type': 'CATEGORICAL',
     'categoricalValues': ['4']},
    {'parameterName': 'reduce',
     'type': 'CATEGORICAL',
     'categoricalValues': ['True']},
    {'parameterName': 'num_dimensions',
     'type': 'CATEGORICAL',
     'categoricalValues': ['2']}],
   'maxTrials': 1,
   'maxParallelTrials': 1,
   'hyperparameterMetricTag': 'ssd'},
  'region': 'us-central1',
  'runtimeVersion': '2.2',
  'jobDir': 'gs://test-mlflow-1click-traini

In [10]:
job.track_job()

Time: 2021-09-13 14:34:47.315461, State: RUNNING
Time: 2021-09-13 14:35:17.453620, State: RUNNING
Time: 2021-09-13 14:35:47.628527, State: RUNNING
Time: 2021-09-13 14:36:17.763402, State: RUNNING
Time: 2021-09-13 14:36:47.946263, State: RUNNING
Time: 2021-09-13 14:37:18.079878, State: RUNNING
Time: 2021-09-13 14:37:48.203279, State: RUNNING
Time: 2021-09-13 14:38:18.339221, State: RUNNING
Time: 2021-09-13 14:38:48.478325, State: RUNNING
Time: 2021-09-13 14:39:18.623938, State: RUNNING
Time: 2021-09-13 14:39:48.759153, State: RUNNING
Time: 2021-09-13 14:40:18.866107, State: RUNNING
Time: 2021-09-13 14:40:48.985790, State: RUNNING
Time: 2021-09-13 14:41:19.109592, State: RUNNING
Time: 2021-09-13 14:41:49.236487, State: RUNNING
Time: 2021-09-13 14:42:19.363802, State: RUNNING
Time: 2021-09-13 14:42:49.487727, State: RUNNING
Time: 2021-09-13 14:43:19.622152, State: RUNNING
Time: 2021-09-13 14:43:49.741313, State: RUNNING
Time: 2021-09-13 14:44:19.862709, State: RUNNING
Time: 2021-09-13 14:

{'jobId': 'training_test_2021_09_13_14_22_46',
 'trainingInput': {'scaleTier': 'CUSTOM',
  'masterType': 'complex_model_m',
  'workerType': 'complex_model_m',
  'parameterServerType': 'large_model',
  'workerCount': '1',
  'parameterServerCount': '1',
  'packageUris': ['gs://test-mlflow-1click-training-data/training_packages/training_package.tar.gz'],
  'pythonModule': 'trainer.task',
  'args': ['--train', 'gs://test-mlflow-1click-training-data/data/instacart'],
  'hyperparameters': {'goal': 'MINIMIZE',
   'params': [{'parameterName': 'num_clusters',
     'type': 'CATEGORICAL',
     'categoricalValues': ['4']},
    {'parameterName': 'reduce',
     'type': 'CATEGORICAL',
     'categoricalValues': ['True']},
    {'parameterName': 'num_dimensions',
     'type': 'CATEGORICAL',
     'categoricalValues': ['2']}],
   'maxTrials': 1,
   'maxParallelTrials': 1,
   'hyperparameterMetricTag': 'ssd'},
  'region': 'us-central1',
  'runtimeVersion': '2.2',
  'jobDir': 'gs://test-mlflow-1click-traini

You can also view your job on the AI Platform [console](https://console.cloud.google.com/ai-platform/jobs?project=raybeam-training).

An important part of any ML development is tracking different experiments, trials, paramters, and metrics. There are many tools available to do this but we will be using [MLFlow Tracking](https://www.mlflow.org/docs/latest/tracking.html).

`GCPMetricsLogger` will log the training metrics outputted by the training script. Each script has to log a metric using the `hypertune` package. In the training script below, we use the `report_hyperparameter_tuning_metric` function to log metrics. These metrics will be logged to MLFlow. `GCPMetricsLogger` will initialize an MLFlow client which will log parameters and metrics to a database (a postgres database in this case). 

To log metrics to MLFlow, we first create an instance of `GCPMetricsLogger` and initialize an mlflow experiment. The `experiment_name` does not have to be unique - there can be many runs under the same experiment. 

In [12]:
import os
metrics_logger = GCPMetricsLogger(os.environ["TRACKING_URI"])

In [16]:
!echo $TRACKING_URI

https://mlflow-dot-test-mlflow-1click.ew.r.appspot.com


We then pass the instance of `GCPTrainingJob` we created earlier. The training job must have been successfully complete in order to log the metrics successfully 

In [13]:
metrics_logger.log_training_metrics(job)

MLFlow has a UI that displays details about experiments and the various runs within an experiment. Because github is unable to display images in notebooks, screenshots of the UI have been documented in this [markdown file](mlflow_ui.md).

## Hyperparameter Tuning

The process for hyperparamter tuning is very similar to training. The only difference is that users will be expected to provide the GCP hyperparameter tuning configuration explicitly here. This configuration must be passed to the `tuning_hyperparams` option of the `GCPTrainingConfig.set_hyperparameters` function and `run_tuning` must also be set to `True`.

For more information on this tuning configuration: https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#HyperparameterSpec

In [None]:
tuning_hyperparameters = [
    {
        'parameterName':'num_clusters',
        'type':'INTEGER',
        'minValue': 1,
        'maxValue': 20,
        'scaleType': 'UNIT_LINEAR_SCALE'
    },
    {
        'parameterName':'num_dimensions',
        'type':'INTEGER',
        'minValue': 1,
        'maxValue': 3,
        'scaleType': 'UNIT_REVERSE_LOG_SCALE'
    },
    {
        'parameterName':'reduce',
        'type':'CATEGORICAL',
        'categoricalValues': ['True', 'False']
    },
]

hyperparams = {
    'goal': 'MINIMIZE',
    'hyperparameterMetricTag': 'ssd',
    'maxTrials': 5,
    'maxParallelTrials': 5,
    'enableTrialEarlyStopping': True,
    'params': tuning_hyperparameters}

In [None]:
config = GCPTrainingConfig(gcs_output_bucket, gcs_training_data_dir)
config.set_hyperparameters(metric_name='ssd', run_tuning=True, tuning_hyperparams=hyperparams)

In [None]:
job = GCPTrainingJob(gcp_project_name=project_name, job_name='training_test_hpt_v2', training_config=config)
job.submit_job()

In [None]:
job.track_job()

Finally metrics are logged using the same process as before

In [None]:
metrics_logger = GCPMetricsLogger()
metrics_logger.initialize_mlflow("hypertune")
metrics_logger.log_training_metrics(job)

## Training Script
The full training script used to run this model has been provided below. The training script must fulfill certain requirements:

1. The script must accept hyperparamters as command line arguments through `argparse`
1. The script must report training metrics using the `hypertune` library
1. The script must save the trained model in pkl, joblib, or bst format (more info here: https://cloud.google.com/ai-platform/training/docs/getting-started-scikit-xgboost#model_file_naming_requirements)

In [None]:
from __future__ import print_function


import os
import sys
import subprocess

import argparse
import joblib
import numpy as np
import pandas as pd
import scipy.stats as stats

from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import FunctionTransformer

import hypertune


def log_normalize_features(data_df, method='skewed_only', skew_threshold=1):
    if not isinstance(data_df, pd.DataFrame):
        data_df = pd.DataFrame(data_df)
    if method == 'none':
        return data_df
    elif method == 'skewed_only':
        skewness = stats.skew(data_df)
        log_data_df = data_df.copy()
        for col, skew in zip(data_df.columns, skewness):
            if abs(skew) >= skew_threshold:
                log_data_df[col] = np.log(data_df[col])
        return log_data_df
    elif method == 'all':
        return pd.DataFrame(np.log(data_df), columns=data_df.columns, index=data_df.index)
    else:
        raise ValueError('Unknown normalize method {}'.format(method))


if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    #Job Parameters
    parser.add_argument('--standardize', type=bool, default=True)
    parser.add_argument('--normalize', type=str, default='skewed_only')
    parser.add_argument('--skew_threshold', type=float, default=1.0)
    parser.add_argument('--reduce', type=bool, default=False)
    parser.add_argument('--num_dimensions', type=int, default=50)
    parser.add_argument('--num_clusters', type=int, required=True)


    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--job-dir', type=str)
    parser.add_argument('--model-dir', type=str, default="./")
    parser.add_argument('--train', type=str, help='GCS storage directory path', required=True)

    args = parser.parse_args()
    local_dir = os.path.basename(args.train)
    if args.train[-1] == '/':
        local_dir = os.path.basename(args.train[:-1])
    subprocess.check_call(['gsutil', 'cp', '-r', args.train, './'], stderr=sys.stdout)
    # Take the set of files and read them all into a single pandas dataframe
    input_files = [os.path.join(args.train, file) for file in os.listdir(local_dir)]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [pd.read_csv(file, header=None) for file in input_files]
    concat_data = pd.concat(raw_data)

    scaler = StandardScaler()
    normalizer = FunctionTransformer(log_normalize_features,
                                     kw_args={'method': args.normalize,
                                              'skew_threshold': args.skew_threshold})

    steps = [('normalize', normalizer), ('scale', scaler)]
    if args.reduce:
        num_dimensions = min(args.num_dimensions, concat_data.shape[1])
        reducer = PCA(n_components=num_dimensions)
        steps.append(('reduce', reducer))

    kmeans = KMeans(n_clusters=args.num_clusters)
    steps.append(('cluster', kmeans))

    pipeline = Pipeline(steps)
    pipeline.fit(concat_data)
    ssd = pipeline['cluster'].inertia_
    print('Training:SSD = {};'.format(ssd))
    hpt = hypertune.HyperTune()
    hpt.report_hyperparameter_tuning_metric(
                                            hyperparameter_metric_tag='ssd',
                                            metric_value=ssd)
    joblib.dump(pipeline, os.path.join(args.model_dir, "model.joblib"))
    subprocess.check_call(['gsutil', 'cp', 'model.joblib', args.job_dir], stderr=sys.stdout)
    print("saved model!")
