In [1]:
! pip3 install --upgrade --user google-cloud-aiplatform \
                                google-cloud-pipeline-components \
                                kfp

Collecting google-cloud-pipeline-components
  Downloading google_cloud_pipeline_components-2.14.1-py3-none-any.whl.metadata (5.9 kB)
Collecting kfp
  Downloading kfp-2.7.0.tar.gz (441 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m441.8/441.8 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting kfp-pipeline-spec==0.3.0 (from kfp)
  Downloading kfp_pipeline_spec-0.3.0-py3-none-any.whl.metadata (329 bytes)
Collecting protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5 (from google-cloud-aiplatform)
  Downloading protobuf-4.25.3-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
INFO: pip is looking at multiple versions of google-api-core to determine which version is compatible with other requirements. This could take a while.
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1 (from goo

In [1]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

In [1]:
from dotenv import load_dotenv, dotenv_values

load_dotenv(dotenv_path="variables.env")
# env_vars = dotenv_values("variables.env")

True

In [2]:
import os

_project = !gcloud config list project --format "value(core.project)"
PROJECT = _project[0]

LOCATION = os.environ["LOCATION"]

# bucket names
DATA_BUCKET_NAME = os.environ["DATA_BUCKET_NAME"]
PROCESSED_DATA_SAVE_BUCKET_NAME = os.environ["PROCESSED_DATA_SAVE_BUCKET_NAME"]
PROCESSED_DATA_BUCKET_NAME = os.environ["PROCESSED_DATA_BUCKET_NAME"]
NEW_TRAIN_DATA_BUCKET_NAME = os.environ["NEW_TRAIN_DATA_BUCKET_NAME"]
VALID_DATA_BUCKET_NAME = os.environ["VALID_DATA_BUCKET_NAME"]

MODEL_BUCKET_NAME = os.environ["MODEL_BUCKET_NAME"]

# bucket uris
DATA_BUCKET_URI = f"gs://{PROJECT}-{DATA_BUCKET_NAME}"
PROCESSED_DATA_SAVE_BUCKET_URI = f"gs://{PROJECT}-{PROCESSED_DATA_SAVE_BUCKET_NAME}"
PROCESSED_DATA_BUCKET_URI = f"gs://{PROJECT}-{PROCESSED_DATA_BUCKET_NAME}"
NEW_TRAIN_DATA_BUCKET_URI = f"gs://{PROJECT}-{NEW_TRAIN_DATA_BUCKET_NAME}"
VALID_DATA_BUCKET_URI = f"gs://{PROJECT}-{VALID_DATA_BUCKET_NAME}"

MODEL_BUCKET_URI = f"gs://{PROJECT}-{MODEL_BUCKET_NAME}"

# gar repo name
REPO_NAME = os.environ["REPO_NAME"]

# docker image names
DATA_VALIDATION_IMAGE_NAME = os.environ["DATA_VALIDATION_IMAGE_NAME"]
DATA_PROCESSING_IMAGE_NAME = os.environ["DATA_PROCESSING_IMAGE_NAME"]
HP_TUNING_IMAGE_NAME = os.environ["HP_TUNING_IMAGE_NAME"]
TRAINING_IMAGE_NAME = os.environ["TRAINING_IMAGE_NAME"]
FINE_TUNING_IMAGE_NAME = os.environ["FINE_TUNING_IMAGE_NAME"]

# docker images
DATA_VALIDATION_IMAGE = f"{LOCATION}-docker.pkg.dev/{PROJECT}/{REPO_NAME}/{DATA_VALIDATION_IMAGE_NAME}:latest"
DATA_PROCESSING_IMAGE = f"{LOCATION}-docker.pkg.dev/{PROJECT}/{REPO_NAME}/{DATA_PROCESSING_IMAGE_NAME}:latest"
HP_TUNING_IMAGE = f"{LOCATION}-docker.pkg.dev/{PROJECT}/{REPO_NAME}/{HP_TUNING_IMAGE_NAME}:latest"
TRAINING_IMAGE = f"{LOCATION}-docker.pkg.dev/{PROJECT}/{REPO_NAME}/{TRAINING_IMAGE_NAME}:latest"
FINE_TUNING_IMAGE = f"{LOCATION}-docker.pkg.dev/{PROJECT}/{REPO_NAME}/{FINE_TUNING_IMAGE_NAME}:latest"

In [3]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import compiler, dsl
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1 import custom_job, endpoint, model

API_ENDPOINT = "{}-aiplatform.googleapis.com".format(LOCATION)

In [4]:
PIPELINE_BUCKET_NAME = os.environ["PIPELINE_BUCKET_NAME"]
PIPELINE_BUCKET_URI = f"gs://{PROJECT}-{PIPELINE_BUCKET_NAME}"

aip.init(project=PROJECT, staging_bucket=PIPELINE_BUCKET_URI)

In [5]:
@dsl.component(base_image="python:3.11.8-slim-bookworm",
               packages_to_install=["google-cloud-aiplatform"])
def data_validation(
    data_bucket_uri: str,
    data_validation_image: str,
    staging_bucket_uri: str,
    location: str,
    project: str
):

    from google.cloud import aiplatform
    
    data_validation_container_env_variables = [{ "name": "data_bucket_ur", "value": data_bucket_uri }]
    
    data_validation_worker_pool_specs = [
        { "container_spec": { "image_uri": data_validation_image, 
                              "env": data_validation_container_env_variables },         
          "replica_count": 1,                            
          "machine_spec": { "machine_type": "e2-highmem-2" } }
    ]
    
    data_validation_job = aiplatform.CustomJob(
        display_name="data-validation-component",                     
        location=location,                       
        project=project,
        worker_pool_specs=data_validation_worker_pool_specs,     
        staging_bucket=staging_bucket_uri,
    )
    
    data_validation_job.run(timeout=300)

In [6]:
@dsl.component(base_image="python:3.11.8-slim-bookworm",
               packages_to_install=["google-cloud-aiplatform"])
def data_processing(
    data_bucket_uri: str,
    processed_data_save_bucket_uri: str,
    fraction_for_valid_and_test_data: str,
    data_processing_image: str,
    staging_bucket_uri: str,
    location: str,
    project: str
):

    from google.cloud import aiplatform
    
    data_processing_container_env_variables = [
        { "name": "data_bucket_uri", "value": data_bucket_uri },
        { "name": "processed_data_save_bucket", "value": processed_data_save_bucket_uri },
        { "name": "fraction_for_valid_and_test_data", "value": fraction_for_valid_and_test_data }
    ]
    
    data_processing_worker_pool_specs = [
        { "container_spec": { "image_uri": data_processing_image,
                              "env": data_processing_container_env_variables },
          "replica_count": 1,
          "machine_spec": { "machine_type": "e2-highmem-2" } }
    ]
    
    data_processing_job = aiplatform.CustomJob(
        display_name="data-processing-component",                                
        location=location,
        project=project,                        
        worker_pool_specs=data_processing_worker_pool_specs,
        staging_bucket=staging_bucket_uri
    )
    
    data_processing_job.run(timeout=300)

In [7]:
@dsl.component(base_image="python:3.11.8-slim-bookworm",
               packages_to_install=["google-cloud-aiplatform"])
def hyperparameter_tuning(
    max_trials: str, 
    hp_epochs: str,
    hp_tuning_image: str,
    staging_bucket_uri: str,
    location: str,
    project: str
):

    from google.cloud import aiplatform

    hp_tuning_container_env_variables = [
        { "name": "learning_rating", "value": "0.0001" },
        { "name": "number_of_layers", "value": "4" },
        { "name": "max_trials", "value": max_trials },
        { "name": "epochs", "value": hp_epochs }
    ]

    hpt_worker_pool_specs = [
        { "container_spec": { "image_uri": hp_tuning_image, "env": hp_tuning_container_env_variables },
          "replica_count": 1,
          "machine_spec": { "machine_type": "e2-highmem-2" } }
    ]

    hp_tuning_job = aiplatform.CustomJob(
        display_name="hyperparameter-tuning-component",                          
        location=location,
        project=project,
        worker_pool_specs=hpt_worker_pool_specs,
        staging_bucket=staging_bucket_uri
    )

    hp_tuning_job.run(timeout=1000)

In [8]:
@dsl.component(base_image="python:3.11.8-slim-bookworm",
               packages_to_install=["google-cloud-aiplatform"])
def model_training(
    train_epochs: str,
    training_image: str,
    staging_bucket_uri: str,
    location: str,
    project: str
):
    
    from google.cloud import aiplatform

    training_container_env_variables = [{ "name": "epochs", "value": train_epochs }]
    
    training_worker_pool_specs = [
        { "container_spec": { "image_uri": training_image, "env": training_container_env_variables },
          "replica_count": 1,
          "machine_spec": { "machine_type": "e2-highmem-2" } }
    ]
    
    training_job = aiplatform.CustomJob(
        display_name="training-component",
        location=location,
        project=project,
        worker_pool_specs=training_worker_pool_specs,
        staging_bucket=staging_bucket_uri
    )
    
    training_job.run(timeout=600)

In [9]:
from typing import NamedTuple

@dsl.component(base_image="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-13.py310:latest",
               packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def eval_and_deploy_decision(
    model_bucket_uri: str,
    valid_data_bucket_uri: str,
    mae_threshold: float
) -> NamedTuple("Outputs", [("deploy_decision", bool)]):

    import tensorflow as tf
    import pandas as pd
    import numpy as np
    
    print("Attempting to load trained and saved model ...")
    saved_model_path = model_bucket_uri + "/model_artifacts"
    
    print("saved_model_path:", saved_model_path)
    loaded_model_artifacts = tf.saved_model.load(export_dir=saved_model_path)
    print("Model successfully loaded")
    
    model = loaded_model_artifacts.signatures["serving_default"]
    print("serving function successfully loaded")
    
    valid_data_path = valid_data_bucket_uri + "/valid.csv"
    print("Attempting to read the data at:", valid_data_path)
    
    x_valid = pd.read_csv(valid_data_path)
    y_valid = x_valid.pop("log_price")
    
    print("Read successful")

    valid_float_inputs = [list(row) for row in x_valid.values[:, :12]]
    valid_string_inputs = [list(row) for row in x_valid.values[:, 12:]]

    predictions_dict = model(float_inputs=valid_float_inputs, string_inputs=valid_string_inputs)

    key = list(predictions_dict.keys())[0]
    
    predictions_valid = tf.squeeze(predictions_dict[key], axis=-1)
    log_predictions_valid = tf.math.log(predictions_valid)
    
    log_y_true_valid = tf.constant(y_valid, dtype=tf.float32)
    
    mean_absolute_error = tf.math.reduce_mean(tf.abs(log_predictions_valid - log_y_true_valid))

    print("mean_absolute_error:", mean_absolute_error)
    
    if mean_absolute_error.numpy() < mae_threshold:
        deploy_decision = True
    else:
        deploy_decision = False
        
    return (deploy_decision,)

In [10]:
def model_deployment_components(deploy_decision_task):
    
    # generic component for putting saved model artifacts into a kubeflow pipeline component
    model_save_uri = MODEL_BUCKET_URI + "/model_artifacts"
    serving_image = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest"
    
    import_unmanaged_model_task = dsl.importer(artifact_uri=model_save_uri,
                                               artifact_class=artifact_types.UnmanagedContainerModel,
                                               metadata={ "containerSpec": { "imageUri": serving_image } })
 
    import_unmanaged_model_task.after(deploy_decision_task)
    
    # component for uploading a model to Vertex AI model registry
    model_upload_task = model.ModelUploadOp(project=PROJECT,
                                            display_name="used-cars-model",
                                            location=LOCATION,
                                            unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"])
    
    model_upload_task.after(import_unmanaged_model_task)
    
    # component for creating the model serving endpoint on Vertex AI
    create_endpoint_task = endpoint.EndpointCreateOp(project=PROJECT,
                                                     location=LOCATION,
                                                     display_name="used-cars-model-inference-endpoint")
    
    # component for deploying the model on Vertex AI
    endpoint.ModelDeployOp(model=model_upload_task.outputs["model"],
                           endpoint=create_endpoint_task.outputs["endpoint"],
                           dedicated_resources_min_replica_count=1,
                           dedicated_resources_max_replica_count=1,
                           dedicated_resources_machine_type="e2-standard-2")

In [11]:
@dsl.component(base_image="python:3.11.8-slim-bookworm",
               packages_to_install=["google-cloud-aiplatform"])
def model_fine_tuning(
    fine_tuning_epochs: str,
    fine_tuning_learning_rate: str, 
    new_train_data_bucket_uri: str,
    valid_data_bucket_uri: str,
    fine_tuning_image: str,
    staging_bucket_uri: str,
    location: str,
    project: str
):
    
    from google.cloud import aiplatform
    
    fine_tuning_container_env_variables = [
        { "name": "epochs", "value": fine_tuning_epochs },
        { "name": "learning_rate", "value": fine_tuning_learning_rate },
        { "name": "new_train_data_bucket", "value": new_train_data_bucket_uri },
        { "name": "valid_data_bucket", "value": valid_data_bucket_uri }
    ]
    
    fine_tuning_worker_pool_specs = [
        { "container_spec": { "image_uri": fine_tuning_image,
                             "env": fine_tuning_container_env_variables },
          "replica_count": 1,
          "machine_spec": { "machine_type": "e2-highmem-2" } }
    ]
    
    fine_tuning_job = aiplatform.CustomJob(
        display_name="fine-tuning-component",
        location=location,
        project=project,
        worker_pool_specs=fine_tuning_worker_pool_specs,
        staging_bucket=staging_bucket_uri
    )
    
    fine_tuning_job.run(timeout=600)

In [12]:
@dsl.component(base_image="python:3.11")
def model_evaluation_failed(failure_message: str):
    print(failure_message)

In [13]:
from typing import Dict

@dsl.pipeline(name="kubeflow-pipeline-used-cars",
              description="A kubeflow pipeline for used cars project",
              pipeline_root=PIPELINE_BUCKET_URI)
def pipeline_function(
    first_time_training: bool = False,
    location: str = LOCATION,
    project: str = PROJECT,
    staging_bucket_uri: str = PIPELINE_BUCKET_URI,
    data_bucket_uri: str = DATA_BUCKET_URI,
    processed_data_save_bucket_uri: str = PROCESSED_DATA_SAVE_BUCKET_URI,
    processed_data_bucket_uri: str = PROCESSED_DATA_BUCKET_URI,
    new_train_data_bucket_uri: str = NEW_TRAIN_DATA_BUCKET_URI,
    valid_data_bucket_uri: str = VALID_DATA_BUCKET_URI,
    model_bucket_uri: str = MODEL_BUCKET_URI,
    data_validation_image: str = DATA_VALIDATION_IMAGE,
    data_processing_image: str = DATA_PROCESSING_IMAGE,
    hp_tuning_image: str = HP_TUNING_IMAGE,
    training_image: str = TRAINING_IMAGE,
    fine_tuning_image: str = FINE_TUNING_IMAGE,
    fraction_for_valid_and_test_data: str = "0.2",
    max_trials: str = "3",
    hp_epochs: str = "1",
    train_epochs: str = "5",
    mae_threshold: float = 5.0,
    fine_tuning_epochs: str = "3",
    fine_tuning_learning_rate: str = "0.0005",
):

    data_validation_task = data_validation(
        data_bucket_uri=data_bucket_uri,
        data_validation_image=data_validation_image,
        staging_bucket_uri=staging_bucket_uri,
        location=location,
        project=project
    )
    
    data_processing_task = data_processing(
        data_bucket_uri=data_bucket_uri,
        processed_data_save_bucket_uri=processed_data_save_bucket_uri,
        fraction_for_valid_and_test_data=fraction_for_valid_and_test_data,
        data_processing_image=data_processing_image,
        staging_bucket_uri=staging_bucket_uri,
        location=location,
        project=project
    )

    data_processing_task.after(data_validation_task)
    
    with dsl.If(first_time_training == True, name="training-from-scratch"):
        
        hp_tuning_task = hyperparameter_tuning(
            max_trials=max_trials, 
            hp_epochs=hp_epochs,
            hp_tuning_image=hp_tuning_image,
            staging_bucket_uri=staging_bucket_uri,
            location=location,
            project=project
        )
        
        hp_tuning_task.after(data_processing_task)
        
        training_task = model_training(
            train_epochs=train_epochs,
            training_image=training_image,
            staging_bucket_uri=staging_bucket_uri,
            location=location,
            project=project
        )
        
        training_task.after(hp_tuning_task)

        deploy_decision_task = eval_and_deploy_decision(
            model_bucket_uri=model_bucket_uri,
            valid_data_bucket_uri=valid_data_bucket_uri,
            mae_threshold=mae_threshold
        )
        
        deploy_decision_task.after(training_task)
        
        with dsl.If(deploy_decision_task.outputs["deploy_decision"] == True, name="deploy decision: YES"):
            
            model_deployment_components(deploy_decision_task)
            
        with dsl.Else(name="deploy decision: NO"):
            
            failure_message = "Model's hyperparameters were tuned and trained, but still was not performant enough and failed its evaluation"
            model_eval_failed_task = model_evaluation_failed(failure_message=failure_message)
            
            model_eval_failed_task.after(deploy_decision_task)
   
    with dsl.Else(name="model-fine-tuning"):
        
        fine_tuning_task = model_fine_tuning(
            fine_tuning_epochs=fine_tuning_epochs,
            fine_tuning_learning_rate=fine_tuning_learning_rate,
            new_train_data_bucket_uri=new_train_data_bucket_uri,
            valid_data_bucket_uri=valid_data_bucket_uri,
            fine_tuning_image=fine_tuning_image,
            staging_bucket_uri=staging_bucket_uri,
            location=location,
            project=project
        )
        
        fine_tuning_task.after(data_processing_task)

        deploy_decision_task = eval_and_deploy_decision(
            model_bucket_uri=model_bucket_uri,
            valid_data_bucket_uri=valid_data_bucket_uri,
            mae_threshold=mae_threshold
        )
        
        deploy_decision_task.after(fine_tuning_task)
        
        with dsl.If(deploy_decision_task.outputs["deploy_decision"] == True, name="deploy decision: YES"):
            
            model_deployment_components(deploy_decision_task)
            
        with dsl.Else(name="deploy decision: NO"):
            
            failure_message = "Previously trained model was fine tuned but the fine tuned model was not as performant as the previous model"
            model_eval_failed_task = model_evaluation_failed(failure_message=failure_message)
            
            model_eval_failed_task.after(deploy_decision_task)

In [14]:
compiler.Compiler().compile(pipeline_func=pipeline_function, package_path="used_cars_kubeflow_pipeline.yaml")

In [15]:
parameter_values = { 
    "first_time_training": True,
    "location": LOCATION,
    "project": PROJECT,
    "staging_bucket_uri": PIPELINE_BUCKET_URI,
    "data_bucket_uri": DATA_BUCKET_URI,
    "processed_data_save_bucket_uri": PROCESSED_DATA_SAVE_BUCKET_URI,
    "processed_data_bucket_uri": PROCESSED_DATA_BUCKET_URI,
    "new_train_data_bucket_uri": NEW_TRAIN_DATA_BUCKET_URI,
    "valid_data_bucket_uri": VALID_DATA_BUCKET_URI,
    "model_bucket_uri": MODEL_BUCKET_URI,
    "data_validation_image": DATA_VALIDATION_IMAGE,
    "data_processing_image": DATA_PROCESSING_IMAGE,
    "hp_tuning_image": HP_TUNING_IMAGE,
    "training_image": TRAINING_IMAGE,
    "fine_tuning_image": FINE_TUNING_IMAGE,
    "fraction_for_valid_and_test_data": "0.2",
    "max_trials": "3",
    "hp_epochs": "5",
    "train_epochs": "25",
    "mae_threshold": 5.0,
    "fine_tuning_epochs": "3",
    "fine_tuning_learning_rate": "0.0005"
}

In [16]:
job = aip.PipelineJob(
    display_name="used_cars_kubeflow_pipeline_job",
    template_path="used_cars_kubeflow_pipeline.yaml",
    pipeline_root=PIPELINE_BUCKET_URI,
    enable_caching=False,
    parameter_values=parameter_values
)
                                    
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/825348564081/locations/us-central1/pipelineJobs/kubeflow-pipeline-used-cars-20240519035654
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/825348564081/locations/us-central1/pipelineJobs/kubeflow-pipeline-used-cars-20240519035654')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kubeflow-pipeline-used-cars-20240519035654?project=825348564081
PipelineJob projects/825348564081/locations/us-central1/pipelineJobs/kubeflow-pipeline-used-cars-20240519035654 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/825348564081/locations/us-central1/pipelineJobs/kubeflow-pipeline-used-cars-20240519035654 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/825348564081/locations/us-central1/pipelineJobs/kubeflow-pipeline-used-cars-20240519035654 current state:
PipelineState.PIPELINE_STATE_RUNNING

In [17]:
from google.cloud import aiplatform

PROJECT_ID = 825348564081
ENDPOINT_ID = 7919654711521705984

endpoint = aiplatform.Endpoint(
    endpoint_name=f"projects/{PROJECT_ID}/locations/{LOCATION}/endpoints/{ENDPOINT_ID}"
)

instances = [
      { 
        "float_inputs": [5.0, 20.0, 95.0, 2021.0, 5.0, 5.0, 4.9, 4.8, 4.8, 5.0, 30.28433366877628, 23.5],
        "string_inputs": ['Honda', 'Front-wheel Drive', 'None reported', ' Yes', ' Yes', ' No', 'At least 1 recall'] 
      },
      {
        "float_inputs": [3.5, 6.0, 50.0, 2017.0, 3.7, 4.2, 3.3, 3.3, 4.0, 3.2, 46.48263119767776, 24.0],
        "string_inputs": ['RAM', 'Front-wheel Drive', 'None reported', ' No', ' No', ' Unknown', 'Unknown']
      },
      {
        "float_inputs": [4.8, 277.0, 94.0, 2020.0, 4.9, 4.8, 4.8, 4.8, 4.9, 4.9, 29.127461156199114, 29.5],
        "string_inputs": ['Honda', 'All-wheel Drive', 'None reported', ' Yes', ' Yes', ' Unknown', 'Unknown']
      }
]

endpoint.predict(instances)

Prediction(predictions=[[34733.9844], [16445.4746], [23626.3965]], deployed_model_id='4540917052017213440', metadata=None, model_version_id='1', model_resource_name='projects/825348564081/locations/us-east1/models/5816917888237305856', explanations=None)