In [None]:
!pip install --upgrade pip
!pip install --upgrade "tfx[kfp]<2"


In [2]:
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

In [None]:
import tensorflow as tf
import os
import numpy as np
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

# You may see a WARNING issued. You can safely ignore this until the TFX and KFP versions are displayed in the cell output.

In [2]:
GOOGLE_CLOUD_PROJECT = 'som-k8s'    
GOOGLE_CLOUD_REGION = 'australia-southeast1'     
GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-gcs'
PIPELINE_NAME = 'penguin-vertex-pipelines'
_trainer_module_file = 'penguin_trainer.py'
# Path to various pipeline artifact.
PIPELINE_ROOT = f'gs://{GCS_BUCKET_NAME}/pipeline_root/{PIPELINE_NAME}'
# Paths for users' Python module.
MODULE_ROOT = f'gs://{GCS_BUCKET_NAME}/pipeline_module/{PIPELINE_NAME}'
# Paths for input data.
DATA_ROOT = f'gs://{GCS_BUCKET_NAME}/data/{PIPELINE_NAME}'
# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = f'gs://{GCS_BUCKET_NAME}/serving_model/{PIPELINE_NAME}'
print(f'PIPELINE_ROOT: {PIPELINE_ROOT}')
print(f'MODULE_ROOT: {MODULE_ROOT}')
print(f'DATA_ROOT: {DATA_ROOT}')
print(f'SERVING_MODEL_DIR: {SERVING_MODEL_DIR}')

PIPELINE_ROOT: gs://som-k8s-gcs/pipeline_root/penguin-vertex-pipelines
MODULE_ROOT: gs://som-k8s-gcs/pipeline_module/penguin-vertex-pipelines
DATA_ROOT: gs://som-k8s-gcs/data/penguin-vertex-pipelines
SERVING_MODEL_DIR: gs://som-k8s-gcs/serving_model/penguin-vertex-pipelines


In [3]:
!gcloud config set project {GOOGLE_CLOUD_PROJECT}
!gsutil mb 'gs://'{GCS_BUCKET_NAME}
!gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/

Updated property [core/project].
Creating gs://som-k8s-gcs/...
Copying gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv [Content-Type=application/octet-stream]...
/ [1 files][ 25.0 KiB/ 25.0 KiB]                                                
Operation completed over 1 objects/25.0 KiB.                                     


In [4]:
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Writing penguin_trainer.py


In [5]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

Copying file://penguin_trainer.py [Content-Type=text/x-python]...
/ [1 files][  3.8 KiB/  3.8 KiB]                                                
Operation completed over 1 objects/3.8 KiB.                                      


In [6]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool
                     ) -> tfx.dsl.Pipeline:
    """Creates a three component penguin pipeline with TFX."""
    # Brings data into the pipeline.
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

In [7]:
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool
                     ) -> tfx.dsl.Pipeline:
    """Creates a three component penguin pipeline with TFX."""
    # Brings data into the pipeline.
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)
    # This dictionary will be passed as `CustomJobSpec`.
    vertex_job_spec = {
        'project': project_id,
        'worker_pool_specs': [{
            'machine_spec': {
                'machine_type': 'n1-standard-4',
            },
            'replica_count': 1,
            'container_spec': {
                'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
            },
        }],
    }
    if use_gpu:
        # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
        # for available machine types.
        vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 1
        })

    # Uses user-provided Python function that trains a model. We need to specify a Trainer for GCP with related configs.
    trainer =  tfx.extensions.google_cloud_ai_platform.Trainer(
        module_file=module_file,
        examples=example_gen.outputs['examples'],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5),
        custom_config={
            tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
                True,
            tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
                region,
            tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
                vertex_job_spec,
            'use_gpu':
                use_gpu,
        })

        # Configuration for pusher.
    vertex_serving_spec = {
            'project_id': project_id,
            'endpoint_name': endpoint_name,
            'machine_type': 'n1-standard-4',
        }
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
    if use_gpu:
        vertex_serving_spec.update({
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 1
        })
        serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

    # Pushes the model to a filesystem destination.
    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
        model=trainer.outputs['model'],
        custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

    # Following three components will be included in the pipeline.
    components = [
        example_gen,
        trainer,
        pusher,
    ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components)

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.

_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

# submit The generated definition file using kfp client.
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer.py -> build/lib


!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


installing to /var/tmp/tmpo4tyf49y
running install
running install_lib
copying build/lib/penguin_trainer.py -> /var/tmp/tmpo4tyf49y
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /var/tmp/tmpo4tyf49y/tfx_user_code_Trainer-0.0+afa8d5dfaf2942b08385545b5e095850331200a5d7a669b1791b3e7efcc5fdd9-py3.10.egg-info
running install_scripts
creating /var/tmp/tmpo4tyf49y/tfx_user_code_Trainer-0.0+afa8d5dfaf2942b08385545b5e095850331200a5d7a669b1791b3e7efcc5fdd9.dist-info/WHEEL
creating '/var/tmp/tmpy0nb1owq/tfx_user_code_Trainer-0

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/41931926481/locations/australia-southeast1/pipelineJobs/penguin-vertex-pipelines-20240620001654
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/41931926481/locations/australia-southeast1/pipelineJobs/penguin-vertex-pipelines-20240620001654')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/australia-southeast1/pipelines/runs/penguin-vertex-pipelines-20240620001654?project=41931926481
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/41931926481/locations/australia-southeast1/pipelineJobs/penguin-vertex-pipelines-20240620001654 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:Pipeli