# Setup
## Packages & Restart Kernel

In [15]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [2]:
# Install Python package dependencies.
print("Installing libraries")
! pip3 install {USER_FLAG} --quiet google-cloud-pipeline-components kfp
! pip3 install {USER_FLAG} --quiet --upgrade google-cloud-aiplatform google-cloud-bigquery
! pip3 install {USER_FLAG} --quiet db-dtypes

Installing libraries
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cloud-tpu-client 0.10 requires google-api-python-client==1.8.0, but you have google-api-python-client 1.12.11 which is incompatible.[0m[31m
[0m

In [3]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Set Project ID and Env Variables

In [16]:
import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  uki-mlops-dev-demo


In [17]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

In [18]:
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [19]:
print(PROJECT_ID)

uki-mlops-dev-demo


In [20]:
# timestamp - refresh when resubmitting pipeline runs
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
print(TIMESTAMP)

20221010162928


## Create Cloud Storage Bucket

In [21]:
BUCKET_NAME = "football-match-events"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [6]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "aip-" + TIMESTAMP
BUCKET_URI = f"gs://{BUCKET_NAME}"

if REGION == "[your-region]":
    REGION = "us-central1"

In [22]:
print(BUCKET_NAME+" in "+REGION)

football-match-events in us-central1


In [None]:
# run this if bucket doesn't already exist
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

In [23]:
# validate access to the bucket by checking contents
! gsutil ls -al $BUCKET_URI

     15290  2022-10-07T12:08:40Z  gs://football-match-events/all_tables.csv#1665144520869928  metageneration=1
       632  2022-10-07T12:08:42Z  gs://football-match-events/bet-testdata.csv#1665144522728766  metageneration=1
      7502  2022-10-07T12:08:42Z  gs://football-match-events/betting-activity.csv#1665144522186580  metageneration=1
   4906287  2022-10-07T12:08:42Z  gs://football-match-events/bettingCustomers.csv#1665144522017710  metageneration=1
  56003476  2022-10-07T12:08:46Z  gs://football-match-events/events-raw.csv#1665144526990023  metageneration=1
   8186151  2022-10-07T12:08:41Z  gs://football-match-events/matches.csv#1665144521488578  metageneration=1
       586  2022-10-07T12:21:11Z  gs://football-match-events/teams.csv#1665145271184362  metageneration=1
TOTAL: 7 objects, 69119924 bytes (65.92 MiB)


## Service Account

In [24]:
SERVICE_ACCOUNT = "vertex-pipelines-sa@uki-mlops-dev-demo.iam.gserviceaccount.com"  # @param {type:"string"}

In [25]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://football-match-events/
No changes made to gs://football-match-events/


## Imports

In [26]:
import sys
from typing import NamedTuple
import os

from google.cloud import aiplatform as vertex
from google.cloud import bigquery
from google_cloud_pipeline_components import \
    aiplatform as vertex_pipeline_components
from google_cloud_pipeline_components.experimental import \
    bigquery as bq_components

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import Artifact, Input, Metrics, Output, component

# Build Pipeline

## Pipeline Variables
Make sure the GCS bucket and the BigQuery Dataset do not exist. This script may delete any existing content.

Your bucket must be on the same region as your Vertex AI resources.

BQ region us-central1;

Make sure your preferred Vertex AI region is supported [link].

In [173]:
PIPELINE_JSON_PKG_PATH = "football-match.json"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root"
DATA_FOLDER = f"{BUCKET_NAME}/data"

BQ_DATASET = "football_match"  # @param {type:"string"}
BQ_LOCATION = "us-central1"  # @param {type:"string"}
BQ_LOCATION = BQ_LOCATION.upper()
BQML_EXPORT_LOCATION = f"gs://{BUCKET_NAME}/artifacts/bqml"

DISPLAY_NAME = "football-match"
ENDPOINT_DISPLAY_NAME = f"{DISPLAY_NAME}_endpoint"

image_prefix = REGION.split("-")[0]
BQML_SERVING_CONTAINER_IMAGE_URI = (
    f"{image_prefix}-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest"
)

In [174]:
if os.getenv("IS_TESTING"):
    !gcloud --quiet components install beta
    !gcloud --quiet components update
!gcloud config set project $PROJECT_ID
!gcloud config set ai/region $REGION

Updated property [core/project].
Updated property [ai/region].


## Pipeline Components
Starting after data has been loaded into BQ from GCS

### Split Datasets

Splits the dataset in 3 slices:

* TRAIN
* EVALUATE
* TEST

AutoML and BigQuery ML use different nomenclatures for data splits:

BQML
How BQML splits the data: https://cloud.google.com/bigquery-ml/docs/reference/standard-sql/bigqueryml-hyperparameter-tuning#data_split

In [175]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-bigquery","pandas","pyarrow","fsspec"],
)  # pandas, pyarrow and fsspec required to export bq data to csv

def split_datasets(
    raw_dataset: str,
    bq_location: str,
) -> NamedTuple(
    "bqml_split",
    [
        ("dataset_uri", str),
        ("dataset_bq_uri", str),
    ],
):

    from collections import namedtuple

    from google.cloud import bigquery

    project = "uki-mlops-dev-demo"
    bq_dataset = "football_match"

    client = bigquery.Client(project=project, location=bq_location)

    def split_dataset(training_dataset_table):
        training_dataset_table_name = f"{project}.{bq_dataset}.{training_dataset_table}"
        split_query = f"""
        CREATE OR REPLACE TABLE
            `{training_dataset_table_name}`
           AS
           SELECT * EXCEPT (id)
            FROM (
             SELECT
               a.*,
               b.product_type,
               CASE
                 WHEN a.home = d.FavTeam THEN 1
               ELSE
               0
             END
               AS favTeamHome,
               CASE
                 WHEN a.away = d.FavTeam THEN 1
               ELSE
               0
             END
               AS favTeamAway,
             d.CustomerGroup,
             d.CustomerYears,
                CASE(ABS(MOD(FARM_FINGERPRINT(CAST(a.id AS string)), 10)))
                          WHEN 9 THEN 'TEST'
                          WHEN 8 THEN 'VALIDATE'
                          ELSE 'TRAIN' END AS split_col
             FROM
               `{project}.{bq_dataset}.features-events` a
             JOIN
               `{project}.{bq_dataset}.customers-activity` b
             ON
               a.id=b.id and a.time_bucket=b.time_bucket
             JOIN
               `{project}.{bq_dataset}.customers` d
             ON
               b.PlayerID=d.PlayerID) 
        """
        print("Splitting the dataset")
        query_job = client.query(split_query)  # Make an API request.
        query_job.result()
        return training_dataset_table_name
    
    training_dataset_table = "football_data_train_test"

    dataset_uri = split_dataset(training_dataset_table)
    print("ran split_dataset fxn")
    dataset_bq_uri = "bq://" + dataset_uri

    print(f"dataset: {dataset_uri}")

    result_tuple = namedtuple(
        "bqml_split",
        ["dataset_uri", "dataset_bq_uri"],
    )
    return result_tuple(
        dataset_uri=str(dataset_uri),
        dataset_bq_uri=str(dataset_bq_uri),
    )


### Train BQML Models

In [176]:
def _query_create_customergroup_model(
    project_id: str,
    bq_dataset: str,
    training_data_uri: str,
    model_name: str = "CustomerGroupClassifier",
):
    model_uri = f"{project_id}.{bq_dataset}.{model_name}"

    model_options = """OPTIONS
      ( MODEL_TYPE='KMEANS', num_clusters = 6
        )
        """
    query = f"""
    CREATE OR REPLACE MODEL
      `{model_uri}`
      {model_options}
     AS
    SELECT
      * EXCEPT(CustomerGroup),
      CASE(split_col)
        WHEN 'TRAIN' THEN TRUE
      ELSE
      FALSE
    END
      AS data_split
    FROM
      `{training_data_uri}`;
    """

    print(query.replace("\n", " "))

    return query

In [177]:
def _query_create_productpredictor_model(
    project_id: str,
    bq_dataset: str,
    training_data_uri: str,
    model_name: str = "product-predictor",
):
    model_uri = f"{project_id}.{bq_dataset}.{model_name}"

    model_options = """OPTIONS
      ( MODEL_TYPE='BOOSTED_TREE_CLASSIFIER',
       AUTO_CLASS_WEIGHTS = TRUE,
       BOOSTER_TYPE = 'GBTREE',
       DATA_SPLIT_METHOD = 'AUTO_SPLIT',
       NUM_PARALLEL_TREE = 1,
       MAX_ITERATIONS = 50,
       TREE_METHOD = 'AUTO',
       EARLY_STOP = FALSE,
       SUBSAMPLE = 0.85,
       INPUT_LABEL_COLS = ['product_type']
        )
        """
    query = f"""
    CREATE OR REPLACE MODEL
      `{model_uri}`
      {model_options}
     AS
    SELECT
      *,
      CASE(split_col)
        WHEN 'TRAIN' THEN TRUE
      ELSE
      FALSE
    END
      AS data_split
    FROM
      `{training_data_uri}`;
    """

    print(query.replace("\n", " "))

    return query

# Pipeline

In [199]:
pipeline_params = {
    "project": PROJECT_ID,
    "region": REGION,
    "bq_dataset": BQ_DATASET,
    "bq_location": BQ_LOCATION,
    "bqml_model_export_location": BQML_EXPORT_LOCATION,
    "bqml_serving_container_image_uri": BQML_SERVING_CONTAINER_IMAGE_URI,
    "endpoint_display_name": ENDPOINT_DISPLAY_NAME,
    #"thresholds_dict_str": '{"rmse": 2.5}',
}

In [200]:
@dsl.pipeline(name=DISPLAY_NAME, description="Rapid Prototyping")
def train_pipeline(
    project: str,
    region: str,
    bq_dataset: str,
    bq_location: str,
    bqml_model_export_location: str,
    bqml_serving_container_image_uri: str,
    endpoint_display_name: str,
    #thresholds_dict_str: str,
):

    # Splits the BQ dataset using a custom component.
    split_datasets_op = split_datasets(raw_dataset=bq_dataset, bq_location=bq_location)
    
    # Generates the query to create the customer group BQML using a static function.
    create_customergroup_model_query = _query_create_customergroup_model(
        project, bq_dataset, split_datasets_op.outputs["dataset_uri"]
    )

    # Builds BQML customer group model using pre-built-component.
    bqml_create_customergroup_op = bq_components.BigqueryCreateModelJobOp(
        project=project, location=bq_location, query=create_customergroup_model_query
    )
    bqml_customergroup_model = bqml_create_customergroup_op.outputs["model"]
    
    # Exports the BQML customer group model to a GCS bucket using a pre-built-component.
    bqml_export_customergroup_op = bq_components.BigqueryExportModelJobOp(
        project=project,
        location=bq_location,
        model=bqml_customergroup_model,
        model_destination_path=bqml_model_export_location,
    ).after(bqml_create_customergroup_op)
    bqml_customergroup_exported_gcs_path = bqml_export_customergroup_op.outputs["exported_model_path"]

    # Uploads the recently exported BQML customer group model from GCS into Vertex AI using a pre-built-component.
    bqml_customergroup_model_upload_op = vertex_pipeline_components.ModelUploadOp(
        project=project,
        location=region,
        display_name=DISPLAY_NAME + "_bqml_customergroup",
        artifact_uri=bqml_customergroup_exported_gcs_path,
        serving_container_image_uri=bqml_serving_container_image_uri,
    )
    bqml_vertex_customergroup_model = bqml_customergroup_model_upload_op.outputs["model"]
    
    # Generates the query to create the product predictor BQML using a static function.
    create_productpredictor_model_query = _query_create_productpredictor_model(
        project, bq_dataset, split_datasets_op.outputs["dataset_uri"]
    )

    # Builds BQML product predictor model using pre-built-component.
    bqml_create_productpredictor_op = bq_components.BigqueryCreateModelJobOp(
        project=project, location=bq_location, query=create_productpredictor_model_query
    )
    bqml_productpredictor_model = bqml_create_productpredictor_op.outputs["model"]
    
    # Exports the BQML product predictor model to a GCS bucket using a pre-built-component.
    bqml_export_productpredictor_op = bq_components.BigqueryExportModelJobOp(
        project=project,
        location=bq_location,
        model=bqml_productpredictor_model,
        model_destination_path=bqml_model_export_location,
    ).after(bqml_create_productpredictor_op)
    bqml_productpredictor_exported_gcs_path = bqml_export_productpredictor_op.outputs["exported_model_path"]

    # Uploads the recently exported BQML product predictor model from GCS into Vertex AI using a pre-built-component.
    bqml_productpredictor_model_upload_op = vertex_pipeline_components.ModelUploadOp(
        project=project,
        location=region,
        display_name=DISPLAY_NAME + "_bqml_productpredictor",
        artifact_uri=bqml_productpredictor_exported_gcs_path,
        serving_container_image_uri=bqml_serving_container_image_uri,
    )
    bqml_vertex_productpredictor_model = bqml_productpredictor_model_upload_op.outputs["model"]
    
    # Creates a Vertex AI endpoint using a pre-built-component for product predictor.
    endpoint_productpredictor_create_op = vertex_pipeline_components.EndpointCreateOp(
        project=project,
        location=region,
        display_name=endpoint_display_name,
    )
    endpoint_productpredictor_create_op.after(bqml_productpredictor_model_upload_op)
    
    # Deploys the BQML model (now on Vertex AI) to the recently created endpoint using a pre-built component.
    model_deploy_productpredictor_op = (
        vertex_pipeline_components.ModelDeployOp(  # noqa: F841
            endpoint=endpoint_productpredictor_create_op.outputs["endpoint"],
            model=bqml_vertex_productpredictor_model,
            deployed_model_display_name=DISPLAY_NAME + "_productpredictor",
            dedicated_resources_machine_type="n1-standard-2",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=2,
            traffic_split={
                "0": 100
            },  # newly deployed model gets 100% of the traffic
        ).set_caching_options(False)
    )
    

In [201]:
compiler.Compiler().compile(
    pipeline_func=train_pipeline,
    package_path=PIPELINE_JSON_PKG_PATH,
)

vertex.init(project=PROJECT_ID, location=REGION)

     CREATE OR REPLACE MODEL       `{{pipelineparam:op=;name=project}}.{{pipelineparam:op=;name=bq_dataset}}.CustomerGroupClassifier`       OPTIONS       ( MODEL_TYPE='KMEANS', num_clusters = 6         )               AS     SELECT       * EXCEPT(CustomerGroup),       CASE(split_col)         WHEN 'TRAIN' THEN TRUE       ELSE       FALSE     END       AS data_split     FROM       `{{pipelineparam:op=split-datasets;name=dataset_uri}}`;     
     CREATE OR REPLACE MODEL       `{{pipelineparam:op=;name=project}}.{{pipelineparam:op=;name=bq_dataset}}.product-predictor`       OPTIONS       ( MODEL_TYPE='BOOSTED_TREE_CLASSIFIER',        AUTO_CLASS_WEIGHTS = TRUE,        BOOSTER_TYPE = 'GBTREE',        DATA_SPLIT_METHOD = 'AUTO_SPLIT',        NUM_PARALLEL_TREE = 1,        MAX_ITERATIONS = 50,        TREE_METHOD = 'AUTO',        EARLY_STOP = FALSE,        SUBSAMPLE = 0.85,        INPUT_LABEL_COLS = ['product_type']         )               AS     SELECT       *,       CASE(split_col)         WHE

In [202]:
# timestamp - refresh when resubmitting pipeline runs
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
print(TIMESTAMP)

20221010220652


In [203]:
pipeline_job = vertex.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PIPELINE_JSON_PKG_PATH,
    pipeline_root=PIPELINE_ROOT,
    job_id="football-pipeline-{0}".format(TIMESTAMP),
    parameter_values=pipeline_params,
    enable_caching=True
)

response = pipeline_job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/106131389347/locations/us-central1/pipelineJobs/football-pipeline-20221010220652
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/106131389347/locations/us-central1/pipelineJobs/football-pipeline-20221010220652')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/football-pipeline-20221010220652?project=106131389347


In [35]:
compiler.Compiler().compile(
    pipeline_func=train_pipeline,
    package_path=PIPELINE_JSON_PKG_PATH,
)


vertex.init(project=PROJECT_ID, location=REGION)