# Using the dynamic AI Platform Vizier API

Run this inside a Virtual Environment to isolate the different library versions (and in particular, Tensorflow.

__[This](https://janakiev.com/blog/jupyter-virtual-envs/)__ explains how.

In [99]:
import datetime
import json 
import os
import random
import string
import subprocess
import time 
import yaml

from google.cloud import storage

from googleapiclient import discovery
from googleapiclient import errors
start = time.time()

In [100]:
! pip install -U google-cloud
! pip install -U google-cloud-storage
! pip install -U requests  



This Notebook is built to run in Google Cloud Notebooks, where you are already authenticated.

In [101]:
shell_output = !gcloud config get-value project 
PROJECT_ID = shell_output[0]

New Study ID each time to avoid collisions.

In [102]:
USER = 'user1' 
STUDY_ID = f"{USER.replace('-','')}_study_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" 
REGION = 'us-central1'

In [103]:
def study_name(study_id):
  return f'projects/{PROJECT_ID}/locations/{REGION}/studies/{study_id}'

def trial_name(study_id, trial_id):
  return f'projects/{PROJECT_ID}/locations/{REGION}/studies/{study_id}/trials/{trial_id}'

The REST API is defined in this JSON. You can see the content with 


`gsutil cp gs://caip-optimizer-public/api/ml_public_google_rest_v1.json . && less ml_public_google_rest_v1.json`

Since it is dynamically defined, there are no API-specific Python classes.

In [104]:
 
def read_api_document():
  client = storage.Client(PROJECT_ID)
  bucket = client.get_bucket('caip-optimizer-public')
  blob = bucket.get_blob('api/ml_public_google_rest_v1.json')
  return blob.download_as_string()

api_doc = read_api_document()

#[0]
ml = discovery.build_from_document(service=api_doc)

In [105]:
_TRAINING_JOB_NAME_PATTERN = '{}_condition_parameters_{}_{}'
_IMAGE_URIS = {'LINEAR' : 'gcr.io/cloud-ml-algos/linear_learner_cpu:latest', 
               'WIDE_AND_DEEP' : 'gcr.io/cloud-ml-algos/wide_deep_learner_cpu:latest'}
_STEP_COUNT = 'step_count'
_ACCURACY = 'accuracy'


Define hyperparameters. Some are continuous (`double`), and some are integers, and some are categorical (in this case, selection of a model).

Numerical hyperparams are defined as logarithmic where the values span orders of magnitude.

In [106]:
#[1]
param_learning_rate = {
    'parameter': 'learning_rate',
    'type' : 'DOUBLE',
    'double_value_spec' : {
        'min_value' : 0.00001,
        'max_value' : 100.0
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['LINEAR', 'WIDE_AND_DEEP']
    },
}

param_dnn_learning_rate = {
    'parameter': 'dnn_learning_rate',
    'type' : 'DOUBLE',
    'double_value_spec' : {
        'min_value' : 0.0005,
        'max_value' :0.05
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['WIDE_AND_DEEP']
    },
}

param_max_steps={
    'parameter': 'max_steps',
    'type' : 'INTEGER',
    'integer_value_spec' : {
        'min_value' : 1,
        'max_value' :1000
    },
    'scale_type' : 'UNIT_LOG_SCALE',
    'parent_categorical_values' : {
        'values': ['WIDE_AND_DEEP']
    },
}


param_model_type = {
    'parameter': 'model_type',
    'type' : 'CATEGORICAL',
    'categorical_value_spec' : {'values': ['LINEAR', 'WIDE_AND_DEEP']},
    'child_parameter_specs' : [param_learning_rate, param_dnn_learning_rate,param_max_steps]
}

metric_spec = {
    'metric' : _ACCURACY,
    'goal' : 'MAXIMIZE'
}

study_config = {
    'algorithm' : 'ALGORITHM_UNSPECIFIED',  
    'parameters' : [param_model_type,], 
    'metrics' : [metric_spec],
}

study = {'study_config': study_config}

In [107]:
#[2]
req = ml.projects().locations().[().create(
    parent=f'projects/{PROJECT_ID}/locations/{REGION}', studyId=STUDY_ID, body=study)

try :
  result = req.execute() 
except errors.HttpError as e: 
  if e.resp.status == 409:
    print('Study already exists')
  else:
    raise e

In [108]:
WORKING_BUCKET =f"vizier-{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
OUTPUT_DIR = 'output' 

!gsutil mb gs://$WORKING_BUCKET

Creating gs://vizier-wgxqwgbz/...


Use the data in Joshua's bucket.

In [109]:
#TRAINING_DATA_PATH = f'gs://joshuafraud/fraud-noheader-withy.csv'  
#TRAINING_DATA_PATH =  'gs://caip-optimizer-public/sample-data/raw_census_train.csv'
TRAINING_DATA_PATH= 'gs://vizierjoshua/preprocessed-telcochurn.csv'

For each of a set of suggested trials (received from Vizier), this runs a training job. It runs these jobs in parallel.

In [110]:
# [4]
def evaluate_trials(trials):

  trials_by_job_id = {}
  measurement_by_trial_id = {}

  for trial in trials:
    trial_id = int(trial['name'].split('/')[-1])
    model_type =  get_suggested_parameter_value(trial, 'model_type', 'stringValue')
    learning_rate = get_suggested_parameter_value(trial, 'learning_rate', 
                                                'floatValue')
    dnn_learning_rate = get_suggested_parameter_value(trial, 'dnn_learning_rate', 
                                                    'floatValue')
    job_id = _TRAINING_JOB_NAME_PATTERN.format(STUDY_ID, model_type, trial_id)
    trials_by_job_id[job_id] = {
        'trial_id' : trial_id,
        'model_type' : model_type,
        'learning_rate' : learning_rate,
        'dnn_learning_rate' : dnn_learning_rate, 
    }
    
    print('submitting',job_id, trial_id, model_type, learning_rate, dnn_learning_rate)
    submit_training_job(job_id, trial_id, model_type, learning_rate, dnn_learning_rate)

  # Waits for completion of AI Platform Training jobs.
  while not  jobs_completed(trials_by_job_id.keys()):
    time.sleep(60)
  
  print('Completed')

  # Retrieves model training result(e.g. global_steps, accuracy, loss) for AI Platform Training jobs.
  metrics_by_job_id = get_job_metrics(trials_by_job_id.keys())
  for job_id, metric in metrics_by_job_id.items():
    measurement =create_measurement(trials_by_job_id[job_id]['trial_id'],
                                     trials_by_job_id[job_id]['model_type'],
                                     trials_by_job_id[job_id]['learning_rate'], 
                                     trials_by_job_id[job_id]['dnn_learning_rate'],
                                     metric)

    measurement_by_trial_id[trials_by_job_id[job_id]['trial_id']] = measurement
  return measurement_by_trial_id

In [111]:
def create_measurement(trial_id, model_type, learning_rate, dnn_learning_rate, metric):
  if not metric[_ACCURACY]:
    # Returns `none` for trials without metrics. The trial will be marked as `INFEASIBLE`.
    return None
  dnn_lr = (dnn_learning_rate if dnn_learning_rate else 'N/A')
  print("Trial", trial_id,"model_type", model_type, "learning_rate", learning_rate, 
      "dnn_learning_rate", dnn_lr, _ACCURACY, metric[_ACCURACY])
  measurement = {
      _STEP_COUNT: metric[_STEP_COUNT], 
      'metrics': [{'metric': _ACCURACY, 'value': metric[_ACCURACY]},]}
  return measurement

#[D]
def submit_training_job(job_id, trial_id, model_type, learning_rate, dnn_learning_rate=None):
  try:
    if model_type == 'LINEAR':
      subprocess.check_output(linear_command(job_id, learning_rate), stderr=subprocess.STDOUT)
    elif model_type == 'WIDE_AND_DEEP':
      subprocess.check_output(wide_and_deep_command(job_id, learning_rate, dnn_learning_rate), stderr=subprocess.STDOUT)
    print(f'Trial {trial_id}: Submitted job [https://console.cloud.google.com/ai-platform/jobs/{job_id}?project={PROJECT_ID}].')
  except subprocess.CalledProcessError as e:
    logging.error(e.output)


def get_training_job_state(job_id):
  cmd = ['gcloud', 'ai-platform', 'jobs', 'describe', job_id, 
         '--project', PROJECT_ID, 
         '--format', 'json']
  try:
    output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, timeout=3)
  except subprocess.CalledProcessError as e:
    logging.error(e.output)
  return json.loads(output)['state']


def jobs_completed(jobs):
  all_done = True
  for job in jobs:
    if get_training_job_state(job) not in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
      print(round(time.time()-start,1), f"sec elapsed; Waiting for job https://console.cloud.google.com/ai-platform/jobs/{job}?project={PROJECT_ID}")
      all_done = False
  return all_done


def retrieve_accuracy(job_id):
  storage_client = storage.Client(project=PROJECT_ID)
  bucket = storage_client.get_bucket(WORKING_BUCKET)
  blob_name = os.path.join(OUTPUT_DIR, job_id, 'model/deployment_config.yaml')
  blob = storage.Blob(blob_name, bucket)
  try: 
    blob.reload()
    content = blob.download_as_string()
    loaded_content = yaml.safe_load(content)    
    accuracy = float(loaded_content['labels'][_ACCURACY]) 
    print('Accuracy is', accuracy)
        
    step_count = int(loaded_content['labels']['global_step'])
    return {_STEP_COUNT: step_count, _ACCURACY: accuracy}
  except:
    # Returns None if failed to load the built-in algo output file.
    # It could be due to job failure and the trial will be `INFEASIBLE`
    return None


def get_job_metrics(jobs):
     return {j: retrieve_accuracy(j) for j in jobs}


def get_suggested_parameter_value(trial, parameter, value_type):
  param_found = [p for p in trial['parameters'] if p['parameter'] == parameter]
  if param_found:
    return param_found[0][value_type]
  else: 
    return None


def job_dir(job_id):
  return os.path.join('gs://', WORKING_BUCKET, OUTPUT_DIR, job_id)

The ML training is done in the AI Platform service, not locally in the Notebook.

In [None]:
#[6]
def linear_command(job_id, learning_rate):
  return ['gcloud', 'ai-platform', 'jobs', 'submit', 'training', job_id,
          '--scale-tier', 'BASIC',
          '--region', 'us-central1',
          '--master-image-uri', _IMAGE_URIS['LINEAR'],
          '--project', PROJECT_ID,
          '--job-dir', job_dir(job_id),
          '--',
          '--preprocess',
          '--model_type=classification',
          '--batch_size=250',
          '--max_steps=1000',
          f'--learning_rate={learning_rate}',
          f'--training_data_path={TRAINING_DATA_PATH}']
 
 
def wide_and_deep_command(job_id, learning_rate, dnn_learning_rate):
  return ['gcloud', 'ai-platform', 'jobs', 'submit', 'training', job_id,
          '--scale-tier', 'BASIC',
          '--region', 'us-central1',
          '--master-image-uri', _IMAGE_URIS['WIDE_AND_DEEP'],
          '--project', PROJECT_ID,
          '--job-dir', job_dir(job_id),
          '--',
          '--preprocess',
          '--test_split=0',
          '--use_wide',
          '--model_type=classification',
          '--batch_size=250',
          f'--learning_rate={learning_rate}',
          f'--dnn_learning_rate={dnn_learning_rate}',
          '--max_steps=1000',
          f'--training_data_path={TRAINING_DATA_PATH}']

Beware! Like all ML training, this can get expensive, so limit the number of suggestions from Vizier per request, and the total number of trials, as set in the variables below.

In [113]:
client_id = 'client1'  
suggestions_per_request = 1
trials_to_do =  3
print('client_id:', client_id, '; suggestions_per_request', suggestions_per_request, '; trials_to_do:', trials_to_do)

client_id: client1 ; suggestions_per_request 1 ; trials_to_do: 3


The core loop: On each loop, asks for suggested trials from  from Vizier, then runs the trials.

At the end, we report that the study  is complete.


In [114]:
#[3]
current_trial_id = 0
while current_trial_id < trials_to_do:
  # Request trials
  resp = ml.projects().locations().studies().trials().suggest(
    parent = study_name(STUDY_ID), 
    body={'client_id': client_id, 'suggestion_count': suggestions_per_request}).execute()
  
  op_id = resp['name'].split('/')[-1]

  # Polls the suggestion long-running operations.
  get_op = ml.projects().locations().operations().get(name=f'projects/{PROJECT_ID}/locations/{REGION}/operations/{op_id}')
  while True:
      operation = get_op.execute()
      if 'done' in operation and operation['done']:
        break
      time.sleep(1)
  
  # Fetches the suggested trials.
  trials = []
  for suggested_trial in get_op.execute()['response']['trials']:
    trial_id = int(suggested_trial['name'].split('/')[-1])
    trial = ml.projects().locations().studies().trials().get(name=trial_name(STUDY_ID, trial_id)).execute()
    if trial['state'] not in ['COMPLETED', 'INFEASIBLE']:
      print("Trial {}: {}".format(trial_id, trial))
      trials.append(trial)
    
  measurement_by_trial_id = evaluate_trials(trials)

  # Completes trials.
  for trial in trials:
    trial_id = int(trial['name'].split('/')[-1])
    current_trial_id = trial_id
    measurement = measurement_by_trial_id[trial_id]
    
    if measurement:
      ml.projects().locations().studies().trials().complete(
        name = trial_name(STUDY_ID, trial_id), 
        body = {'final_measurement' : measurement}).execute()
    else:    
       ml.projects().locations().studies().trials().complete(
        name = trial_name(STUDY_ID, trial_id), 
        body = {'trial_infeasible' : True}).execute()

Trial 1: {'name': 'projects/401966870909/locations/us-central1/studies/user1_study_20211227_160302/trials/1', 'state': 'ACTIVE', 'parameters': [{'parameter': 'model_type', 'stringValue': 'LINEAR'}, {'parameter': 'learning_rate', 'floatValue': 0.0007762982111444026}], 'startTime': '2021-12-27T16:03:05Z', 'clientId': 'client1'}
submitting user1_study_20211227_160302_condition_parameters_LINEAR_1 1 LINEAR 0.0007762982111444026 None
Trial 1: Submitted job [https://console.cloud.google.com/ai-platform/jobs/user1_study_20211227_160302_condition_parameters_LINEAR_1?project=joshua-playground].
14.7 sec elapsed; Waiting for job https://console.cloud.google.com/ai-platform/jobs/user1_study_20211227_160302_condition_parameters_LINEAR_1?project=joshua-playground
76.0 sec elapsed; Waiting for job https://console.cloud.google.com/ai-platform/jobs/user1_study_20211227_160302_condition_parameters_LINEAR_1?project=joshua-playground
137.2 sec elapsed; Waiting for job https://console.cloud.google.com/ai-

You do not have to use Vizier's suggestions: You can create trials using your own parameters. 

In [115]:
#[5]
custom_trial = {
  "clientId": "client1",
  "finalMeasurement": {
    "metrics": [
      {
        "metric": _ACCURACY,
        "value": 86
      }
    ],
    "stepCount": "1000"
  },
  "parameters": [
    {
      "parameter": "model_type",
      "stringValue": "LINEAR"
    },
    {
      "floatValue": 0.3869103706121445,
      "parameter": "learning_rate"
    }
  ],
  "state": "COMPLETED"
}

# To avoid getting a fake result, we won't actually add it here.
# trial = ml.projects().locations().studies().trials().create(  parent=study_name(STUDY_ID), body=custom_trial).execute()

In [116]:
resp = ml.projects().locations().studies().trials().list(parent=study_name(STUDY_ID)).execute()
resp

{'trials': [{'name': 'projects/401966870909/locations/us-central1/studies/user1_study_20211227_160302/trials/1',
   'state': 'COMPLETED',
   'parameters': [{'parameter': 'model_type', 'stringValue': 'LINEAR'},
    {'parameter': 'learning_rate', 'floatValue': 0.0007762982111444026}],
   'finalMeasurement': {'stepCount': '1000',
    'metrics': [{'metric': 'accuracy', 'value': 79}]},
   'startTime': '2021-12-27T16:03:05Z',
   'endTime': '2021-12-27T16:10:18Z',
   'clientId': 'client1'},
  {'name': 'projects/401966870909/locations/us-central1/studies/user1_study_20211227_160302/trials/2',
   'state': 'COMPLETED',
   'parameters': [{'parameter': 'model_type', 'stringValue': 'WIDE_AND_DEEP'},
    {'parameter': 'dnn_learning_rate', 'floatValue': 0.005000000000000001},
    {'parameter': 'learning_rate', 'floatValue': 0.031622776601683805},
    {'parameter': 'max_steps', 'intValue': '32'}],
   'finalMeasurement': {'stepCount': '1000',
    'metrics': [{'metric': 'accuracy', 'value': 77}]},
   's

Choose the best trial. This can also be retrieved from Vizier -- see the other Notebook

In [120]:
#[7]
def accuracy_from_trial(t):
    return t['finalMeasurement']['metrics'][0]['value']

print("Final measurements", [accuracy_from_trial(t) for t in resp['trials']])

best_trial = max(resp['trials'], key=lambda t: accuracy_from_trial(t))

print("Best", accuracy_from_trial(best_trial))

Final measurements [79, 77, 75]
Best 79
