## Initial setup

In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

In [2]:
# !pip3 install --upgrade --user google-cloud-aiplatform \
#                                google-cloud-storage \
#                                kfp \
#                                google-cloud-pipeline-components -q

In [21]:
PROJECT_ID = "ds-training-380514"
REGION = "us-central1"
BUCKET_NAME = "aaa-aca-ml-workshop-central"
BUCKET_URI = f"gs://{BUCKET_NAME}"

BUCKET_URI

'gs://aaa-aca-ml-workshop-central'

In [22]:
# Get this value from GCP IAM console
SERVICE_ACCOUNT = "354621994428-compute@developer.gserviceaccount.com"

In [23]:
from google.cloud import aiplatform, bigquery
from kfp.dsl import pipeline
from kfp.v2 import compiler

In [24]:
# Initialize Vertex AI SDK
aiplatform.init(project=PROJECT_ID,
                location=REGION,
                staging_bucket=BUCKET_URI)

# Initialize BigQuery client
bq_client = bigquery.Client(
                            project=PROJECT_ID,
                            credentials=aiplatform.initializer.global_config.credentials,
                           )

In [25]:
import random
import string

# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits,
                                  k=length))

UUID = generate_uuid()

In [26]:
# Source of the dataset
DATA_SOURCE = "bq://bigquery-public-data.ml_datasets.census_adult_income"

# Set name for the managed Vertex AI dataset
DATASET_DISPLAY_NAME = f"adult_census_dataset_{UUID}"

# BigQuery Dataset name
BQ_DATASET_ID = f"income_prediction_{UUID}"

# Set name for the BigQuery source table for batch prediction
BQ_INPUT_TABLE = f"income_test_data_{UUID}"

# Set the size(%) of the train set
TRAIN_SPLIT = 0.9

# Provide the container for training the model
TRAINING_CONTAINER = "us-docker.pkg.dev/vertex-ai/training/scikit-learn-cpu.0-23:latest"
# Provide the container for serving the model
SERVING_CONTAINER = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-23:latest"

# Set the display name for training job
TRAINING_JOB_DISPLAY_NAME = f"income_classify_train_job_{UUID}"

# Model display name for Vertex AI Model Registry
MODEL_DISPLAY_NAME = f"income_classify_model_{UUID}"
# Set the name for batch prediction job
BATCH_PREDICTION_JOB_NAME = f"income_classify_batch_pred_{UUID}"
# Dispaly name for the Vertex AI Pipeline
PIPELINE_DISPLAY_NAME = f"income_classfiy_batch_pred_pipeline_{UUID}"

# Filename to compile the pipeline to
PIPELINE_FILE_NAME = f"{PIPELINE_DISPLAY_NAME}.json"  # to be generated

In [27]:
# Create a BQ dataset
bq_dataset = bigquery.Dataset(f"{PROJECT_ID}.{BQ_DATASET_ID}")
bq_dataset = bq_client.create_dataset(bq_dataset)
print(f"Created dataset {bq_client.project}.{bq_dataset.dataset_id}")

Created dataset ds-training-380514.income_prediction_qqoshgp7


In [28]:
# Query to create a test set from the source table
query = f"""
CREATE OR REPLACE TABLE
  `{PROJECT_ID}.{BQ_DATASET_ID}.{BQ_INPUT_TABLE}` AS

SELECT
  * EXCEPT (pseudo_random, income_bracket)
FROM (
  SELECT
    *,
    RAND() AS pseudo_random 
  FROM
    `bigquery-public-data.ml_datasets.census_adult_income` )
WHERE pseudo_random > {TRAIN_SPLIT}
"""
# Run the query
_ = bq_client.query(query)

## Prepare custom model code

In [29]:
!mkdir -p python_package
!mkdir -p python_package/trainer
!touch python_package/trainer/__init__.py

In [30]:
%%writefile python_package/trainer/task.py
import os
import joblib
import argparse
from google.cloud import storage
from google.cloud import bigquery
from sklearn.pipeline import Pipeline
from sklearn.pipeline import FeatureUnion
from sklearn.preprocessing import LabelBinarizer
from sklearn.feature_selection import SelectKBest
from sklearn.ensemble import RandomForestClassifier

# Read environmental variables
PROJECT = os.getenv("CLOUD_ML_PROJECT_ID")
TRAINING_DATA_URI = os.getenv("AIP_TRAINING_DATA_URI")

# Set Bigquery Client
bq_client = bigquery.Client(project=PROJECT)
storage_client = storage.Client(project=PROJECT)

# Define the constants
TARGET = 'income_bracket'
ARTIFACTS_PATH = os.getenv("AIP_MODEL_DIR")
# Get the bucket name from the model dir
BUCKET_NAME = ARTIFACTS_PATH.replace("gs://","").split("/")[0]

MODEL_FILENAME = 'model.joblib' 
# Define the format of your input data, excluding the target column.
COLUMNS = [
           'age',
           'workclass',
           'functional_weight',
           'education',
           'education_num',
           'marital_status',
           'occupation',
           'relationship',
           'race',
           'sex',
           'capital_gain',
           'capital_loss',
           'hours_per_week',
           'native_country'
           ]

CATEGORICAL_COLUMNS = [
                       'workclass',
                       'education',
                       'marital_status',
                       'occupation',
                       'relationship',
                       'race',
                       'sex',
                       'native_country'
                      ]

# Fetch data from BigQuery
def download_table(bq_table_uri: str):
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]

    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bq_client.list_rows(table,)
    return rows.to_dataframe(create_bqstorage_client=False)

# Upload local files to GCS
def upload_model(bucket_name: str,
                filename: str):
     # Upload the saved model file to GCS
    bucket = storage_client.get_bucket(bucket_name)
    storage_path = os.path.join(ARTIFACTS_PATH, filename)
    blob = storage.blob.Blob.from_string(storage_path,
                                         client=storage_client)
    blob.upload_from_filename(filename)
    

if __name__ == '__main__':
    # Load the training data
    X_train = download_table(TRAINING_DATA_URI)

    # Remove the column we are trying to predict ('income-level') from our features list
    # Convert the Dataframe to a lists of lists
    train_features = X_train.drop(TARGET, axis=1).to_numpy().tolist()
    # Create our training labels list, convert the Dataframe to a lists of lists
    train_labels = X_train[TARGET].to_numpy().tolist()

    # We use a list of pipelines to convert each categorical column and then use FeatureUnion to combine them
    # before calling the RandomForestClassifier.
    categorical_pipelines = []

    # Each categorical column needs to be extracted individually and converted to a numerical value.
    # To do this, each categorical column uses a pipeline that extracts one feature column via
    # SelectKBest(k=1) and a LabelBinarizer() to convert the categorical value to a numerical one.
    # A scores array (created below) selects and extracts the feature column. The scores array is
    # created by iterating over the COLUMNS and checking if it is a CATEGORICAL_COLUMN.
    for i, col in enumerate(COLUMNS):
        if col in CATEGORICAL_COLUMNS:
            # Create a scores array to get the individual categorical column.
            # Example:
            #  data = [39, 'State-gov', 77516, 'Bachelors', 13, 'Never-married', 'Adm-clerical',
            #         'Not-in-family', 'White', 'Male', 2174, 0, 40, 'United-States']
            #  scores = [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
            #
            # Returns: [['Sate-gov']]
            
            scores = []
            # Build the scores array
            for j in range(len(COLUMNS)):
                if i == j: # This column is the categorical column we want to extract.
                    scores.append(1) # Set to 1 to select this column
                else: # Every other column should be ignored.
                    scores.append(0)
                    
            skb = SelectKBest(k=1)
            skb.scores_ = scores
            # Convert the categorical column to a numerical value
            lbn = LabelBinarizer()
            r = skb.transform(train_features)
            lbn.fit(r)
            # Create the pipeline to extract the categorical feature
            categorical_pipelines.append(
                                        ('categorical-{}'.format(i), Pipeline([
                                                                                ('SKB-{}'.format(i), skb),
                                                                                ('LBN-{}'.format(i), lbn)
                                                                              ])
                                        )
                                        )

    # Create pipeline to extract the numerical features
    skb = SelectKBest(k=6)
    # From COLUMNS use the features that are numerical
    skb.scores_ = [1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0]
    categorical_pipelines.append(('numerical', skb))

    # Combine all the features using FeatureUnion
    preprocess = FeatureUnion(categorical_pipelines)

    # Create the classifier
    classifier = RandomForestClassifier()

    # Transform the features and fit them to the classifier
    classifier.fit(preprocess.transform(train_features), train_labels)

    # Create the overall model as a single pipeline
    pipeline = Pipeline([
                         ('union', preprocess),
                         ('classifier', classifier)
                        ])

    # Save the pipeline locally
    joblib.dump(pipeline, MODEL_FILENAME)
    
    # Upload the locally saved model to GCS
    upload_model(
                 bucket_name = BUCKET_NAME, 
                 filename=MODEL_FILENAME
                )

Overwriting python_package/trainer/task.py


## Packaging of custom model code

In [31]:
%%writefile python_package/setup.py

from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = ['pandas','pyarrow']

setup(
      name='trainer',
      version='0.1',
      packages=find_packages(),
      include_package_data=True,
      description='My training application.'
     )

Overwriting python_package/setup.py


In [32]:
!cd python_package && python3 setup.py sdist --formats=gztar

running sdist
running egg_info
writing trainer.egg-info/PKG-INFO
writing dependency_links to trainer.egg-info/dependency_links.txt
writing top-level names to trainer.egg-info/top_level.txt
reading manifest file 'trainer.egg-info/SOURCES.txt'
writing manifest file 'trainer.egg-info/SOURCES.txt'

running check
creating trainer-0.1
creating trainer-0.1/trainer
creating trainer-0.1/trainer.egg-info
copying files to trainer-0.1...
copying setup.py -> trainer-0.1
copying trainer/__init__.py -> trainer-0.1/trainer
copying trainer/task.py -> trainer-0.1/trainer
copying trainer.egg-info/PKG-INFO -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/SOURCES.txt -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/dependency_links.txt -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/top_level.txt -> trainer-0.1/trainer.egg-info
Writing trainer-0.1/setup.cfg
Creating tar archive
removing 'trainer-0.1' (and everything under it)


In [33]:
!gsutil cp -r python_package/dist/* $BUCKET_URI/training_package/

Copying file://python_package/dist/trainer-0.1.tar.gz [Content-Type=application/x-tar]...
/ [1 files][  2.7 KiB/  2.7 KiB]                                                
Operation completed over 1 objects/2.7 KiB.                                      


## Build pipeline

Using pre-built components

Reference: https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.aiplatform.html

In [34]:
@pipeline(name="custom-model-bq-batch-prediction-pipeline")
def custom_model_bq_batch_prediction_pipeline(
                                            project: str,
                                            location: str,
                                            dataset_display_name: str,
                                            dataset_bq_source: str,
                                            training_job_dispaly_name: str,
                                            gcs_staging_directory: str,
                                            python_package_gcs_uri: str,
                                            python_package_module_name: str,
                                            training_split: float,
                                            test_split: float,
                                            training_container_uri: str,
                                            serving_container_uri: str,
                                            training_bigquery_destination: str,
                                            model_display_name: str,
                                            batch_prediction_display_name: str,
                                            batch_prediction_instances_format: str,
                                            batch_prediction_predictions_format: str,
                                            batch_prediction_source_uri: str,
                                            batch_prediction_destination_uri: str,
                                            batch_prediction_machine_type: str = "n1-standard-4",
                                            batch_prediction_batch_size: int = 1000,
                                             ):
    
    from google_cloud_pipeline_components.aiplatform import (
        CustomPythonPackageTrainingJobRunOp, ModelBatchPredictOp,
        TabularDatasetCreateOp)

    # Create the dataset
    dataset_create_op = TabularDatasetCreateOp(
                                                project=project,
                                                location=location,
                                                display_name=dataset_display_name,
                                                bq_source=dataset_bq_source,
                                              )

    # Run the training task
    train_op = CustomPythonPackageTrainingJobRunOp(
                                                    display_name=training_job_dispaly_name,
                                                    python_package_gcs_uri=python_package_gcs_uri,
                                                    python_module_name=python_package_module_name,
                                                    container_uri=training_container_uri,
                                                    model_display_name=model_display_name,
                                                    model_serving_container_image_uri=serving_container_uri,
                                                    dataset=dataset_create_op.outputs["dataset"],
                                                    base_output_dir=gcs_staging_directory,
                                                    bigquery_destination=training_bigquery_destination,
                                                    training_fraction_split=training_split,
                                                    test_fraction_split=test_split,
                                                    staging_bucket=gcs_staging_directory,
                                                  )

    # Run the batch prediction task
    _ = ModelBatchPredictOp(
                            project=project,
                            location=location,
                            model=train_op.outputs["model"],
                            instances_format=batch_prediction_instances_format,
                            bigquery_source_input_uri=batch_prediction_source_uri,
                            predictions_format=batch_prediction_predictions_format,
                            bigquery_destination_output_uri=batch_prediction_destination_uri,
                            job_display_name=batch_prediction_display_name,
                            machine_type=batch_prediction_machine_type,
                            manual_batch_tuning_parameters_batch_size=batch_prediction_batch_size,
                            )

In [35]:
compiler.Compiler().compile(
                            pipeline_func=custom_model_bq_batch_prediction_pipeline,
                            package_path=PIPELINE_FILE_NAME,
                           )

In [36]:
parameters = {
            "project": PROJECT_ID,
            "location": REGION,
            "dataset_display_name": DATASET_DISPLAY_NAME,
            "dataset_bq_source": DATA_SOURCE,
            "training_job_dispaly_name": TRAINING_JOB_DISPLAY_NAME,
            "gcs_staging_directory": BUCKET_URI,
            "python_package_gcs_uri": f"{BUCKET_URI}/training_package/trainer-0.1.tar.gz",
            "python_package_module_name": "trainer.task",
            "training_split": TRAIN_SPLIT,
            "test_split": 1 - TRAIN_SPLIT,
            "training_container_uri": TRAINING_CONTAINER,
            "serving_container_uri": SERVING_CONTAINER,
            "training_bigquery_destination": f"bq://{PROJECT_ID}",
            "model_display_name": MODEL_DISPLAY_NAME,
            "batch_prediction_display_name": BATCH_PREDICTION_JOB_NAME,
            "batch_prediction_instances_format": "bigquery",
            "batch_prediction_predictions_format": "bigquery",
            "batch_prediction_source_uri": f"bq://{PROJECT_ID}.{BQ_DATASET_ID}.{BQ_INPUT_TABLE}",
            "batch_prediction_destination_uri": f"bq://{PROJECT_ID}.{BQ_DATASET_ID}",
             }

## Run pipeline

In [37]:
job = aiplatform.PipelineJob(
                            display_name=PIPELINE_DISPLAY_NAME,
                            template_path=PIPELINE_FILE_NAME,
                            parameter_values=parameters,
                            enable_caching=True,
                            )

In [38]:
# Run the pipeline job
job.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/354621994428/locations/us-central1/pipelineJobs/custom-model-bq-batch-prediction-pipeline-20230406192358
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/354621994428/locations/us-central1/pipelineJobs/custom-model-bq-batch-prediction-pipeline-20230406192358')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-model-bq-batch-prediction-pipeline-20230406192358?project=354621994428
PipelineJob projects/354621994428/locations/us-central1/pipelineJobs/custom-model-bq-batch-prediction-pipeline-20230406192358 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/354621994428/locations/us-central1/pipelineJobs/custom-model-bq-batch-prediction-pipeline-20230406192358 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/354621994428/locations/us-central1/pipelineJobs/custom-model-bq-batch-pr