In [8]:
# ! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform


KFP SDK version: 2.14.3
google-cloud-aiplatform==1.113.0


In [3]:
import sys, importlib

# uninstall + install with the SAME python the notebook uses
!{sys.executable} -m pip uninstall -y kfp kfp-pipeline-spec kfp-server-api
!{sys.executable} -m pip install -q "kfp==2.7.0"

# verify the version and where it loads from
import kfp
print("KFP:", kfp.__version__, "Path:", kfp.__file__)


Found existing installation: kfp 2.7.0
Uninstalling kfp-2.7.0:
  Successfully uninstalled kfp-2.7.0
Found existing installation: kfp-pipeline-spec 0.3.0
Uninstalling kfp-pipeline-spec-0.3.0:
  Successfully uninstalled kfp-pipeline-spec-0.3.0
Found existing installation: kfp-server-api 2.0.5
Uninstalling kfp-server-api-2.0.5:
  Successfully uninstalled kfp-server-api-2.0.5
KFP: 2.7.0 Path: /opt/conda/lib/python3.10/site-packages/kfp/__init__.py


In [4]:
import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, ClassificationMetrics

from collections import namedtuple
from typing import NamedTuple


In [5]:
PROJECT_ID = 'magnetic-guild-473821-t7'   # replace with your own GCP project ID
REGION = 'us-central1'                 # region where Vertex AI runs
EXPERIMENT = 'vertex-pipelines'        # experiment name (for ML Metadata tracking)
SERIES = 'dev'                         # series label, can be used for grouping runs

# gcs bucket
GCS_BUCKET = PROJECT_ID
BUCKET_URI = f"gs://{PROJECT_ID}-bucket"   # GCS bucket path to store pipeline artifacts


In [6]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)


In [7]:
from google.cloud import bigquery


In [8]:
from kfp import dsl
from kfp.dsl import Output, Dataset


@dsl.component(
    base_image="python:3.8",
    packages_to_install=[
        "pandas==1.3.4",
        "scikit-learn==1.0.1",
        "google-cloud-bigquery==3.13.0",
        "db-dtypes==1.1.1",
    ],
)
def get_data(
    project_id: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
) -> None:
    """
    Loads data from BigQuery, splits it into training and test sets,
    and saves them as CSV files.

    Args:
        project_id: str, Google Cloud project ID
        dataset_train: Output[Dataset] for the training set
        dataset_test: Output[Dataset] for the test set
    """

    # Imports inside component so they're installed in the container
    from sklearn.model_selection import train_test_split
    import pandas as pd
    from google.cloud import bigquery

    # Construct a BigQuery client object
    client = bigquery.Client(project=project_id)
    job_config = bigquery.QueryJobConfig()

    # BigQuery SQL query
    query = """
    SELECT
      * EXCEPT(fullVisitorId)
    FROM
    (
      SELECT
        fullVisitorId,
        IFNULL(totals.bounces, 0) AS bounces,
        IFNULL(totals.timeOnSite, 0) AS time_on_site
      FROM `data-to-insights.ecommerce.web_analytics`
      WHERE totals.newVisits = 1
        AND date BETWEEN '20160801' AND '20170430'  # train on first 9 months
    )
    JOIN
    (
      SELECT
        fullVisitorId,
        IF(COUNTIF(totals.transactions > 0 AND totals.newVisits IS NULL) > 0, 1, 0) 
          AS will_buy_on_return_visit
      FROM `data-to-insights.ecommerce.web_analytics`
      GROUP BY fullVisitorId
    )
    USING (fullVisitorId)
    LIMIT 10000
    """

    # Run query and load results into DataFrame
    query_job = client.query(query, job_config=job_config)
    df = query_job.to_dataframe()

    # Split dataset into train/test sets
    train, test = train_test_split(df, test_size=0.3, random_state=42)

    # Save to pipeline outputs
    train.to_csv(dataset_train.path, index=False)
    test.to_csv(dataset_test.path, index=False)

    
    


In [9]:
!pip install xgboost==1.6.2




In [10]:
import os
import joblib
import pandas as pd
from xgboost import XGBClassifier
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model

@dsl.component(
    base_image="python:3.8",
    packages_to_install=[
        "xgboost==1.6.2",
        "pandas==1.3.5",
        "joblib==1.1.0",
        "scikit-learn==1.0.2",
    ],
)
def train_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model],
) -> None:
    """
    Trains an XGBoost classifier on a given dataset and saves the model artifact.

    Args:
        dataset: Input[Dataset]
            The training dataset as a Kubeflow component input.
        model_artifact: Output[Model]
            A Kubeflow component output for saving the trained model.

    Returns:
        None
        This function doesn't return a value; its primary purpose is to produce a model artifact.
    """
    import os
    import joblib
    import pandas as pd
    from xgboost import XGBClassifier

    # Load Training Data
    data = pd.read_csv(dataset.path)

    # Train XGBoost Model
    model = XGBClassifier(objective="binary:logistic")
    model.fit(
        data.drop(columns=["will_buy_on_return_visit"]),
        data.will_buy_on_return_visit,
    )

    # Evaluate and Log Metrics
    score = model.score(
        data.drop(columns=["will_buy_on_return_visit"]),
        data.will_buy_on_return_visit,
    )

    # Save the Model Artifact
    os.makedirs(model_artifact.path, exist_ok=True)
    joblib.dump(model, os.path.join(model_artifact.path, "model.joblib"))

    # Metadata for the Artifact
    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "XGBoost"


  import pkg_resources


In [17]:
from typing import NamedTuple

from kfp import dsl
from kfp.dsl import (
    Input,
    Output,
    Dataset,
    Model,
    Metrics,
    ClassificationMetrics,
)

@dsl.component(
    base_image="python:3.8",
    packages_to_install=[
        "xgboost==1.6.2",
        "pandas==1.3.5",
        "joblib==1.1.0",
        "scikit-learn==1.0.2",
        "google-cloud-storage==2.13.0",
    ],
)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics],
    bucket_name: str,
    score_threshold: float = 0.8,
) -> NamedTuple("Outputs", [("deploy", str)]):
    """
    Evaluates an XGBoost model on a test dataset, logs metrics, and decides whether to deploy.

    Args:
        test_set: Input[Dataset] - CSV with a target column 'will_buy_on_return_visit'.
        xgb_model: Input[Model] - Trained model artifact saved as 'model.joblib' in GCS.
        metrics: Output[ClassificationMetrics] - For ROC curve and confusion matrix.
        smetrics: Output[Metrics] - For scalar metrics like accuracy.
        bucket_name: str - GCS bucket name (no gs:// prefix).
        score_threshold: float - minimum accuracy required to deploy.

    Returns:
        NamedTuple("Outputs", [("deploy", str)]) - "true" or "false".
    """
    # --- Imports inside component image ---
    from google.cloud import storage
    import joblib
    import pandas as pd
    from sklearn.metrics import roc_curve, confusion_matrix

    # ---------- 1) Load test data ----------
    data = pd.read_csv(test_set.path)
    X = data.drop(columns=["will_buy_on_return_visit"])
    y = data["will_buy_on_return_visit"]

    # ---------- 2) Load trained model from GCS ----------
    client = storage.Client()
    bucket = client.bucket(bucket_name)

    # xgb_model.uri is like: gs://<bucket>/<path-to-artifact-dir>
    gs_uri = xgb_model.uri  # e.g., "gs://my-bucket/pipelines/.../model"
    prefix = f"gs://{bucket_name}/"
    # Path inside the bucket to the model directory
    blob_dir = gs_uri[len(prefix):] if gs_uri.startswith(prefix) else gs_uri
    smetrics.log_metric("blob_path", str(blob_dir))

    # Model file name created by training step
    model_blob = bucket.blob(f"{blob_dir}/model.joblib")
    with model_blob.open("rb") as f:
        model = joblib.load(f)

    # ---------- 3) Evaluate ----------
    # Probabilities for positive class
    y_scores = model.predict_proba(X)[:, 1]
    # Class predictions
    y_pred = model.predict(X)
    # Simple accuracy (demo)
    score = model.score(X, y)

    # ---------- 4) Log metrics to KFP UI ----------
    fpr, tpr, thresholds = roc_curve(y.to_numpy(), y_scores, pos_label=1)
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    # Confusion matrix: ensure label order matches categories
    labels = [0, 1]
    cm = confusion_matrix(y, y_pred, labels=labels)
    metrics.log_confusion_matrix(["False", "True"], cm.tolist())



    smetrics.log_metric("accuracy", float(score))

    # ---------- 5) Deployment decision ----------
    deploy = "true" if score >= score_threshold else "false"

    # ---------- 6) Write back artifact metadata ----------
    xgb_model.metadata["test_score"] = float(score)

    # ---------- 7) Return flag ----------
    Outputs = NamedTuple("Outputs", [("deploy", str)])
    return Outputs(deploy)


In [18]:
from kfp import dsl
from kfp.dsl import Input, Output, Model, Artifact

@dsl.component(
    base_image="python:3.8",
    packages_to_install=["google-cloud-aiplatform==1.25.0"],
)
def deploy_xgboost_model(
    model: Input[Model],
    project_id: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model],
) -> None:
    """
    Deploys an XGBoost model to Vertex AI Endpoint.

    Args:
        model: Input[Model]
            The trained model to deploy.
        project_id: str
            The Google Cloud project ID.
        vertex_endpoint: Output[Artifact]
            Represents the deployed Vertex AI Endpoint.
        vertex_model: Output[Model]
            Represents the deployed Vertex AI Model.

    Returns:
        None
    """
    from google.cloud import aiplatform

    # --- Initialize Vertex AI with the project ---
    aiplatform.init(project=project_id)

    # --- Upload the trained model to Vertex AI ---
    deployed_model = aiplatform.Model.upload(
        display_name="xgb-classification",
        artifact_uri=model.uri,
        serving_container_image_uri=(
            "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest"
        ),
    )

    # --- Deploy model to an endpoint (with machine type) ---
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # --- Save outputs so other pipeline steps can use them ---
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name


In [19]:
from kfp import dsl
BUCKET_NAME = "gs://" + PROJECT_ID + "-bucket"
PIPELINE_ROOT = BUCKET_NAME + "/pipeline_root/"

In [20]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="xgboost-pipeline-with-deployment-v2",
)
def pipeline():
    """
    Full XGBoost MLOps pipeline:
    1. Load dataset
    2. Train model
    3. Evaluate model
    4. Conditionally deploy model if score >= threshold
    """

    # ----- Step 1: Get Data -----
    dataset_op = get_data(project_id=PROJECT_ID)

    # ----- Step 2: Train Model -----
    training_op = train_model(
        dataset=dataset_op.outputs["dataset_train"]
    )

    # ----- Step 3: Evaluate Model -----
    eval_op = eval_model(
        test_set=dataset_op.outputs["dataset_test"],
        xgb_model=training_op.outputs["model_artifact"],
        bucket_name=PROJECT_ID + "-bucket",  # GCS bucket name
    )

    # ----- Step 4: Conditional Deployment -----
    with dsl.If(eval_op.outputs["deploy"] == "true", name="deploy"):
        deploy_op = deploy_xgboost_model(
            model=training_op.outputs["model_artifact"],
            project_id=PROJECT_ID,
        )


In [21]:
from kfp import compiler

# Compile the pipeline into a YAML definition file
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="pipeline.yaml",
)


In [22]:
from google.cloud import aiplatform

# Create a Vertex AI Pipeline Job
job = aiplatform.PipelineJob(
    display_name="demo-pipeline",      # name you’ll see in the Vertex AI UI
    template_path="pipeline.yaml",     # compiled pipeline YAML file
    pipeline_root=PIPELINE_ROOT,       # GCS path where pipeline stores artifacts
)

# Run the job
job.run()
