# Convert Black-n-White image to Color Images - Vertex AI Training Job

### Dependencies

#### Before running this notebook, please make sure you have already installed the following libraries with correct versions.

- numpy==1.21.6
- google-cloud-aiplatform==1.24.1
- google-cloud-storage==2.9.0
- pillow==9.5.0

## Imports

In [1]:
import numpy as np
import glob
import matplotlib.pyplot as plt
import os
from google.cloud import aiplatform
%matplotlib inline

## Setup project configurations

In [2]:
PROJECT_ID='417812395597'
REGION='us-west2'
BUCKET_URI='gs://my-training-artifacts'

## Initialize Vertex AI (SDK)

In [3]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

## Setup pre-built contianer for training

In [4]:
TRAIN_VERSION = "tf-cpu.2-9"
DEPLOY_VERSION = "tf2-cpu.2-9"
TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(DEPLOY_VERSION)

## Define command-line arguments for training

In [5]:
JOB_NAME = "vertex_custom_training"
MODEL_DIR = "{}/{}".format(BUCKET_URI, JOB_NAME)

TRAIN_STRATEGY = "single"
EPOCHS = 20
STEPS = 100

CMDARGS = [
    "--epochs=" + str(EPOCHS),
    "--steps=" + str(STEPS),
    "--distribute=" + TRAIN_STRATEGY,
]

## Writing down entire training script into a "task.py" file

### This task.py will run inside the contianer that we have defined above (could be a custom contianer with all dependencies)
#### task.py should have the entire training flow including - 
- Load and prepared the training data
- define model architecure
- train model
- save model

In [1]:
%%writefile task.py
# Single, Mirror and Multi-Machine Distributed Training

import tensorflow as tf
import tensorflow
from tensorflow.python.client import device_lib
import argparse
import os
import sys
from io import BytesIO
import numpy as np
from tensorflow.python.lib.io import file_io

# parse required arguments
parser = argparse.ArgumentParser()
parser.add_argument('--lr', dest='lr',
                    default=0.001, type=float,
                    help='Learning rate.')
parser.add_argument('--epochs', dest='epochs',
                    default=10, type=int,
                    help='Number of epochs.')
parser.add_argument('--steps', dest='steps',
                    default=35, type=int,
                    help='Number of steps per epoch.')
parser.add_argument('--distribute', dest='distribute', type=str, default='single',
                    help='distributed training strategy')
args = parser.parse_args()

print('Python Version = {}'.format(sys.version))
print('TensorFlow Version = {}'.format(tf.__version__))
print('TF_CONFIG = {}'.format(os.environ.get('TF_CONFIG', 'Not found')))
print('DEVICES', device_lib.list_local_devices())

# Single Machine, single compute device
if args.distribute == 'single':
    if tf.test.is_gpu_available():
        strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    else:
        strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
# Single Machine, multiple compute device
elif args.distribute == 'mirror':
    strategy = tf.distribute.MirroredStrategy()
# Multiple Machine, multiple compute device
elif args.distribute == 'multi':
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

# Multi-worker configuration
print('num_replicas_in_sync = {}'.format(strategy.num_replicas_in_sync))

# Preparing dataset
BUFFER_SIZE = 10000
BATCH_SIZE = 128

def make_datasets_unbatched():
    # Load train, validation and test sets
    dest = 'gs://data-bucket-417812395597/'
    train_x = np.load(BytesIO(
        file_io.read_file_to_string(dest+'train_x', binary_mode=True)
    ))
    train_y = np.load(BytesIO(
        file_io.read_file_to_string(dest+'train_y', binary_mode=True)
    ))
    val_x = np.load(BytesIO(
        file_io.read_file_to_string(dest+'val_x', binary_mode=True)
    ))
    val_y = np.load(BytesIO(
        file_io.read_file_to_string(dest+'val_y', binary_mode=True)
    ))
    test_x = np.load(BytesIO(
        file_io.read_file_to_string(dest+'test_x', binary_mode=True)
    ))
    test_y = np.load(BytesIO(
        file_io.read_file_to_string(dest+'test_y', binary_mode=True)
    ))
    return train_x, train_y, val_x, val_y, test_x, test_y

def tf_model():
    black_n_white_input = tensorflow.keras.layers.Input(shape=(80, 80, 1))
    
    enc = black_n_white_input
    
    #Encoder part
    enc = tensorflow.keras.layers.Conv2D(
        32, kernel_size=3, strides=2, padding='same'
    )(enc)
    enc = tensorflow.keras.layers.LeakyReLU(alpha=0.2)(enc)
    enc = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(enc)
    
    enc = tensorflow.keras.layers.Conv2D(
        64, kernel_size=3, strides=2, padding='same'
    )(enc)
    enc = tensorflow.keras.layers.LeakyReLU(alpha=0.2)(enc)
    enc = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(enc)
    
    enc = tensorflow.keras.layers.Conv2D(
        128, kernel_size=3, strides=2, padding='same'
    )(enc)
    enc = tensorflow.keras.layers.LeakyReLU(alpha=0.2)(enc)
    enc = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(enc)
    
    enc = tensorflow.keras.layers.Conv2D(
        256, kernel_size=1, strides=2, padding='same'
    )(enc)
    enc = tensorflow.keras.layers.LeakyReLU(alpha=0.2)(enc)
    enc = tensorflow.keras.layers.Dropout(0.5)(enc)
    
    #Decoder part
    dec = enc
    
    dec = tensorflow.keras.layers.Conv2DTranspose(
        256, kernel_size=3, strides=2, padding='same'
    )(dec)
    dec = tensorflow.keras.layers.Activation('relu')(dec)
    dec = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(dec)
    
    dec = tensorflow.keras.layers.Conv2DTranspose(
        128, kernel_size=3, strides=2, padding='same'
    )(dec)
    dec = tensorflow.keras.layers.Activation('relu')(dec)
    dec = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(dec)
    
    dec = tensorflow.keras.layers.Conv2DTranspose(
        64, kernel_size=3, strides=2, padding='same'
    )(dec)
    dec = tensorflow.keras.layers.Activation('relu')(dec)
    dec = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(dec)
    
    dec = tensorflow.keras.layers.Conv2DTranspose(
        32, kernel_size=3, strides=2, padding='same'
    )(dec)
    dec = tensorflow.keras.layers.Activation('relu')(dec)
    dec = tensorflow.keras.layers.BatchNormalization(momentum=0.8)(dec)
    
    dec = tensorflow.keras.layers.Conv2D(
        3, kernel_size=3, padding='same'
    )(dec)
    
    color_image = tensorflow.keras.layers.Activation('tanh')(dec)
    
    return black_n_white_input, color_image

# Build the and compile TF model
def build_and_compile_tf_model():
    black_n_white_input, color_image = tf_model()
    model = tensorflow.keras.models.Model(
        inputs=black_n_white_input,
        outputs=color_image
    )
    _optimizer = tensorflow.keras.optimizers.Adam(
        learning_rate=0.0002,
        beta_1=0.5
    )
    model.compile(
        loss='mse',
        optimizer=_optimizer
    )
    return model

# Train the model
NUM_WORKERS = strategy.num_replicas_in_sync
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size.
GLOBAL_BATCH_SIZE = BATCH_SIZE * NUM_WORKERS
MODEL_DIR = os.getenv("AIP_MODEL_DIR")

train_x, train_y, _, _, _, _ = make_datasets_unbatched()

with strategy.scope():
    # Creation of dataset, and model building/compiling need to be within
    # `strategy.scope()`.
    model = build_and_compile_tf_model()

model.fit(
    train_x,
    train_y,
    epochs=args.epochs,
    steps_per_epoch=args.steps
)
model.save(MODEL_DIR)

Overwriting task.py


## Define and submit vertex AI training job

In [17]:
job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri=TRAIN_IMAGE,
    requirements=[],
    model_serving_container_image_uri=DEPLOY_IMAGE,
)

MODEL_DISPLAY_NAME = "tf_bnw_to_color"

# Start the training job
model = job.run(
    model_display_name=MODEL_DISPLAY_NAME,
    args=CMDARGS,
    machine_type = "n1-standard-16",
    replica_count=1,
)

Training script copied to:
gs://my-training-artifacts/aiplatform-2023-04-18-09:25:16.215-aiplatform_custom_trainer_script-0.1.tar.gz.
Training Output directory:
gs://my-training-artifacts/aiplatform-custom-training-2023-04-18-09:25:16.320 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-west2/training/3454177351309459456?project=417812395597
CustomTrainingJob projects/417812395597/locations/us-west2/trainingPipelines/3454177351309459456 current state:
PipelineState.PIPELINE_STATE_RUNNING
View backing custom job:
https://console.cloud.google.com/ai/platform/locations/us-west2/training/6460048627602554880?project=417812395597
CustomTrainingJob projects/417812395597/locations/us-west2/trainingPipelines/3454177351309459456 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/417812395597/locations/us-west2/trainingPipelines/3454177351309459456 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/417812395597/locat