In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# **Lab 4:** Vertex AI Pipeline Development
This lab orchestrates the content Labs 2 & 3 into a vertex pipeline. In our pipeline we will use supported operators to:
* **BigqueryCreateModelJobOp**: To train a logistic regression model
* **BigqueryPredictModelJobOp**: To run our predictions and save the results to a BQ table
* **BigqueryExportModelJobOp**, **importer_node**, **ModelUploadOp**: To export our BQML model in tensorflow format in GCS and upload to vertex model registry
* **EndpointCreateOp**: To create our endpoint
* **ModelDeployOp**: To deploy our registered model to our endpoint

![overview](assets/pipeline.png)

In [None]:
! pip3 install --upgrade "kfp" \
                         "google-cloud-aiplatform" \
                         "google-cloud-storage" \
                         "google_cloud_pipeline_components" \
                         "google-cloud-bigquery" --user -q

In [None]:
project_id = "<project-id>"
location     = "us"
region       = "us-central1"
team_name    = "<team name>"
dataset_name = "datathon_ds_{}".format(team_name)
bucket_name  = "gs://{}_{}".format(project_id,dataset_name)
pipeline_root_path = bucket_name + '/pipelines/'
model_artificat_path = bucket_name + '/vertex_pipelines_models'
model_artificat_path

In [None]:
! gcloud config set project $project_id

We need to restart the kernel to make sure we reference the newly installed libraries
* **Kernel** -> **Restart Kernel and clear outputs**

Execute the commands from the first cell again

In [None]:
from kfp import compiler, dsl
from kfp.dsl import importer_node
from kfp.dsl import HTML, Artifact, Condition, Input, Output, component
from google_cloud_pipeline_components.v1.bigquery import (
    BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp, BigqueryExportModelJobOp,
    BigqueryExplainForecastModelJobOp, BigqueryForecastModelJobOp,BigqueryMLConfusionMatrixJobOp, BigqueryPredictModelJobOp,
    BigqueryMLArimaEvaluateJobOp, BigqueryQueryJobOp)
from google.cloud import aiplatform
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp, ModelDeployOp)

In [None]:
# Define the workflow of the pipeline.
@dsl.pipeline(
    name="bqml-pipeline",
    pipeline_root=pipeline_root_path)

def pipeline(
    project: str,
    dataset: str,
    location: str,
    region: str
):
    # Learn more about operators used here -> https://cloud.google.com/vertex-ai/docs/pipelines/bigqueryml-component
    model_create = BigqueryCreateModelJobOp(
        project=project,
        location=location,
        query=f"""
        CREATE OR REPLACE MODEL `{project}.{dataset}.vertex_pipeline_logistic_regression_baseline`
        OPTIONS(MODEL_TYPE='LOGISTIC_REG',
                INPUT_LABEL_COLS = ['churned'])
        AS
        SELECT * EXCEPT (user_pseudo_id)
        FROM `{project}.{dataset}.cc_train_dataset`
        """
        ).set_display_name("Train logistic regression baseline")

    _ = BigqueryPredictModelJobOp(
        project=project,
        location=location,
        model=model_create.outputs["model"],
        query_statement=f'''SELECT * EXCEPT(user_pseudo_id, churned)
                            FROM
                            `{project}.{dataset}.cc_eval_dataset`
                            ''',
        job_configuration_query={
            "destinationTable": {
                "projectId": project_id, 
                "datasetId": dataset_name, 
                "tableId": "results_1", # change table for every new run
            }
        },
        ).set_display_name("Prediction on evaluation set").after(model_create)

    bq_export = BigqueryExportModelJobOp(
        project=project,
        location=location,
        model=model_create.outputs["model"],
        model_destination_path=model_artificat_path,
    ).set_display_name("Export BQ model to GCS").after(model_create)

    import_unmanaged_model_task = importer_node.importer(
        artifact_uri=model_artificat_path,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest",
            },
        },
    ).after(bq_export)

    model_upload = ModelUploadOp(
        project=project,
        display_name="vertex_pipeline_model_logistic_regression",
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
    ).after(import_unmanaged_model_task)

    endpoint = EndpointCreateOp(
        project=project,
        location=region,
        display_name="vertex_pipeline_deployment",
    ).after(model_upload)

    _ = ModelDeployOp(
        model=model_upload.outputs["model"],
        endpoint=endpoint.outputs["endpoint"],
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        dedicated_resources_machine_type='n1-standard-2',
        traffic_split={"0": 100},
    ).set_display_name("Deploy to endpoint").after(endpoint)

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='churn_prediction_pipeline.json')

In [None]:
# Prepare the pipeline job
job = aiplatform.PipelineJob(
    display_name="bqml-vertex-pipeline1",
    template_path="churn_prediction_pipeline.json",
    pipeline_root=pipeline_root_path,
    enable_caching=True,
    parameter_values={
        'project': project_id,
        'dataset': dataset_name,
        'location': location,
        'region': region
    }
)

job.submit()