In [None]:
! pip3 install --upgrade --quiet --user google-cloud-aiplatform  \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components

In [1]:
# Define project
PROJECT_ID = "learning-project-38730"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [2]:
# Define region
REGION = "us-central1"  # @param {type: "string"}

In [3]:
# Define bucket
BUCKET_NAME = "bucket-datapath-project"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

! echo $BUCKET_URI

gs://bucket-datapath-project


In [4]:
# Create bucket
# ! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

In [5]:
SERVICE_ACCOUNT = "datapath-bootcamp@learning-project-38730.iam.gserviceaccount.com"  # @param {type:"string"}

In [6]:
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

In [7]:
# Import libraries
import json
import google.cloud.aiplatform as aiplatform
import google.cloud.aiplatform as aip
from kfp import compiler, dsl
from kfp.dsl import component
from typing import List
from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output

In [8]:
# Vertex AI Pipelines constants
PIPELINE_ROOT = "{}/pipeline_root/control".format(BUCKET_URI)

In [9]:
# Initialize Vertex AI SDK for Python
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [11]:
# Create components
@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
    import pandas as pd

    csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    col_names = [
        'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    ]
    df = pd.read_csv(csv_url, names=col_names)

    with open(iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
    input_iris_dataset: Input[Dataset],
    normalized_iris_dataset: Output[Dataset],
    standard_scaler: bool,
    min_max_scaler: bool,
):
    if standard_scaler is min_max_scaler:
        raise ValueError(
            'Exactly one of standard_scaler or min_max_scaler must be True.')

    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler

    with open(input_iris_dataset.path) as f:
        df = pd.read_csv(f)
    labels = df.pop('Labels')

    if standard_scaler:
        scaler = StandardScaler()
    if min_max_scaler:
        scaler = MinMaxScaler()

    df = pd.DataFrame(scaler.fit_transform(df))
    df['Labels'] = labels
    with open(normalized_iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
    normalized_iris_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier

    with open(normalized_iris_dataset.path) as f:
        df = pd.read_csv(f)

    y = df.pop('Labels')
    X = df

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(clf, f)

In [12]:
# Generate pipeline
@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
    standard_scaler: bool,
    min_max_scaler: bool,
    neighbors: List[int],
):
    create_dataset_task = create_dataset()

    normalize_dataset_task = normalize_dataset(
        input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
        standard_scaler=True,
        min_max_scaler=False)

    with dsl.ParallelFor(neighbors) as n_neighbors:
        train_model(
            normalized_iris_dataset=normalize_dataset_task
            .outputs['normalized_iris_dataset'],
            n_neighbors=n_neighbors)

In [19]:
# Compile
pipeline_filename = 'iris_training_pipeline.yaml'
compiler.Compiler().compile(my_pipeline, pipeline_filename)
print(f'Pipeline compiled successfully! Check {pipeline_filename} for the YAML definition.')

Pipeline compiled successfully! Check iris_training_pipeline.yaml for the YAML definition.


In [None]:
DISPLAY_NAME = "control"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="iris_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'min_max_scaler': True,
        'standard_scaler': False,
        'neighbors': [3, 6, 9]
    }
)

job.run()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/iris-training-pipeline-20231123150340?project=394727607809


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/iris-training-pipeline-20231123150340?project=394727607809


PipelineJob projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340 current state:
PipelineState.PIPELINE_STATE_PENDING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340 current state:
PipelineState.PIPELINE_STATE_PENDING


PipelineJob projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340 current state:
PipelineState.PIPELINE_STATE_PENDING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340 current state:
PipelineState.PIPELINE_STATE_PENDING


PipelineJob projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340 current state:
PipelineState.PIPELINE_STATE_PENDING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/394727607809/locations/us-central1/pipelineJobs/iris-training-pipeline-20231123150340 current state:
PipelineState.PIPELINE_STATE_PENDING


In [14]:
def create_endpoint_sample(
    project: str,
    display_name: str,
    location: str,
):
    aiplatform.init(project=project, location=location)

    endpoint = aiplatform.Endpoint.create(
        display_name=display_name,
        project=project,
        location=location,
    )

    print(endpoint.display_name)
    print(endpoint.resource_name)
    return endpoint

In [15]:
endpoint = create_endpoint_sample(project= PROJECT_ID, \
                                    display_name= 'iris_endpoint', \
                                    location= REGION)

Creating Endpoint
Create Endpoint backing LRO: projects/394727607809/locations/us-central1/endpoints/4254619617307131904/operations/3669423726157365248
Endpoint created. Resource name: projects/394727607809/locations/us-central1/endpoints/4254619617307131904
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/394727607809/locations/us-central1/endpoints/4254619617307131904')
iris_endpoint
projects/394727607809/locations/us-central1/endpoints/4254619617307131904


In [16]:
ENDPOINT_NAME = 'iris_endpoint'
!gcloud ai endpoints list \
  --region=$REGION\
  --filter=display_name=$ENDPOINT_NAME

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
ENDPOINT_ID          DISPLAY_NAME
4254619617307131904  iris_endpoint
7045725486369996800  iris_endpoint
