In [8]:
import sys
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
import tensorflow_model_analysis as tfma
print('TFMA version: {}'.format(tfma.__version__))
import os

TensorFlow version: 2.10.1
TFX version: 1.11.0
KFP version: 1.8.22
TFMA version: 0.42.0


## TFX Pipeline - Loading preprocessed Bigquery CSV Data from Cloud Storage

In [2]:
GOOGLE_CLOUD_PROJECT = 'aa-ai-specialisation'         
GOOGLE_CLOUD_PROJECT_NUMBER = '653183562498'  
GOOGLE_CLOUD_REGION = 'us-central1'          
GCS_BUCKET_NAME = 'aa_chicago_taxi_trips'

In [81]:
PIPELINE_NAME = 'chicago-vertex-training'
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
DATA_ROOT = 'gs://aa_chicago_taxi_trips/data/chicago_vertex_training'
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

PIPELINE_ROOT: gs://aa_chicago_taxi_trips/pipeline_root/chicago-vertex-training


In [98]:
import tensorflow_model_analysis as tfma


def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
    output_config = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=4),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
    example_gen = tfx.components.CsvExampleGen(input_base=data_root, output_config=output_config)
    vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-16',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
    }
    if use_gpu:
        vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 2
        })

    trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })
    
    eval_config = tfma.EvalConfig(
        model_specs=[
            tfma.ModelSpec(label_key='fare')
        ],
        metrics_specs=[
            tfma.MetricsSpec(
               metrics=[
                    tfma.MetricConfig(class_name='MeanSquaredError')
                ]
            )
        ],
        slicing_specs=[
            tfma.SlicingSpec(),
    ])
    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        eval_config=eval_config,
    )

    vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-16',
    }

    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
    if use_gpu:
        vertex_serving_spec.update({
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 2
        })
        serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'
    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

    components = [
      example_gen,
      trainer,
      evaluator,
      pusher,
    ]

    return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

In [99]:
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=True))

INFO:absl:Generating ephemeral wheel package for '/var/tmp/tmp3bjkq38m/Chicago_trainer_pipeline_vertex.py' (including modules: ['Chicago_trainer_pipeline_vertex']).
INFO:absl:User module package has hash fingerprint version 86262b9e77e085256147a3092d50c6626335b07de8bcb1c4a9d15960aae65dc0.
INFO:absl:Executing: ['/opt/conda/bin/python', '/var/tmp/tmp8jdvw3c_/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/var/tmp/tmp6mnllzl7', '--dist-dir', '/var/tmp/tmpw33ak0i6']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying Chicago_trainer_pipeline_vertex.py -> build/lib
installing to /var/tmp/tmp6mnllzl7
running install
running install_lib
copying build/lib/Chicago_trainer_pipeline_vertex.py -> /var/tmp/tmp6mnllzl7
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /var/tmp/tmp6mnllzl7/tfx_user_code_Trainer-0.0+86262b9e77e085256147a3092d50c6626335b07de8bcb1c4a9d15960aae65dc0-py3.7.egg-info
running install_scripts
creating /var/tmp/tmp6mnllzl7/tfx_u

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()
INFO:absl:Successfully built user code wheel distribution at 'gs://aa_chicago_taxi_trips/pipeline_root/chicago-vertex-training/_wheels/tfx_user_code_Trainer-0.0+86262b9e77e085256147a3092d50c6626335b07de8bcb1c4a9d15960aae65dc0-py3-none-any.whl'; target user module is 'Chicago_trainer_pipeline_vertex'.
INFO:absl:Full user module path is 'Chicago_trainer_pipeline_vertex@gs://aa_chicago_taxi_trips/pipeline_root/chicago-vertex-training/_wheels/tfx_user_code_Trainer-0.0+86262b9e77e085256147a3092d50c6626335b07de8bcb1c4a9d15960aae65dc0-py3-none-any.whl'


In [100]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/653183562498/locations/us-central1/pipelineJobs/chicago-vertex-training-20240110101055


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/653183562498/locations/us-central1/pipelineJobs/chicago-vertex-training-20240110101055


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/653183562498/locations/us-central1/pipelineJobs/chicago-vertex-training-20240110101055')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/653183562498/locations/us-central1/pipelineJobs/chicago-vertex-training-20240110101055')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chicago-vertex-training-20240110101055?project=653183562498


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chicago-vertex-training-20240110101055?project=653183562498


## TFX E2E Pipeline - Loading Raw CSV Data from Cloud Storage

In [3]:
PIPELINE_NAME = 'chicago-vertex-training'
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
DATA_ROOT = 'gs://aa_chicago_taxi_trips/data/chicago_vertex_training/raw_data'
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

In [6]:
import tensorflow_model_analysis as tfma
from tfx.components import Transform
from tfx.components import StatisticsGen, SchemaGen
from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_root: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:

    _trainer_module_file = "Chicago_trainer_pipeline_vertex.py"
    _transform_module_file = "preprocessing.py"
    output_config = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=4),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
    example_gen = tfx.components.CsvExampleGen(input_base=data_root, output_config=output_config)

    vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-16',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
    }
    if use_gpu:

        vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 2
        })
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

    schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])

    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=os.path.join(module_root, _transform_module_file)
    )
    trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
        module_file=os.path.join(module_root, _trainer_module_file),
        examples=transform.outputs['transformed_examples'],  
        transform_graph=transform.outputs['transform_graph'],  
        schema=schema_gen.outputs['schema'],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5),
        custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
        })
    
    eval_config = tfma.EvalConfig(
        model_specs=[
            tfma.ModelSpec(label_key='fare')
        ],
        metrics_specs=[
            tfma.MetricsSpec(
               metrics=[
                    tfma.MetricConfig(class_name='MeanSquaredError')
                ]
            )
        ],
        slicing_specs=[
            tfma.SlicingSpec(),
            tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])
    evaluator = tfx.components.Evaluator(
        examples=transform.outputs['transformed_examples'],  
        model=trainer.outputs['model'],
        eval_config=eval_config,
    )

    vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-16',
    }

    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
    if use_gpu:
        vertex_serving_spec.update({
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 2
        })
        serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

    components = [
        example_gen,
        statistics_gen,
        schema_gen,
        transform, 
        trainer,
        evaluator,
        pusher,
    ]

    return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

In [9]:
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_root=MODULE_ROOT,
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=True))

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying preprocessing.py -> build/lib
installing to /var/tmp/tmpd_mdvl31
running install
running install_lib
copying build/lib/preprocessing.py -> /var/tmp/tmpd_mdvl31
running install_egg_info
running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
Copying tfx_user_code_Transform.egg-info to /var/tmp/tmpd_mdvl31/tfx_user_code_Transform-0.0+aecb5162972a13bf42e8d7a1f96001c6ca3d5f286f7913fba122584b5660f8a9-py3.7.egg-info
running install_scripts
creating /var/tmp/tmpd_mdvl31/tfx_user_code_Transform

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying Chicago_trainer_pipeline_vertex.py -> build/lib
installing to /var/tmp/tmpb4dz5bng
running install
running install_lib
copying build/lib/Chicago_trainer_pipeline_vertex.py -> /var/tmp/tmpb4dz5bng
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /var/tmp/tmpb4dz5bng/tfx_user_code_Trainer-0.0+86262b9e77e085256147a3092d50c6626335b07de8bcb1c4a9d15960aae65dc0-py3.7.egg-info
running install_scripts
creating /var/tmp/tmpb4dz5bng/tfx_u

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


In [None]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()