In [1]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/pipelines/google-cloud-pipeline-components_automl_images.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/pipelines/google-cloud-pipeline-components_automl_images.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/ai/platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/vertex-ai-samples/raw/master/notebooks/official/pipelines/google-cloud-pipeline-components_automl_images.ipynb">
      Open in Google Cloud Notebooks
    </a>
  </td>    
</table>

# Vertex Pipelines: AutoML Images pipelines using google-cloud-pipeline-components


## Reach out to 
@Saurabhmangal if needed.

## Overview

This notebook shows how to use the components defined in [`google_cloud_pipeline_components`](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud) to build an AutoML Images workflow on [Vertex Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines).

### Objective

In this example, you'll learn how to use components from `google_cloud_pipeline_components` to:
- create a _Dataset_
- train an AutoML Images model
- train a Custom ML model
- deploy the trained model to an _endpoint_ for serving
- deploy a batch prediction job for a subset of images

The components are [documented here](https://google-cloud-pipeline-components.readthedocs.io/en/latest/google_cloud_pipeline_components.aiplatform.html#module-google_cloud_pipeline_components.aiplatform).

### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AI Training and Serving
* Cloud Storage

Learn about pricing for [Vertex AI](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

**If you are using Colab or Google Cloud Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* The Google Cloud SDK
* Git
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

### Install additional packages


In [15]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [16]:
! pip install --quiet {USER_FLAG} google-cloud-aiplatform --upgrade

In [17]:
! pip install --quiet {USER_FLAG} kfp google-cloud-pipeline-components --upgrade

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [5]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING") or True:
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [18]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.12


## Before you begin

This notebook does not require a GPU runtime.

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable the Vertex AI, Cloud Storage, and Compute Engine APIs](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component,storage-component.googleapis.com). 

1. Follow the "**Configuring your project**" instructions from the Vertex Pipelines documentation.

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [19]:
import os

PROJECT_ID = "veretxai-demo-ce-apac" # <-- CHANGE THIS in customer setting

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)


Project ID:  veretxai-demo-ce-apac


Otherwise, set your project ID here.

In [8]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "python-docs-samples-tests"  # @param {type:"string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [20]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your Google Cloud account

**If you are using Google Cloud Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "Vertex AI"
into the filter box, and select
   **Vertex AI Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [21]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### Create a Cloud Storage bucket as necessary

You need a Cloud Storage bucket for this example.  If you don't have one that you want to use, you can make one now.


Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. Make sure to [choose a region where Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). You may
not use a Multi-Regional Storage bucket for training with Vertex Pipelines.

In [1]:
BUCKET_NAME = "gs://veretxai_image_recognition"  # @param {type:"string"} # <-- CHANGE THIS
REGION = "us-central1"  # @param {type:"string"}

In [2]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [3]:
# !gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [4]:
!gsutil ls -al $BUCKET_NAME

      3398  2022-06-17T01:12:06Z  gs://veretxai_image_recognition/auto_ml_prediction_images.jsonl#1655428326683248  metageneration=1
      2291  2022-06-17T01:12:11Z  gs://veretxai_image_recognition/custom_ml_prediction_images.txt#1655428331978491  metageneration=1
                                 gs://veretxai_image_recognition/batch_prediction_dataset/
                                 gs://veretxai_image_recognition/pipeline_root/
TOTAL: 2 objects, 5689 bytes (5.56 KiB)


### Import libraries and define constants

Define some constants. 


In [5]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

USER = "saurabhmangal"  # <---CHANGE THIS
PIPELINE_ROOT = "{}/pipeline_root/{}".format(BUCKET_NAME, USER)

PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://veretxai_image_recognition/pipeline_root/saurabhmangal'

## Training Data

In [6]:
!gsutil cp gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv .

Copying gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv...
/ [1 files][289.2 KiB/289.2 KiB]                                                
Operation completed over 1 objects/289.2 KiB.                                    


In [7]:
!head all_data_v2.csv

gs://cloud-ml-data/img/flower_photos/daisy/100080576_f52e8ee070_n.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10140303196_b88d3d6cec.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10172379554_b296050f82_n.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10172567486_2748826a8b.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10172636503_21bededa75_n.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/102841525_bd6628ae3c.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/1031799732_e7f4008c03.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10391248763_1d16681106_n.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10437754174_22ec990b77_m.jpg,daisy
gs://cloud-ml-data/img/flower_photos/daisy/10437770546_8bb6f7bdd3_m.jpg,daisy


## Define the Custom Training Job

In [8]:
import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient

from google.cloud.aiplatform.utils import source_utils
from typing import Callable, List

In [9]:
%%writefile task.py
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.python.client import device_lib
import argparse
import os
import sys
import json
import tqdm
from typing import List


def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ("yes", "true", "t", "y", "1"):
        return True
    elif v.lower() in ("no", "false", "f", "n", "0"):
        return False
    else:
        raise argparse.ArgumentTypeError("Boolean value expected.")


def parse_args():
    parser = argparse.ArgumentParser(description="Keras Image Classification")
    parser.add_argument(
        "--epochs", default=10, type=int, help="number of training epochs"
    )
    parser.add_argument("--image-width", default=32, type=int, help="image width")
    parser.add_argument("--image-height", default=32, type=int, help="image height")
    parser.add_argument("--batch-size", default=16, type=int, help="mini-batch size")
    parser.add_argument(
        "--model-dir",
        default=os.getenv("AIP_MODEL_DIR"),
        type=str,
        help="model directory",
    )
    parser.add_argument("--data-dir", default="./data", type=str, help="data directory")
    parser.add_argument(
        "--test-run",
        default=False,
        type=str2bool,
        help="test run the training application, i.e. 1 epoch for training using sample dataset",
    )
    parser.add_argument("--model-version", default=1, type=int, help="model version")
    parser.add_argument(
        "--lr", dest="lr", default=0.01, type=float, help="Learning rate."
    )
    parser.add_argument(
        "--steps",
        dest="steps",
        default=200,
        type=int,
        help="Number of steps per epoch.",
    )
    parser.add_argument(
        "--distribute",
        dest="distribute",
        type=str,
        default="single",
        help="distributed training strategy",
    )

    args = parser.parse_args()
    return args


args = parse_args()


def parse_image(filename):
    image = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [args.image_width, args.image_height])
    return image


# Scaling image data from (0, 255] to (0., 1.]
def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label


def load_aip_dataset(
    aip_data_uri_pattern: str,
    batch_size: int,
    class_names: List[str],
    test_run: bool,
    shuffle=True,
    repeat=False,
    seed=42,
):

    data_file_urls = list()
    labels = list()

    class_indices = dict(zip(class_names, range(len(class_names))))
    num_classes = len(class_names)

    for aip_data_uri in tqdm.tqdm(tf.io.gfile.glob(pattern=aip_data_uri_pattern)):
        with tf.io.gfile.GFile(name=aip_data_uri, mode="r") as gfile:
            for line in gfile.readlines():
                line = json.loads(line)
                data_file_urls.append(line["imageGcsUri"])
                classification_annotation = line["classificationAnnotations"][0]
                label = classification_annotation["displayName"]
                labels.append(class_indices[label])
                if test_run:
                    break

    filenames_ds = tf.data.Dataset.from_tensor_slices(data_file_urls)
    dataset = filenames_ds.map(
        parse_image, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )

    print(f" data files count: {len(data_file_urls)}")
    print(f" labels count: {len(labels)}")

    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    label_ds = label_ds.map(lambda x: tf.one_hot(x, num_classes))

    dataset = tf.data.Dataset.zip((dataset, label_ds)).map(scale).cache()

    if shuffle:
        # Shuffle locally at each iteration
        dataset = dataset.shuffle(buffer_size=batch_size * 8, seed=seed)

    if repeat:
        dataset = dataset.repeat()

    dataset = dataset.batch(batch_size)
    # Users may need to reference `class_names`.
    dataset.class_names = class_names

    return dataset


class_names = ["daisy", "dandelion", "roses", "sunflowers", "tulips"]
class_indices = dict(zip(class_names, range(len(class_names))))
num_classes = len(class_names)
print(f" class names: {class_names}")
print(f" class indices: {class_indices}")
print(f" num classes: {num_classes}")


# Get strategy
# Single Machine, single compute device
if args.distribute == "single":
    if tf.test.is_gpu_available():
        strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    else:
        strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
# Single Machine, multiple compute device
elif args.distribute == "mirror":
    strategy = tf.distribute.MirroredStrategy()
# Multiple Machine, multiple compute device
elif args.distribute == "multi":
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

# Multi-worker configuration
print("num_replicas_in_sync = {}".format(strategy.num_replicas_in_sync))

NUM_WORKERS = strategy.num_replicas_in_sync
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size.
GLOBAL_BATCH_SIZE = args.batch_size * NUM_WORKERS

aip_model_dir = os.environ.get("AIP_MODEL_DIR")
aip_data_format = os.environ.get("AIP_DATA_FORMAT")
aip_training_data_uri = os.environ.get("AIP_TRAINING_DATA_URI")
aip_validation_data_uri = os.environ.get("AIP_VALIDATION_DATA_URI")
# aip_test_data_uri = os.environ.get("AIP_TEST_DATA_URI")

print(f"aip_model_dir: {aip_model_dir}")
print(f"aip_data_format: {aip_data_format}")
print(f"aip_training_data_uri: {aip_training_data_uri}")
print(f"aip_validation_data_uri: {aip_validation_data_uri}")
# print(f"aip_test_data_uri: {aip_test_data_uri}")

print("Loading AIP dataset")
train_ds = load_aip_dataset(
    aip_training_data_uri,
    GLOBAL_BATCH_SIZE,
    class_names,
    args.test_run,
    shuffle=True,
    repeat=True,
)
print("AIP training dataset is loaded")
val_ds = load_aip_dataset(aip_validation_data_uri, 1, class_names, args.test_run)
print("AIP validation dataset is loaded")
# test_ds = load_aip_dataset(aip_test_data_uri, 1, class_names, args.test_run)
# print("AIP test dataset is loaded")

tfds.disable_progress_bar()

print("Python Version = {}".format(sys.version))
print("TensorFlow Version = {}".format(tf.__version__))
print("TF_CONFIG = {}".format(os.environ.get("TF_CONFIG", "Not found")))
print("DEVICES", device_lib.list_local_devices())

# Build the Keras model
def build_and_compile_cnn_model(num_classes: int, image_width: int, image_height: int):
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Conv2D(
                32, 3, activation="relu", input_shape=(image_width, image_height, 3)
            ),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
        ]
    )
    model.compile(
        loss=tf.keras.losses.categorical_crossentropy,
        optimizer=tf.keras.optimizers.SGD(learning_rate=args.lr),
        metrics=["accuracy"],
    )
    return model

# Train the model
model_dir = os.getenv("AIP_MODEL_DIR")

with strategy.scope():
    # Creation of dataset, and model building/compiling need to be within
    # `strategy.scope()`.
    model = build_and_compile_cnn_model(
        num_classes=num_classes,
        image_width=args.image_width,
        image_height=args.image_height,
    )

model.fit(
    x=train_ds, epochs=args.epochs, validation_data=val_ds, steps_per_epoch=args.steps
)

print("Saving to:" + model_dir)
if model_dir:
    model.save(model_dir)

Overwriting task.py


In [11]:
# model_dir

In [12]:
model_dir = './'

In [13]:
!ls

'Untitled Folder'		   image_recognition.ipynb
 Untitled.ipynb			   intro_pipeline_job.json
 Vertex_pipeline.ipynb		  'myfile (1).zip'
 all_data_v2.csv		   onnx-tensorflow
 auto_ml_prediction_images.jsonl   src
 custom_image			   task.py
 custom_ml_prediction_images.txt   tutorials
 first-component.yaml		   veretxai-demo-ce-apac-repo
 image_classif_pipeline.json


In [14]:
# Define dependencies, package task.py into a python package, and upload to Google Cloud 
PROJECT_ID = "veretxai-demo-ce-apac"
DEPENDENCIES = ["tqdm", "tensorflow_datasets==1.3.0"]
PYTHON_MODULE_NAME = f"{source_utils._TrainingScriptPythonPackager._ROOT_MODULE}.{source_utils._TrainingScriptPythonPackager.module_name}"

python_packager = source_utils._TrainingScriptPythonPackager(
    script_path='task.py', requirements=DEPENDENCIES
)

PACKAGE_GCS_URI = python_packager.package_and_copy_to_gcs(
    gcs_staging_dir=PIPELINE_ROOT,
    project=PROJECT_ID
)

INFO:google.cloud.aiplatform.utils.source_utils:Training script copied to:
gs://veretxai_image_recognition/pipeline_root/saurabhmangal/aiplatform-2022-08-02-02:33:58.282-aiplatform_custom_trainer_script-0.1.tar.gz.


In [16]:
# Define the container images for custom training and online and batch prediction
TRAIN_VERSION = "tf-gpu.2-1"
TRAIN_IMAGE = "gcr.io/cloud-aiplatform/training/{}:latest".format(TRAIN_VERSION)

DEPLOY_VERSION = "tf2-gpu.2-1"
DEPLOY_IMAGE = "gcr.io/cloud-aiplatform/prediction/{}:latest".format(DEPLOY_VERSION)

## Prepare data to be used in our Prediction Tasks (AutoML and CustomML)

In [17]:
# Let's select a subset of our training data
# The data is available from an open Google Cloud bucket, "cloud-ml-data"
# To simulate the Batch Prediction task, we take a few images from that dataset
batch_prediction_dataset_list = [
    "gs://cloud-ml-data/img/flower_photos/daisy/9595857626_979c45e5bf_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/daisy/9611923744_013b29e4da_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/daisy/9922116524_ab4a2533fe_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/daisy/99306615_739eb94b9e_m.jpg",
    "gs://cloud-ml-data/img/flower_photos/dandelion/10043234166_e6dd915111_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/dandelion/10200780773_c6051a7d71_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/dandelion/10294487385_92a0676c7d_m.jpg",
    "gs://cloud-ml-data/img/flower_photos/roses/15190665092_5c1c37a066_m.jpg",
    "gs://cloud-ml-data/img/flower_photos/roses/15202632426_d88efb321a_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/roses/15222804561_0fde5eb4ae_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/roses/15236835789_6009b8f33d.jpg",
    "gs://cloud-ml-data/img/flower_photos/roses/15255964274_cf2ecdf702.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/18828283553_e46504ae38.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/18843967474_9cb552716b.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/18972803569_1a0634f398_m.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/19349582128_68a662075e_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/19359539074_d7e32e6616_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/18828283553_e46504ae38.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/18843967474_9cb552716b.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/18972803569_1a0634f398_m.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/19349582128_68a662075e_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/sunflowers/19359539074_d7e32e6616_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/tulips/5680695867_baff72fc7c.jpg",
    "gs://cloud-ml-data/img/flower_photos/tulips/5682463466_d3e641cb8b.jpg",
    "gs://cloud-ml-data/img/flower_photos/tulips/5687705933_55a8c2dbac.jpg",
    "gs://cloud-ml-data/img/flower_photos/tulips/5691090657_2f1e9bf49e_n.jpg",
    "gs://cloud-ml-data/img/flower_photos/tulips/5691100579_4a2767360a.jpg"
]

In [19]:
import os

In [20]:
# Let's copy the dataset to a bucket we own, called BUCKET_NAME
from google.cloud import storage
from urllib.parse import urlparse, ParseResult

def move_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_name):
    storage_client = storage.Client()
    source_bucket = storage_client.bucket(bucket_name)
    source_blob = source_bucket.blob(blob_name)
    destination_bucket = storage_client.bucket(destination_bucket_name)
    source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)

for dataset in batch_prediction_dataset_list:
    url_parse = urlparse(dataset, allow_fragments=False)
    move_blob(bucket_name = os.path.basename(url_parse.netloc),
              blob_name = url_parse.path[1:], 
              destination_bucket_name = BUCKET_NAME[5:], 
              destination_blob_name = "batch_prediction_dataset/"+os.path.basename(url_parse.path))

In [21]:
# Create a list that contains our new GCS links, pointing to the images in the Bucket we own
batch_prediction_dataset_list = ["gs://"+BUCKET_NAME[5:]+"/batch_prediction_dataset/"+os.path.basename(urlparse(item, allow_fragments=False).path) for item in batch_prediction_dataset_list]
batch_prediction_dataset_list

['gs://veretxai_image_recognition/batch_prediction_dataset/9595857626_979c45e5bf_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/9611923744_013b29e4da_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/9922116524_ab4a2533fe_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/99306615_739eb94b9e_m.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/10043234166_e6dd915111_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/10200780773_c6051a7d71_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/10294487385_92a0676c7d_m.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/15190665092_5c1c37a066_m.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/15202632426_d88efb321a_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/15222804561_0fde5eb4ae_n.jpg',
 'gs://veretxai_image_recognition/batch_prediction_dataset/15236835789_6009b8f33d.jpg',
 'gs://veretxai_im

In [22]:
# We need to create a file that points to the image links above
# For AutoML Batch Prediction, the input request needs to be in a .JSONL in the following format:
# {"content": "gs://sourcebucket/datasets/images/source_image.jpg", "mimeType": "image/jpeg"}

# Build a JSON-like object
import json

auto_ml_prediction_images = []
for file in batch_prediction_dataset_list:
    auto_ml_prediction_image = {}
    auto_ml_prediction_image["content"] = file
    auto_ml_prediction_image["mimeType"] = "image/jpeg"
    auto_ml_prediction_images.append(auto_ml_prediction_image)

In [23]:
# Write as JSONL to disk (not, that's NOT JSON)
with open('auto_ml_prediction_images.jsonl', 'w') as outfile:
    for entry in auto_ml_prediction_images:
        json.dump(entry, outfile)
        outfile.write('\n')

In [24]:
!head 'auto_ml_prediction_images.jsonl'

{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/9595857626_979c45e5bf_n.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/9611923744_013b29e4da_n.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/9922116524_ab4a2533fe_n.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/99306615_739eb94b9e_m.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/10043234166_e6dd915111_n.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/10200780773_c6051a7d71_n.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/10294487385_92a0676c7d_m.jpg", "mimeType": "image/jpeg"}
{"content": "gs://veretxai_image_recognition/batch_prediction_dataset/15190665092_5c1c37a066_m.jpg", "mimeType": "ima

In [25]:
# Upload to GCS
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage.Client().bucket(bucket_name).blob(source_file_name).upload_from_filename(destination_blob_name)
upload_blob(BUCKET_NAME[5:], 'auto_ml_prediction_images.jsonl', 'auto_ml_prediction_images.jsonl')
auto_ml_prediction_data_object = BUCKET_NAME+"/auto_ml_prediction_images.jsonl"
auto_ml_prediction_data_object

'gs://veretxai_image_recognition/auto_ml_prediction_images.jsonl'

In [26]:
# For Custom Training Batch Prediction, the input format is a .TXT file as follows:
# gs://formoso_image_detection/batch_prediction_dataset/5682463466_d3e641cb8b.jpg
# gs://formoso_image_detection/batch_prediction_dataset/5687705933_55a8c2dbac.jpg
# gs://formoso_image_detection/batch_prediction_dataset/5691090657_2f1e9bf49e_n.jpg

# Build a TXT-like object

with open('custom_ml_prediction_images.txt', 'w') as outfile:
    for entry in batch_prediction_dataset_list:
        outfile.write(entry+'\n')

In [27]:
!head "custom_ml_prediction_images.txt"

gs://veretxai_image_recognition/batch_prediction_dataset/9595857626_979c45e5bf_n.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/9611923744_013b29e4da_n.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/9922116524_ab4a2533fe_n.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/99306615_739eb94b9e_m.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/10043234166_e6dd915111_n.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/10200780773_c6051a7d71_n.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/10294487385_92a0676c7d_m.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/15190665092_5c1c37a066_m.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/15202632426_d88efb321a_n.jpg
gs://veretxai_image_recognition/batch_prediction_dataset/15222804561_0fde5eb4ae_n.jpg


In [28]:
# Upload to GCS
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage.Client().bucket(bucket_name).blob(source_file_name).upload_from_filename(destination_blob_name)
upload_blob(BUCKET_NAME[5:], 'custom_ml_prediction_images.txt', 'custom_ml_prediction_images.txt')
custom_ml_prediction_data_object = BUCKET_NAME+"/custom_ml_prediction_images.txt"
custom_ml_prediction_data_object

'gs://veretxai_image_recognition/custom_ml_prediction_images.txt'

## Define an Image classification pipeline that uses components from `google_cloud_pipeline_components`

Create a managed image dataset from a CSV file and train it using AutoML Image Training.


Define the pipeline:

In [29]:
@kfp.dsl.pipeline(name="image-detection-run-1")
def pipeline(project: str = PROJECT_ID):
    
    create_dataset_op = gcc_aip.ImageDatasetCreateOp(
        project=project,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )
        
    auto_ml_training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
        project=project,
        display_name="train-iris-automl-mbsdk-1",
        prediction_type="classification",
        model_type="CLOUD",
        base_model=None,
        dataset=create_dataset_op.outputs["dataset"],
        model_display_name="iris-classification-model-mbsdk",
        training_fraction_split=0.6,
        validation_fraction_split=0.2,
        test_fraction_split=0.2,
        budget_milli_node_hours=8000,
    )
        
    auto_ml_endpoint_op = gcc_aip.ModelDeployOp(
        #project=project, 
        model=auto_ml_training_job_run_op.outputs["model"]
    )
    
    auto_ml_batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=project,
        model=auto_ml_training_job_run_op.outputs["model"],
        job_display_name="auto_ml_batch_predict_op"
        #,
        #gcs_source=auto_ml_prediction_data_object,
        #gcs_destination_prefix=PIPELINE_ROOT
    )

    custom_training_job_run_op = gcc_aip.CustomPythonPackageTrainingJobRunOp(
        project=project,
        display_name='custom_training_job_run_op',
        dataset=create_dataset_op.outputs["dataset"],
        python_package_gcs_uri = PACKAGE_GCS_URI,
        python_module_name = PYTHON_MODULE_NAME,
        # Training
        container_uri = TRAIN_IMAGE,
        staging_bucket=PIPELINE_ROOT,
        annotation_schema_uri=aiplatform.schema.dataset.annotation.image.classification,
        args=["--epochs", "50", "--image-width", "32", "--image-height", "32"],
        replica_count=1,
        machine_type="n1-standard-4",
        accelerator_type="NVIDIA_TESLA_K80",
        accelerator_count=1,
        # Serving - As part of this operation, the model is registered to Vertex AI
        model_serving_container_image_uri = DEPLOY_IMAGE,
        model_display_name = 'custom_model_image_detection'
    )


    custom_endpoint_op = gcc_aip.ModelDeployOp(
        #project=project,
        deployed_model_display_name='custom_deployed_model_image_detection',
        #machine_type="n1-standard-4",
        #accelerator_type="NVIDIA_TESLA_K80",
        #accelerator_count=1,
        model=custom_training_job_run_op.outputs["model"]
    )

    
    custom_ml_batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=project,
        model=custom_training_job_run_op.outputs["model"],
        job_display_name="custom_ml_batch_predict_op",
        #gcs_source=custom_ml_prediction_data_object,
        machine_type="n1-standard-4",
        accelerator_type="NVIDIA_TESLA_K80",
        accelerator_count=1
        #,
        #gcs_destination_prefix=PIPELINE_ROOT
    )


## Compile and run the pipeline

Compile the pipeline:

In [30]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="image_classif_pipeline.json"
)



The pipeline compilation generates the `image_classif_pipeline.json` job spec file.

Next, instantiate an API client object:

In [31]:
from kfp.v2.google.client import AIPlatformClient  # noqa: F811

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)



Then, you run the defined pipeline like this: 

In [32]:
response = api_client.create_run_from_job_spec(
    "image_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID},
)

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:
- Delete Cloud Storage objects that were created.  Uncomment and run the command in the cell below **only if you are not using the `PIPELINE_ROOT` path for any other purpose**.
- Delete your deployed model: first, undeploy it from its *endpoint*, then delete the model and endpoint.


In [None]:
# Warning: this command will delete ALL Cloud Storage objects under the PIPELINE_ROOT path.
# ! gsutil -m rm -r $PIPELINE_ROOT