In [116]:
#Install the library
! pip3 install --upgrade --quiet google-cloud-aiplatform

In [117]:
!pip show google-cloud-aiplatform

Name: google-cloud-aiplatform
Version: 1.46.0
Summary: Vertex AI API client library
Home-page: https://github.com/googleapis/python-aiplatform
Author: Google LLC
Author-email: googleapis-packages@google.com
License: Apache 2.0
Location: /opt/conda/lib/python3.10/site-packages
Requires: docstring-parser, google-api-core, google-auth, google-cloud-bigquery, google-cloud-resource-manager, google-cloud-storage, packaging, proto-plus, protobuf, pydantic, shapely
Required-by: 


In [118]:
PROJECT_ID = "vddl-419615"  # @param {type:"string"}

In [119]:
# Set the project id (--quiet - resolves the prompt automatically)
! gcloud config set project {PROJECT_ID} --quiet

Updated property [core/project].


In [138]:
#Check the vallue of the project
! gcloud config get-value project

vddl-419615


In [121]:
REGION = "us-central1"  # @param {type: "string"}

In [122]:
import random
import string


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()
print(UUID)

an0zf8ux


In [123]:
BUCKET_URI = f"gs://vddl-test-{PROJECT_ID}-unique"  # @param {type:"string"}

In [139]:
#Create a bucket in the specified location and project with the given name using Google Cloud Storage
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Creating gs://vddl-test-vddl-419615-unique/...


In [125]:
import os

import google.cloud.aiplatform as aip

In [126]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [127]:
print(os.getenv("IS_TESTING_TRAIN_GPU"))
print(os.getenv("IS_TESTING_DEPLOY_GPU"))

None
None


In [128]:
if os.getenv("IS_TESTING_TRAIN_GPU"):
    TRAIN_GPU, TRAIN_NGPU = (
        aip.gapic.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED,
        int(os.getenv("IS_TESTING_TRAIN_GPU")),
    )
else:
    TRAIN_GPU, TRAIN_NGPU = (aip.gapic.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED, 0)

if os.getenv("IS_TESTING_DEPLOY_GPU"):
    DEPLOY_GPU, DEPLOY_NGPU = (
        aip.gapic.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED,
        int(os.getenv("IS_TESTING_DEPLOY_GPU")),
    )
else:
    DEPLOY_GPU, DEPLOY_NGPU = (None, None)

In [129]:
print(os.getenv("IS_TESTING_TF"))

None


In [130]:
if os.getenv("IS_TESTING_TF"):
    TF = os.getenv("IS_TESTING_TF")
else:
    TF = "2.5".replace(".", "-")

if TF[0] == "2":
    if TRAIN_GPU:
        TRAIN_VERSION = "tf-gpu.{}".format(TF)
    else:
        TRAIN_VERSION = "tf-cpu.{}".format(TF)
    if DEPLOY_GPU:
        DEPLOY_VERSION = "tf2-gpu.{}".format(TF)
    else:
        DEPLOY_VERSION = "tf2-cpu.{}".format(TF)
else:
    if TRAIN_GPU:
        TRAIN_VERSION = "tf-gpu.{}".format(TF)
    else:
        TRAIN_VERSION = "tf-cpu.{}".format(TF)
    if DEPLOY_GPU:
        DEPLOY_VERSION = "tf-gpu.{}".format(TF)
    else:
        DEPLOY_VERSION = "tf-cpu.{}".format(TF)

TRAIN_IMAGE = "{}-docker.pkg.dev/vertex-ai/training/{}:latest".format(
    REGION.split("-")[0], TRAIN_VERSION
)
DEPLOY_IMAGE = "{}-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(
    REGION.split("-")[0], DEPLOY_VERSION
)

print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

Training: us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-5:latest AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED 0
Deployment: us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-5:latest None None


In [131]:
if os.getenv("IS_TESTING_TRAIN_MACHINE"):
    MACHINE_TYPE = os.getenv("IS_TESTING_TRAIN_MACHINE")
else:
    MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

Train machine type n1-standard-4


In [132]:
# Make folder for Python training script
! rm -rf custom
! mkdir custom

# Add package information
! touch custom/README.md

setup_cfg = "[egg_info]\n\ntag_build =\n\ntag_date = 0"
! echo "$setup_cfg" > custom/setup.cfg

setup_py = "import setuptools\n\nsetuptools.setup(\n\n    install_requires=[\n\n        'tensorflow==2.5.0',\n\n        'tensorflow_datasets==1.3.0',\n\n    ],\n\n    packages=setuptools.find_packages())"
! echo "$setup_py" > custom/setup.py

pkg_info = "Metadata-Version: 1.0\n\nName: Classification cloud\n\nVersion: 0.0.0\n\nSummary: Demonstration training script\n\nHome-page: www.google.com\n\nAuthor: Google\n\nAuthor-email: aferlitsch@google.com\n\nLicense: Public\n\nDescription: Demo\n\nPlatform: Vertex"
! echo "$pkg_info" > custom/PKG-INFO

# Make the training subfolder
! mkdir custom/trainer
! touch custom/trainer/__init__.py

In [133]:
%%writefile custom/trainer/task.py
# Single, Mirrored and MultiWorker Distributed Training

import os
# os.environ["KERAS_BACKEND"] = "tensorflow"

import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_datasets as tfds
from tensorflow.python.client import device_lib
import numpy as np
import argparse
import sys
import logging

parser = argparse.ArgumentParser()
parser.add_argument('--model-dir', dest='model_dir',
                    default=os.getenv('AIP_MODEL_DIR'), type=str, help='Model dir.')
parser.add_argument('--lr', dest='lr',
                    default=0.001, type=float,
                    help='Learning rate.')
parser.add_argument('--epochs', dest='epochs',
                    default=10, type=int,
                    help='Number of epochs.')
parser.add_argument('--steps', dest='steps',
                    default=100, type=int,
                    help='Number of steps per epoch.')
parser.add_argument('--batch_size', dest='batch_size',
                    default=16, type=int,
                    help='Size of a batch.')
parser.add_argument('--distribute', dest='distribute', type=str, default='single',
                    help='distributed training strategy')
parser.add_argument('--param-file', dest='param_file',
                    default='/tmp/param.txt', type=str,
                    help='Output file for parameters')
args = parser.parse_args()

logging.info('DEVICES'  + str(device_lib.list_local_devices()))

# Single Machine, single compute device
if args.distribute == 'single':
    if tf.test.is_gpu_available():
        strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    else:
        strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
    logging.info("Single device training")
# Single Machine, multiple compute device
elif args.distribute == 'mirrored':
    strategy = tf.distribute.MirroredStrategy()
    logging.info("Mirrored Strategy distributed training")
# Multi Machine, multiple compute device
elif args.distribute == 'multiworker':
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    logging.info("Multi-worker Strategy distributed training")
    logging.info('TF_CONFIG = {}'.format(os.environ.get('TF_CONFIG', 'Not found')))
    # Single Machine, multiple TPU devices
elif args.distribute == 'tpu':
    cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local")
    tf.config.experimental_connect_to_cluster(cluster_resolver)
    tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
    strategy = tf.distribute.TPUStrategy(cluster_resolver)
    print("All devices: ", tf.config.list_logical_devices('TPU'))

logging.info('num_replicas_in_sync = {}'.format(strategy.num_replicas_in_sync))


def get_compiled_model():
    # Make a simple 2-layer densely-connected neural network.
    inputs = keras.Input(shape=(784,))
    x = keras.layers.Dense(256, activation="relu")(inputs)
    x = keras.layers.Dense(256, activation="relu")(x)
    outputs = keras.layers.Dense(10)(x)
    model = keras.Model(inputs, outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model


def get_dataset():
    batch_size = 32
    num_val_samples = 10000

    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # Preprocess the data (these are Numpy arrays)
    x_train = x_train.reshape(-1, 784).astype("float32") / 255
    x_test = x_test.reshape(-1, 784).astype("float32") / 255
    y_train = y_train.astype("float32")
    y_test = y_test.astype("float32")

    # Reserve num_val_samples samples for validation
    x_val = x_train[-num_val_samples:]
    y_val = y_train[-num_val_samples:]
    x_train = x_train[:-num_val_samples]
    y_train = y_train[:-num_val_samples]
    return (
        tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size),
        tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size),
        tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size),
    )

# Create a MirroredStrategy.
# strategy = tf.distribute.MirroredStrategy()
print("Number of devices: {}".format(strategy.num_replicas_in_sync))

# Open a strategy scope.
with strategy.scope():
    # Everything that creates variables should be under the strategy scope.
    # In general this is only model construction & `compile()`.
    model = get_compiled_model()

    # Train the model on all available devices.
    train_dataset, val_dataset, test_dataset = get_dataset()
    model.fit(train_dataset, epochs=args.epochs, validation_data=val_dataset)

    # Test the model on all available devices.
    model.evaluate(test_dataset)

Writing custom/trainer/task.py


In [140]:
! rm -f custom.tar custom.tar.gz
! tar cvf custom.tar custom
! gzip custom.tar
! gsutil cp custom.tar.gz $BUCKET_URI/model/classification.tar.gz

custom/
custom/README.md
custom/setup.cfg
custom/PKG-INFO
custom/setup.py
custom/trainer/
custom/trainer/__init__.py
custom/trainer/.ipynb_checkpoints/
custom/trainer/.ipynb_checkpoints/task-checkpoint.py
custom/trainer/task.py
Copying file://custom.tar.gz [Content-Type=application/x-tar]...
/ [1 files][  2.3 KiB/  2.3 KiB]                                                
Operation completed over 1 objects/2.3 KiB.                                      


In [144]:
DISPLAY_NAME = "classification_" + UUID

job = aip.CustomPythonPackageTrainingJob(
    display_name=DISPLAY_NAME,
    python_package_gcs_uri=f"{BUCKET_URI}/model/classification.tar.gz",
    python_module_name="trainer.task",
    container_uri=TRAIN_IMAGE,
    model_serving_container_image_uri=DEPLOY_IMAGE,
    project=PROJECT_ID,
)

In [145]:
MODEL_DIR = BUCKET_URI

print(MODEL_DIR)


gs://vddl-test-vddl-419615-unique


In [None]:
CMDARGS = ["--epochs=5", "--batch_size=16", "--distribute=multiworker"]

try:
    model = job.run(
        model_display_name="classification_" + UUID,
        args=CMDARGS,
        replica_count=4,
        machine_type=TRAIN_COMPUTE,
        accelerator_type=TRAIN_GPU.name,
        accelerator_count=TRAIN_NGPU,
        base_output_dir=MODEL_DIR,
        sync=True,
    )
except Exception as e:
    # may fail duing model.save() -- seems to be some issue when merging checkpoints from the workers
    print(e)

Training Output directory:
gs://vddl-test-vddl-419615-unique 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/7635285854566481920?project=170952101248
CustomPythonPackageTrainingJob projects/170952101248/locations/us-central1/trainingPipelines/7635285854566481920 current state:
PipelineState.PIPELINE_STATE_RUNNING
View backing custom job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/5069210433290764288?project=170952101248
CustomPythonPackageTrainingJob projects/170952101248/locations/us-central1/trainingPipelines/7635285854566481920 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomPythonPackageTrainingJob projects/170952101248/locations/us-central1/trainingPipelines/7635285854566481920 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomPythonPackageTrainingJob projects/170952101248/locations/us-central1/trainingPipelines/7635285854566481920 current state:
PipelineState.PIPELINE_STATE_RUNNING
Cust

In [147]:
print(model)

<google.cloud.aiplatform.models.Model object at 0x7f164354a080> 
resource name: projects/170952101248/locations/us-central1/models/1829131051293736960


In [None]:
job.delete()