## User-Defined Constants

Set these before each run.  

In [1]:
REGION="us-central1"
PROJECT_ID = "rax-datascience-dev"
DATASET_NAME = "inventory_forecasting"
RUN_DATE = "2022-07-01"
# Note: Use underscores instead of dashes for BQ compatibility
DISPLAY_NAME_PREFIX = "automl_inventory"
MAPE_THRESHOLD = 35

## Import Libraries

These are the libraries for pipeline creation.  *NOT* libraries for individual components.

In [2]:
from typing import NamedTuple
import os
import json
import time
from dateutil import parser

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, Metrics, component)
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

## Calculated Constants

In [3]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

BUCKET_NAME="gs://" + PROJECT_ID + "-inventory-forecasting-bucket"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
API_ENDPOINT = REGION + "-aiplatform.googleapis.com"

RUN_DATE_TIMESTAMP = parser.parse(RUN_DATE)
RUN_DATE_STRIPPED = RUN_DATE_TIMESTAMP.strftime("%Y%m%d")
RUN_DATE_DASHES = RUN_DATE_TIMESTAMP.strftime("%Y-%m-%d")

# Note: Use underscores instead of dashes for BQ compatibility
DISPLAY_NAME = DISPLAY_NAME_PREFIX + "_{}".format(RUN_DATE_STRIPPED)
DISPLAY_NAME_ONLINE = DISPLAY_NAME_PREFIX + "_online_{}".format(RUN_DATE_STRIPPED)
DISPLAY_NAME_OFFLINE = DISPLAY_NAME_PREFIX + "_offline_{}".format(RUN_DATE_STRIPPED)

THRESHOLDS_DICT = "{\"meanAbsolutePercentageError\": " + str(MAPE_THRESHOLD) + "}"

BQ_PROJ_DATASET = PROJECT_ID + "." + DATASET_NAME
BQ_PROJ_DATASET_URI = "bq://" + BQ_PROJ_DATASET 
BQ_SOURCE_FEATURES = BQ_PROJ_DATASET_URI + ".forecasting_model_features_vertex"
BQ_SOURCE_BATCH = BQ_PROJ_DATASET_URI + ".forecasting_model_batch_vertex"
BQ_SOURCE_EVALUATED_ONLINE = BQ_PROJ_DATASET_URI + ".evaluate_data_items_" + DISPLAY_NAME_ONLINE
BQ_SOURCE_EVALUATED_OFFLINE = BQ_PROJ_DATASET_URI + ".evaluate_data_items_" + DISPLAY_NAME_OFFLINE

QUERY_CREATE_FEATURES = "CALL `" + BQ_PROJ_DATASET + ".udsp_forecasting_master`(DATE('" + RUN_DATE_DASHES + "'))"
QUERY_COMBINE_RESULTS = "CALL `" + BQ_PROJ_DATASET + ".udsp_forecasting_combine_results`(DATE('" + RUN_DATE_DASHES + "'))"

print("\n\n")
print("USER-DEFINED CONSTANTS")
print("***************************")
print("REGION:", REGION)
print("PROJECT_ID:", PROJECT_ID)
print("DATASET_NAME:", DATASET_NAME)
print("RUN_DATE:", RUN_DATE)
print("DISPLAY_NAME_PREFIX:", DISPLAY_NAME_PREFIX)
print("MAPE_THRESHOLD:", MAPE_THRESHOLD)

print("\n\n")
print("CALCULATED CONSTANTS")
print("***************************")
print("PATH:", PATH)
print("BUCKET_NAME:", BUCKET_NAME)
print("PIPELINE_ROOT:", PIPELINE_ROOT)
print("API_ENDPOINT:", API_ENDPOINT)
print("RUN_DATE_TIMESTAMP:", RUN_DATE_TIMESTAMP)
print("RUN_DATE_STRIPPED:", RUN_DATE_STRIPPED)
print("RUN_DATE_DASHES:", RUN_DATE_DASHES)
print("DISPLAY_NAME:", DISPLAY_NAME)
print("DISPLAY_NAME_ONLINE:", DISPLAY_NAME_ONLINE)
print("DISPLAY_NAME_OFFLINE:", DISPLAY_NAME_OFFLINE)
print("THRESHOLDS_DICT:", THRESHOLDS_DICT)
print("BQ_PROJ_DATASET:", BQ_PROJ_DATASET)
print("BQ_PROJ_DATASET_URI:", BQ_PROJ_DATASET_URI)
print("BQ_SOURCE_FEATURES:", BQ_SOURCE_FEATURES)
print("BQ_SOURCE_BATCH:", BQ_SOURCE_BATCH)
print("BQ_SOURCE_EVALUATED_ONLINE:", BQ_SOURCE_EVALUATED_ONLINE)
print("BQ_SOURCE_EVALUATED_OFFLINE:", BQ_SOURCE_EVALUATED_OFFLINE)
print("QUERY_CREATE_FEATURES:", QUERY_CREATE_FEATURES)
print("QUERY_COMBINE_RESULTS:", QUERY_COMBINE_RESULTS)


env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin



USER-DEFINED CONSTANTS
***************************
REGION: us-central1
PROJECT_ID: rax-datascience-dev
DATASET_NAME: inventory_forecasting
RUN_DATE: 2022-07-01
DISPLAY_NAME_PREFIX: automl_inventory
MAPE_THRESHOLD: 35



CALCULATED CONSTANTS
***************************
PATH: /usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games
BUCKET_NAME: gs://rax-datascience-dev-inventory-forecasting-bucket
PIPELINE_ROOT: gs://rax-datascience-dev-inventory-forecasting-bucket/pipeline_root/
API_ENDPOINT: us-central1-aiplatform.googleapis.com
RUN_DATE_TIMESTAMP: 2022-07-01 00:00:00
RUN_DATE_STRIPPED: 20220701
RUN_DATE_DASHES: 2022-07-01
DISPLAY_NAME: automl_inventory_20220701
DISPLAY_NAME_ONLINE: automl_inventory_online_20220701
DISPLAY_NAME_OFFLINE: automl_inventory_offline_20220701
THRESHOLDS_DIC

## Custom Module to Execute SQL

In [4]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="sql_query_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-bigquery"],
)
def run_bigquery_ddl(
    project: str, 
    location: str,
    query_string: str, 
) -> NamedTuple('Outputs', [('created_table', str), ('query', str)]):
    """
    Runs BigQuery query and returns a table/model name
    """
    import logging
    
    from google.cloud import bigquery
    from google.api_core.future import polling
    from google.cloud import bigquery
    from google.cloud.bigquery import retry as bq_retry
    
    print(query_string)
        
    bqclient = bigquery.Client(project=project, location=location)
    job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
    job._retry = polling.DEFAULT_RETRY
    
    while job.running():
        from time import sleep
        sleep(0.1)
        print('Running ...')
    
    log_msg = 'JOB completed in {}'.format(job.ended - job.started)
    print(log_msg)
    
    from collections import namedtuple
    result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
    return result_tuple("", query_string)

## Custom Module for Sending Email Notifications

In [5]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="send_gmail_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-api-python-client"],
)
def send_email_operator(
    to_string: str,
    from_string: str,
    subject: str,
    message_text: str,
) -> NamedTuple('Outputs', [('message_id', str), ('raw', str)]):
    """
    Sends an email via the service accounts credentials using gmail api.
    """
    import logging
    import base64
    from urllib.error import HTTPError
    from email.mime.text import MIMEText
    
    import google.auth.transport.requests
    from googleapiclient.discovery import build
    import google.auth

    # Note - we must set the appropriate scope for the default service account
    SCOPES = ["https://www.googleapis.com/auth/gmail.send"]
    credentials, project = google.auth.default(
        scopes=SCOPES
    )
    
    auth_req = google.auth.transport.requests.Request()
    credentials.refresh(auth_req)

    
    def create_message(sender, to, subject, message_text):
        """Create a message for an email.

        Args:
            sender: Email address of the sender.
            to: Email address of the receiver.
            subject: The subject of the email message.
            message_text: The text of the email message.

        Returns:
            An object containing a base64url encoded email object.
        """
        message = MIMEText(message_text)
        message['to'] = to
        message['from'] = sender
        message['subject'] = subject
        message['raw'] = base64.urlsafe_b64encode(message.as_string().encode('utf-8'))
        return {'raw': message['raw'].decode('utf-8')}
    
    def send_message(service, user_id, message):
        """Send an email message.

        Args:
            service: Authorized Gmail API service instance.
            user_id: User's email address. The special value "me"
            can be used to indicate the authenticated user.
            message: Message to be sent.

        Returns:
            Sent Message.
        """
        try:
            message = (service.users().messages().send(userId=user_id, body=message)
                   .execute())
            print('Message Id: %s' % message['id'])
            return message
        except HTTPError as error:
            print('An error occurred: %s' % error)
    
    # Instantiate the service
    service = build('gmail', 'v1', credentials=credentials)
    message = create_message(from_string, to_string, subject, message_text)
    sent = send_message(service, from_string, message)
    
    from collections import namedtuple
    result_tuple = namedtuple('Output', ['message_id', 'raw'])
    return result_tuple(sent['id'], sent['raw'])
    

## Custom Module for Model Evaluation

In [6]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="forecast_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def forecast_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],    
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    """This function renders evaluation metrics for an AutoML Tabular forecasting model.
    It retrieves the classification model evaluation generated by the AutoML Tabular forecasting
    training process, does some parsing, and uses that info to render the MAPE calculation
    for the model. It also uses given metrics threshold information and compares that to the
    evaluation results to determine whether the model is sufficiently accurate to deploy.
    """
    import json
    import logging

    from google.cloud import aiplatform
    from google.protobuf.json_format import MessageToDict
    
    # Fetch model eval info
    def get_eval_info(client, model_name):
        

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is 
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["meanAbsolutePercentageError"]:  # lower is better
                if metrics_dict[k] > v:  # if over threshold, don't deploy
                    logging.info(
                        "{} > {}; returning False".format(metrics_dict[k], v)
                    )
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list):
        mape = metrics_list[0]["meanAbsolutePercentageError"]
        logging.info("mape: %s", mape)

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            val_string = json.dumps(metrics_list[0][metric])
            metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.uri.replace("aiplatform://v1/", "")
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

## Pre-built component configuration

In [7]:
@kfp.dsl.pipeline(name="automl-inventory-forecasting",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source_features: str = BQ_SOURCE_FEATURES,
    bq_source_batch: str = BQ_SOURCE_BATCH,
    display_name: str = DISPLAY_NAME,
    display_name_online: str = DISPLAY_NAME_ONLINE,
    display_name_offline: str = DISPLAY_NAME_OFFLINE,
    project: str = PROJECT_ID,
    gcp_region: str = REGION,
    api_endpoint: str = API_ENDPOINT,
    thresholds_dict_str: str = THRESHOLDS_DICT,
    query_create_features: str = QUERY_CREATE_FEATURES,
    query_combine_results: str = QUERY_COMBINE_RESULTS,
    bq_source_eval_online: str = BQ_SOURCE_EVALUATED_ONLINE,
    bq_source_eval_offline: str = BQ_SOURCE_EVALUATED_OFFLINE,
    bq_source_batch_destination: str = BQ_PROJ_DATASET_URI,
):
    # Step 1 - Create the tables needed for the dataset from the raw data.  
    create_tables_op = run_bigquery_ddl(
        project, 
        gcp_region,
        query_create_features, 
    )
    
    # Step 2 - Create a Vertex AI Dataset
    dataset_create_op = gcc_aip.TimeSeriesDatasetCreateOp(
        project=project, 
        display_name=display_name, 
        bq_source=bq_source_features,
        labels={'foo':'{}'.format(create_tables_op.outputs["created_table"])},
    )

    # Step 3a - Train the Online Model
    online_training_op = gcc_aip.AutoMLForecastingTrainingJobRunOp(
        project=project,
        display_name=display_name_online,
        budget_milli_node_hours=8000,
        column_transformations=[
            {"timestamp": {"column_name": "TMK_TIMESTAMP"}},
            {"categorical": {"column_name": "item_status"}},
            {"numeric": {"column_name": "full_lead_time"}},
            {"numeric": {"column_name": "min"}},
            {"numeric": {"column_name": "max"}},
            {"numeric": {"column_name": "excess_threshold"}},
            {"categorical": {"column_name": "lmc"}},
            {"numeric": {"column_name": "dr"}},
            {"numeric": {"column_name": "device_count"}},
            {"numeric": {"column_name": "opp_count"}},
            {"numeric": {"column_name": "online"}},
            {"numeric": {"column_name": "offline"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="online",
        time_column="TMK_TIMESTAMP",
        time_series_identifier_column="SERIES_IDENTIFIER",
        unavailable_at_forecast_columns = [
            "item_status",
            "full_lead_time",
            "min",
            "max",
            "excess_threshold",
            "lmc",
            "dr",
            "device_count",
            "opp_count",
            "online",
            "offline",
        ],
        available_at_forecast_columns = [
            "TMK_TIMESTAMP"
        ],
        forecast_horizon = 6,
        data_granularity_unit = "month",
        data_granularity_count = 1,
        context_window = 12,
        export_evaluated_data_items = True,
        export_evaluated_data_items_bigquery_destination_uri = bq_source_eval_online,
        optimization_objective = "minimize-wape-mae",
    )
    
    # Step 4a - Evaluate the model metrics
    online_model_eval_op = forecast_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        online_training_op.outputs["model"],
    )

    # Step 5a - IF the model metrics pass
    with dsl.Condition(
        online_model_eval_op.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        # Step 6a - Batch Prediction for Online
        batch_op_online = gcc_aip.ModelBatchPredictOp(
            model=online_training_op.outputs["model"],
            project=project,
            location=gcp_region,
            job_display_name=display_name_online,
            bigquery_source=bq_source_batch,
            bigquery_destination_prefix=bq_source_batch_destination,
            predictions_format="bigquery",
            #gcs_destination_prefix=BUCKET_NAME,
            #predictions_format="csv",
        )
    
    # Step 3b - Train the Offline model
    offline_training_op = gcc_aip.AutoMLForecastingTrainingJobRunOp(
        project=project,
        display_name=display_name_offline,
        budget_milli_node_hours=8000,
        column_transformations=[
            {"timestamp": {"column_name": "TMK_TIMESTAMP"}},
            {"categorical": {"column_name": "item_status"}},
            {"numeric": {"column_name": "full_lead_time"}},
            {"numeric": {"column_name": "min"}},
            {"numeric": {"column_name": "max"}},
            {"numeric": {"column_name": "excess_threshold"}},
            {"categorical": {"column_name": "lmc"}},
            {"numeric": {"column_name": "dr"}},
            {"numeric": {"column_name": "device_count"}},
            {"numeric": {"column_name": "opp_count"}},
            {"numeric": {"column_name": "online"}},
            {"numeric": {"column_name": "offline"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="offline",
        time_column="TMK_TIMESTAMP",
        time_series_identifier_column="SERIES_IDENTIFIER",
        unavailable_at_forecast_columns = [
            "item_status",
            "full_lead_time",
            "min",
            "max",
            "excess_threshold",
            "lmc",
            "dr",
            "device_count",
            "opp_count",
            "online",
            "offline",
        ],
        available_at_forecast_columns = [
            "TMK_TIMESTAMP"
        ],
        forecast_horizon = 6,
        data_granularity_unit = "month",
        data_granularity_count = 1,
        context_window = 12,
        export_evaluated_data_items = True,
        export_evaluated_data_items_bigquery_destination_uri = bq_source_eval_offline,
        optimization_objective = "minimize-wape-mae",
    )
    
    # Step 4b - Evaluate the model
    offline_model_eval_op = forecast_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        offline_training_op.outputs["model"],
    )

    # Step 5b - IF the model eval passes
    with dsl.Condition(
        offline_model_eval_op.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        # Step 6b - Batch Prediction for Offline
        batch_op_offline = gcc_aip.ModelBatchPredictOp(
            model=offline_training_op.outputs["model"],
            project=project,
            location=gcp_region,
            job_display_name=display_name_offline,
            bigquery_source=bq_source_batch,
            bigquery_destination_prefix=bq_source_batch_destination,
            predictions_format="bigquery",
            #gcs_destination_prefix=BUCKET_NAME,
            #predictions_format="csv",
        )
        
    # Step 6 - Combine the results
    combine_results_op = run_bigquery_ddl(
        project, 
        gcp_region,
        query_combine_results, 
        
    ).after(batch_op_online).after(batch_op_offline)

## Compile and Run Pipeline

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_forecast_pipeline.json"
)

ml_pipeline_job = pipeline_jobs.PipelineJob(
    display_name="automl-inventory-forecasting",
    template_path="tab_forecast_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "bq_source_features": BQ_SOURCE_FEATURES,
        "bq_source_batch": BQ_SOURCE_BATCH,
        "display_name": DISPLAY_NAME,
        "display_name_online": DISPLAY_NAME_ONLINE,
        "display_name_offline": DISPLAY_NAME_OFFLINE,
        "project": PROJECT_ID,
        "gcp_region": REGION,
        "api_endpoint": API_ENDPOINT,
        "thresholds_dict_str": THRESHOLDS_DICT,
        "query_create_features": QUERY_CREATE_FEATURES,
        "query_combine_results": QUERY_COMBINE_RESULTS,
        "bq_source_eval_online": BQ_SOURCE_EVALUATED_ONLINE,
        "bq_source_eval_offline": BQ_SOURCE_EVALUATED_OFFLINE,
        "bq_source_batch_destination": BQ_PROJ_DATASET_URI,
    },
    #enable_caching=True,
    enable_caching=False,
)

ml_pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/40046533665/locations/us-central1/pipelineJobs/automl-inventory-forecasting-20220809153909
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/40046533665/locations/us-central1/pipelineJobs/automl-inventory-forecasting-20220809153909')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/automl-inventory-forecasting-20220809153909?project=40046533665
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/40046533665/locations/us-central1/pipelineJobs/automl-inventory-forecasting-20220809153909 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/40046