In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Lightweight Python function-based components, and component I/O

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/lightweight_functions_component_io_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Fpipelines%2Flightweight_functions_component_io_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/lightweight_functions_component_io_kfp.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
  <td style="text-align: center">
<a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/pipelines/lightweight_functions_component_io_kfp.ipynb" target='_blank'>
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
     </a>
   </td>
</table>
<br/><br/><br/>

## Overview

This notebooks shows how to use [the Kubeflow Pipelines (KFP) SDK](https://www.kubeflow.org/docs/components/pipelines/) to build [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) that use lightweight Python function based components, as well as supporting component I/O using the KFP SDK.

Learn more about [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

### Objective

In this tutorial, you learn to use the KFP SDK to build lightweight Python function-based components, and then you learn to use Vertex AI Pipelines to execute the pipeline.

This tutorial uses the following Google Cloud ML services:

- Vertex AI Pipelines

The steps performed include:

- Build Python function-based KFP components.
- Construct a KFP pipeline.
- Pass *Artifacts* and *parameters* between components, both by path reference and by value.
- Use the kfp.dsl.importer method.
- Compile the KFP pipeline.
- Execute the KFP pipeline using Vertex AI Pipelines

### KFP Python function-based components

A Kubeflow pipeline component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:

* The component code, which implements the logic need to perform a step in your ML workflow.
* A component specification, which defines the following:
  * The component’s metadata, its name and description.
  * The component’s interface, the component’s inputs and outputs.
* The component’s implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component’s outputs.

Lightweight Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you. This notebook shows how to create Python function-based components for use in [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines).

Python function-based components use the Kubeflow Pipelines SDK to handle the complexity of passing inputs into your component and passing your function’s outputs back to your pipeline.

There are two categories of inputs/outputs supported in Python function-based components: *artifacts* and *parameters*.

* Parameters are passed to your component by value and typically contain `int`, `float`, `bool`, or small `string` values.
* Artifacts are passed to your component as a *reference* to a path, to which you can write a file or a subdirectory structure. In addition to the artifact’s data, you can also read and write the artifact’s metadata. This lets you record arbitrary key-value pairs for an artifact such as the accuracy of a trained model, and use metadata in downstream components – for example, you could use metadata to decide if a model is accurate enough to deploy for predictions.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing) and
[Cloud Storage pricing](https://cloud.google.com/storage/pricing) and use the
[Pricing Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Get Started

Install Vertex AI SDK for Python and other required packages

In [None]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 "numpy<2" \
                                 google-cloud-pipeline-components

### Restart runtime (Colab only)
Authenticate your environment on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Authenticate your notebook environment (Colab only)
Authenticate your environment on Google Colab.

In [None]:
# import sys

# if "google.colab" in sys.modules:

#     from google.colab import auth

#     auth.authenticate_user()

### Set Google Cloud project information
Learn more about setting up a project and a development environment.

In [2]:
PROJECT_ID = "involuted-tuner-441406-a9"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"} # @param {type:"string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

- *{Note to notebook author: For any user-provided strings that need to be unique (like bucket names or model ID's), append "-unique" to the end so proper testing can occur}*

In [3]:
BUCKET_URI = f"gs://mlops-01-pipeline/pipeline_root_1"  # @param {type:"string"}

**If your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [3]:
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

CommandException: "mb" command does not support "file://" URLs. Did you mean to use a gs:// URL?


#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [None]:
# SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
# # import sys

# IS_COLAB = "google.colab" in sys.modules
# if (
#     SERVICE_ACCOUNT == ""
#     or SERVICE_ACCOUNT is None
#     or SERVICE_ACCOUNT == "[your-service-account]"
# ):
#     # Get your service account from gcloud
#     if not IS_COLAB:
#         shell_output = !gcloud auth list 2>/dev/null
#         SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

#     if IS_COLAB:
#         shell_output = ! gcloud projects describe  $PROJECT_ID
#         project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
#         SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

#     print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [1]:
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [4]:
from typing import NamedTuple
import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                     OutputPath, component)

#### Vertex AI Pipelines constants

Set up up the following constants for Vertex AI Pipelines:

In [5]:
PIPELINE_ROOT = "{}/pipeline_root/shakespeare".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [6]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

### Define Python function-based pipeline components

In this tutorial, you define function-based components that consume parameters and produce (typed) Artifacts and parameters. Functions can produce Artifacts in three ways:

* Accept an output local path using `OutputPath`
* Accept an `OutputArtifact` which gives the function a handle to the output artifact's metadata
* Return an `Artifact` (or `Dataset`, `Model`, `Metrics`, etc) in a `NamedTuple`

These options for producing Artifacts are demonstrated.

#### Define preprocess component

The first component definition, `preprocess`, shows a component that outputs two `Dataset` Artifacts, as well as an output parameter.  (For this example, the datasets don't reflect real data).

For the parameter output, you would typically use the approach shown here, using the `OutputPath` type, for "larger" data.
For "small data", like a short string, it might be more convenient to use the `NamedTuple` function output as shown in the second component instead.

In [8]:
# @component(base_image="python:3.9")
# def preprocess(
#     # An input parameter of type string.
#     message: str,
#     # Use Output to get a metadata-rich handle to the output artifact
#     # of type `Dataset`.
#     output_dataset_one: Output[Dataset],
#     # A locally accessible filepath for another output artifact of type
#     # `Dataset`.
#     output_dataset_two_path: OutputPath("Dataset"),
#     # A locally accessible filepath for an output parameter of type string.
#     output_parameter_path: OutputPath(str),
# ):
#     """'Mock' preprocessing step.
#     Writes out the passed in message to the output "Dataset"s and the output message.
#     """
#     output_dataset_one.metadata["hello"] = "there"
#     # Use OutputArtifact.path to access a local file path for writing.
#     # One can also use OutputArtifact.uri to access the actual URI file path.
#     with open(output_dataset_one.path, "w") as f:
#         f.write(message)

#     # OutputPath is used to just pass the local file path of the output artifact
#     # to the function.
#     with open(output_dataset_two_path, "w") as f:
#         f.write(message)

#     with open(output_parameter_path, "w") as f:
#         f.write(message)

#### Define train component

The second component definition, `train`, defines as input both an `InputPath` of type `Dataset`, and an `InputArtifact` of type `Dataset` (as well as other parameter inputs). It uses the `NamedTuple` format for function output.  As shown, these outputs can be Artifacts as well as parameters.

Additionally, this component writes some metrics metadata to the `model` output Artifact.  This information is displayed in the Cloud Console user interface when the pipeline runs.

In [46]:
# @component(
#     base_image="python:3.9",
#     packages_to_install=[
#         "scikit-learn",
#         "pandas",
#         "numpy"
#     ]
# )
# def train(
#     imported_dataset: Input[Dataset],
# ) -> NamedTuple(
#     "Outputs",
#     [
#         ("output_message", str),
#         ("model_accuracy", float),
#     ],
# ):
#     """Training step using scikit-learn logistic regression.
#     Loads data from the imported dataset, trains a logistic regression model,
#     and returns the accuracy metrics.
    
#     Args:
#         imported_dataset: Input dataset for training
        
#     Returns:
#         NamedTuple containing output message and model accuracy
#     """
#     # Add error handling and logging
#     import logging
#     logging.basicConfig(level=logging.INFO)
#     logger = logging.getLogger(__name__)
    
#     try:
#         import pandas as pd
#         from sklearn.linear_model import LogisticRegression
#         from sklearn.model_selection import train_test_split
#         from sklearn.metrics import accuracy_score
        
#         logger.info("Successfully imported all required packages")
        
#         # Load dataset
#         logger.info(f"Loading dataset from: {imported_dataset.path}")
#         df = pd.read_csv(imported_dataset.path)
#         logger.info(f"Dataset shape: {df.shape}")
        
#         # Split features and target
#         X = df.iloc[:, :-1]
#         y = df.iloc[:, -1]
#         logger.info(f"Features shape: {X.shape}, Target shape: {y.shape}")
        
#         # Split the data
#         X_train, X_test, y_train, y_test = train_test_split(
#             X, y, test_size=0.2, random_state=42
#         )
#         logger.info(f"Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}")
        
#         # Train logistic regression model
#         lr_model = LogisticRegression(max_iter=1000)
#         logger.info("Training logistic regression model...")
#         lr_model.fit(X_train, y_train)
        
#         # Make predictions and calculate accuracy
#         y_pred = lr_model.predict(X_test)
#         accuracy = accuracy_score(y_test, y_pred)
#         logger.info(f"Model training completed. Accuracy: {accuracy:.4f}")
        
#         # Create output message
#         output_message = f"Model trained successfully with accuracy: {accuracy:.4f}"
        
#         return (output_message, float(accuracy))
        
#     except Exception as e:
#         logger.error(f"An error occurred: {str(e)}")
#         raise RuntimeError(f"Training failed: {str(e)}")

In [14]:
# @component(
#     base_image="python:3.9",
#     packages_to_install=[
#         "scikit-learn",
#         "pandas",
#         "numpy",
#         "google-cloud-storage"
#     ]
# )
# def train(
#     imported_dataset: Input[Dataset],
# ) -> NamedTuple(
#     "Outputs",
#     [
#         ("output_message", str),
#         ("model_accuracy", float),
#     ],
# ):
#     """Training step using scikit-learn logistic regression.
#     Loads data from the imported dataset, trains a logistic regression model,
#     saves it to GCS, and returns the accuracy metrics.
#     """
#     import logging
#     import os
#     from google.cloud import storage
#     import tempfile
    
#     logging.basicConfig(level=logging.INFO)
#     logger = logging.getLogger(__name__)
    
#     try:
#         import pandas as pd
#         import joblib
#         from sklearn.linear_model import LogisticRegression
#         from sklearn.model_selection import train_test_split
#         from sklearn.metrics import accuracy_score
        
#         logger.info("Successfully imported all required packages")
        
#         # Load dataset
#         logger.info(f"Loading dataset from: {imported_dataset.path}")
#         df = pd.read_csv(imported_dataset.path)
#         logger.info(f"Dataset shape: {df.shape}")
        
#         # Split features and target
#         X = df.iloc[:, :-1]
#         y = df.iloc[:, -1]
#         logger.info(f"Features shape: {X.shape}, Target shape: {y.shape}")
        
#         # Split the data
#         X_train, X_test, y_train, y_test = train_test_split(
#             X, y, test_size=0.2, random_state=42
#         )
#         logger.info(f"Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}")
        
#         # Train logistic regression model
#         lr_model = LogisticRegression(max_iter=1000)
#         logger.info("Training logistic regression model...")
#         lr_model.fit(X_train, y_train)
        
#         # Make predictions and calculate accuracy
#         y_pred = lr_model.predict(X_test)
#         accuracy = accuracy_score(y_test, y_pred)
#         logger.info(f"Model training completed. Accuracy: {accuracy:.4f}")
        
#         # Save model to GCS
#         bucket_name = "mlops-01-pipeline"  # Replace with your bucket name
#         model_filename = "model.joblib"
        
#         # Create a temporary directory
#         with tempfile.TemporaryDirectory() as temp_dir:
#             temp_model_path = os.path.join(temp_dir, model_filename)
            
#             # Save model to temporary file
#             logger.info(f"Saving model to temporary file: {temp_model_path}")
#             joblib.dump(lr_model, temp_model_path)
            
#             # Initialize GCS client
#             storage_client = storage.Client()
#             bucket = storage_client.bucket(bucket_name)
            
#             # Upload to GCS
#             blob = bucket.blob(model_filename)
#             logger.info(f"Uploading model to GCS: gs://{bucket_name}/{model_filename}")
#             blob.upload_from_filename(temp_model_path)
#             logger.info("Model successfully uploaded to GCS")
        
#         # Create output message
#         output_message = (
#             f"Model trained successfully with accuracy: {accuracy:.4f}. "
#             f"Model saved to: gs://{bucket_name}/{model_filename}"
#         )
        
#         return (output_message, float(accuracy))
        
#     except Exception as e:
#         logger.error(f"An error occurred: {str(e)}")
#         raise RuntimeError(f"Training failed: {str(e)}")



In [49]:
# @dsl.component(
#     base_image="python:3.9",
#     packages_to_install=[
#         "scikit-learn",
#         "pandas",
#         "numpy",
#         "google-cloud-storage"
#     ]
# )
# def feature_engineering(imported_dataset: Dataset) -> Dataset:
#     """
#     Perform feature engineering on the imported dataset.
#     Args:
#         imported_dataset: The dataset to perform feature engineering on.

#     Returns:
#         A Dataset object with engineered features.
#     """
#     # Simulate feature engineering logic here (e.g., normalization, transformation, etc.)
#     # This is a placeholder for actual feature engineering code.
#     pass

In [7]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "scikit-learn",
        "pandas",
        "numpy",
        "google-cloud-storage"
    ]
)
def train(
    imported_dataset: Input[Dataset],
    # df: pandas.DataFrame
) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),
        ("model_accuracy", float),
    ],
):
    """Training step using scikit-learn logistic regression.
    Loads data from the imported dataset, trains a logistic regression model,
    saves it to GCS in the models folder, and returns the accuracy metrics.
    """
    import logging
    import os
    from google.cloud import storage
    import tempfile
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    try:
        import pandas as pd
        import joblib
        from sklearn.linear_model import LogisticRegression
        from sklearn.model_selection import train_test_split
        from sklearn.metrics import accuracy_score
        
        logger.info("Successfully imported all required packages")
        
        # Load dataset
        logger.info(f"Loading dataset from: {imported_dataset.path}")
        df = pd.read_csv(imported_dataset.path)
        logger.info(f"Dataset shape: {df.shape}")
        
        # Split features and target
        X = df.drop(columns=["Diabetic", "PatientID"])
        y = df["Diabetic"]
        logger.info(f"Features shape: {X.shape}, Target shape: {y.shape}")
        
        # Split the data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        logger.info(f"Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}")
        
        # Train logistic regression model
        lr_model = LogisticRegression(max_iter=1000)
        logger.info("Training logistic regression model...")
        lr_model.fit(X_train, y_train)
        
        # Make predictions and calculate accuracy
        y_pred = lr_model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        logger.info(f"Model training completed. Accuracy: {accuracy:.4f}")
        
        # Save model to GCS
        bucket_name = "mlops-01-pipeline"
        model_folder = "models"  # Folder in the bucket
        model_filename = "model.joblib"
        gcs_model_path = f"{model_folder}/{model_filename}"  # Path within bucket
        
        # Create a temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_model_path = os.path.join(temp_dir, model_filename)
            
            # Save model to temporary file
            logger.info(f"Saving model to temporary file: {temp_model_path}")
            joblib.dump(lr_model, temp_model_path)
            
            # Initialize GCS client
            storage_client = storage.Client()
            bucket = storage_client.bucket(bucket_name)
            
            # Upload to GCS in models folder
            blob = bucket.blob(gcs_model_path)
            logger.info(f"Uploading model to GCS: gs://{bucket_name}/{gcs_model_path}")
            blob.upload_from_filename(temp_model_path)
            logger.info("Model successfully uploaded to GCS")
        
        # Create output message
        output_message = (
            f"Model trained successfully with accuracy: {accuracy:.4f}. "
            f"Model saved to: gs://{bucket_name}/{gcs_model_path}"
        )
        
        return (output_message, float(accuracy))
        
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise RuntimeError(f"Training failed: {str(e)}")

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "scikit-learn",
        "pandas",
        "numpy",
        "google-cloud-storage"
    ]
)
def train(
    imported_dataset: Input[Dataset],
) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),
        ("model_accuracy", float),
    ],
):
    """Training step using scikit-learn logistic regression.
    Splits data into train/test sets, trains on training data,
    saves model and test indices to GCS.
    """
    import logging
    import os
    from google.cloud import storage
    import tempfile
    import numpy as np
    import json
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    try:
        import pandas as pd
        import joblib
        from sklearn.linear_model import LogisticRegression
        from sklearn.model_selection import train_test_split
        from sklearn.metrics import accuracy_score
        
        logger.info("Successfully imported all required packages")
        
        # Load dataset
        logger.info(f"Loading dataset from: {imported_dataset.path}")
        df = pd.read_csv(imported_dataset.path)
        logger.info(f"Dataset shape: {df.shape}")
        
        # Split features and target
        X = df.drop(columns=["Diabetic", "PatientID"])
        y = df["Diabetic"]
        
        # Split the data and save test indices
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        # Get test indices for later use
        test_indices = X_test.index.tolist()
        
        logger.info(f"Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}")
        
        # Train logistic regression model
        lr_model = LogisticRegression(max_iter=1000)
        logger.info("Training logistic regression model...")
        lr_model.fit(X_train, y_train)
        
        # Make predictions and calculate accuracy on training data
        y_pred = lr_model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        logger.info(f"Model training completed. Validation accuracy: {accuracy:.4f}")
        
        # Save model and test indices to GCS
        bucket_name = "mlops-01-pipeline"
        model_folder = "models"
        model_filename = "model.joblib"
        indices_filename = "test_indices.json"
        
        gcs_model_path = f"{model_folder}/{model_filename}"
        gcs_indices_path = f"{model_folder}/{indices_filename}"
        
        # Create a temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            # Save model
            temp_model_path = os.path.join(temp_dir, model_filename)
            logger.info(f"Saving model to temporary file: {temp_model_path}")
            joblib.dump(lr_model, temp_model_path)
            
            # Save test indices
            temp_indices_path = os.path.join(temp_dir, indices_filename)
            with open(temp_indices_path, 'w') as f:
                json.dump({'test_indices': test_indices}, f)
            
            # Initialize GCS client
            storage_client = storage.Client()
            bucket = storage_client.bucket(bucket_name)
            
            # Upload model to GCS
            model_blob = bucket.blob(gcs_model_path)
            logger.info(f"Uploading model to GCS: gs://{bucket_name}/{gcs_model_path}")
            model_blob.upload_from_filename(temp_model_path)
            
            # Upload test indices to GCS
            indices_blob = bucket.blob(gcs_indices_path)
            logger.info(f"Uploading test indices to GCS: gs://{bucket_name}/{gcs_indices_path}")
            indices_blob.upload_from_filename(temp_indices_path)
            
            logger.info("Model and test indices successfully uploaded to GCS")
        
        output_message = (
            f"Model trained successfully with validation accuracy: {accuracy:.4f}. "
            f"Model saved to: gs://{bucket_name}/{gcs_model_path}"
        )
        
        return (output_message, float(accuracy))
        
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise RuntimeError(f"Training failed: {str(e)}")

In [8]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "scikit-learn",
        "pandas", 
        "numpy",
        "google-cloud-storage"
    ]
)
def test(
    imported_dataset: Input[Dataset],
) -> NamedTuple(
    "Outputs",
    [
        ("metrics_gcs_path", str),
    ],
):
    """Test step for evaluating the trained model.
    Uses the same test split as training by loading saved test indices.
    """
    import logging
    import os
    import json
    from google.cloud import storage
    import tempfile
    from datetime import datetime
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    try:
        import pandas as pd
        import joblib
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
        
        logger.info("Successfully imported all required packages")
        
        # Constants
        bucket_name = "mlops-01-pipeline"
        model_folder = "models"
        model_filename = "model.joblib"
        metrics_folder="metrics"
        indices_filename = "test_indices.json"
        
        gcs_model_path = f"{model_folder}/{model_filename}"
        gcs_indices_path = f"{model_folder}/{indices_filename}"
        
        # Load full dataset
        logger.info(f"Loading dataset from: {imported_dataset.path}")
        df = pd.read_csv(imported_dataset.path)
        
        # Initialize GCS client
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        
        # Create a temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            # Download model and test indices from GCS
            temp_model_path = os.path.join(temp_dir, model_filename)
            temp_indices_path = os.path.join(temp_dir, indices_filename)
            
            # Download model
            model_blob = bucket.blob(gcs_model_path)
            logger.info(f"Downloading model from GCS: gs://{bucket_name}/{gcs_model_path}")
            model_blob.download_to_filename(temp_model_path)
            
            # Download test indices
            indices_blob = bucket.blob(gcs_indices_path)
            logger.info(f"Downloading test indices from GCS: gs://{bucket_name}/{gcs_indices_path}")
            indices_blob.download_to_filename(temp_indices_path)
            
            # Load model and test indices
            model = joblib.load(temp_model_path)
            with open(temp_indices_path, 'r') as f:
                test_indices = json.load(f)['test_indices']
            
            # Prepare test features and target using saved indices
            test_df = df.iloc[test_indices]
            X_test = test_df.drop(columns=["Diabetic", "PatientID"])
            y_test = test_df["Diabetic"]
            
            logger.info(f"Test set size: {len(test_indices)}")
            
            # Make predictions
            logger.info("Making predictions on test data")
            y_pred = model.predict(X_test)
            
            # Calculate metrics
            metrics = {
                "accuracy": str(accuracy_score(y_test, y_pred)),
                "precision": str(precision_score(y_test, y_pred)),
                "recall": str(recall_score(y_test, y_pred)),
                "f1_score": str(f1_score(y_test, y_pred)),
                "timestamp": str(datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
            }
            
            logger.info(f"Evaluation metrics: {metrics}")
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            
            # Create metrics filename using same base name as model
            # metrics_filename = model_filename.replace('.joblib', f'_metrics_{timestamp}.json')

            # Define the path where you want to store the metrics (e.g., in the 'metrics' folder)
            # metrics_folder = "metrics"
            # os.makedirs(metrics_folder, exist_ok=True)  # Create the 'metrics' folder if it doesn't exist

            # GCS Path where you will store the metrics
            # metrics_gcs_path = f"{metrics_folder}/{metrics_filename}"
            
            metrics_filename = model_filename.replace('.joblib', f'{timestamp}_metrics.json')
            metrics_gcs_path = f"{metrics_folder}/{metrics_filename}"
            
            # Save metrics to temporary JSON file
            temp_metrics_path = os.path.join(temp_dir, metrics_filename)
            with open(temp_metrics_path, 'w') as f:
                json.dump(metrics, f, indent=4)
            
            # Upload metrics to GCS
            metrics_blob = bucket.blob(metrics_gcs_path)
            logger.info(f"Uploading metrics to GCS: gs://{bucket_name}/{metrics_gcs_path}")
            metrics_blob.upload_from_filename(temp_metrics_path)
            logger.info("Metrics successfully uploaded to GCS")
        
        return (f"gs://{bucket_name}/{metrics_gcs_path}",)
        
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise RuntimeError(f"Testing failed: {str(e)}")

In [9]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-aiplatform",
    ]
)
def deploy_model(
    project: str = "your-project-id",  # Replace with your project ID
    location: str = "us-central1",     # Replace with your desired location
    description: str= "metrics path",
    bucket_name: str = "mlops-01-pipeline",
    model_filename: str = "model.joblib",
) -> NamedTuple(
    "Outputs",
    [
        ("endpoint_name", str),
        ("model_name", str),
    ],
):
    """
    Uploads a trained model to Vertex AI and deploys it to an endpoint.
    
    Args:
        project: GCP project ID
        location: GCP region
        bucket_name: GCS bucket name where model is stored
        model_filename: Name of the model file in GCS
    
    Returns:
        NamedTuple containing endpoint and model names
    """
    import logging
    from google.cloud import aiplatform
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    try:
        # Initialize Vertex AI
        aiplatform.init(
            project=project,
            location=location,
        )
        
        # Construct GCS URI for the model
        gcs_model_uri = f"gs://{bucket_name}/models"
        logger.info(f"Model URI: {gcs_model_uri}")
        
        # Generate unique names for model and endpoint
        import datetime
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        model_display_name = f"diabetes_prediction_{timestamp}"
        endpoint_display_name = f"diabetes_endpoint_{timestamp}"
        
        logger.info(f"Uploading model: {model_display_name}")
        # Upload model to Vertex AI
        model = aiplatform.Model.upload(
            display_name=model_display_name,
            artifact_uri=gcs_model_uri,
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest",
            description=description
        )
        logger.info(f"Model uploaded successfully: {model.resource_name}")
        
        logger.info(f"Deploying model to endpoint: {endpoint_display_name}")
        # Deploy model to endpoint
        endpoint = model.deploy(
            deployed_model_display_name=endpoint_display_name,
            machine_type="n1-standard-2",
        )
        logger.info(f"Model deployed successfully to endpoint: {endpoint.resource_name}")
        
        return (endpoint.resource_name, model.resource_name)
        
    except Exception as e:
        logger.error(f"An error occurred during model deployment: {str(e)}")
        raise RuntimeError(f"Model deployment failed: {str(e)}")

### Define a pipeline that uses your components and the Importer

Next, define a pipeline that uses the components that were built in the previous sections, and also shows the use of the `kfp.dsl.importer`.

This example uses the `importer` to create, in this case, a `Dataset` artifact from an existing URI.

Note that the `train_task` step takes as inputs three of the outputs of the `preprocess_task` step, as well as the output of the `importer` step.
In the "train" inputs we refer to the `preprocess`  `output_parameter`, which gives us the output string directly.

The `read_task` step takes as input the `train_task` `generic_artifact` output.

In [11]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="metadata-pipeline-v2",
)
def pipeline(message: str):
    importer = kfp.dsl.importer(
        artifact_uri="gs://mlops-01-pipeline/Dataset_diabetes-dev.csv",
        artifact_class=Dataset,
        reimport=False,
    )
    # preprocess_task = preprocess(message=message)
    train_task = train(
        imported_dataset=importer.output)
    
    test_task=test(imported_dataset=importer.output).after(train_task)
    # Deploy model
    deploy_task = deploy_model(
        project="involuted-tuner-441406-a9",  # Replace with your project ID
        location="us-central1",# Replace with your desired location
        description=test_task.outputs["metrics_gcs_path"]
    ).after(test_task)  # Ensure deployment happens after training

    

In [None]:
# @dsl.pipeline(
#     # Default pipeline root. You can override it when submitting the pipeline.
#     pipeline_root=PIPELINE_ROOT,
#     # A name for the pipeline. Use to determine the pipeline Context.
#     name="metadata-pipeline-v2",
# )
# def pipeline(message: str):
#     importer = dsl.importer(
#         artifact_uri="gs://mlops-01-pipeline/Dataset_diabetes-dev.csv",
#         artifact_class=Dataset,
#         reimport=False,
#     )
    
#     # Feature engineering task
#     feature_engineering_task = feature_engineering(imported_dataset=importer.output)
    
#     # Training task
#     train_task = train(
#         imported_dataset=feature_engineering_task.output,
#     )
    
#     # Deploy model task
#     deploy_task = deploy_model(
#         project="involuted-tuner-441406-a9",  # Replace with your project ID
#         location="us-central1",  # Replace with your desired location
#     ).after(train_task)  # Ensure deployment happens after training

## Compile the pipeline

Next, compile the pipeline.

In [12]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="lightweight_pipeline.yaml"
)

## Run the pipeline

Next, run the pipeline.

In [13]:
DISPLAY_NAME = "shakespeare"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="lightweight_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"message": "Hello, World"},
    enable_caching=False,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/260483181843/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250128092447
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/260483181843/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250128092447')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/metadata-pipeline-v2-20250128092447?project=260483181843
PipelineJob projects/260483181843/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250128092447 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/260483181843/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250128092447 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/260483181843/locations/us-central1/pipelineJobs/metadata-pipeline-v2-20250128092447 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/260483181843/locatio

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it's running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the Google Cloud console, many of the pipeline DAG nodes expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/artifact_io2.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/artifact_io2.png" width="95%"/></a>

### Delete the pipeline job

You can delete the pipeline job with the method `delete()`.job.delete()

In [None]:
job.delete()

# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial.

In [None]:
delete_bucket = False

if delete_bucket:
    ! gsutil rm -r $BUCKET_URI

! rm lightweight_pipeline.yaml