# Vertex AI Auto ML
Tabular data training and batch prediction using Vertex SDK.

#### Model overview
Estimate the unit sales of Walmart retail goods

### Setting up the environment variables

#### Project ID

In [1]:
shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
! gcloud config set project $PROJECT_ID
print(f'Project ID is {PROJECT_ID}')

Updated property [core/project].
Project ID is coe-ie-dsg-training-lab


#### Region 
Region must be the same for the data source and model

In [2]:
REGION = "us-central1"

#### Timestamp

To avoid name collisions between projects on resources created, we create a timestamp and append the timestamp onto the name of resources.

In [3]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
TIMESTAMP = str(20210929111546)

#### Cloud Storage bucket

In [4]:
BUCKET_NAME = "gs://vertex-walmart"

Investigate the bucket

In [5]:
! gsutil ls -al $BUCKET_NAME

                                 gs://vertex-walmart/data/
                                 gs://vertex-walmart/predictions/


### Modeling with Vertex SDK

#### Initialize the Vertex SDK for Python

In [6]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

#### Create dataset
Create Vertex AI dataset using BQ source table

In [7]:
BQ_TABLE = 'coe-ie-dsg-training-lab.walmart_raw.sales_train_data'

Investigate the datasource using BQ API

In [8]:
from google.cloud import bigquery

client = bigquery.Client()

query = (f"""
    SELECT * FROM `{BQ_TABLE}` LIMIT 5;
""")

query_job = client.query(query)

query_job.result().to_dataframe()

Unnamed: 0,date,store_id,item_id,wm_yr_wk,dept_id,cat_id,state_id,demand,wday,month,year,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI,sell_price
0,2015-01-02,TX_3,FOODS_1_170,11448,FOODS_1,FOODS,TX,19,7,1,2015,no_event,no_event,no_event,no_event,1,0,1,6.1
1,2015-01-02,TX_3,FOODS_1_129,11448,FOODS_1,FOODS,TX,16,7,1,2015,no_event,no_event,no_event,no_event,1,0,1,6.02
2,2015-01-02,TX_3,FOODS_1_113,11448,FOODS_1,FOODS,TX,14,7,1,2015,no_event,no_event,no_event,no_event,1,0,1,0.98
3,2015-01-02,TX_3,FOODS_1_162,11448,FOODS_1,FOODS,TX,13,7,1,2015,no_event,no_event,no_event,no_event,1,0,1,1.96
4,2015-01-02,TX_3,FOODS_1_159,11448,FOODS_1,FOODS,TX,13,7,1,2015,no_event,no_event,no_event,no_event,1,0,1,5.3


Create the `Dataset` resource using the `create` method for the `TabularDataset` class, which takes the following parameters:

- `display_name`: The human readable name for the `Dataset` resource.
- `bq_source`: Import data items from a BigQuery table into the `Dataset` resource.
- `gcs_source`: Alternatively, a list of one or more dataset index files to import the data items into the `Dataset` resource.

In [9]:
dataset = aiplatform.TabularDataset.create(
    display_name="sales_cropped" + "_" + TIMESTAMP, 
    bq_source=[f'bq://{BQ_TABLE}']
)

print(f'Created dataset is {dataset.resource_name}')

INFO:google.cloud.aiplatform.datasets.dataset:Creating TabularDataset
INFO:google.cloud.aiplatform.datasets.dataset:Create TabularDataset backing LRO: projects/994212091105/locations/us-central1/datasets/255464929643986944/operations/6773675248454729728
INFO:google.cloud.aiplatform.datasets.dataset:TabularDataset created. Resource name: projects/994212091105/locations/us-central1/datasets/255464929643986944
INFO:google.cloud.aiplatform.datasets.dataset:To use this TabularDataset in another session:
INFO:google.cloud.aiplatform.datasets.dataset:ds = aiplatform.TabularDataset('projects/994212091105/locations/us-central1/datasets/255464929643986944')
Created dataset is projects/994212091105/locations/us-central1/datasets/255464929643986944


#### Create and run training pipeline

##### Create training pipeline

An AutoML training pipeline is created with the `AutoMLTabularTrainingJob` class, with the following parameters:

- `display_name`: The human readable name for the `TrainingJob` resource.
- `optimization_prediction_type`: The type task to train the model for.
  - `classification`: A tabuar classification model.
  - `regression`: A tabular regression model.
- `column_transformations`: (Optional): Transformations to apply to the input columns
- `optimization_objective`: The optimization objective to minimize or maximize.
  - binary classification:
    - `minimize-log-loss`
    - `maximize-au-roc`
    - `maximize-au-prc`
    - `maximize-precision-at-recall`
    - `maximize-recall-at-precision`
  - multi-class classification:
    - `minimize-log-loss`
  - regression:
    - `minimize-rmse`
    - `minimize-mae`
    - `minimize-rmsle`

The instantiated object is the DAG (directed acyclic graph) for the training pipeline.

In [10]:
dag = aiplatform.AutoMLTabularTrainingJob(
    display_name="sales_cropped" + TIMESTAMP,
    optimization_prediction_type="regression",
    optimization_objective="minimize-mae",
)

print(dag)

<google.cloud.aiplatform.training_jobs.AutoMLTabularTrainingJob object at 0x7fd23afd5a10>


#### Run the training pipeline

Next, you run the DAG to start the training job by invoking the method `run`, with the following parameters:

- `dataset`: The `Dataset` resource to train the model.
- `model_display_name`: The human readable name for the trained model.
- `training_fraction_split`: The percentage of the dataset to use for training.
- `test_fraction_split`: The percentage of the dataset to use for test (holdout data).
- `validation_fraction_split`: The percentage of the dataset to use for validation.
- `target_column`: The name of the column to train as the label.
- `budget_milli_node_hours`: (optional) Maximum training time specified in unit of millihours (minimum 1000 = hour). When forming costs, it is necessary to take into account that this is only training time and the following steps will be added to it: data preprocessing, evaluation, post model processing
- `disable_early_stopping`: If `True`, training maybe completed before using the entire budget if the service believes it cannot further improve on the model objective measurements.

The `run` method when completed returns the `Model` resource.
Running this code in our case takes about two hours, so you will see the finished result.

In [None]:
model = dag.run(
    dataset=dataset,
    model_display_name="sales_cropped_reg_" + TIMESTAMP,
    budget_milli_node_hours=1000,
    disable_early_stopping=False,
    target_column='demand',
    timestamp_split_column_name='date'
)

INFO:google.cloud.aiplatform.training_jobs:No column transformations provided, so now retrieving columns from dataset in order to set default column transformations.
INFO:google.cloud.aiplatform.training_jobs:The column transformation of type 'auto' was set for the following columns: ['snap_TX', 'date', 'year', 'state_id', 'month', 'event_type_1', 'sell_price', 'event_name_1', 'item_id', 'cat_id', 'snap_WI', 'event_name_2', 'wday', 'snap_CA', 'wm_yr_wk', 'dept_id', 'store_id', 'event_type_2'].
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/2114697310771347456?project=994212091105
INFO:google.cloud.aiplatform.training_jobs:AutoMLTabularTrainingJob projects/994212091105/locations/us-central1/trainingPipelines/2114697310771347456 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:AutoMLTabularTrainingJob projects/994212091105/locations/us-central1/trainingPipe

#### Review model evaluation scores
First, we need to get a reference to the new model and then get all available metrics

In [8]:
# Get model resource ID
models = aiplatform.Model.list(filter="display_name=sales_cropped_reg_" + TIMESTAMP)

# Get a reference to the Model Service client
client_options = {"api_endpoint": f"{REGION}-aiplatform.googleapis.com"}
model_service_client = aiplatform.gapic.ModelServiceClient(client_options=client_options)

model_evaluations = model_service_client.list_model_evaluations(
    parent=models[0].resource_name
)

model_evaluation = list(model_evaluations)[0]
print(model_evaluation)

name: "projects/994212091105/locations/us-central1/models/3553472047390654464/evaluations/3883904147705491740"
metrics_schema_uri: "gs://google-cloud-aiplatform/schema/modelevaluation/regression_metrics_1.0.0.yaml"
metrics {
  struct_value {
    fields {
      key: "meanAbsoluteError"
      value {
        number_value: 1.0616668
      }
    }
    fields {
      key: "meanAbsolutePercentageError"
      value {
        number_value: 80915496.0
      }
    }
    fields {
      key: "rSquared"
      value {
        number_value: 0.6423195
      }
    }
    fields {
      key: "rootMeanSquaredError"
      value {
        number_value: 2.8614118
      }
    }
    fields {
      key: "rootMeanSquaredLogError"
      value {
        number_value: 0.5733611
      }
    }
  }
}
create_time {
  seconds: 1632922681
  nanos: 663497000
}
slice_dimensions: "annotationSpec"
model_explanation {
  mean_attributions {
    feature_attributions {
      struct_value {
        fields {
          key: "cat_id

### Batch predictions

Send a batch prediction to the deployed model

#### Create input file from BQ table

In [10]:
from google.cloud import bigquery

client = bigquery.Client()

project = "coe-ie-dsg-training-lab"
dataset_id = "walmart_raw"
table_id = "sales_predict"

destination_uri = ("gs://vertex-walmart/data/sales_predict.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location="us",
)  # API request
extract_job.result()  # Waits for job to complete.

print(
    "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)

Exported coe-ie-dsg-training-lab:walmart_raw.sales_predict to gs://vertex-walmart/data/sales_predict.csv


In [11]:
from google.cloud import aiplatform_v1beta1
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

def create_batch_prediction_job_bigquery_sample(
    project: str,
    display_name: str,
    model_name: str,
    instances_format: str,
    bigquery_source_input_uri: str,
    predictions_format: str,
    gcs_destination: str,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform_v1beta1.JobServiceClient(client_options=client_options)
    model_parameters_dict = {}
    model_parameters = json_format.ParseDict(model_parameters_dict, Value())

    batch_prediction_job = {
        "display_name": display_name,
        # Format: 'projects/{project}/locations/{location}/models/{model_id}'
        "model": model_name,
        "model_parameters": model_parameters,
        "input_config": {
            "instances_format": instances_format,
            "bigquery_source": {"input_uri": bigquery_source_input_uri},
        },
        "output_config": {
            "predictions_format": predictions_format,
            "gcs_destination": {"output_uri_prefix": gcs_destination},
        },
        # optional
        "generate_explanation": True,
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_batch_prediction_job(
        parent=parent, batch_prediction_job=batch_prediction_job
    )
    print("response:", response)

In [12]:
batch_predict_job = create_batch_prediction_job_bigquery_sample(
    project = PROJECT_ID,
    display_name = "sales_cropped_reg_predict" + TIMESTAMP,
    model_name = models[0].resource_name,
    instances_format = 'bigquery',
    bigquery_source_input_uri = 'bq://coe-ie-dsg-training-lab.walmart_raw.sales_forecast',
    predictions_format = 'csv',
    gcs_destination = 'gs://vertex-walmart/predictions',
    location = REGION,
    api_endpoint = f"{REGION}-aiplatform.googleapis.com",
)

response: name: "projects/994212091105/locations/us-central1/batchPredictionJobs/7411493422512472064"
display_name: "sales_cropped_reg_predict20210929111546"
model: "projects/994212091105/locations/us-central1/models/3553472047390654464"
input_config {
  instances_format: "bigquery"
  bigquery_source {
    input_uri: "bq://coe-ie-dsg-training-lab.walmart_raw.sales_forecast"
  }
}
model_parameters {
  struct_value {
  }
}
output_config {
  predictions_format: "csv"
  gcs_destination {
    output_uri_prefix: "gs://vertex-walmart/predictions"
  }
}
dedicated_resources {
  machine_spec {
    machine_type: "n1-highmem-8"
  }
  starting_replica_count: 20
  max_replica_count: 20
}
manual_batch_tuning_parameters {
  batch_size: 1
}
state: JOB_STATE_PENDING
create_time {
  seconds: 1633105083
  nanos: 172964000
}
update_time {
  seconds: 1633105083
  nanos: 172964000
}
generate_explanation: true
explanation_spec {
  parameters {
    sampled_shapley_attribution {
      path_count: 11
    }
  }
 

#### Read predictions from GS

In [14]:
import dask.dataframe as dd

df_forecast = dd \
    .read_csv('gs://vertex-walmart/predictions/prediction-sales_cropped_reg_20210929111546-2021_10_01T09_18_02_978Z/*results*.csv') \
    .compute()

# test sample
df_forecast = df_forecast \
    .loc[(df_forecast['store_id'] == 'CA_3') 
         & (df_forecast['item_id'].isin(['HOUSEHOLD_1_383', 'HOUSEHOLD_1_366', 'HOUSEHOLD_1_349'])), 
         ['date', 'item_id', 'predicted_demand']]

df_forecast.rename(columns={'predicted_demand': 'demand'}, inplace=True)

df_forecast['Type'] = 'prediction'

#### Read train data from BQ

In [15]:
from google.cloud import bigquery

client = bigquery.Client()

# Perform a query

query = (f"""
    SELECT * FROM `{BQ_TABLE}`;
""")

query_job = client.query(query)

df_train = query_job.result().to_dataframe()

df_train = df_train \
    .loc[(df_train['store_id'] == 'CA_3') 
         & (df_train['item_id'].isin(['HOUSEHOLD_1_383', 'HOUSEHOLD_1_366', 'HOUSEHOLD_1_349'])), 
         ['date', 'item_id', 'demand']]

df_train['Type'] = 'historical'

#### Visualize results

In [16]:
import pandas as pd
import altair as alt

df_plot = pd.concat([df_train, df_forecast])
del df_train, df_forecast

df_plot['date'] = pd.to_datetime(df_plot['date'])

In [17]:
alt.Chart(df_plot).mark_line().encode(
    x = alt.X('date:T', title = 'Date'),
    y = alt.Y('demand:Q', title = 'Demand'),
    color = alt.Color('item_id', title = 'Item ID'),
    strokeDash='Type:N'
).properties(
    width=700
)