<div class="alert alert-block alert-info">

<h1 style="font-family:verdana;"> Description:</h1>

<ul>
<li><p style="font-family:verdana;">
In this notebook, we are going to develop the complete MLOps pipeline for the Chicago Taxi trip use case, using Tensorflow Extended and Vertex AI.
</p></li>

<li><p style="font-family:verdana;">
Data pre-processing is performed with BigQuery.
</p></li>
</ul>

</div>

## Index
1. [Import needed packages](#step-0-import-needed-packages)
2. [Set up GCP environment variables](#step-1-set-up-gcp-environment-parameters)
3. [Data extraction and pre-processing with BigQuery](#step-2-data-extraction-from-bigquery-with-pre-processing)
4. [TensorFlow model trainer code](#step-3-model-trainer)
5. [TensorFlow eXtended pipeline](#step-4-create-pipeline)
6. [Execute pipeline on Vertex AI Pipelines](#step-5-execute-pipeline-on-vertex-ai)

## Step 0: Import needed packages.

In [1]:
import warnings

warnings.filterwarnings('ignore')

## Step 1: Set up GCP environment parameters.

GCP Project params.

In [2]:
GOOGLE_CLOUD_PROJECT = 'demoespecialidadgcp'
GOOGLE_CLOUD_PROJECT_NUMBER = '502688298240'
GOOGLE_CLOUD_REGION = 'us-east1'
GCS_BUCKET_NAME = 'demo-1-chicago-taxi-fare'

Vertex AI Pipeline params.

In [3]:
PIPELINE_NAME = 'demo-1-pipeline-v3'
ENDPOINT_NAME = 'demo-1-model-v3'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: gs://demo-1-chicago-taxi-fare/pipeline_root/demo-1-pipeline-v3


In [4]:
MODULE_ROOT

'gs://demo-1-chicago-taxi-fare/pipeline_module/demo-1-pipeline-v3'

## Step 2: Data extraction from BigQuery with pre-processing.

In [5]:
QUERY = """
        SELECT 
            trip_seconds,
            trip_miles,
            trip_total,
            CASE WHEN EXTRACT(DAYOFWEEK FROM trip_start_timestamp) < 5 THEN 1 ELSE 0 END AS work_day,
            CASE WHEN EXTRACT(HOUR FROM trip_start_timestamp) BETWEEN 8 AND 18 THEN 1 ELSE 0 END AS work_hour,
            trip_miles/(trip_seconds/3600) AS trip_speed,
            pickup_community_area,
            dropoff_community_area
        FROM 
            `demoespecialidadgcp.demo_1.training_dataset`
        WHERE
            trip_seconds > 0 AND trip_seconds < 7200 AND
            trip_miles > 0 AND
            trip_total > 0 AND trip_total < 250 AND
            payment_type = "Cash" AND payment_type = "Cash" AND
            trip_start_timestamp IS NOT NULL AND
            pickup_community_area IS NOT NULL AND
            dropoff_community_area IS NOT NULL AND
            RAND() < 2000000/(SELECT COUNT(*) FROM `demoespecialidadgcp.demo_1.training_dataset`)
        LIMIT 200000
"""

## Step 3: Model trainer.

In [6]:
%%writefile chicago_taxi_trainer.py

from typing import List, Text
from absl import logging
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow import keras
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.regularizers import *
import keras_tuner
from tfx import v1 as tfx
from tfx_bsl.public import tfxio

# Specify features that we will use.
_FEATURE_KEYS = [
    "trip_seconds", "trip_miles", "work_day", "work_hour",
    "trip_speed", "pickup_community_area", "dropoff_community_area"
]

_LABEL_KEY = "trip_total"

_TRAIN_BATCH_SIZE = 64
_EVAL_BATCH_SIZE = 32

# NEW: TFX Transform will call this function.
def preprocessing_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.

    Args:
    inputs: map from feature keys to raw not-yet-transformed features.

    Returns:
    Map from string feature key to transformed feature.
    """
    outputs = {}

    # Uses features defined in _FEATURE_KEYS only.
    for key in _FEATURE_KEYS:
        if key in ["work_day", "work_hour"]:
            # Convert work_day and work_hour to float32
            outputs[key] = tf.cast(inputs[key], tf.float32)
        else:
            # For other features, apply z-score normalization
            outputs[key] = tft.scale_to_z_score(inputs[key])
    
    outputs[_LABEL_KEY] = inputs[_LABEL_KEY]

    return outputs



# NEW: This function will create a handler function which gets a serialized
#      tf.example, preprocess and run an inference with it.
def _get_serve_tf_examples_fn(model, tf_transform_output):
    # We must save the tft_layer to the model to ensure its assets are kept and
    # tracked.
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[
        tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")
    ])
    def serve_tf_examples_fn(serialized_tf_examples):
                # Expected input is a string which is serialized tf.Example format.
        feature_spec = tf_transform_output.raw_feature_spec()
        # Remove label feature since these will not be present at serving time.
        feature_spec.pop(_LABEL_KEY)

        parsed_features = tf.io.parse_example(serialized_tf_examples,
                                              feature_spec)

        # Preprocess parsed input with transform operation defined in
        # preprocessing_fn().
        transformed_features = model.tft_layer(parsed_features)
        # Run inference with ML model.
        outputs = model(transformed_features)
        
        return {'predictions': outputs}        

    return serve_tf_examples_fn

def _get_serve_raw_examples_fn(model, tf_transform_output):
    # This function will create the custom serving signature
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_raw_examples_fn(trip_seconds, 
                              trip_miles, 
                              work_day, 
                              work_hour,
                              trip_speed, 
                              pickup_community_area, 
                              dropoff_community_area):
        features = {
            "trip_seconds": trip_seconds, 
            "trip_miles": trip_miles, 
            "work_day": tf.cast(work_day, tf.float32), 
            "work_hour": tf.cast(work_hour, tf.float32),
            "trip_speed": trip_speed, 
            "pickup_community_area": tf.cast(pickup_community_area, tf.float32), 
            "dropoff_community_area": tf.cast(dropoff_community_area, tf.float32)
        }
              
        transformed_features = model.tft_layer(features)
        outputs = model(transformed_features)
        
        return {'predictions': outputs}

    return serve_raw_examples_fn



def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 64) -> tf.data.Dataset:
    """Generates features and label for tuning/training.

    Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
    """
   
    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(
            batch_size=batch_size, label_key=_LABEL_KEY),
        tf_transform_output.transformed_metadata.schema)
    dataset=dataset.repeat()
    return dataset.prefetch(tf.data.AUTOTUNE)


def _build_keras_model(hparams: keras_tuner.HyperParameters) -> tf.keras.Model:
    """Builds a Keras model for taxi trip fare prediction with hyperparameter tuning.

    Args:
        hparams: (Hyperparameter) Hyperparameter object for tuning.

    Returns:
        tf.keras.Model: The compiled Keras model.
    """

    units_1 = hparams.get('units_1')
    units_2 = hparams.get('units_2')
    learning_rate = hparams.get('learning_rate')
    activation = "relu"
    dropout_rate = hparams.get('dropout_rate')
    l2_regularization = hparams.get('l2_regularization')

    # Create inputs for each feature
    inputs = {
        feature: tf.keras.Input(shape=(1,), name=feature, dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }

    # Concatenate all inputs
    concat = tf.keras.layers.Concatenate()(list(inputs.values()))

    # Add first hidden layer with dropout and L2 regularization
    hidden_layer_1 = tf.keras.layers.Dense(
        units_1, 
        activation=activation,
        kernel_regularizer=tf.keras.regularizers.l2(l2_regularization)
    )(concat)
    hidden_layer_1 = tf.keras.layers.Dropout(dropout_rate)(hidden_layer_1)

    # Add second hidden layer with dropout and L2 regularization
    hidden_layer_2 = tf.keras.layers.Dense(
        units_2, 
        activation=activation,
        kernel_regularizer=tf.keras.regularizers.l2(l2_regularization)
    )(hidden_layer_1)
    hidden_layer_2 = tf.keras.layers.Dropout(dropout_rate)(hidden_layer_2)

    # Add output layer
    output = tf.keras.layers.Dense(1)(hidden_layer_2)

    # Create the functional model
    model = tf.keras.Model(inputs=inputs, outputs=output)

    # Compile the model using Adam optimizer with tuned learning rate
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='huber_loss', metrics=['mean_absolute_error'])
    
    model.summary(print_fn=logging.info)

    return model

def _get_hyperparameters() -> keras_tuner.HyperParameters:
    """Defines the search space for hyperparameters."""
    hp = keras_tuner.HyperParameters()
    hp.Int('units_1', min_value=32, max_value=512, step=32)
    hp.Int('units_2', min_value=16, max_value=256, step=16)
    hp.Float('learning_rate', min_value=1e-5, max_value=1e-1, sampling='log')
    hp.Float('dropout_rate', min_value=0.0, max_value=0.5, step=0.1)
    hp.Float('l2_regularization', min_value=1e-6, max_value=1e-1, sampling='log')
    return hp

# TFX Tuner will call this function.
def tuner_fn(fn_args: tfx.components.FnArgs) -> tfx.components.TunerFnResult:
    """Build the tuner using the KerasTuner API.

    Args:
    fn_args: Holds args as name/value pairs.
      - working_dir: working dir for tuning.
      - train_files: List of file paths containing training tf.Example data.
      - eval_files: List of file paths containing eval tf.Example data.
      - train_steps: number of train steps.
      - eval_steps: number of eval steps.
      - schema_path: optional schema of the input data.
      - transform_graph_path: optional transform graph produced by TFT.

    Returns:
    A namedtuple contains the following:
      - tuner: A BaseTuner that will be used for tuning.
      - fit_kwargs: Args to pass to tuner's run_trial function for fitting the
                    model , e.g., the training and validation dataset. Required
                    args depend on the above tuner's implementation.
    """
    # RandomSearch is a subclass of keras_tuner.Tuner which inherits from
    # BaseTuner.
    tuner = keras_tuner.RandomSearch(
      _build_keras_model,
      max_trials=20, 
      hyperparameters=_get_hyperparameters(),
      allow_new_entries=False,
      objective=keras_tuner.Objective('val_mean_absolute_error', 'min'),
      directory=fn_args.working_dir,
      project_name='demo_1_tuning')

    transform_graph = tft.TFTransformOutput(fn_args.transform_graph_path)

    train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      transform_graph,
      _TRAIN_BATCH_SIZE)

    eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      transform_graph,
      _EVAL_BATCH_SIZE)
    
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=4, restore_best_weights=True, mode='auto')

    return tfx.components.TunerFnResult(
      tuner=tuner,
      fit_kwargs={
          'x': train_dataset,
          'validation_data': eval_dataset,
          'steps_per_epoch': fn_args.train_steps,
          'validation_steps': fn_args.eval_steps,
          'epochs': 16, 
          'callbacks':[early_stopping]
      })


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
    fn_args: Holds args used to train the model as name/value pairs.
    """
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        tf_transform_output,
        batch_size=_TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        tf_transform_output,
        batch_size=_EVAL_BATCH_SIZE)
    
    if fn_args.hyperparameters:
        hparams = keras_tuner.HyperParameters.from_config(fn_args.hyperparameters)
    else:
        # This is a shown case when hyperparameters is decided and Tuner is removed
        # from the pipeline. User can also inline the hyperparameters directly in
        # _build_keras_model.
        hparams = _get_hyperparameters()

    model = _build_keras_model(hparams)
    
    """
    # Write logs to path
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=fn_args.model_run_dir, update_freq='epoch')
    """
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True, mode='auto')
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=100,
        callbacks=[early_stopping])
    
    # NEW: Save a computation graph including transform layer.
    
    signatures = {
        'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
        'serving_raw_examples': _get_serve_raw_examples_fn(model, tf_transform_output).get_concrete_function(
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="trip_seconds"),
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="trip_miles"),
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="work_day"),
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="work_hour"),
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="trip_speed"),
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="pickup_community_area"),
            tf.TensorSpec(shape=[None,1], dtype=tf.float32, name="dropoff_community_area"),
        )}

    # The result of the training should be saved in `fn_args.serving_model_dir`
    # directory.
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Overwriting chicago_taxi_trainer.py


Copy the trainer file to `MODULE_ROOT` directory.

In [7]:
# Copy chicago_taxi_trainer.py to MODULE_ROOT
!gsutil cp chicago_taxi_trainer.py $MODULE_ROOT


Copying file://chicago_taxi_trainer.py [Content-Type=text/x-python]...
/ [1 files][ 11.8 KiB/ 11.8 KiB]                                                
Operation completed over 1 objects/11.8 KiB.                                     


## Step 4: Create pipeline.

In [8]:
from typing import List, Optional
import tensorflow_model_analysis as tfma
from tfx import v1 as tfx


def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, beam_pipeline_args: Optional[List[str]]) -> tfx.dsl.Pipeline:
    """Creates a TFX pipeline using BigQuery."""

    # Query data in BigQuery as a data source.
    example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
          query=query).with_id('example_gen_demo_1')
    
    stats_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs['examples']).with_id('statistics_gen_demo_1')
    
    schema_gen = tfx.components.SchemaGen(
        statistics=stats_gen.outputs['statistics']).with_id('schema_gen_demo_1')
    
    transform = tfx.components.Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        materialize=True,
        module_file=module_file).with_id('transform_demo_1')
    
    # Configuration for Vertex AI Training.
    # This dictionary will be passed as `CustomJobSpec`.
    vertex_job_spec = {
        'project': project_id,
        'worker_pool_specs': [{
            'machine_spec': {
                'machine_type': 'n1-standard-8',
            },
            'replica_count': 1,
            'container_spec': {
                'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
            },
        }],
    }
    
    tuner = tfx.components.Tuner(
        module_file=module_file,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        train_args=tfx.proto.TrainArgs(num_steps=3125),
        eval_args=tfx.proto.EvalArgs(num_steps=3125)).with_id('tuner_demo_1')

    # Trains a model using Vertex AI Training.
    trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
        module_file=module_file,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        hyperparameters=tuner.outputs['best_hyperparameters'],
        train_args=tfx.proto.TrainArgs(num_steps=3125),
        eval_args=tfx.proto.EvalArgs(num_steps=3125),
        custom_config={
            tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
                True,
            tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
                region,
            tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
                vertex_job_spec,
            'use_gpu':
                False,
      }).with_id('trainer_demo_1')
    
    _LABEL_KEY = "trip_total"
    eval_config = tfma.EvalConfig(
        model_specs=[
            # This assumes a serving model with signature 'serving_default'. If
            # using estimator based EvalSavedModel, add signature_name='eval' and
            # remove the label_key. Note, if using a TFLite model, then you must set
            # model_type='tf_lite'.
            tfma.ModelSpec(label_key=_LABEL_KEY)
        ],
        metrics_specs=[
            tfma.MetricsSpec(
                # The metrics added here are in addition to those saved with the
                # model (assuming either a keras model or EvalSavedModel is used).
                # Any metrics added into the saved model (for example using
                # model.compile(..., metrics=[...]), etc) will be computed
                # automatically.
                metrics=[
                    tfma.MetricConfig(class_name='ExampleCount'),
                    tfma.MetricConfig(
                        class_name='MeanAbsoluteError',
                        threshold=tfma.MetricThreshold(
                            value_threshold=tfma.GenericValueThreshold(
                                lower_bound={'value': 0.0},
                                upper_bound={'value': 2.0}),
                            change_threshold=tfma.GenericChangeThreshold(
                                direction=tfma.MetricDirection.LOWER_IS_BETTER,
                                relative={'value': 1.2})))
                ]
            )
        ],
        slicing_specs=[
            # An empty slice spec means the overall slice, i.e. the whole dataset.
            tfma.SlicingSpec(),
            # Data can be sliced along a feature column.
        ])

    # The following component is experimental and may change in the future. This is
    # required to specify the latest blessed model will be used as the baseline.
    model_resolver = tfx.dsl.Resolver(
          strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
          model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
          model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing)
    ).with_id('latest_blessing_demo_1')
    
    model_analyzer = tfx.components.Evaluator(
          examples=example_gen.outputs['examples'],
          model=trainer.outputs['model'],
          baseline_model=model_resolver.outputs['model'],
          # Change threshold will be ignored if there is no baseline (first run).
          eval_config=eval_config).with_id('evaluator_demo_1')
    
    # NEW: Configuration for pusher.
    vertex_serving_spec = {
        'project_id': project_id,
        'endpoint_name': endpoint_name,
        'machine_type': 'n1-standard-2',
    }

    # Vertex AI provides pre-built containers with various configurations for
    # serving.
    # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
    # for available container images.
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest'
    
    # NEW: Pushes the model to Vertex AI.
    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
        model=trainer.outputs['model'],
            custom_config={
                tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
                    True,
                tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
                    region,
                tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
                    serving_image,
                tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
                    vertex_serving_spec,
      }).with_id('pusher_demo_1')
    
    components = [
        example_gen,
        stats_gen,
        schema_gen,
        transform,
        tuner,
        trainer,
        model_resolver,
        model_analyzer,
        pusher
    ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        beam_pipeline_args=beam_pipeline_args)

2024-08-18 15:57:47.363523: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-08-18 15:57:47.365153: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-08-18 15:57:47.393531: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-08-18 15:57:47.393565: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-08-18 15:57:47.394531: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to

Create pipeline JSON.

In [9]:
from datetime import datetime
now = datetime.now()
MODEL_VERSION_NAME = 'chicago-taxi-fare-model-v' + now.strftime("%d%m%Y-%H%M%S")

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = f"{PIPELINE_NAME}.json"

_trainer_module_file = 'chicago_taxi_trainer.py'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
))

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying chicago_taxi_trainer.py -> build/lib
installing to /tmp/tmpinl8qtko
running install
running install_lib
copying build/lib/chicago_taxi_trainer.py -> /tmp/tmpinl8qtko
running install_egg_info
running egg_info
creating tfx_user_code_transform_demo_1.egg-info
writing tfx_user_code_transform_demo_1.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_transform_demo_1.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_transform_demo_1.egg-info/top_level.txt
writing manifest file 'tfx_user_code_transform_demo_1.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_transform_demo_1.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_transform_demo_1.egg-info/SOURCES.txt'
Copying tfx_user_code_transform_demo_1.egg-info to /tmp/tmpinl8qtko/tfx_user_code_transform_demo_1-0.0+7bdeb48a4fd5b533b979d648c5ec8ec517763be5915017b1dc6d30197bfa54e3-py3.10.egg-info
running ins



running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying chicago_taxi_trainer.py -> build/lib
installing to /tmp/tmpr2qhimm3
running install
running install_lib
copying build/lib/chicago_taxi_trainer.py -> /tmp/tmpr2qhimm3
running install_egg_info
running egg_info
creating tfx_user_code_tuner_demo_1.egg-info
writing tfx_user_code_tuner_demo_1.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_tuner_demo_1.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_tuner_demo_1.egg-info/top_level.txt
writing manifest file 'tfx_user_code_tuner_demo_1.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_tuner_demo_1.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_tuner_demo_1.egg-info/SOURCES.txt'
Copying tfx_user_code_tuner_demo_1.egg-info to /tmp/tmpr2qhimm3/tfx_user_code_tuner_demo_1-0.0+7bdeb48a4fd5b533b979d648c5ec8ec517763be5915017b1dc6d30197bfa54e3-py3.10.egg-info
running install_scripts
creating /tmp/tmpr2qhim



running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying chicago_taxi_trainer.py -> build/lib
installing to /tmp/tmpgub7helz
running install
running install_lib
copying build/lib/chicago_taxi_trainer.py -> /tmp/tmpgub7helz
running install_egg_info
running egg_info
creating tfx_user_code_trainer_demo_1.egg-info
writing tfx_user_code_trainer_demo_1.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_trainer_demo_1.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_trainer_demo_1.egg-info/top_level.txt
writing manifest file 'tfx_user_code_trainer_demo_1.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_trainer_demo_1.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_trainer_demo_1.egg-info/SOURCES.txt'
Copying tfx_user_code_trainer_demo_1.egg-info to /tmp/tmpgub7helz/tfx_user_code_trainer_demo_1-0.0+7bdeb48a4fd5b533b979d648c5ec8ec517763be5915017b1dc6d30197bfa54e3-py3.10.egg-info
running install_scripts
creat



## Step 5: Execute pipeline on Vertex AI

In [10]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging

logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/502688298240/locations/us-east1/pipelineJobs/demo-1-pipeline-v3-20240818155804
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/502688298240/locations/us-east1/pipelineJobs/demo-1-pipeline-v3-20240818155804')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east1/pipelines/runs/demo-1-pipeline-v3-20240818155804?project=502688298240
