## Initialize (Variables and Libraries)

In [1]:
PROJECT_ID = "vtxdemos"
STAGING_FOLDER_URI =  "gs://vtxdemos-staging"
TRAIN_IMAGE_URI = "gcr.io/vtxdemos/tensorflow-gpu-nlp-pipe:v1"
PRED_IMAGE_URI = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest"
MODEL_URI = "gs://vtxdemos-models/nlp"

In [2]:
from google.cloud import aiplatform as aip

## Creating Folder Structure for Training Script

In [9]:
!rm -fr training
!mkdir training

In [10]:
%%writefile training/train.py
#%%
import os
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from google.cloud import bigquery
import tensorflow_datasets as tfds
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)

print("Version: ", tf.__version__)
print("Eager mode: ", tf.executing_eagerly())
print("Hub version: ", hub.__version__)
print("GPU is", "available" if tf.config.list_physical_devices('GPU') else "NOT AVAILABLE")

client = bigquery.Client(project="vtxdemos")

## Loading testing dataset from bigquery
sql = "select * from `public.train_nlp`"
train_df = client.query(sql).to_dataframe()
train_examples = np.array([i.encode('utf-8') for i in train_df['text']], dtype="object")
train_labels = train_df['labels'].to_numpy(dtype=int)

## Loading testing dataset from bigquery
sql = "select * from `vtxdemos.public.train_nlp`"
test_df = client.query(sql).to_dataframe()
test_examples = np.array([i.encode('utf-8') for i in test_df['text']], dtype="object")
test_labels = test_df['labels'].to_numpy(dtype=int)

## Load pre-trained model (BERT)
model = "https://tfhub.dev/google/nnlm-en-dim50/2"
hub_layer = hub.KerasLayer(model, input_shape=[], dtype=tf.string, trainable=True)

## Splitting datasets
x_val = train_examples[:10000]
partial_x_train = train_examples[10000:]

y_val = train_labels[:10000]
partial_y_train = train_labels[10000:]

## Create new nn layers
model = tf.keras.Sequential()
model.add(hub_layer)
model.add(tf.keras.layers.Dense(16, activation='relu'))
model.add(tf.keras.layers.Dense(1))

model.compile(optimizer='adam',
              loss=tf.losses.BinaryCrossentropy(from_logits=True),
              metrics=[tf.metrics.BinaryAccuracy(threshold=0.0, name='accuracy')])

#%%
history = model.fit(partial_x_train,
                    partial_y_train,
                    epochs=20,
                    batch_size=512,
                    validation_data=(x_val, y_val),
                    verbose=1,
                    callbacks=[callback])
model.save(os.getenv('AIP_MODEL_DIR'))

Writing training/train.py


In [11]:
%%writefile training/requirements.txt
tensorflow==2.11.0
tensorflow_hub
tensorflow-datasets
numpy
pandas
google-cloud-bigquery
db-dtypes

Writing training/requirements.txt


In [12]:
%%writefile training/Dockerfile
FROM nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
ARG DEBIAN_FRONTEND=noninteractive

COPY train.py train.py
COPY requirements.txt requirements.txt
RUN apt update -y
RUN apt-get install -y python3.10 && \
     apt-get install -y python3-pip
RUN pip install -r requirements.txt

CMD ["python3", "train.py"]

Writing training/Dockerfile


In [13]:
!docker build -t $TRAIN_IMAGE_URI training/.
!docker push $TRAIN_IMAGE_URI

Sending build context to Docker daemon  5.632kB
Step 1/8 : FROM nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04
 ---> fd00873d2a37
Step 2/8 : ARG DEBIAN_FRONTEND=noninteractive
 ---> Using cache
 ---> 9e5c0c7ff153
Step 3/8 : COPY train.py train.py
 ---> Using cache
 ---> d4799fbe429c
Step 4/8 : COPY requirements.txt requirements.txt
 ---> Using cache
 ---> 6876f3ccf27d
Step 5/8 : RUN apt update -y
 ---> Using cache
 ---> baabb2dc1168
Step 6/8 : RUN apt-get install -y python3.10 &&      apt-get install -y python3-pip
 ---> Using cache
 ---> 6d948c304b6a
Step 7/8 : RUN pip install -r requirements.txt
 ---> Using cache
 ---> 6583cf5048a1
Step 8/8 : CMD ["python3", "train.py"]
 ---> Using cache
 ---> 2d186d2e6778
Successfully built 2d186d2e6778
Successfully tagged gcr.io/vtxdemos/tensorflow-gpu-nlp-pipe:v1

Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
The push refers to repository [gcr.io/vtxdemos/tensorflow-gpu-nlp-pipe]

[1Bd5a56689: P

## Create Vertex Pipelines

In [36]:
from kfp import compiler
from kfp.dsl import component, pipeline
from kfp.components import importer_node
from google_cloud_pipeline_components.v1 import custom_job, model, endpoint
from google_cloud_pipeline_components.types import artifact_types



worker_pool_specs = [
    {
        "machine_spec": {
            "machine_type" : "n1-standard-8",
            "accelerator_type": "NVIDIA_TESLA_T4",
            "accelerator_count": 1
        },
        "replica_count": "1",
        "container_spec": {
            "image_uri" : TRAIN_IMAGE_URI
        }
    }
]

@pipeline(name='pipe-tf-nlp')
def pipeline(
    project_id : str,
    model_uri: str,
    pred_image_uri: str,
    prefix_name: str
):
    custom_train_job = custom_job.component.custom_training_job(
        display_name=f"{prefix_name}-train",
        project=project_id,
        worker_pool_specs=worker_pool_specs,
        base_output_directory=model_uri
    )
    importer_spec = importer_node.importer(
        artifact_uri=f"{MODEL_URI}/model",
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": pred_image_uri
            },
        }).after(custom_train_job)
    model_upload_job = model.ModelUploadOp(
        display_name=f"{prefix_name}-model",
        project=project_id,
        unmanaged_container_model=importer_spec.outputs["artifact"])
    endpoint_create_job = endpoint.EndpointCreateOp(
        display_name=f"{prefix_name}-endpoint",
        project=project_id,
    )
    endpoint_deploy_job = endpoint.ModelDeployOp(
        deployed_model_display_name=f"{prefix_name}-model-deployed",
        endpoint=endpoint_create_job.outputs["endpoint"],
        model=model_upload_job.outputs["model"],
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )

    
    

## Create Compiled File from Pipelines

In [37]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='pipe_tf_nlp.yaml')

In [38]:
from kfp.registry import RegistryClient

client = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/vtxdemos/kfp-repo")

## Create Template for Graphical Interface (Sharing Artifact Template) [Optional]

In [39]:
templateName, versionName = client.upload_pipeline(
  file_name="pipe_tf_nlp.yaml",
  tags=["v2", "latest"],
  extra_headers={"description":"This is an example pipeline template."})

## Send Pipeline Job

In [40]:
from google.cloud import aiplatform

# Initialize the aiplatform package
aiplatform.init(
    project="vtxdemos",
    location='us-central1',
    staging_bucket=STAGING_FOLDER_URI)

# Create a job via version id.
job = aiplatform.PipelineJob(
    display_name="pipe-tf-nlp",
    template_path="pipe_tf_nlp.yaml",
    parameter_values={
        "project_id" : PROJECT_ID,
        "model_uri": MODEL_URI,
        "pred_image_uri": PRED_IMAGE_URI,
        "prefix_name": "pipe-nlp-tf2"
    },
    )

job.submit()

[](images/transformer-pipe.png)

<center><img src="images/transformer-pipe.png"/></center>
