# Pipeline definition and running

This notebook serves as the core of the assignment solution for the Chicago Taxi Trips task. It defines all the necessary steps and runs the pipeline on Vertex Pipelines. The other notebooks will refer back to parts of this notebook or use the results of a pipeline run created here.

In [1]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx_v1
import tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

TensorFlow version: 2.8.1
TFX version: 1.8.0
KFP version: 1.8.12


In [2]:
from data.queries import TRAIN_QUERY, EVAL_QUERY, TEST_QUERY
from pipeline.pipeline_def import _create_pipeline

### Setting up variables

This section sets up the GCP variables used for the pipeline.

In [3]:
GOOGLE_CLOUD_PROJECT = 'aliz-ml-spec-2022-submission'
GOOGLE_CLOUD_REGION = 'us-central1'
GCS_BUCKET_NAME = 'aliz-ml-spec-2022'

PIPELINE_NAME = 'taxi-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/demo-1/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/demo-1/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/demo-1/data/taxi_data'.format(GCS_BUCKET_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/demo-1/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: gs://aliz-ml-spec-2022/demo-1/pipeline_root/taxi-vertex-pipelines


### Prepare data

To simplify the process, we are going to use the CsvExampleGen component. To do so, we export the required portion of the dataset to a GCS bucket using the query below.

In [4]:
%%bigquery

$TRAIN_QUERY

Query complete after 0.01s: 100%|██████████| 1/1 [00:00<00:00, 408.48query/s]                          


In [5]:
%%bigquery

$EVAL_QUERY

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 380.09query/s]                          


In [6]:
%%bigquery

$TEST_QUERY

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 518.14query/s]                          


### Pipeline modules

The following blocks create the preprocessing and model module code and upload the files to GCS. This last step is necessary for TFX to be able to use our custom code in the different components.

#### Preprocessing

The preprocessing transforms the categorical features into vocabularies and standardises the numerical features.

#### Model

The model is a simple DNN that is made up of two blocks. The first processes the inputs and embeds taxi trips in an implicit feature space. The second is a dense block that leads to the final predictions.

#### Hyperparameter tuning

The hyperparameter tuning section uses Keras tuning to perform a hyperparameter search on the previously defined model.

In [22]:
_trainer_module_file_location = '../src/pipeline/taxi_trainer.py'
preprocess_module_file_location = '../src/pipeline/preprocess.py'
_trainer_module_file = 'taxi_trainer.py'
preprocess_module_file = 'preprocess.py'

Copy the module file to GCS which can be accessed from the pipeline components.
Because model training happens on GCP, we need to upload this model definition. 

Otherwise, you might want to build a container image including the module file
and use the image to run the pipeline.

In [27]:
!gsutil cp {_trainer_module_file_location} {MODULE_ROOT}/taxi_trainer.py

Copying file://../src/pipeline/taxi_trainer.py [Content-Type=text/x-python]...
- [1 files][  5.2 KiB/  5.2 KiB]                                                
Operation completed over 1 objects/5.2 KiB.                                      


In [28]:
!gsutil cp {preprocess_module_file_location} {MODULE_ROOT}/preprocess.py

Copying file://../src/pipeline/preprocess.py [Content-Type=text/x-python]...
- [1 files][  2.2 KiB/  2.2 KiB]                                                
Operation completed over 1 objects/2.2 KiB.                                      


### Pipeline definition

The following block replicates the pipeline creation found in src/pipeline/pipeline_def to showcase the creation of a TFX Pipeline. It is made up of the following components:

- CsvExampleGen: Loads our dataset from Google Cloud Storage
- SchemaGen: Creates a TF schema from our dataset to be used in later components
- ExampleValidator: Allows us to check for data skew later on
- Transform: Performs preprocessing on the data
- Tuner: Performs hyperparameter tuning
- Trainer: Trains the model
- Pusher: Deploys the model to the endpoint on Vertex AI

In [30]:
import tensorflow_model_analysis as tfma

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str, project_id: str,
                     endpoint_name: str, region: str,
                     ) -> tfx_v1.dsl.Pipeline:
  """Defines a pipeline for the Chicago Taxi Trips assignment"""

  input = tfx_v1.proto.Input(splits=[
                tfx.proto.example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                tfx.proto.example_gen_pb2.Input.Split(name='eval', pattern='eval/*'),
                tfx.proto.example_gen_pb2.Input.Split(name='test', pattern='test/*')
            ])
  example_gen = tfx_v1.components.CsvExampleGen(input_base=data_root, input_config=input)

  compute_eval_stats = tfx_v1.components.StatisticsGen(
      examples=example_gen.outputs['examples'],
      )
  schema_gen = tfx_v1.components.SchemaGen(
    statistics=compute_eval_stats.outputs['statistics'])
    
  validate_stats = tfx_v1.components.ExampleValidator(
      statistics=compute_eval_stats.outputs['statistics'],
      schema=schema_gen.outputs['schema']
      )

  transform = tfx_v1.components.Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=preprocess_module_file_location)
    
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }

  tuner = tfx_v1.components.Tuner(
    module_file=module_file,
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=tfx_v1.proto.TrainArgs(num_steps=20),
    schema=schema_gen.outputs['schema'],
    eval_args=tfx_v1.proto.EvalArgs(num_steps=5))
    
    
  trainer = tfx.v1.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'],
      schema=schema_gen.outputs['schema'],
      hyperparameters=tuner.outputs['best_hyperparameters'],
      train_args=tfx_v1.proto.TrainArgs(num_steps=100),
      eval_args=tfx_v1.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx_v1.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx_v1.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx_v1.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              False,
      })


  eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='fare')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='MeanSquaredError'),
                tfma.MetricConfig(
                    class_name='MeanSquaredError',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            upper_bound={'value': 50}),))
            ]
        )
    ],
    slicing_specs=[
        tfma.SlicingSpec(),
        tfma.SlicingSpec(feature_keys=['TripStartHour'])
    ])



  model_analyzer = tfx_v1.components.Evaluator(
      examples=transform.outputs['transformed_examples'],
      model=trainer.outputs['model'],
      eval_config=eval_config)

  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-4',
  }

  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  pusher = tfx_v1.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx_v1.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx_v1.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx_v1.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx_v1.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      compute_eval_stats,
      schema_gen,
      validate_stats,
      transform,
      tuner,
      trainer,
      model_analyzer,
      pusher,
  ]

  return tfx_v1.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,)

### Running the pipeline on Vertex Pipelines.

The following section compiles the pipeline using the Kubeflow V2 SDK and then starts a pipeline run on Vertex Pipelines. Note that no pipeline run is created at this point.

In [31]:
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx_v1.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx_v1.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        serving_model_dir=SERVING_MODEL_DIR,))

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying pipeline_def.py -> build/lib
copying taxi_trainer.py -> build/lib
copying preprocess.py -> build/lib
installing to /tmp/tmpflm5rlcs
running install
running install_lib
copying build/lib/taxi_trainer.py -> /tmp/tmpflm5rlcs
copying build/lib/pipeline_def.py -> /tmp/tmpflm5rlcs
copying build/lib/preprocess.py -> /tmp/tmpflm5rlcs
running install_egg_info
running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
Copying tfx_user_code_Transform.egg-info to /tmp/tmpflm5rlcs/tfx_user_code_Transfo



running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying taxi_trainer.py -> build/lib
installing to /tmp/tmpzn0jysg8
running install
running install_lib
copying build/lib/taxi_trainer.py -> /tmp/tmpzn0jysg8
running install_egg_info
running egg_info
creating tfx_user_code_Tuner.egg-info
writing tfx_user_code_Tuner.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Tuner.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Tuner.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
Copying tfx_user_code_Tuner.egg-info to /tmp/tmpzn0jysg8/tfx_user_code_Tuner-0.0+1e86b02980eae8be19a8e9caef2ee6f049dfd20fb4ac9c12d379fc635601c8d0-py3.7.egg-info
running install_scripts
creating /tmp/tmpzn0jysg8/tfx_user_code_Tuner-0.0+1e86b02980eae8be19a8e9caef2ee6f049dfd20fb4ac9c12d379f



running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying taxi_trainer.py -> build/lib
installing to /tmp/tmpwvqsrehe
running install
running install_lib
copying build/lib/taxi_trainer.py -> /tmp/tmpwvqsrehe
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmpwvqsrehe/tfx_user_code_Trainer-0.0+1e86b02980eae8be19a8e9caef2ee6f049dfd20fb4ac9c12d379fc635601c8d0-py3.7.egg-info
running install_scripts
creating /tmp/tmpwvqsrehe/tfx_user_code_Trainer-0.0+1e86b02980eae8be19a8e9caef2ee6f04



In [32]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/53911330556/locations/us-central1/pipelineJobs/taxi-vertex-pipelines-20220621095033


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/53911330556/locations/us-central1/pipelineJobs/taxi-vertex-pipelines-20220621095033


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/53911330556/locations/us-central1/pipelineJobs/taxi-vertex-pipelines-20220621095033')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/53911330556/locations/us-central1/pipelineJobs/taxi-vertex-pipelines-20220621095033')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/taxi-vertex-pipelines-20220621095033?project=53911330556


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/taxi-vertex-pipelines-20220621095033?project=53911330556
