In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Build & Experiment using Vertex AI Pipeline

This notebook showcases an end-to-end Machine Learning Operations (MLOps) pipeline built with Vertex AI, leveraging the Ames Iowa Housing data. It serves as a pattern for rapid experimentation using notebooks and demonstrates how to develop models within a structured, automated workflow

## Flow
![housing-pipeline.png](./diagrams/housing-pipeline.png)


Documentation: https://cloud.google.com/vertex-ai/docs/pipelines/introduction

### Install Vertex AI SDK for Python and other required packages

In [None]:
! pip3 install --upgrade --user --quiet google-cloud-aiplatform

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
# import sys

# if "google.colab" in sys.modules:

#     import IPython

#     app = IPython.Application.instance()
#     app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
# import sys

# if "google.colab" in sys.modules:

#     from google.colab import auth

#     auth.authenticate_user()

In [None]:
import pandas as pd
import os
import sys
from datetime import datetime
from typing import Tuple
import time
import random
import string
import numpy as np

from typing import NamedTuple

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (
    component,
    pipeline,
    Input,
    Output,
    Dataset,
    Model
)

### Set Google Cloud project information and initialize Vertex AI SDK for Python

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

- `PROJECT_ID`: Google Cloud project ID where Vertex AI resources are deployed
- `LOCATION`: Google Cloud region where the Vertex AI endpoint is located
- `VERSION`: Version tag for Docker serving container image
- `REPO_NAME`: Artifact Registry repository name.
- `JOB_IMAGE_ID`: Docker image name for custom jobs.
- `BUCKET_URI`: Google Cloud Storage bucket URI to store model artifacts and other data


In [None]:
PROJECT_ID = "sandbox-401718"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
VERSION="latest" 
REPO_NAME="housing-poc" # @param {type:"string"}
JOB_IMAGE_ID="housing-poc-image" # @param {type:"string"}

# Create GCS Bucket
BUCKET_URI = f"gs://{PROJECT_ID}-pred-benchmark"  # @param {type:"string"}
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

In [None]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

### Define Components

Components are self-contained, containerized tasks that operate as isolated jobs, each with its own environment and explicit interfaces

- `Import Data Source:`
This initial component represents the pipeline's entry point where raw data, like the Ames Iowa Housing dataset, is ingested. It acts as a placeholder for various data sources, ensuring the pipeline can be fed with the necessary information to begin processing.

- `Feature Transformation:`
Following data import, this crucial step processes and prepares the raw data for model training. It performs tasks such as data cleaning, feature engineering, and splitting the dataset into training and testing subsets (X_train, y_train, X_test, y_test).

- `X_test_dataset / X_train_dataset / y_test_dataset / y_train_dataset:`
These represent the intermediate outputs of the Feature Transformation step. They are structured datasets (system.Dataset) containing the partitioned feature (X) and target (y) sets for both training and testing, ensuring data readiness for the subsequent model training phase.

- `Model Training:`
This core component takes the prepared training and testing datasets to build and validate a machine learning model. It typically involves fitting an algorithm to the training data and evaluating its performance before producing a trained model.

- `pickled_model:`
This output artifact from the Model Training component represents the fully trained model in a serialized (pickled) format. It's the tangible result of the training process, ready to be registered or deployed for inference.

- `Register Model to Vertex:`
As the final step in this pipeline, this component takes the trained model and integrates it with the Vertex AI Model Registry. It formalizes the model for MLOps, allowing for versioning, centralized management, and subsequent deployment to prediction endpoints.

In [None]:
# Data Import

@component(
    base_image="us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-6:latest",
    packages_to_install=["google-cloud-aiplatform"],
)
def import_data_op():
    print("Dummy operator to import data from BQ / Cloud Storage")


In [None]:
# Data Transformation

from kfp.dsl import Output, Dataset, component

@component(
    base_image="us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-6:latest",
    packages_to_install=[
        "fsspec",
        "gcsfs",
        "pandas",
    ],
)
def transformation_op(
    project: str,
    location: str,
    X_train_dataset: Output[Dataset],
    X_test_dataset: Output[Dataset],
    y_train_dataset: Output[Dataset],
    y_test_dataset: Output[Dataset],
    current_year: int = 2024,
    test_split_ratio: float = 0.1,
    random_state_seed: int = 42,
):
    """
    Loads Ames housing data, preprocesses features, splits into train/test sets,
    and saves them as separate Dataset artifacts (.npy format).
    """
    # === Imports inside the function ===
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import LabelEncoder

    # === Data Loading ===
    data_url = "https://raw.githubusercontent.com/melindaleung/Ames-Iowa-Housing-Dataset/master/data/ames%20iowa%20housing.csv"
    print(f"Loading data from: {data_url}")
    try:
        df = pd.read_csv(data_url)
        print(f"Data loaded successfully. Shape: {df.shape}")
    except Exception as e:
        print(f"Failed to load data: {e}")
        raise  # Re-raise the exception to fail the component

    # === Feature Selection ===
    features_to_keep = ["LotFrontage", "LotArea", "YearBuilt", "SaleType", "SalePrice"]
    print(f"Selecting features: {features_to_keep}")

    df = df[features_to_keep].copy()

    # === Preprocessing ===
    print("Starting preprocessing...")
    # Impute missing LotFrontage with the mean

    lot_frontage_mean = df["LotFrontage"].mean()
    df["LotFrontage"].fillna(lot_frontage_mean, inplace=True)
    print(f"Imputed 'LotFrontage' NaN with mean: {lot_frontage_mean:.2f}")

    # Log transform LotArea

    df["LotArea"] = np.log1p(df["LotArea"])
    print("Applied log1p transformation to 'LotArea'")

    # Create HouseAge feature

    df["HouseAge"] = current_year - df["YearBuilt"]
    df.drop("YearBuilt", axis=1, inplace=True)
    print(
        f"Created 'HouseAge' using current_year={current_year} and dropped 'YearBuilt'"
    )

    # Label Encode SaleType

    if "SaleType" in df.columns:
        df["SaleType"] = df["SaleType"].astype(
            str
        )  # Convert to string to handle potential NaN/mixed types robustly
        label_encoder = (
            LabelEncoder()
        )  ##################################################
        df["SaleType"] = label_encoder.fit_transform(df["SaleType"])
        print("Label encoded 'SaleType'")
    else:
        print("Warning: 'SaleType' column not found. Skipping encoding.")

    # === Define Features X and Target y ===

    target_column = "SalePrice"
    feature_columns = ["LotFrontage", "LotArea", "HouseAge", "SaleType"]

    # Ensure all expected feature columns exist after preprocessing

    missing_features = [col for col in feature_columns if col not in df.columns]
    if missing_features:
        raise ValueError(
            f"Missing expected feature columns after preprocessing: {missing_features}"
        )
    print(f"Defining features (X) from columns: {feature_columns}")
    print(f"Defining target (y) from column: {target_column}")
    X = df[feature_columns].values  # Convert features to numpy array
    y = df[target_column].values  # Convert target to numpy array

    # === Split into Training and Testing Sets ===
    print(
        f"Splitting data with test_size={test_split_ratio} and random_state={random_state_seed}"
    )
    X_train_np, X_test_np, y_train_np, y_test_np = train_test_split(
        X, y, test_size=test_split_ratio, random_state=random_state_seed
    )

    # === Save Outputs to KFP Provided Paths ===

    print(f"Saving X_train to: {X_train_dataset.path}")
    np.save(X_train_dataset.path, X_train_np)
    # Add metadata (optional but good practice)

    X_train_dataset.metadata["npy_shape"] = X_train_np.shape
    X_train_dataset.metadata["description"] = "Training features"

    print(f"Saving X_test to: {X_test_dataset.path}")
    np.save(X_test_dataset.path, X_test_np)
    X_test_dataset.metadata["npy_shape"] = X_test_np.shape
    X_test_dataset.metadata["description"] = "Test features"

    print(f"Saving y_train to: {y_train_dataset.path}")
    np.save(y_train_dataset.path, y_train_np)
    y_train_dataset.metadata["npy_shape"] = y_train_np.shape
    y_train_dataset.metadata["description"] = "Training target"

    print(f"Saving y_test to: {y_test_dataset.path}")
    np.save(y_test_dataset.path, y_test_np)
    y_test_dataset.metadata["npy_shape"] = y_test_np.shape
    y_test_dataset.metadata["description"] = "Test target"

    print("Transformation component finished successfully.")


In [None]:
# Model Training

TrainingOutputs = NamedTuple(
    "TrainingOutputs",
    [
        ("model_gcs_path", str), # GCS path
        ("pickled_model_output", Output[Model]),
    ],
)

@component(
    base_image="us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-6:latest",
    packages_to_install=["google-cloud-storage", "scikit-learn", "numpy"],
)
def training_op(
    project: str,
    location: str,
    BUCKET_URI: str,
    X_train_dataset: Input[Dataset],
    X_test_dataset: Input[Dataset],
    y_train_dataset: Input[Dataset],
    y_test_dataset: Input[Dataset],
    pickled_model: Output[Model],
    filename: str = 'house_price_model.pkl',
) -> NamedTuple("Outputs", [("model_gcs_path", str),]):

    import numpy as np
    import pickle
    from sklearn.linear_model import LinearRegression
    from google.cloud import storage
    from collections import namedtuple

    print(f"Loading X_train from base path: {X_train_dataset.path}")
    X_train = np.load(X_train_dataset.path + ".npy") # Append .npy

    print(f"Loading X_test from base path: {X_test_dataset.path}")
    X_test = np.load(X_test_dataset.path + ".npy")   # Append .npy

    print(f"Loading y_train from base path: {y_train_dataset.path}")
    y_train = np.load(y_train_dataset.path + ".npy") # Append .npy

    print(f"Loading y_test from base path: {y_test_dataset.path}")
    y_test = np.load(y_test_dataset.path + ".npy")   # Append .npy

    print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
    print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

    model = LinearRegression()
    model.fit(X_train, y_train)

    if X_test.shape[0] > 0:
        predictions = model.predict(X_test[:1])
        print(f"Sample prediction on first test instance: {predictions}")
    else:
        print("Test set is empty, skipping sample prediction.")

    with open(pickled_model.path, 'wb') as f:
        pickle.dump(model, f)
    print(f"Model pickled and saved to KFP managed path: {pickled_model.path}")
    pickled_model.metadata["framework"] = "scikit-learn"
    pickled_model.metadata["model_type"] = "LinearRegression"
    pickled_model.metadata["description"] = "House price prediction model."

    local_upload_filename = filename
    with open(local_upload_filename, 'wb') as f:
        pickle.dump(model, f)

    gcs_model_path_in_bucket = f"housing_models/{filename}"
    bucket_name = BUCKET_URI.replace("gs://", "").split('/')[0]

    print(f"Uploading {local_upload_filename} to gs://{bucket_name}/{gcs_model_path_in_bucket}")
    storage_client = storage.Client(project=project if project else None)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(gcs_model_path_in_bucket)
    blob.upload_from_filename(local_upload_filename)

    uploaded_model_gcs_uri = f"gs://{bucket_name}/{gcs_model_path_in_bucket}"
    print(f"Model additionally uploaded to: {uploaded_model_gcs_uri}")

    Outputs = namedtuple("Outputs", ["model_gcs_path"])
    return Outputs(model_gcs_path=uploaded_model_gcs_uri)

In [None]:
# Register Model

@component(
    base_image="us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-6:latest",
    packages_to_install=["google-cloud-aiplatform", "fsspec", "gcsfs"], # fsspec, gcsfs can be helpful for GCS path handling by aiplatform SDK
)
def model_registry_op(
    project: str,
    location: str,
    model_gcs_uri: str,
    serving_container_image: str, # Full URI of the serving container
    model_display_name: str = "housing-model-registered", # Default display name, can be overridden
) -> NamedTuple("RegistryOutputs", [("model_resource_name", str), ("model_version_id", str)]):
    """
    Uploads a trained model to Vertex AI Model Registry.
    """
    import os
    from google.cloud import aiplatform
    from collections import namedtuple

    aiplatform.init(project=project, location=location)

    # The artifact_uri for Model.upload should be the GCS *directory*
    # containing the model artifact. model_gcs_uri is the path to the file itself.
    upload_artifact_uri = os.path.dirname(model_gcs_uri)

    # aiplatform.Model.upload will create a new model or a new version
    uploaded_model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=upload_artifact_uri, # GCS directory containing the model
        serving_container_image_uri=serving_container_image,
        serving_container_predict_route="/predict", 
        serving_container_health_route="/health",   
        serving_container_ports=[8080],             
        project=project,
        location=location,
    )

    RegistryOutputs = namedtuple("RegistryOutputs", ["model_resource_name", "model_version_id"])
    return RegistryOutputs(
        model_resource_name=uploaded_model.resource_name,
        model_version_id=uploaded_model.version_id
    )


### Define pipeline

In [None]:
import random
import string

# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()

In [None]:
@dsl.pipeline(pipeline_root=BUCKET_URI, name="passage-gen-example")
def pipeline():

    import_data = import_data_op().set_display_name("Import Data Source")

    transformation = transformation_op(
        project=PROJECT_ID,
        location=LOCATION,
    ).after(import_data).set_display_name("Feature Transformation")

    training = training_op(
        project=PROJECT_ID,
        location=LOCATION,
        BUCKET_URI=BUCKET_URI,
        X_train_dataset=transformation.outputs["X_train_dataset"],
        X_test_dataset=transformation.outputs["X_test_dataset"],
        y_train_dataset=transformation.outputs["y_train_dataset"],
        y_test_dataset=transformation.outputs["y_test_dataset"],
    ).after(transformation).set_display_name("Model Training")

    model_registry = model_registry_op(
        project=PROJECT_ID,
        location=LOCATION,
        model_gcs_uri=training.outputs["model_gcs_path"],
        serving_container_image=f"{LOCATION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{JOB_IMAGE_ID}:{VERSION}",
        model_display_name="housing_model"
    ).after(training).set_display_name("Register Model to Vertex AI")

### Run Pipeline

Vertex AI Pipelines lets you automate, monitor, and govern your ML systems in a serverless manner by using ML pipelines to orchestrate your ML workflows.

In [None]:
template_uri = 'pipeline.yaml'
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path=template_uri,
)

job = aiplatform.PipelineJob(
    job_id=f"housing-pipeline-{UUID}",
    display_name="housing-model-poc",
    pipeline_root=os.path.join(BUCKET_URI),
    template_path=template_uri,
    enable_caching=False,
)
job.run(sync=False) 