In [None]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Custom training and batch prediction
## Run on a notebook with multiple GPUs

<table align="left">
  <td>
    <a href="https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/official/custom/sdk-custom-image-classification-batch.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      Based on this GitHub Notebook
    </a>
  </td>
</table>
<br/><br/><br/>

## Overview


This tutorial demonstrates how to use the Vertex SDK for Python to train and deploy a custom image classification model for batch prediction.

### Dataset

The dataset used for this tutorial is the [cifar10 dataset](https://pytorch.org/vision/stable/datasets.html#cifar) from [Pytorch Datasets](https://pytorch.org/vision/stable/datasets.html). The version of the dataset you will use is built into TensorFlow. The trained model predicts which type of class an image is from ten classes: airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck.

### Objective

In this notebook, you create a custom-trained model from a Python script in a Docker container using the Vertex SDK for Python, and then do a prediction on the deployed model by sending data. Alternatively, you can create custom-trained models using `gcloud` command-line tool, or online using the Cloud Console.

The steps performed include:

- Create a Vertex AI custom job for training a model.
- Train a Pytorch model.
- Make a batch prediction.
- Cleanup resources.

### Costs

This tutorial uses billable components of Google Cloud (GCP):

* Vertex AI
* Cloud Storage
* GPUs

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [GPU pricing](https://cloud.google.com/compute/gpus-pricing), and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Installation

Install the latest (preview) version of Vertex SDK for Python.

In [None]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [None]:
! pip3 install {USER_FLAG} --upgrade google-cloud-aiplatform

Install the latest GA version of *google-cloud-storage* library as well.

In [None]:
! pip3 install {USER_FLAG} --upgrade google-cloud-storage

Install the *pillow* library for loading images.

In [None]:
! pip3 install {USER_FLAG} --upgrade pillow

Install the *numpy* library for manipulation of image data.

In [None]:
! pip3 install {USER_FLAG} --upgrade numpy

### Restart the kernel

Once you've installed everything, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

### Select a GPU runtime

**Make sure you're running this notebook in a GPU runtime if you have that option. In Colab, select "Runtime --> Change runtime type > GPU"**

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

3. [Enable the Vertex AI API and Compute Engine API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component).

4. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

5. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [None]:
import os

PROJECT_ID = ""

if not os.getenv("IS_TESTING"):
    # Get your Google Cloud project ID from gcloud
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set your project ID here.

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

### Authenticate your Google Cloud account

**If you are using Google Cloud Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "Vertex AI"
into the filter box, and select
   **Vertex AI Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you submit a training job using the Cloud SDK, you upload a Python package
containing your training code to a Cloud Storage bucket. Vertex AI runs
the code from this package. In this tutorial, Vertex AI also saves the
trained model that results from your job in the same bucket. Using this model artifact, you can then create Vertex AI model resources.

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. Make sure to [choose a region where Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). You may
not use a Multi-Regional Storage bucket for training with Vertex AI.

In [None]:
BUCKET_NAME = "gs://[your-bucket-name]" # Change to your bucket
REGION = "us-central1"  # Change to your region

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_NAME

### Service Account

You use a service account to create the Vertex AI Training job. If you do not want to use your project's Compute Engine service account, set SERVICE_ACCOUNT to another service account ID.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"

In [None]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if IS_GOOGLE_CLOUD_NOTEBOOK:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if not IS_GOOGLE_CLOUD_NOTEBOOK:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"
    
    print("Service Account:", SERVICE_ACCOUNT)

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Set up variables

Next, set up some variables used throughout the tutorial.

#### Import Vertex SDK for Python

Import the Vertex SDK for Python into your Python environment and initialize it.

In [None]:
import os
import sys

from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

# Tutorial

Now you are ready to start creating your own custom-trained model with CIFAR10.

## Train a model

There are two ways you can train a custom model using a container image:

- **Use a Google Cloud prebuilt container**. If you use a prebuilt container, you will additionally specify a Python package to install into the container image. This Python package contains your code for training a custom model.

- **Use your own custom container image**. If you use your own container, the container needs to contain your code for training a custom model.

### Define the command args for the training script

Prepare the command-line arguments to pass to your training script.
- `args`: The command line arguments to pass to the corresponding Python module. In this example, they will be:

  - `"--dist-url=" + "env://": The number of epochs for training.
  - `"--num_epochs=" + EPOCHS`: The number of epochs for training.
  - `"--multiprocessing-distributed=" : Pass this in when you want to use distributed processing across multiple workers. Defaults to single worker/single GPU 
  
  For local training in the Notebook, also include the following:
  - `"--rank=" + "0": Setting this to 0 lets the program know we're doing single worker training
  - `"--model_dir=" + SAVED_MODEL_DIR: The directory to save models
  - `"--local_training=": Local training flag

"""
#### Training script

In the next cell, you will write the contents of the training script, `task.py`. In summary:

- Get the directory where to save the model artifacts from the environment variable `AIP_MODEL_DIR`. This variable is set by the training service.
- Loads CIFAR10 dataset from Torchvision datasets.
- Builds a custom pytorch model.
- Sets a training distribution strategy
- Trains the model with epochs according to the arguments `args.epochs`
- Saves the trained model (`save(MODEL_DIR)`) to the specified model directory.
"""

In [None]:
%%writefile task.py
import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.multiprocessing as mp
import torch.distributed as dist

import torchvision
import torchvision.models as models
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np
from datetime import datetime
from google.cloud import storage

#Limit loglevel for TF, since by default it logs a bunch of these unimportant messages
#e.g. tensorflow/stream_executor/platform/default/dso_loader.cc
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

#Tensorboard libraries and variables
from torch.utils.tensorboard import SummaryWriter
LOCAL_TB_DIR = '/tmp/logs'
tb_dir = os.getenv('AIP_TENSORBOARD_LOG_DIR', LOCAL_TB_DIR)
tb_dir.replace ("gs://", "/gcs/")
writer = SummaryWriter(log_dir=tb_dir)

# function to generate random seeds to ensure models are the same in different processes
def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

# function to evaluate the model
def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

# main function to parse args and kickoff training
def main():

    model_names = sorted(name for name in models.__dict__
    if name.islower() and not name.startswith("__")
    and callable(models.__dict__[name]))

    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=100)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=256)
    parser.add_argument("--learning_rate", dest='learning_rate', type=float, help="Learning rate.", default=0.1)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=0)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=os.environ['AIP_MODEL_DIR'] if 'AIP_MODEL_DIR' in os.environ else "")
    parser.add_argument("--model_filename", type=str, help="Model filename.", default="resnet_distributed.pth")
    parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', choices=model_names,
                    help='model architecture: ' + ' | '.join(model_names) + ' (default: resnet18)')
    parser.add_argument('--momentum', default=0.9, type=float, metavar='M', help='momentum')
    parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float,
                    metavar='W', help='weight decay (default: 1e-4)',
                    dest='weight_decay')
    parser.add_argument('--pretrained', dest='pretrained', action='store_true',
                    help='use pre-trained model')
    parser.add_argument('--local_training', dest='local_training', action='store_true',
                    help='use local machine for training')
    parser.add_argument('--rank', default=-1, type=int,
                    help='node rank for distributed training')
    parser.add_argument('--world-size', default=int(os.getenv('WORLD_SIZE', -1)), type=int,
                    help='number of nodes for distributed training')
    parser.add_argument('--dist-url', default='http://localhost:8082', type=str,
                    help='url used to set up distributed training') # From https://cloud.google.com/ai-platform/training/docs/distributed-pytorch#updating_your_training_code
    parser.add_argument('--dist-backend', default='nccl', type=str,
                    help='distributed backend') # default to GPU. Can change to CPU using 'gloo'
    parser.add_argument('--multiprocessing-distributed', action='store_true',
                    help='Use multi-processing distributed training to launch '
                         'N processes per node, which has N GPUs. This is the '
                         'fastest way to use PyTorch for either single node or '
                         'multi node data parallel training')  
    parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.')  
    parser.add_argument('--workers', default=4, type=int, metavar='N',
                    help='number of data loading workers (default: 4)')
    argv = parser.parse_args()

    if argv.dist_url == "env://" and argv.world_size == -1:
        argv.world_size = int(os.environ["WORLD_SIZE"])

    argv.distributed = argv.world_size > 1 or argv.multiprocessing_distributed

    ngpus_per_node = torch.cuda.device_count()
    
    # debugging
    print (f"os WORLD_SIZE={os.getenv('WORLD_SIZE', -1)}")
    print (f"os RANK={os.getenv('RANK', 0)}")
    print (f"os MASTER_ADDR={os.getenv('MASTER_ADDR', 'localhost')}")
    print (f"os MASTER_PORT={os.getenv('MASTER_PORT', '8082')}")
    print (f'Arg - distributed={argv.distributed}')
    print (f'Arg - multiprocessing_distributed={argv.multiprocessing_distributed}')
    print (f'Arg - dist_backend={argv.dist_backend}')
    print (f'Arg - dist_url={argv.dist_url}')
    print (f'ngpus_per_node={ngpus_per_node}')

    start = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    print (f'Starting training: {start}')       
    if argv.multiprocessing_distributed:
        # Since we have ngpus_per_node processes per node, the total world_size
        # needs to be adjusted accordingly
        argv.world_size = ngpus_per_node * argv.world_size
        print ('GPU x WORLD SIZE = {}'.format(argv.world_size))
        # Use torch.multiprocessing.spawn to launch distributed processes: the
        # main_worker process function
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, argv))
    else:
        # Simply call main_worker function
        main_worker(argv.gpu, 0, argv)

    end = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    print (f'Training complete: {end}')

# function for each worker process used for distributed training
def main_worker(gpu, ngpus_per_node, args):
    global best_acc1
    args.gpu = gpu
    
    if args.gpu is not None:
        args.gpu = gpu # Set the gpu device for the current process
        print("Use GPU: {args.gpu} for training")

    # Set parameters distributed and multiprocess distributed training
    if args.distributed:
        if args.dist_url == "env://" and args.rank == -1:
            args.rank = int(os.environ["RANK"])
            print (f"Distributed and getting rank from os.environ: rank={args.rank}")
        if args.multiprocessing_distributed:
            # For multiprocessing distributed training, rank needs to be the
            # global rank among all the processes
            args.rank = args.rank * ngpus_per_node + gpu
            print (f"Distributed and Multiprocesing. Setting rank for each worker. rank={args.rank}")
        dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                world_size=args.world_size, rank=args.rank)
        print ("Process group initialized")
    
    # Set variables for training
    num_epochs = args.num_epochs
    batch_size = args.batch_size
    random_seed = args.random_seed
    model_dir = args.model_dir
    model_filename = args.model_filename
    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)
    print ("random seeds")
    
    # create model
    if args.pretrained:
        print(f"=> using pre-trained model '{args.arch}'")
        model = models.__dict__[args.arch](pretrained=True)
    else:
        print(f"=> creating model '{args.arch}'")
        model = models.__dict__[args.arch]()   

    if args.distributed:
        # For multiprocessing distributed, DistributedDataParallel constructor
        # should always set the single device scope, otherwise,
        # DistributedDataParallel will use all available devices.
        if args.gpu is not None:
            torch.cuda.set_device(args.gpu)
            model.cuda(args.gpu)
            # When using a single GPU per process and per
            # DistributedDataParallel, we need to divide the batch size
            # ourselves based on the total number of GPUs we have
            args.batch_size = int(args.batch_size / ngpus_per_node)
            args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)

            # Encapsulate the model on the GPU assigned to the current process
            device = torch.device("cuda:{}".format(args.rank))
            print (f"Distributed GPU device={device}")
            model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
        else:
            model.cuda()
            # DistributedDataParallel will divide and allocate batch_size to all
            # available GPUs if device_ids are not set
            model = torch.nn.parallel.DistributedDataParallel(model)
            print (f"Distributed CPU device used")
    elif args.gpu is not None:
        torch.cuda.set_device(args.gpu)
        model = model.cuda(args.gpu)
        device = torch.device(f"cuda:{args.rank}")
        print (f"Non-distributed GPU device id ={args.gpu}")
    else:
        # DataParallel will divide and allocate batch_size to all available GPUs
        if args.arch.startswith('alexnet') or args.arch.startswith('vgg'):
            model.features = torch.nn.DataParallel(model.features)
            device = torch.device(f"cpu:{args.rank}")
            #model.cuda()
        else:
            model = torch.nn.DataParallel(model)  
            device = torch.device(f"cpu:{args.rank}")
        print (f"Non distributed CPU device used")

    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(args.gpu)
    optimizer = torch.optim.SGD(model.parameters(), args.learning_rate,
                                momentum=args.momentum,
                                weight_decay=args.weight_decay)      
    
    cudnn.benchmark = True    
    
    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    # Data should be prefetched
    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

    if args.distributed:
        # Restricts data loading to a subset of the dataset exclusive to the current process
        train_sampler = DistributedSampler(dataset=train_set)

        # Load training data - set num_workers to turn multi-process data loading
        train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, pin_memory=True)
    else:
        train_loader = DataLoader(dataset=train_set, batch_size=batch_size)
    
    # Test loader does not have to follow distributed sampling strategy
    # Load training data - set num_workers to turn multi-process data loading
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=4)

    #criterion = nn.CrossEntropyLoss()
    #optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        epoch_start = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
        print("Rank: {}, Epoch: {}, Training start: {}".format(args.rank, epoch, epoch_start))
        
        # Evaluate model routinely
        if epoch % 10 == 0:
            if args.rank == 0:
                accuracy = evaluate(model=model, device=device, test_loader=test_loader)
                if args.local_training:
                    torch.save(model.module.state_dict(), model_filepath)
                    print ('saving model to local folders')
                else:
                    # Save locally, then copy to GCS - https://cloud.google.com/vertex-ai/docs/training/exporting-model-artifacts
                    torch.save(model.module.state_dict(), model_filename)
                    # Upload model artifact to Cloud Storage
                    model_directory = os.environ['AIP_MODEL_DIR']
                    print (f"AIP_MODEL_DIR={model_directory}")
                    storage_path = os.path.join(model_directory, model_filename)
                    print (f"storage_path={storage_path}")
                    blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
                    blob.upload_from_filename(model_filename)
                    
                    # Save full model
                    fullmodel_name = f'fullmodel_{model_filename}'
                    torch.save(model.module.state_dict(), fullmodel_name)
                    storage_path = os.path.join(model_directory, fullmodel_name)
                    print (f"storage_path={storage_path}")
                    blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
                    blob.upload_from_filename(fullmodel_name)
                print("-" * 75)
                epoch_middle = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
                print(f"Epoch: {epoch}, Accuracy: {accuracy}, Time: {epoch_middle}")
                print("-" * 75)

        # Switch to training mode
        model.train()    
            
        for data in train_loader:
            if args.gpu is not None:
                inputs = data[0].cuda(args.gpu, non_blocking=True)
                #print(f'training with gpu {args.gpu}')
                labels = data[1].cuda(args.gpu, non_blocking=True)
            else:
                inputs = data[0]
                labels = data[1]
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            #Write loss to Tensorboard
            writer.add_scalar("Loss/train", loss, epoch)
            loss.backward()
            optimizer.step()
    
    if args.rank == 0:
        if args.local_training:
            torch.save(model.module.state_dict(), model_filepath) #For pretrained models, need to replaced module. keys - https://discuss.pytorch.org/t/missing-keys-unexpected-keys-in-state-dict-when-loading-self-trained-model/22379/14
        else:
            if args.rank == 0:
                fullmodel_name = f'fullmodel_{model_filename}'
                torch.save(model.module.state_dict(), fullmodel_name)
                model_directory = os.environ['AIP_MODEL_DIR']
                storage_path = os.path.join(model_directory, fullmodel_name)
                print (f"storage_path={storage_path}")
                blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
                blob.upload_from_filename(fullmodel_name)
                
    epoch_end = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    writer.close()
    print (f'Epoch complete: {epoch_end}')
            
if __name__ == "__main__":
    main()

#### Set hardware accelerators

You can set hardware accelerators for both training and prediction.

Set the variables `TRAIN_GPU/TRAIN_NGPU` and `DEPLOY_GPU/DEPLOY_NGPU` to use a container image supporting a GPU and the number of GPUs allocated to the virtual machine (VM) instance. For example, to use a GPU container image with 4 Nvidia Tesla K80 GPUs allocated to each VM, you would specify:

    (aip.AcceleratorType.NVIDIA_TESLA_K80, 4)

See the [locations where accelerators are available](https://cloud.google.com/vertex-ai/docs/general/locations#accelerators).

Otherwise specify `(None, None)` to use a container image to run on a CPU.

*Note*: TensorFlow releases earlier than 2.3 for GPU support fail to load the custom model in this tutorial. This issue is caused by static graph operations that are generated in the serving function. This is a known issue, which is fixed in TensorFlow 2.3. If you encounter this issue with your own custom models, use a container image for TensorFlow 2.3 or later with GPU support.

In [None]:
TRAIN_GPU, TRAIN_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_T4, 1)

#### Set pre-built containers

Vertex AI provides pre-built containers to run training and prediction.

For the latest list, see [Pre-built containers for training](https://cloud.google.com/vertex-ai/docs/training/pre-built-containers) and [Pre-built containers for prediction](https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers)

In [None]:
# Uses GPU image for training, no pytorch prebuilt deployment images
TRAIN_VERSION = "pytorch-gpu.1-11"

TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)

print("Training:", TRAIN_IMAGE)
print(TRAIN_GPU, TRAIN_NGPU)

#### Set machine types

Next, set the machine types to use for training and prediction.

- Set the variables `TRAIN_COMPUTE` and `DEPLOY_COMPUTE` to configure your compute resources for training and prediction.
 - `machine type`
     - `n1-standard`: 3.75GB of memory per vCPU
     - `n1-highmem`: 6.5GB of memory per vCPU
     - `n1-highcpu`: 0.9 GB of memory per vCPU
 - `vCPUs`: number of \[2, 4, 8, 16, 32, 64, 96 \]

*Note: The following is not supported for training:*

 - `standard`: 2 vCPUs
 - `highcpu`: 2, 4 and 8 vCPUs

*Note: You may also use n2 and e2 machine types for training and deployment, but they do not support GPUs*.

In [None]:
MACHINE_TYPE = "n1-highmem"

VCPU = "16"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)
TRAIN_NCOMPUTE_MASTER = 1
TRAIN_NCOMPUTE_WORKER = 2

### Train the model

Define your custom training job on Vertex AI.

Use the `CustomTrainingJob` class to define the job, which takes the following parameters:

- `display_name`: The user-defined name of this training pipeline.
- `script_path`: The local path to the training script.
- `container_uri`: The URI of the training container image.
- `requirements`: The list of Python package dependencies of the script.
- `model_serving_container_image_uri`: The URI of a container that can serve predictions for your model — either a prebuilt container or a custom container.

Use the `run` function to start training, which takes the following parameters:

- `args`: The command line arguments to be passed to the Python script.
- `replica_count`: The number of worker replicas.
- `model_display_name`: The display name of the `Model` if the script produces a managed `Model`.
- `machine_type`: The type of machine to use for training.
- `accelerator_type`: The hardware accelerator type.
- `accelerator_count`: The number of accelerators to attach to a worker replica.

The `run` function creates a training pipeline that trains and creates a `Model` object. After the training pipeline completes, the `run` function returns the `Model` object.

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
print (TIMESTAMP)

In [None]:
JOB_NAME = "cifar10_resnet_custom_job_spawn_T4_gpu_" + TIMESTAMP

ARGS = [
    "--dist-url=" + "env://",
    "--multiprocessing-distributed",
    "--num_epochs=101"
]

#### Run a local test

In [None]:
%mkdir -p saved_models

### Create tensorboard instance

### Create a Vertex Tensorboard Instance

In [None]:
tensorboard = aiplatform.Tensorboard.create(
    display_name=content_name,
)

#### Option: Use a Previously Created Vertex Tensorboard Instance

```
tensorboard_name = "Your Tensorboard Resource Name or Tensorboard ID"
tensorboard = aiplatform.Tensorboard(tensorboard_name=tensorboard_name)
```

In [None]:
TENSORBOARD_NAME = tensorboard.resource_name
print (TENSORBOARD_NAME)

In [None]:
#Set env for local test
os.environ["MASTER_ADDR"]="localhost"
os.environ["MASTER_PORT"]="8082"
os.environ["RANK"]="0"
os.environ['WORLD_SIZE']="1"
!python3 task.py --rank 0 \
--dist-url "env://" \
--dist-backend "nccl" \
--num_epochs 2 \
--model_dir "saved_models" \
--local_training \
--multiprocessing-distributed

In [None]:
base_output_dir = '{}/jobs/{}'.format(BUCKET_NAME, JOB_NAME)
# Change to your tensorboard and service account
TENSORBOARD = TENSORBOARD_NAME
VERTEX_SA = SERVICE_ACCOUNT

job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri=TRAIN_IMAGE,
    staging_bucket=base_output_dir,
    requirements=["tensorflow"],
#    model_serving_container_image_uri=DEPLOY_IMAGE,
)

MODEL_DISPLAY_NAME = "cifar10-pytorch-" + TIMESTAMP

# Start the training
if TRAIN_GPU:
    model = job.run(
        args=ARGS,
        replica_count=TRAIN_NCOMPUTE_WORKER + TRAIN_NCOMPUTE_MASTER,
        machine_type=TRAIN_COMPUTE,
        tensorboard=TENSORBOARD,
        service_account=VERTEX_SA,
        sync=False,
        accelerator_type=TRAIN_GPU.name,
        accelerator_count=TRAIN_NGPU,
        #        model_display_name=MODEL_DISPLAY_NAME,
    )
else:
    model = job.run(
        args=ARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_count=0,
        tensorboard=TENSORBOARD,
        service_account=VERTEX_SA,
        sync=False,
        #        model_display_name=MODEL_DISPLAY_NAME,
    )

In [None]:
NETWORK=f"projects/780788467724/global/networks/default"
JOB_NAME = "cifar10_resnet_custom_job_spawn_T4_gpu_VPC_" + TIMESTAMP

base_output_dir = '{}/jobs/{}'.format(BUCKET_NAME, JOB_NAME)
# Change to your tensorboard and service account
TENSORBOARD = TENSORBOARD_NAME
VERTEX_SA = SERVICE_ACCOUNT

job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri=TRAIN_IMAGE,
    staging_bucket=base_output_dir,
    requirements=["tensorflow"],
#    model_serving_container_image_uri=DEPLOY_IMAGE,
)

MODEL_DISPLAY_NAME = "cifar10-pytorch-" + TIMESTAMP

# Start the training
if TRAIN_GPU:
    model = job.run(
        args=ARGS,
        replica_count=TRAIN_NCOMPUTE_WORKER + TRAIN_NCOMPUTE_MASTER,
        machine_type=TRAIN_COMPUTE,
        tensorboard=TENSORBOARD,
        service_account=VERTEX_SA,
        sync=False,
        accelerator_type=TRAIN_GPU.name,
        accelerator_count=TRAIN_NGPU,
        #        model_display_name=MODEL_DISPLAY_NAME,
    )
else:
    model = job.run(
        args=ARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_count=0,
        tensorboard=TENSORBOARD,
        service_account=VERTEX_SA,
        sync=False,
         network=NETWORK,
    )

# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

- Training Job
- Model
- Cloud Storage Bucket