In [13]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 # kfp \
                                 "numpy<2" \
                                 google-cloud-pipeline-components

In [14]:
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

LOCATION = "us-central1"
LOCATION = "us-central1"  # @param {type:"string"}

In [15]:
BUCKET_URI = f"gs://{PROJECT_ID}-unique"  # @param {type:"string"}

In [16]:
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://my-project-0004-346516-unique/...
ServiceException: 409 A Cloud Storage bucket named 'my-project-0004-346516-unique' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [17]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [18]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    if IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 255766800726-compute@developer.gserviceaccount.com


In [19]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://my-project-0004-346516-unique/
No changes made to gs://my-project-0004-346516-unique/


In [20]:
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                     OutputPath, component)

In [21]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [22]:
Dataset = kfp.dsl.Dataset

In [23]:
import kfp
from kfp.dsl import component
from kfp.dsl import Output, Metrics
from typing import NamedTuple

@component(
    packages_to_install=[
        "scikit-learn==1.0.2",
        "pandas==1.3.5",
        "matplotlib==3.5.1",
        "numpy==1.23.0"
    ],
    base_image="python:3.9"
)
def perform_pca(data: Output[Dataset],metrics: Output[Metrics], n_components: int = 2) -> NamedTuple("Outputs", [("explained_variance_ratio", str)]):
    """
    Performs PCA on the Iris dataset, saves the transformed data,
    and generates a scree plot.

    Args:
        n_components (int): The number of principal components to retain.

    Returns:
        explained_variance_ratio (str): The explained variance ratio.
    """
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    from sklearn.decomposition import PCA
    from sklearn.datasets import load_iris
    from sklearn.preprocessing import StandardScaler
    import os
    from collections import namedtuple

    # Load the Iris dataset
    iris = load_iris()
    X = iris.data
    y = iris.target

    # Standardize the data
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Perform PCA
    pca = PCA(n_components=n_components)
    X_reduced = pca.fit_transform(X_scaled)

    # Create a Pandas DataFrame for the transformed data
    df = pd.DataFrame(X_reduced, columns=[f'PC{i+1}' for i in range(n_components)])
    df['target'] = y  # Add the target variable for potential analysis

    # Save the transformed data to a CSV file
    output_path = os.path.join(data.path, "pca_transformed_data.csv")
    df.to_csv(output_path, index=False)

    # Explained variance ratio
    explained_variance = pca.explained_variance_ratio_
    print("Explained Variance Ratio:", explained_variance)

    # Log metrics
    metrics.log_metric("explained_variance_ratio_PC1", explained_variance[0])
    if n_components > 1:
        metrics.log_metric("explained_variance_ratio_PC2", explained_variance[1])

    # Cumulative explained variance
    cum_var_exp = np.cumsum(explained_variance)

    # Scree plot
    plt.figure(figsize=(8, 6))
    plt.plot(range(1, len(explained_variance) + 1), explained_variance, 'ro-', linewidth=2)
    plt.title('Scree Plot')
    plt.xlabel('Principal Component')
    plt.ylabel('Variance Explained')
    plt.savefig(os.path.join(data.path, "scree_plot.png"))  # Save the plot to a file
    plt.close()

    metrics.log_artifact("scree_plot", data.path + "/scree_plot.png")

    # Return the explained variance ratio as a string
    Outputs = namedtuple('Outputs', ['explained_variance_ratio'])
    return Outputs(explained_variance_ratio=str(explained_variance.tolist()))


In [24]:
@dsl.pipeline(
    name="pca-pipeline",
    description="A pipeline that performs PCA on the Iris dataset"
)
def pca_pipeline() -> NamedTuple("Outputs", [("explained_variance_ratio", str)]):
    pca_task = perform_pca(
        n_components=2
    )
    return pca_task.output

AttributeError: The task has multiple outputs. Please reference the output by its name.

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="pca_pipeline.yaml"
)