In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Texas Power Demand - AutoML Tabular Forecasting Model for Batch Prediction

## Overview


This sample demonstrates how to use the Vertex SDK to create a tabular forecasting model using a BigQuery Omni dataset, and then performa batch predictions using a Google Cloud [AutoML](https://cloud.google.com/vertex-ai/docs/start/automl-users) forecasting model.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$`.

In [1]:
PROJECT_ID = "fsi-select-demo"  # @param {type:"string"}

In [2]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)

In [3]:
! gcloud config set project $PROJECT_ID

Updated property [core/project].


#### Region

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)

In [4]:
REGION = "us-central1"  # @param {type: "string"}

#### Timestamp

To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append the timestamp onto the name of resources you create in this tutorial.

In [5]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you initialize the Vertex SDK for Python, you specify a Cloud Storage staging bucket. The staging bucket is where all the data associated with your dataset and model resources are retained across sessions.

Set the name of your Cloud Storage bucket below. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.

In [6]:
BUCKET_NAME = "gs://fsi-select-demo-ml-artifacts/forecasting_v1"  # @param {type:"string"}

In [7]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [8]:
# ! gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [9]:
! gsutil ls -al $BUCKET_NAME

         0  2022-02-23T17:04:45Z  gs://fsi-select-demo-ml-artifacts/forecasting_v1/#1645635885626488  metageneration=1
TOTAL: 1 objects, 0 bytes (0 B)


### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [10]:
!pip list | grep aiplatform

google-cloud-aiplatform               1.8.1


In [11]:
from google.cloud import aiplatform as aiplatform

## Initialize Vertex SDK for Python

Initialize the Vertex SDK for Python for your project and corresponding bucket.

In [12]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

# Tutorial

Now you are ready to start creating your own AutoML tabular forecasting model.

#### Location of BigQuery training data.

Now set the variable `TRAINING_DATASET_BQ_PATH` to the location of the BigQuery table. 

In [13]:
pwr_demand_TRAINING_DATASET_BQ_PATH = (
    "bq://fsi-select-demo:mysql_dataset_log1.ds-train-test_texas_power_demand"
)

### Create the Dataset

Next, create the `Dataset` resource using the `create` method for the `TimeSeriesDataset` class, which takes the following parameters:

- `display_name`: The human readable name for the `Dataset` resource.
- `gcs_source`: A list of one or more dataset index files to import the data items into the `Dataset` resource.
- `bq_source`: Alternatively, import data items from a BigQuery table into the `Dataset` resource.

This operation may take several minutes.

In [14]:
dataset = aiplatform.TimeSeriesDataset.create(
    display_name="train_power_demand_texas" + "_" + TIMESTAMP,
    bq_source=[pwr_demand_TRAINING_DATASET_BQ_PATH],
)

time_column = "date"
time_series_identifier_column = "state"
target_column = "demand_mgwhr"

print(dataset.resource_name)

INFO:google.cloud.aiplatform.datasets.dataset:Creating TimeSeriesDataset
INFO:google.cloud.aiplatform.datasets.dataset:Create TimeSeriesDataset backing LRO: projects/355426521391/locations/us-central1/datasets/4357289814085599232/operations/4792626668416008192
INFO:google.cloud.aiplatform.datasets.dataset:TimeSeriesDataset created. Resource name: projects/355426521391/locations/us-central1/datasets/4357289814085599232
INFO:google.cloud.aiplatform.datasets.dataset:To use this TimeSeriesDataset in another session:
INFO:google.cloud.aiplatform.datasets.dataset:ds = aiplatform.TimeSeriesDataset('projects/355426521391/locations/us-central1/datasets/4357289814085599232')
projects/355426521391/locations/us-central1/datasets/4357289814085599232


In [18]:
COLUMN_SPECS = {
    time_column: "timestamp",
    target_column: "numeric",
    "ds": "timestamp",
    "hr": "numeric",
}

### Create and run training job

To train an AutoML model, you perform two steps: 1) create a training job, and 2) run the job.

#### Create training job

An AutoML training job is created with the `AutoMLForecastingTrainingJob` class, with the following parameters:

- `display_name`: The human readable name for the `TrainingJob` resource.
- `column_transformations`: (Optional): Transformations to apply to the input columns
- `optimization_objective`: The optimization objective to minimize or maximize.
    - `minimize-rmse`
    - `minimize-mae`
    - `minimize-rmsle`

The instantiated object is the job for the training pipeline.

In [19]:
MODEL_DISPLAY_NAME = f"texas-power-demand-forecast-model_{TIMESTAMP}"

training_job = aiplatform.AutoMLForecastingTrainingJob(
    display_name=MODEL_DISPLAY_NAME,
    optimization_objective="minimize-rmse",
    column_specs=COLUMN_SPECS,
)

#### Run the training pipeline

Next, you start the training job by invoking the method `run`, with the following parameters:

- `dataset`: The `Dataset` resource to train the model.
- `model_display_name`: The human readable name for the trained model.
- `training_fraction_split`: The percentage of the dataset to use for training.
- `test_fraction_split`: The percentage of the dataset to use for test (holdout data).
- `target_column`: The name of the column to train as the label.
- `budget_milli_node_hours`: (optional) Maximum training time specified in unit of millihours (1000 = hour).
- `time_column`:
- `time_series_identifier_column`:

The `run` method when completed returns the `Model` resource.

The execution of the training pipeline will take up to one hour.

In [20]:
model = training_job.run(
    dataset=dataset,
    target_column=target_column,
    time_column=time_column,
    time_series_identifier_column=time_series_identifier_column,
    available_at_forecast_columns=[time_column],
    unavailable_at_forecast_columns=[target_column],
    time_series_attribute_columns=["ds", "hr"],
    forecast_horizon=24,
    context_window=24,
    data_granularity_unit="hour",
    data_granularity_count=1,
    weight_column=None,
    budget_milli_node_hours=1000,
    model_display_name=MODEL_DISPLAY_NAME,
    predefined_split_column_name=None,
)

INFO:google.cloud.aiplatform.training_jobs:No dataset split provided. The service will use a default split.
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/6916877017245810688?project=355426521391
INFO:google.cloud.aiplatform.training_jobs:AutoMLForecastingTrainingJob projects/355426521391/locations/us-central1/trainingPipelines/6916877017245810688 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:AutoMLForecastingTrainingJob projects/355426521391/locations/us-central1/trainingPipelines/6916877017245810688 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:AutoMLForecastingTrainingJob projects/355426521391/locations/us-central1/trainingPipelines/6916877017245810688 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:AutoMLForecastingTrainingJob projects/355426521391/locat

## Review model evaluation scores
After your model has finished training, you can review the evaluation scores for it.

First, you need to get a reference to the new model. As with datasets, you can either use the reference to the model variable you created when you deployed the model or you can list all of the models in your project.

In [21]:
# Get model resource ID
models = aiplatform.Model.list(filter=f"display_name={MODEL_DISPLAY_NAME}")
model = models[0]

# Get a reference to the Model Service client
client_options = aiplatform.initializer.global_config.get_client_options()
model_service_client = aiplatform.gapic.ModelServiceClient(
    client_options=client_options
)

model_evaluations = model_service_client.list_model_evaluations(
    parent=model.resource_name
)
model_evaluation = list(model_evaluations)[0]
print(model_evaluation)

name: "projects/355426521391/locations/us-central1/models/7573334937051332608/evaluations/4729850347024198656"
metrics_schema_uri: "gs://google-cloud-aiplatform/schema/modelevaluation/forecasting_metrics_1.0.0.yaml"
metrics {
  struct_value {
    fields {
      key: "meanAbsoluteError"
      value {
        number_value: 4283.304
      }
    }
    fields {
      key: "meanAbsolutePercentageError"
      value {
        number_value: 9.718367
      }
    }
    fields {
      key: "rSquared"
      value {
        number_value: 0.64353895
      }
    }
    fields {
      key: "rootMeanSquaredError"
      value {
        number_value: 5707.3203
      }
    }
    fields {
      key: "rootMeanSquaredLogError"
      value {
        number_value: 0.13297832
      }
    }
  }
}
create_time {
  seconds: 1646624219
  nanos: 285172000
}



## Send a batch prediction request

Send a batch prediction to your deployed model.

### Make the batch prediction request

Now that your Model resource is trained, you can make a batch prediction by invoking the batch_predict() method, with the following parameters:

- `job_display_name`: The human readable name for the batch prediction job.
- `gcs_source`: A list of one or more batch request input files.
- `gcs_destination_prefix`: The Cloud Storage location for storing the batch prediction resuls.
- `instances_format`: The format for the input instances, either 'csv' or 'jsonl'. Defaults to 'jsonl'.
- `predictions_format`: The format for the output predictions, either 'csv' or 'jsonl'. Defaults to 'jsonl'.
- `sync`: If set to True, the call will block while waiting for the asynchronous batch job to complete.

In [23]:
print(PROJECT_ID)

fsi-select-demo


In [26]:
import os

from google.cloud import bigquery

# bq_dataset_name_prefix = "vertex_forecasting_predictions"

batch_predict_bq_output_dataset_name = f"texas_power_demand_predictions_{TIMESTAMP}"
batch_predict_bq_output_dataset_path = "{}.{}".format(
    PROJECT_ID, batch_predict_bq_output_dataset_name
)

batch_predict_bq_output_uri_prefix = "bq://{}.{}".format(
    PROJECT_ID, batch_predict_bq_output_dataset_name
)

# Must be the same region as batch_predict_bq_input_uri
client = bigquery.Client(project=PROJECT_ID)
bq_dataset = bigquery.Dataset(batch_predict_bq_output_dataset_path)
dataset_region = "us-central1"  # @param {type : "string"}
bq_dataset.location = dataset_region
bq_dataset = client.create_dataset(bq_dataset)
print(
    "Created bigquery dataset {} in {}".format(
        batch_predict_bq_output_dataset_path, dataset_region
    )
)

Created bigquery dataset fsi-select-demo.texas_power_demand_predictions_20220307020547 in us-central1


In [33]:
batch_predict_bq_output_uri_prefix = "bq://{}.{}".format(
    PROJECT_ID, batch_predict_bq_output_dataset_name
)

In [34]:
PREDICTION_DATASET_BQ_PATH = (
    f"bq://fsi-select-demo:mysql_dataset_log1.ds-validation_texas_power_demand"
)

batch_prediction_job = model.batch_predict(
    job_display_name=f"texas_power_demand_predictions_{TIMESTAMP}",
    bigquery_source=PREDICTION_DATASET_BQ_PATH,
    instances_format="bigquery",
    bigquery_destination_prefix=batch_predict_bq_output_uri_prefix,
    predictions_format="bigquery",
    sync=False,
)

print(batch_prediction_job)

INFO:google.cloud.aiplatform.jobs:Creating BatchPredictionJob
<google.cloud.aiplatform.jobs.BatchPredictionJob object at 0x7fc6a1df8c50> is waiting for upstream dependencies to complete.
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob created. Resource name: projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240
INFO:google.cloud.aiplatform.jobs:To use this BatchPredictionJob in another session:
INFO:google.cloud.aiplatform.jobs:bpj = aiplatform.BatchPredictionJob('projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240')
INFO:google.cloud.aiplatform.jobs:View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/3550999245739786240?project=355426521391
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240 current state:
JobState.JOB_STATE_PENDING


### Wait for completion of batch prediction job

Next, wait for the batch job to complete. Alternatively, you can set the parameter `sync` to `True` in the `batch_predict()` method to block until the batch prediction job is completed.

In [35]:
batch_prediction_job.wait()

INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/355426521391/locations/us-central1/batchPredictionJobs/3550999245739786240 current state:
JobState.JOB_STATE_SUCCEEDED
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob run completed. Resource name: projects/35542652139

### Get the predictions

Next, get the results from the completed batch prediction job.

The results are written to the Cloud Storage output bucket you specified in the batch prediction request. You call the method iter_outputs() to get a list of each Cloud Storage file generated with the results. Each file contains one or more prediction requests in a CSV format:

- CSV header + predicted_label
- CSV row + prediction, per prediction request

In [36]:
import tensorflow as tf

bp_iter_outputs = batch_prediction_job.iter_outputs()

prediction_results = list()
for blob in bp_iter_outputs:
    if blob.name.split("/")[-1].startswith("prediction"):
        prediction_results.append(blob.name)

tags = list()
for prediction_result in prediction_results:
    gfile_name = f"gs://{bp_iter_outputs.bucket.name}/{prediction_result}"
    with tf.io.gfile.GFile(name=gfile_name, mode="r") as gfile:
        for line in gfile.readlines():
            print(line)

### Visualize the forecasts

Lastly, follow the given link to visualize the generated forecasts in [Data Studio](https://support.google.com/datastudio/answer/6283323?hl=en).
The code block included in this section dynamically generates a Data Studio link that specifies the template, the location of the forecasts, and the query to generate the chart. The data is populated from the forecasts generated earlier.

You can inspect the used template at https://datastudio.google.com/c/u/0/reporting/067f70d2-8cd6-4a4c-a099-292acd1053e8. This was created by Google specifically to view forecasting predictions.

In [37]:
import urllib

tables = client.list_tables(batch_predict_bq_output_dataset_path)

prediction_table_id = ""
for table in tables:
    if (
        table.table_id.startswith("predictions_")
        and table.table_id > prediction_table_id
    ):
        prediction_table_id = table.table_id
batch_predict_bq_output_uri = "{}.{}".format(
    batch_predict_bq_output_dataset_path, prediction_table_id
)


def _sanitize_bq_uri(bq_uri):
    if bq_uri.startswith("bq://"):
        bq_uri = bq_uri[5:]
    return bq_uri.replace(":", ".")


def get_data_studio_link(
    batch_prediction_bq_input_uri,
    batch_prediction_bq_output_uri,
    time_column,
    time_series_identifier_column,
    target_column,
):
    batch_prediction_bq_input_uri = _sanitize_bq_uri(batch_prediction_bq_input_uri)
    batch_prediction_bq_output_uri = _sanitize_bq_uri(batch_prediction_bq_output_uri)
    base_url = "https://datastudio.google.com/c/u/0/reporting"
    query = (
        "SELECT \\n"
        " CAST(input.{} as DATETIME) timestamp_col,\\n"
        " CAST(input.{} as STRING) time_series_identifier_col,\\n"
        " CAST(input.{} as NUMERIC) historical_values,\\n"
        " CAST(predicted_{}.value as NUMERIC) predicted_values,\\n"
        " * \\n"
        "FROM `{}` input\\n"
        "LEFT JOIN `{}` output\\n"
        "ON\\n"
        "CAST(input.{} as DATETIME) = CAST(output.{} as DATETIME)\\n"
        "AND CAST(input.{} as STRING) = CAST(output.{} as STRING)"
    )
    query = query.format(
        time_column,
        time_series_identifier_column,
        target_column,
        target_column,
        batch_prediction_bq_input_uri,
        batch_prediction_bq_output_uri,
        time_column,
        time_column,
        time_series_identifier_column,
        time_series_identifier_column,
    )
    params = {
        "templateId": "067f70d2-8cd6-4a4c-a099-292acd1053e8",
        "ds0.connector": "BIG_QUERY",
        "ds0.projectId": PROJECT_ID,
        "ds0.billingProjectId": PROJECT_ID,
        "ds0.type": "CUSTOM_QUERY",
        "ds0.sql": query,
    }
    params_str_parts = []
    for k, v in params.items():
        params_str_parts.append('"{}":"{}"'.format(k, v))
    params_str = "".join(["{", ",".join(params_str_parts), "}"])
    return "{}?{}".format(base_url, urllib.parse.urlencode({"params": params_str}))


print(
    get_data_studio_link(
        PREDICTION_DATASET_BQ_PATH,
        batch_predict_bq_output_uri,
        time_column,
        time_series_identifier_column,
        target_column,
    )
)

https://datastudio.google.com/c/u/0/reporting?params=%7B%22templateId%22%3A%22067f70d2-8cd6-4a4c-a099-292acd1053e8%22%2C%22ds0.connector%22%3A%22BIG_QUERY%22%2C%22ds0.projectId%22%3A%22fsi-select-demo%22%2C%22ds0.billingProjectId%22%3A%22fsi-select-demo%22%2C%22ds0.type%22%3A%22CUSTOM_QUERY%22%2C%22ds0.sql%22%3A%22SELECT+%5Cn+CAST%28input.date+as+DATETIME%29+timestamp_col%2C%5Cn+CAST%28input.state+as+STRING%29+time_series_identifier_col%2C%5Cn+CAST%28input.demand_mgwhr+as+NUMERIC%29+historical_values%2C%5Cn+CAST%28predicted_demand_mgwhr.value+as+NUMERIC%29+predicted_values%2C%5Cn+%2A+%5CnFROM+%60fsi-select-demo.mysql_dataset_log1.ds-validation_texas_power_demand%60+input%5CnLEFT+JOIN+%60fsi-select-demo.texas_power_demand_predictions_20220307020547.predictions_2022_03_06T20_54_29_921Z%60+output%5CnON%5CnCAST%28input.date+as+DATETIME%29+%3D+CAST%28output.date+as+DATETIME%29%5CnAND+CAST%28input.state+as+STRING%29+%3D+CAST%28output.state+as+STRING%29%22%7D


# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources:

- Dataset
- AutoML Training Job
- Model
- Batch Prediction Job
- Cloud Storage Bucket

In [None]:
'''
# Delete dataset
dataset.delete()

# Training job
training_job.delete()

# Delete model
model.delete()

# Delete batch prediction job
batch_prediction_job.delete()

if os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_NAME
'''