In [None]:
# Replace with your project_id
project_id = 'YOUR-PROJECT-ID'

# The bucket must be already created with either:
# (a) gsutil mb gs://YOUR-BUCKET-NAME; or
# (b) https://console.cloud.google.com/storage
bucket_name = 'YOUR-BUCKET-NAME'

# Choose a cluster name.  Preferably not an existing cluster to avoid affecting its workload.
cluster_id = 'YOUR-CLUSTER-ID'

# Choose a name for the image that will be running on the container.
image_name = 'YOUR-IMAGE-NAME'

# Choose a different zone if you prefer.
zone = 'us-central1-b'

# Change this only if you have customized the source.
source_dir = 'source'

In [None]:
# This step builds a Docker image using the content in the source/ folder.
# The image will be tagged with the provided image_name so the workers can pull it.
# The main script source/worker.py would retrieve a pickled RandomizedSearchCV object
# from GCS and fit it to data on GCS.

# Note: This step only needs to be run once the first time you follow these steps,
# and each time you modify the codes in source/.  If you do not modify source/ then
# you can just re-use the same image.

# Note: This could take a couple minutes.
# To monitor the build process: https://console.cloud.google.com/gcr/builds

from cloudbuild_helper import build

build(project_id, source_dir, bucket_name, image_name)

In [None]:
# This step creates a cluster on GKE.

# You can alternatively create the cluster with the gcloud command line tool or through the console, but
# you must add the additional scope of write access to GCS: 'https://www.googleapis.com/auth/devstorage.read_write'

# Note: This could take several minutes.
# To monitor the cluster creation process: https://console.cloud.google.com/kubernetes/list

from gke_helper import create_cluster

create_cluster(project_id, zone, cluster_id, n_nodes=2, machine_type='n1-standard-2')

In [None]:
# For illustration purposes we will use the MNIST dataset.

from sklearn.datasets import fetch_mldata
from sklearn.utils import shuffle

mnist = fetch_mldata('MNIST original', data_home='./mnist_data')
X, y = shuffle(mnist.data[:60000], mnist.target[:60000])

X_small = X[:100]
y_small = y[:100]

X_large = X[:600]
y_large = y[:600]

In [None]:
# For illustration purposes we will use the GradientBoostingClassifier with RandomizedSearchCV.
# http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingClassifier.html
# http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import RandomizedSearchCV

from scipy.stats import randint, uniform

In [None]:
gbc = GradientBoostingClassifier()
param_distributions = {
    'learning_rate': [0.1, 0.01, 0.5],
    'n_estimators': randint(50, 401),
    'max_depth': randint(2, 6),
    'subsample': uniform(0.7, 0.3)
}
n_iter = 100
search = RandomizedSearchCV(estimator=gbc, param_distributions=param_distributions, n_iter=n_iter, n_jobs=-1, verbose=3)

In [None]:
# Each iteration of the hyperparameters will be fitted 3 times in the cross-validation,
# so in total the data will be fitted 3*100 = 300 times.

%time search.fit(X_small, y_small)

In [None]:
print(search.best_score_, search.best_params_)

In [None]:
# Everything up to this point is what you would do when training locally.
# With larger amount of data it would take much longer.
# For example, with 6000 images each fit could take 5~10 minutes, making the same
# randomized search a 15+ hours task.
# With 16 vCPUs the same task would take less than 4 hours.

# For GCE instance pricing: https://cloud.google.com/compute/pricing


In [None]:
# The GKEParallel class is a wrapper around a RandomizedSearchCV object that manages
# deploying fitting jobs to the Kubernetes cluster created above.

from gke_parallel import GKEParallel

In [None]:
# We pass in the RandomizedSearchCV object, which will be pickled and stored on GCS with
# uri of the form gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/search.pkl

gke_search = GKEParallel(search, project_id, zone, cluster_id, bucket_name, image_name)

In [None]:
# Included in this sample is a script that retrieves credentials for the cluster with gcloud
# and refreshes access token with kubectl.
# This allows us to deploy jobs to the Kubernetes cluster.

! bash get_cluster_credentials.sh $cluster_id

In [None]:
# GKEParallel instances implement a similar (but different!) interface as RandomizedSearchCV.

# Calling fit(X, y) first uploads the training data to GCS as:
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/X.pkl
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/y.pkl

# This allows reusing the same uploaded datasets for other training tasks.
# For instance, if you have data set stored on GCS as
# gs://DATA-BUCKET/X.pkl and gs://DATA-BUCKET/y.pkl
# then you can initiate fitting with:
# gke_search.fit(X='gs://DATA-BUCKET/X.pkl', y='gs://DATA-BUCKET/y.pkl')

# In the background, the GKEParallel instance splits the n_iter into a smaller n_iter, and uses
# the same param_distributions.

# Calling fit() also pickles gke_search and stores it on GCS:
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/gke_search.pkl

gke_search.fit(X_large, y_large)

In [None]:
# Each n_iter and param_distributions is pickled and stored on GCS within each worker's workspace:
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/WORKER-ID/n_iter.pkl
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/WORKER-ID/param_distributions.pkl

gke_search.n_iter

In [None]:
# You could optionally specify a task_name when creating a GKEParallel instance.
# If you did not specify a task_name, the task_name will be set to:
# YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME when you call fit()

gke_search.task_name

In [None]:
# The dictionary of Kubernetes job names.  Each worker pod handles one job processing one of the
# smaller n_iter.  

# To monitor the jobs: https://console.cloud.google.com/kubernetes/workload

gke_search.job_names

In [None]:
# To cancel the task.  This will delete all the deployed Kubernetes worker pods and jobs,
# but will NOT delete the cluster, nor delete any data persisted to GCS.

#gke_search.cancel()

In [None]:
# GKEParallel instances implement a similar (but different!) interface as Future instances.
# Calling done() checks whether each worker has completed the job and persisted its outcome
# on GCS with uri:
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/WORKER-ID/fitted_search.pkl

# To monitor the jobs: https://console.cloud.google.com/kubernetes/workload
# To access the persisted data directly: 
# https://console.cloud.google.com/storage/browser/YOUR-BUCKET-NAME/

gke_search.done(), gke_search.dones

In [None]:
# When all the jobs are finished, the pods will stop running (but the cluster will remain),
# and we can retrieve the fitted model.

# Calling result() will populate the gke_search.results attribute which is returned.
# This attribute records the fitted RandomizedSearchCV from each worker job.

# Calling result() also updates the pickled gke_search on GCS as:
# gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/gke_search.pkl

# To recover the fitted gke_search object, you can use the helper function included
# in this sample:

# from gcs_helper import download_uri_and_unpickle
# gcs_uri = 'gs://YOUR-BUCKET-NAME/YOUR-CLUSTER-ID.YOUR-IMAGE-NAME.UNIX-TIME/gke_search.pkl'
# gke_search_recovered = download_uri_and_unpickle(gcs_uri)

import time

while not gke_search.done():
    n_done = len([d for d in gke_search.dones.values() if d])
    print('{}/{} finished'.format(n_done, len(gke_search.job_names)))
    time.sleep(30)

result = gke_search.result()

In [None]:
# GKEParallel also implements these convenient interfaces to access the best score, hyperparameter, and estimator.

gke_search.best_score_, gke_search.best_params_, gke_search.best_estimator_

In [None]:
# You can also call predict(), which deligates the call to the best_estimator_

predicted = gke_search.predict(mnist.data[60000:])

# The number of correct predictions out of 10000 test cases.
print(len([p for i, p in enumerate(predicted) if p == mnist.target[60000:][i]]))


In [None]:
# To clean up, delete the cluster.  This will not delete any data persisted on GCS.
# The simplest way to delete the cluster is through the console:
# https://console.cloud.google.com/kubernetes/list