In [1]:
# Step 1â€” Verify environment (Workbench)

import sys, subprocess, os

def pip_install(pkgs):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", *pkgs])

# Install if needed (safe to run)
pip_install([
    "google-cloud-aiplatform",
    "kfp",
    "google-cloud-pipeline-components",
    "numpy<2",
])

# Basic imports check
import google.cloud.aiplatform as aiplatform
import kfp
from kfp import dsl
from kfp.dsl import component

print("Python:", sys.version.split()[0])
print("aiplatform:", aiplatform.__version__)
print("kfp:", kfp.__version__)





Python: 3.10.19
aiplatform: 1.138.0
kfp: 2.15.2


In [2]:
from kfp import dsl, compiler
from kfp.dsl import component, Input, Output, Model, Metrics
from typing import NamedTuple
import time

from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_from_component
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.types import artifact_types

# --- Config ---
PROJECT_ID = "vertex-ai-487907"
LOCATION = "us-central1"
STAGING_BUCKET = "gs://vertex-mlops-vinzur"
PIPELINE_ROOT = f"{STAGING_BUCKET}/phase4-ab"
SERVICE_ACCOUNT = "vertex-pipeline-sa@vertex-ai-487907.iam.gserviceaccount.com"

TRAIN_IMAGE_URI = "us-central1-docker.pkg.dev/vertex-ai-487907/vertex-mlops/train-sklearn:1"
SERVE_IMAGE_URI = "us-central1-docker.pkg.dev/vertex-ai-487907/vertex-mlops/serve-sklearn:1"

aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET)

In [3]:
# --- 1) Container-based training component ---
@dsl.container_component
def train_container(n_rows: int, model: Output[Model]):
    return dsl.ContainerSpec(
        image=TRAIN_IMAGE_URI,
        args=[
            "--n_rows", n_rows,
            "--model_dir", model.path,
        ],
    )

CustomTrainOp = create_custom_training_job_from_component(
    component_spec=train_container,
    display_name="phase4-train-custom-container",
    machine_type="n1-standard-4",
    replica_count=1,
    base_output_directory=PIPELINE_ROOT,
    service_account=SERVICE_ACCOUNT,
)

In [4]:
# --- 2) compute hash ---
@component(
    base_image="python:3.10-slim",
    packages_to_install=["google-cloud-storage"],
)
def compute_gcs_prefix_hash(gcs_prefix: str) -> NamedTuple("Outputs", [("data_hash", str)]):
    """
    Hashes object names + sizes under a gs://bucket/prefix.
    Use this as "dataset version".
    """
    import hashlib
    from google.cloud import storage

    if not gcs_prefix.startswith("gs://"):
        raise ValueError("gcs_prefix must start with gs://")

    path = gcs_prefix[5:]
    bucket_name, _, prefix = path.partition("/")
    prefix = prefix.rstrip("/")

    client = storage.Client()
    blobs = list(client.list_blobs(bucket_name, prefix=prefix))

    h = hashlib.sha256()
    for b in sorted(blobs, key=lambda x: x.name):
        h.update(b.name.encode("utf-8"))
        h.update(str(getattr(b, "size", 0)).encode("utf-8"))

    return (h.hexdigest(),)

In [5]:
# --- 3) Evaluate into Metrics artifact ---
@component(
    base_image="python:3.10-slim",
    packages_to_install=["numpy<2", "pandas", "scikit-learn", "joblib"],
)
def evaluate(
    model: Input[Model],
    metrics: Output[Metrics],
) -> NamedTuple("Outputs", [("accuracy", float)]):
    import os
    import joblib
    import numpy as np
    import pandas as pd
    from sklearn.metrics import accuracy_score, confusion_matrix

    clf = joblib.load(f"{model.path}/model.joblib")

    rng = np.random.default_rng(123)
    n = 300
    x1 = rng.normal(size=n)
    x2 = rng.normal(size=n)
    y = (x1 + 0.5 * x2 + rng.normal(scale=0.3, size=n) > 0).astype(int)

    X = pd.DataFrame({"x1": x1, "x2": x2})[["x1", "x2"]]
    y_true = y

    preds = clf.predict(X)
    acc = float(accuracy_score(y_true, preds))

    cm = confusion_matrix(y_true, preds).tolist()

    metrics.log_metric("accuracy", acc)
    # Store confusion matrix as structured metadata so Vertex can show it in UI notebooks/visualizations
    metrics.metadata["confusion_matrix"] = {
        "rows": cm,
        "labels": ["0", "1"],
    }

    return (acc,)

In [6]:
# --- 4) Export model artifact dir to a deterministic GCS location ---
@component(
    base_image="python:3.10-slim",
    packages_to_install=["google-cloud-storage"],
)
def export_model_to_gcs(
    model: Input[Model],
    export_gcs_dir: str,
) -> NamedTuple("Outputs", [("exported_uri", str)]):
    import os
    from google.cloud import storage

    if not export_gcs_dir.startswith("gs://"):
        raise ValueError(f"export_gcs_dir must be gs://..., got: {export_gcs_dir}")

    path = export_gcs_dir[5:]
    bucket_name, _, prefix = path.partition("/")
    prefix = prefix.rstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)

    local_dir = model.path
    files = []
    for root, _, fs in os.walk(local_dir):
        for f in fs:
            files.append(os.path.join(root, f))

    if not files:
        raise RuntimeError(f"No files found in model.path: {local_dir}")

    for local_path in files:
        rel = os.path.relpath(local_path, local_dir).replace("\\", "/")
        blob_path = f"{prefix}/{rel}" if prefix else rel
        bucket.blob(blob_path).upload_from_filename(local_path)

    # sanity check
    check_blob = bucket.blob(f"{prefix}/model.joblib")
    if not check_blob.exists(client):
        raise RuntimeError(f"Upload done but model.joblib not found at {export_gcs_dir}/model.joblib")

    return (export_gcs_dir,)

In [7]:
# --- 5) Deploy + A/B traffic split ---
from kfp.dsl import Input
from google_cloud_pipeline_components.types import artifact_types

@component(
    base_image="python:3.10-slim",
    packages_to_install=["google-cloud-aiplatform"],
)
def deploy_ab_and_set_traffic(
    model: Input[artifact_types.VertexModel],
    endpoint_resource_name: str,
    endpoint_display_name: str,
    candidate_traffic_percent: int,
    machine_type: str,
    min_replica_count: int,
    max_replica_count: int,
    project: str,
    location: str,
) -> NamedTuple("Outputs", [("final_endpoint_resource_name", str)]):
    import time
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=location)

    # Read resource name at runtime
    model_resource_name = model.metadata["resourceName"]

    if endpoint_resource_name.strip():
        endpoint = aiplatform.Endpoint(endpoint_resource_name)
    else:
        endpoint = aiplatform.Endpoint.create(display_name=endpoint_display_name)

    before_ids = [m.id for m in endpoint.list_models()]

    aiplatform.Model(model_resource_name).deploy(
        endpoint=endpoint,
        deployed_model_display_name=f"candidate-{int(time.time())}",
        machine_type=machine_type,
        min_replica_count=min_replica_count,
        max_replica_count=max_replica_count,
        traffic_percentage=0,
        sync=True,
    )

    after_ids = [m.id for m in endpoint.list_models()]
    new_ids = [x for x in after_ids if x not in before_ids]
    if not new_ids:
        raise RuntimeError("Could not detect newly deployed model id.")
    candidate_id = new_ids[0]

    if not before_ids:
        endpoint.update(traffic_split={candidate_id: 100})
        return (endpoint.resource_name,)

    baseline_id = before_ids[0]
    cand = int(candidate_traffic_percent)
    if cand < 0 or cand > 100:
        raise ValueError("candidate_traffic_percent must be 0..100")

    endpoint.update(traffic_split={baseline_id: 100 - cand, candidate_id: cand})
    return (endpoint.resource_name,)

In [8]:
# --- Pipeline ---
@dsl.pipeline(name="phase4-custom-container-ab-experiments")
def phase4_pipeline(
    n_rows: int = 500,
    min_accuracy: float = 0.80,
    # Hygiene params:
    git_sha: str = "unknown",
    data_source_gcs_prefix: str = "gs://vertex-mlops-vinzur/datasets/demo",  # replace
    # Endpoint control:
    endpoint_resource_name: str = "",  # leave blank on first run to auto-create
    endpoint_display_name: str = "phase4-fixed-endpoint",
    candidate_traffic_percent: int = 10,
):
    # dataset fingerprint
    data_hash_task = compute_gcs_prefix_hash(gcs_prefix=data_source_gcs_prefix)

    # Train + eval
    train_task = CustomTrainOp(n_rows=n_rows)
    eval_task = evaluate(model=train_task.outputs["model"])

    # Gate deploy
    with dsl.If(eval_task.outputs["accuracy"] >= min_accuracy, name="deploy_if_good"):
        # Deterministic export path
        export_gcs_dir = f"{PIPELINE_ROOT}/exported-models/{dsl.PIPELINE_JOB_ID_PLACEHOLDER}"
        export_task = export_model_to_gcs(
            model=train_task.outputs["model"],
            export_gcs_dir=export_gcs_dir,
        )

        # Create UnmanagedContainerModel (artifacts + serving container)
        unmanaged = dsl.importer(
            artifact_uri=export_task.outputs["exported_uri"],
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {"imageUri": SERVE_IMAGE_URI},
            },
        )

        uploaded = ModelUploadOp(
            project=PROJECT_ID,
            location=LOCATION,
            display_name="phase4-sklearn-model",
            unmanaged_container_model=unmanaged.outputs["artifact"],
        )

        # Deploy + traffic split
        deploy_ab_and_set_traffic(
            model=uploaded.outputs["model"],
            endpoint_resource_name=endpoint_resource_name,
            endpoint_display_name=endpoint_display_name,
            candidate_traffic_percent=candidate_traffic_percent,
            machine_type="n1-standard-2",
            min_replica_count=1,
            max_replica_count=1,
            project=PROJECT_ID,
            location=LOCATION,
        )

In [9]:
# --- Compile + Submit (with Experiment association) ---
PIPELINE_YAML = "phase4_ab_experiments.yaml"
compiler.Compiler().compile(phase4_pipeline, PIPELINE_YAML)

EXPERIMENT_NAME = "phase4-ab-tests"

pipeline_job = aiplatform.PipelineJob(
    display_name=f"phase4-ab-{int(time.time())}",
    template_path=PIPELINE_YAML,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "n_rows": 500,
        "min_accuracy": 0.80,
        "git_sha": "9ae8962f349a2be6e87cd796b770ef819b338212",
        "data_source_gcs_prefix": "gs://vertex-mlops-vinzur/datasets/demo",
        "endpoint_resource_name": "",  # blank on first run
        "endpoint_display_name": "phase4-fixed-endpoint",
        "candidate_traffic_percent": 10,
    },
    enable_caching=False,
)

# This is the key Phase 4 difference:
# associate the PipelineJob with an Experiment for comparison in UI. :contentReference[oaicite:2]{index=2}
pipeline_job.submit(
    service_account=SERVICE_ACCOUNT,
    experiment=EXPERIMENT_NAME,
)

print("Submitted:", pipeline_job.resource_name)

Creating PipelineJob
PipelineJob created. Resource name: projects/208722280565/locations/us-central1/pipelineJobs/phase4-custom-container-ab-experiments-20260223172649
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/208722280565/locations/us-central1/pipelineJobs/phase4-custom-container-ab-experiments-20260223172649')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/phase4-custom-container-ab-experiments-20260223172649?project=208722280565
Associating projects/208722280565/locations/us-central1/pipelineJobs/phase4-custom-container-ab-experiments-20260223172649 to Experiment: phase4-ab-tests
Submitted: projects/208722280565/locations/us-central1/pipelineJobs/phase4-custom-container-ab-experiments-20260223172649
