In [None]:
! pip install --upgrade google-cloud-aiplatform

In [None]:
! pip install google-cloud-pipeline-components==2.14.0

In [None]:
# Environment Variables - Modify accordingly

# Project ID
PROJECT_ID = "PROJECT-ID"
! gcloud config set project {PROJECT_ID}

# Region/Location e.g. "us-central1"
REGION = "us-central1"

# Service Account
SERVICE_ACCOUNT = "YOUR-SA"

# === GCS DATA ===

# Bucket URI for the evaluation tasks e.g. "gs://eval-gcs-test3"
BUCKET_URI = "gs://eval-gcs-test3"

# Dir in bucket for evaluation
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/evaluation_task"

# Bucket URI with the test data
GCS_SOURCE_URI = f"{PIPELINE_ROOT}/test_data_correct_string_format.csv"

# Bucket URI for the result data e.g. "gs://eval-gcs-test3/result/"
GCS_DESTINATION_OUTPUT_URI = PIPELINE_ROOT

# === MODEL DATA ===

# Display name for the Vertex AI Model "my_model"
MODEL_ID_VERSION = "1712582818749480960@1"

# Model Resource Name in Vertex AI. e.g. projects/PROJECT_ID/locations/REGION/models/MODEL_DISPLAY_NAME
# MODEL_RESOURCE_NAME = f"projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID_VERSION}"

# Target column in the TEST dataset. e.g. "target_column"
TARGET = "income_bracket"

# Class labels for classification batch prediction. e.g. ["class1", "class2"]
#CLASS_LABELS = ["\"\"\"<=50K\"\"\"", "\"\"\">50K\"\"\""] -> ["\"\"\"<=50K\"\"\"","\"\"\">50K\"\"\""]
#CLASS_LABELS = ["""<=50K""", """>50K"""] -> ["<=50K", ">50K"]
#CLASS_LABELS = ['"""<=50K"""','""">50K"""'] -> ["\"\"\"<=50K\"\"\"","\"\"\">50K\"\"\""]

# === PIPELINE DATA ===

# Display name for your Vertex AI Pipeline. e.g. ("classification_model_evaluation_pipeline")
PIPELINE_DISPLAY_NAME = ("classification_model_evaluation_pipeline")

# Path where the compiled pipeline needs to be written
PIPELINE_PACKAGE_PATH = "compiled_pipeline.json"

# === DATAFLOW DATA ===

DATAFLOW_SERVICE_ACCOUNT = ""

DATAFLOW_SUBNETWORK = ""

DATAFLOW_USE_PUBLIC_IP = True

In [None]:
# Configure service account
import sys
IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "YOUR-SA"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"
    print("Service Account:", SERVICE_ACCOUNT)

In [None]:
# Add permissions to Service Account to Bucket
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

In [None]:
# Define the Evaluation Pipeline

import kfp
import json

from kfp import dsl

from google_cloud_pipeline_components.v1.model import ModelGetOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationClassificationOp
from google_cloud_pipeline_components._implementation.model_evaluation import EvaluationDataSamplerOp
from google_cloud_pipeline_components._implementation.model_evaluation import ModelImportEvaluationOp
from google_cloud_pipeline_components._implementation.model_evaluation import TargetFieldDataRemoverOp

@dsl.pipeline(name="custom-tabular-classification-evaluation-pipeline-new-version")
def evaluation_custom_tabular_feature_attribution_pipeline(
    project: str,
    location: str,
    model_name: str,
    target_field_name: str,
    gcs_source_input_uris: list,
    gcs_destination_output_uri_prefix: str,
    batch_predict_instances_format: str,
    evaluation_class_names: list,
    batch_predict_predictions_format: str = "jsonl",
    evaluation_prediction_label_column: str = "",
    evaluation_prediction_score_column: str = "prediction",
    enable_caching: bool = False,
    dataflow_service_account: str = "",
    dataflow_subnetwork: str = "",
    dataflow_use_public_ips: bool = True,
    batch_predict_machine_type: str = "n1-standard-4",
    batch_predict_starting_replica_count: int = 5,
    batch_predict_max_replica_count: int = 10,
    #batch_predict_data_sample_size: int = 10000,
):
    # Import the components
    

    # Get the Vertex AI model resource
    # https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.14.0/api/v1/model.html#v1.model.ModelGetOp
    get_model_task = ModelGetOp(
        project=project,
        location=location,
        model_name=model_name
    )

    # Run the data sampling task
    #data_sampler_task = EvaluationDataSamplerOp(
    #    project=project,
    #    location=location,
    #    gcs_source_uris=gcs_source_input_uris,
    #    instances_format=batch_predict_instances_format,
    #    sample_size=batch_predict_data_sample_size,
    #)

    # Run the task to remove the target field from data for batch prediction
    data_splitter_task = TargetFieldDataRemoverOp(
        project=project,
        location=location,
        gcs_source_uris=gcs_source_input_uris,#data_sampler_task.outputs['gcs_output_directory'],
        instances_format=batch_predict_instances_format,
        target_field_name=target_field_name,
    )

    # Run the batch prediction task
    # https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.14.0/api/v1/batch_predict_job.html#v1.batch_predict_job.ModelBatchPredictOp
    batch_predict_task = ModelBatchPredictOp(
        project=project,
        location=location,
        model=get_model_task.outputs["model"],
        job_display_name="model-registry-batch-prediction",
        gcs_source_uris=data_splitter_task.outputs['gcs_output_directory'],
        instances_format=batch_predict_instances_format,
        predictions_format=batch_predict_predictions_format,
        gcs_destination_output_uri_prefix=gcs_destination_output_uri_prefix,
        machine_type=batch_predict_machine_type,
        starting_replica_count=batch_predict_starting_replica_count,
        max_replica_count=batch_predict_max_replica_count,
    )
    
    # Run the evaluation based on prediction type
    # https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.14.0/api/v1/model_evaluation.html#v1.model_evaluation.ModelEvaluationClassificationOp
    eval_task = ModelEvaluationClassificationOp(
        project=project,
        location=location,
        target_field_name=target_field_name,
        model=get_model_task.outputs["model"],
        
        predictions_format=batch_predict_predictions_format,
        predictions_gcs_source=batch_predict_task.outputs["gcs_output_directory"],
        
        ground_truth_format=batch_predict_instances_format,
        ground_truth_gcs_source=gcs_source_input_uris,#data_sampler_task.outputs["gcs_output_directory"],
        
        class_labels=evaluation_class_names,
        prediction_score_column=evaluation_prediction_score_column,
        prediction_label_column=evaluation_prediction_label_column,
        dataflow_service_account=dataflow_service_account,
        dataflow_subnetwork=dataflow_subnetwork,
        dataflow_use_public_ips=dataflow_use_public_ips,
        force_runner_mode="Dataflow" # do not change
    )

    # Import the model evaluations to the Vertex AI model
    ModelImportEvaluationOp(
        classification_metrics=eval_task.outputs["evaluation_metrics"],
        model=get_model_task.outputs["model"],
        dataset_type=batch_predict_instances_format,
    )

In [None]:
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=evaluation_custom_tabular_feature_attribution_pipeline,
    package_path=PIPELINE_PACKAGE_PATH,
)

In [None]:
parameters = {
    "project": PROJECT_ID,
    "location": REGION,
    "model_name": MODEL_ID_VERSION,
    "target_field_name": TARGET,
    "evaluation_class_names": CLASS_LABELS,
    "gcs_source_input_uris": [GCS_SOURCE_URI],
    "gcs_destination_output_uri_prefix": GCS_DESTINATION_OUTPUT_URI,
    "dataflow_service_account": DATAFLOW_SERVICE_ACCOUNT,
    "dataflow_subnetwork": DATAFLOW_SUBNETWORK,
    "dataflow_use_public_ips": DATAFLOW_USE_PUBLIC_IP,
    "batch_predict_instances_format": "csv",
    "batch_predict_predictions_format": "jsonl",
    #"batch_predict_data_sample_size": 3000,
    "enable_caching": False
}

In [None]:
# Run the pipeline

from google.cloud import aiplatform

aiplatform.init(staging_bucket=PIPELINE_ROOT)

job = aiplatform.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    parameter_values=parameters,
    enable_caching=False,
    pipeline_root=PIPELINE_ROOT
)

# Run the pipeline job
job.run(service_account=SERVICE_ACCOUNT)