# BQML Model to Vertex AI Endpoint Kubeflow Pipeline using Custom Components

In [None]:
! pip install --upgrade kfp google-cloud-bigquery google-cloud-aiplatform google-cloud-storage

## Set the Prereqs

In [None]:
from kfp import dsl, compiler
from kfp.dsl import component

PROJECT_ID = ! gcloud config get-value project
PROJECT_ID = PROJECT_ID[0]

# define project information manually if the above code didn't work
if PROJECT_ID == "(unset)":
  PROJECT_ID = "[your-project-id]" # @param {type:"string"}

print(PROJECT_ID)

# Make sure the BigQuery Dataset and Table exists for the
# MPG example.
BQ_DATASET = "mpg_dataset" # @param {type:"string"}
BQ_TABLE = "mpg" # @param {type:"string"}
BQ_MODEL = "mpg_model" # @param {type:"string"}
BQ_LOCATION = "US"  # @param {type:"string"}

# Vertex AI Constants
REGION = "us-central1"  # @param {type:"string"}
MODEL_DISPLAY_NAME = "mpg_model_vertex" # @param {type:"string"}
ENDPOINT_DISPLAY_NAME = "mpg_endpoint" # @param {type:"string"}

# Ensure this bucket exists
BUCKET_NAME = f"{PROJECT_ID}-mpg-model"


## KFP Pipeline Components

These are lightweight Python KFP components. They are just Python functions with the @component decorator.

Notice, in the decorator the Docker image used to run the function and Python packages required are specified.

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-bigquery",
    ]
)
def create_bq_model(
    project_id: str,
    dataset: str,
    table: str,
    bq_model: str,
    location: str = "US"
) -> str:
    """
    Creates or replaces a BigQuery ML model using the specified dataset and table.
    Returns the full path of the created model: "project.dataset.model".
    """
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)
    query = f"""
    CREATE OR REPLACE MODEL `{project_id}.{dataset}.{bq_model}`
    OPTIONS(model_type='LINEAR_REG', input_label_cols=['MPG']) AS
    SELECT
        Cylinders,
        Displacement,
        Horsepower,
        Weight,
        Acceleration,
        Model_Year,
        Origin,
        MPG
    FROM `{project_id}.{dataset}.{table}`
    WHERE MPG IS NOT NULL;
    """
    job = client.query(query)
    job.result()  # Wait for query to complete

    model_path = f"{project_id}.{dataset}.{bq_model}"
    print(f"BigQuery ML Model created successfully at: {model_path}")
    return model_path


@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-storage"
    ]
)
def export_bq_model(
    project_id: str,
    model_path: str,
    bucket_name: str
) -> str:
    """
    Exports the BigQuery ML model artifacts to GCS under the specified bucket.
    """
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)
    # The last element of model_path "<project>.<dataset>.<model>" is the model name
    model_name = model_path.split('.')[-1]

    export_query = f"""
    EXPORT MODEL `{model_path}`
    OPTIONS (URI='gs://{bucket_name}/{model_name}/');
    """
    job = client.query(export_query)
    job.result()  # Wait for query to complete

    export_uri = f"gs://{bucket_name}/{model_name}/"
    print(f"Model exported successfully to: {export_uri}")
    return export_uri


@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-aiplatform"
    ]
)
def upload_model_to_vertex(
    project_id: str,
    region: str,
    model_display_name: str,
    export_uri: str
) -> str:
    """
    Uploads the exported BigQuery ML model artifacts into Vertex AI Model Registry.
    Returns the Vertex AI Model resource name.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project_id, location=region)

    model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=export_uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
    )
    print(f"Model registered in Vertex AI: {model.resource_name}")
    return model.resource_name


@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-aiplatform"
    ]
)
def create_endpoint(
    project_id: str,
    region: str,
    endpoint_display_name: str
) -> str:
    """
    Creates a new endpoint in Vertex AI. Returns the endpoint resource name.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project_id, location=region)

    endpoint = aiplatform.Endpoint.create(display_name=endpoint_display_name)
    print(f"Endpoint created: {endpoint.resource_name}")
    return endpoint.resource_name


@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-aiplatform"
    ]
)
def deploy_model_to_endpoint(
    project_id: str,
    region: str,
    model_resource_name: str,
    endpoint_resource_name: str
) -> str:
    """
    Deploys the model to the given endpoint in Vertex AI.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project_id, location=region)

    model = aiplatform.Model(model_resource_name)
    endpoint = aiplatform.Endpoint(endpoint_resource_name)

    deployed_model = model.deploy(
        endpoint=endpoint,
        machine_type="n1-standard-4",
        traffic_percentage=100,
    )

    print(f"Model deployed to endpoint: {deployed_model.resource_name}")
    return deployed_model.resource_name


## Define the Pipeline

The pipeline executes tasks which are the components defined above.

Notice, the output from the components is available to tasks defined in later steps.

In [None]:
@dsl.pipeline(name="bqml-to-vertex-pipeline")
def bqml_to_vertex_pipeline(
    project_id: str,
    region: str,
    dataset: str,
    table: str,
    bq_model: str,
    bucket_name: str,
    model_display_name: str,
    endpoint_display_name: str,
    location: str = "US"
):
    # 1) Create the BigQuery ML model
    create_model_task = create_bq_model(
        project_id=project_id,
        dataset=dataset,
        table=table,
        bq_model=bq_model,
        location=location
    )

    # 2) Export the BigQuery ML model to GCS
    export_model_task = export_bq_model(
        project_id=project_id,
        model_path=create_model_task.output,
        bucket_name=bucket_name
    )

    # 3) Upload the exported model artifacts to Vertex AI
    upload_model_task = upload_model_to_vertex(
        project_id=project_id,
        region=region,
        model_display_name=model_display_name,
        export_uri=export_model_task.output
    )

    # 4) Create a Vertex AI endpoint
    create_endpoint_task = create_endpoint(
        project_id=project_id,
        region=region,
        endpoint_display_name=endpoint_display_name
    )

    # 5) Deploy the model to the endpoint
    deploy_model_task = deploy_model_to_endpoint(
        project_id=project_id,
        region=region,
        model_resource_name=upload_model_task.output,
        endpoint_resource_name=create_endpoint_task.output
    )


## Compile the Pipeline as JSON

In [None]:
from kfp import compiler

pipeline_filename = "bqml-vertex-custom-components.json"
compiler.Compiler().compile(
    pipeline_func=bqml_to_vertex_pipeline,
    package_path=pipeline_filename
)

print(f"Pipeline compiled to {pipeline_filename}")


## Run the Kubeflow pipeline on Vertex AI Pipelines

In [None]:
import kfp
from google.cloud.aiplatform import PipelineJob

# Create a PipelineJob and submit
job = PipelineJob(
    display_name="bqml-vertex-job",
    template_path=pipeline_filename,  # The JSON artifact produced by compilation
    pipeline_root=f"gs://{BUCKET_NAME}/pipeline_root",  # Where Vertex AI will store pipeline artifacts
    parameter_values={
        'project_id': PROJECT_ID,
        'region': REGION,
        'dataset': BQ_DATASET,
        'table': BQ_TABLE,
        'bq_model': BQ_MODEL,
        'bucket_name': BUCKET_NAME,
        'model_display_name': MODEL_DISPLAY_NAME,
        'endpoint_display_name': ENDPOINT_DISPLAY_NAME,
        'location': BQ_LOCATION
    },
    enable_caching=False,  # Need to turn caching off or the coin flip is the same everytime
)

# Note: If caching is true, but the parameter change the task is re-run.
# If caching is true, but the parameters don't change, tasks are not re-run.

# Runs the job and waits for it to finish
# job.run()

# Submits the job to Vertex AI Pipelines and completes immediately
job.submit()