In [1]:
PROJECT_ID = "Your-Project-ID"

# Set the project id of your GCP Project
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [2]:
#Enter the region and uri of your google cloud bucket

REGION = "Your-Region"
BUCKET_URI = f"gs://Your-Bucket"

In [3]:
#Enter your GCP service account

SERVICE_ACCOUNT = "Your-Service-Account 

In [None]:
#Allow your service account to access your cloud bucket

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

In [5]:
#Import required libraries

import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, InputPath,OutputPath
from typing import NamedTuple

In [6]:
#Init your project

aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [7]:
#Define environment for vertex ai and set your pipeline root

PATH = %env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

PIPELINE_ROOT = f"{BUCKET_URI}/tf_pipeline"  # This is where all pipeline artifacts are sent. You'll need to ensure the bucket is created ahead of time
PIPELINE_ROOT
print(f"PIPELINE_ROOT: {PIPELINE_ROOT}")

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin
PIPELINE_ROOT: gs://imperial-data-306906-test-bucket-unique/tf_pipeline


In [9]:
@component(base_image='python:3.7', packages_to_install=['numpy==1.21.0', 'google-cloud-storage==2.11.0', 'pandas==1.2.4', 'tensorflow==2.9.3'])
def get_data(message: str,
             x_train_path: OutputPath(Artifact),
             y_train_path: OutputPath(Artifact),
             x_test_path: OutputPath(Artifact),
             y_test_path: OutputPath(Artifact)) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),
        ("dataset_details", str)
    ]
):
    """
    Gets and splits the MNIST dataset using TensorFlow, saving datasets to GCS.

    Args:
        message (str): A message for logging.
        x_train_path (OutputPath): Path to save the training images.
        y_train_path (OutputPath): Path to save the training labels.
        x_test_path (OutputPath): Path to save the testing images.
        y_test_path (OutputPath): Path to save the testing labels.

    Returns:
        NamedTuple: Outputs containing message and dataset details.
    """

     from google.cloud import storage
    import pandas as pd
    import numpy as np
    import tensorflow as tf

    # Load MNIST dataset
    (x_train_split, y_train_split), (x_test_split, y_test_split) = tf.keras.datasets.mnist.load_data()

    # Save the split datasets to specified paths
    np.save(x_train_path, x_train_split)
    np.save(x_test_path, x_test_split)
    np.save(y_train_path, y_train_split)
    np.save(y_test_path, y_test_split)

    # Calculate and store dataset details
    dataset_details_dict = {
        "Num Train Images": str(x_train_split.shape[0]),
        "Num Test Images": str(x_test_split.shape[0])
    }

    # Create output message and details
    output_message = "Split Data into Test and Train"
    output_message = f"Message: {output_message}"
    dataset_details = f"Split Details:\n{dataset_details_dict}"

In [10]:
# Component 2: Normalize Data
@component(base_image='python:3.7', packages_to_install=['numpy==1.21.0', 'google-cloud-storage==2.11.0', 'pandas==1.2.4'])
def normalize_data(message: str,
                 x_dataset_path: InputPath(Artifact),
                 x_dataset_reshaped_path: OutputPath(Artifact)
):
    """
    Normalizes and reshapes the input data.

    Args:
        message (str): A message for logging.
        x_dataset_path (InputPath): Path to the original dataset.
        x_dataset_reshaped_path (OutputPath): Path to save the normalized and reshaped dataset.

    Returns:
        None
    """
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    import io

    # Load the original dataset from x_dataset_path
    # Add file extension .npy

    x_dataset_path = f"{x_dataset_path}.npy"
    x = np.load(x_dataset_path)

    # Reshape and Normalize the images
    x = x.reshape(-1,28,28,1)
    x_reshaped = x / 255

    # Save the reshaped data as in the x_dataset_reshaped path
    np.save(x_dataset_reshaped_path , x_reshaped)

In [11]:
# Component 3: Train Model
@component(base_image='python:3.7', packages_to_install=['numpy==1.21.0','google-cloud-storage==2.11.0','pandas==1.2.4','tensorflow==2.9.3','joblib==1.1.0'])
def train_model(x_train_path: InputPath(Artifact),
                y_train_path: InputPath(Artifact),
                x_test_path: InputPath(Artifact),
                y_test_path: InputPath(Artifact),
                message: str,
                trained_model: Output[Model],
                metrics: Output[Metrics],
                no_epochs:int = 5,
                optimizer: str = "adam"
):
    """
    Builds, trains, and evaluates a TensorFlow model.

    Args:
        x_train_path (InputPath): Path to the normalized and reshaped training images.
        y_train_path (InputPath): Path to the training labels.
        x_test_path (InputPath): Path to the normalized and reshaped testing images.
        y_test_path (InputPath): Path to the testing labels.
        message (str): A message for logging.
        trained_model (Output[Model]): Output for the trained model artifact.
        metrics (Output[Metrics]): Output for tracking metrics.
        no_epochs (int): Number of training epochs.
        optimizer (str): Optimizer for model training.

    Returns:
        None
    """
    
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    import tensorflow as tf
    import joblib
    import os

    # Define and configure the neural network model
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(tf.keras.layers.MaxPool2D(2, 2))

    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dense(32, activation='relu'))
    model.add(tf.keras.layers.Dense(10, activation='softmax'))  # Output has 10 classes, numbers from 0-9

    # Get model summary for logging
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)

    # Compile the model with specified optimizer and loss function
    model.compile(optimizer=optimizer,
                  loss="sparse_categorical_crossentropy",
                  metrics=['accuracy'])

    # Load Training Data
    x_train_path = f"{x_train_path}.npy"
    x_train = np.load(x_train_path)

    y_train_path = f"{y_train_path}.npy"
    y_train = np.load(y_train_path)

    # Train the model and return the training history
    history = model.fit(
        x=x_train,
        y=y_train,
        epochs=no_epochs,
        batch_size=20 
    )

    # Load Test Data
    x_test_path = f"{x_test_path}.npy"
    x_test = np.load(x_test_path)

    y_test_path = f"{y_test_path}.npy"
    y_test = np.load(y_test_path)

    # Test the model against the test dataset
    # Returns the loss and accuracy value for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test, y=y_test)

    # Log accuracy metric
    metrics.log_metric("Accuracy", (float(model_accuracy) * 100.0))

    # Log loss metric
    metrics.log_metric("Loss", float(model_loss))

    # Generate output predictions for the input samples
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions, axis=1)  # The prediction outputs 10 values, take the index number of the highest value, which is the prediction of the model

    # Create directories for saving the trained model
    os.makedirs(trained_model.path, exist_ok=True)
    model_dir = trained_model.path
    os.makedirs(model_dir, exist_ok=True)
    tf.saved_model.save(model, model_dir)


In [12]:
# Component 4: Deploy TensorFlow Model
@component(packages_to_install=["google-cloud-aiplatform==1.25.0"],)
def deploy_tensorflow_model(
    model: Input[Model],
    project_id: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    """
    Deploys a TensorFlow model to Vertex AI for serving.

    Args:
        model (Input[Model]): Input for the trained model.
        project_id (str): Google Cloud Project ID.
        vertex_endpoint (Output[Artifact]): Output for the deployed model endpoint.
        vertex_model (Output[Model]): Output for the deployed model.

    Returns:
        None
    """
    
    from google.cloud import aiplatform

    aiplatform.init(project=project_id)

    # Upload the TensorFlow model to Vertex AI
    model_display_name = "tf-mnist-classifier-model"  # Provide a unique name for your model
    deployed_model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=model.uri,
        serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-4:latest",  # Use the appropriate TensorFlow version and CPU/GPU image
    )

    # Deploy the model to an endpoint
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # Update the Vertex AI endpoint and model outputs
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

In [13]:
# Define a Kubeflow Pipeline named "tf-pipeline"
@dsl.pipeline(
    name="tf-pipeline",
)
def pipeline():
    # Step 1: Get Data
    get_data_task = get_data(
        message='Getting MNIST Dataset From TensorFlow'
    )
    
    # Step 2: Normalize Data for Training
    normalize_data_train_task = normalize_data(
        x_dataset_path=get_data_task.outputs['x_train_path'],
        message='Reshaping Images in Train Set'
    )
    
    # Step 3: Normalize Data for Testing
    normalize_data_test_task = normalize_data(
        x_dataset_path=get_data_task.outputs['x_test_path'],
        message='Reshaping Images in Test Set'
    )
    
    # Step 4: Train TensorFlow Model
    train_model_task = train_model(
        x_train_path=normalize_data_train_task.outputs['x_dataset_reshaped_path'],
        y_train_path=get_data_task.outputs['y_train_path'],
        x_test_path=normalize_data_test_task.outputs['x_dataset_reshaped_path'],
        y_test_path=get_data_task.outputs['y_test_path'],
        message='Training TensorFlow Model'
    )
    
    # Step 5: Deploy TensorFlow Model to Vertex AI
    deploy_tensorflow_model_task = deploy_tensorflow_model(
        model=train_model_task.outputs['trained_model'],
        project_id=PROJECT_ID
    )

In [14]:
# Compile the Kubeflow Pipeline using the provided pipeline function
compiler.Compiler().compile(pipeline_func=pipeline, package_path="tf-pipeline.yaml")

In [15]:
# Create a Vertex AI PipelineJob for the compiled pipeline
job = aiplatform.PipelineJob(
    display_name="tf-pipeline",           # Set a display name for the pipeline job
    template_path="tf-pipeline.yaml",     # Path to the compiled pipeline YAML file
    pipeline_root=PIPELINE_ROOT,          # Root directory where pipeline artifacts will be stored
)

# Run the compiled pipeline using Vertex AI PipelineJob
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/138999697430/locations/us-central1/pipelineJobs/tf-pipeline-20231110212520
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/138999697430/locations/us-central1/pipelineJobs/tf-pipeline-20231110212520')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/tf-pipeline-20231110212520?project=138999697430
PipelineJob projects/138999697430/locations/us-central1/pipelineJobs/tf-pipeline-20231110212520 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/138999697430/locations/us-central1/pipelineJobs/tf-pipeline-20231110212520 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/138999697430/locations/us-central1/pipelineJobs/tf-pipeline-20231110212520 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/138999697430/locations/us-central1/pipelineJobs/tf-pipeline-20231110212520