In [1]:
# Copyright 2021 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This software is provided as-is, without warranty or representation for any use or purpose. 
# Your use of it is subject to your agreements with Google.”  


# <font color='gold'>Vertex end-to-end Demo</font>
---
## Agenda
  1. Environment setup
  1. Feature store
  1. Explainable AI Model
  1. Vertex Pipeline: custom/AutoML models training, evaluation, and deployment
  


---
---
## 1. Environment Setup </font>
---
### 1.1 Set up your local development environment

**If you are using Colab or Google Cloud Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* The Google Cloud SDK
* Git
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

---
### 1.2 Install additional packages


In [2]:
import sys

if "google.colab" in sys.modules:
    USER_FLAG = ""
else:
    USER_FLAG = "--user"

In [None]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.6.6 google-cloud-pipeline-components==0.1.1 --upgrade
!pip3 install install Tensorflow --user
!pip3 install explainable-ai-sdk --user

---
### 1.3 Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [14]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the KFP SDK version. It should be >=1.6.

In [2]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.6.6


---
### 1.4 Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable the Vertex AI, Cloud Storage, and Compute Engine APIs](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component,storage-component.googleapis.com). 

1. Follow the "**Configuring your project**" instructions from the Vertex Pipelines documentation.

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [8]:
!gcloud config set project erwinh-ml-demos

Updated property [core/project].


In [9]:
import os

PROJECT_ID = "erwinh-ml-demos"

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  erwinh-ml-demos


Otherwise, set your project ID here.

In [10]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "erwinh-ml-demos"  # @param {type:"string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [19]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

---
### 1.5 Authenticate your Google Cloud account

**If you are using Google Cloud Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "Vertex AI"
into the filter box, and select
   **Vertex AI Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [5]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# If on Google Cloud Notebooks, then don't execute this code
if not os.path.exists("/opt/deeplearning/metadata/env_version"):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

env: GOOGLE_APPLICATION_CREDENTIALS=''


---
### 1.6 Create a Cloud Storage bucket as necessary

You will need a Cloud Storage bucket for this example.  If you don't have one that you want to use, you can make one now.


Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. Make sure to [choose a region where Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). You may
not use a Multi-Regional Storage bucket for training with Vertex AI.

In [6]:
BUCKET_NAME = "gs://erwin-demo-vertex"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [12]:
! gsutil ls -al $BUCKET_NAME

                                 gs://erwin-demo-vertex/data/
                                 gs://erwin-demo-vertex/output-marketing-model/
                                 gs://erwin-demo-vertex/staging/


---
### 1.7 Import libraries and define constants

Define some constants. 

In [13]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

USER = "erwinh"  # <---CHANGE THIS
PIPELINE_ROOT = "{}/pipeline_root/{}".format(BUCKET_NAME, USER)

WORKING_DIR = f"{PIPELINE_ROOT}/{TIMESTAMP}"

MODEL_DISPLAY_NAME = f"train_deploy{TIMESTAMP}"
print(WORKING_DIR, MODEL_DISPLAY_NAME)

env: PATH=/opt/conda/bin:/opt/conda/condabin:/opt/conda/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin
gs://erwin-demo-vertex/pipeline_root/erwinh/20220318103505 train_deploy20220318103505


Do some imports:

In [1]:
import uuid
import numpy as np
import logging
import os
import tensorflow as tf
import pandas as pd
import kfp
import google.auth
import explainable_ai_sdk

from google.cloud import aiplatform
from google.cloud import bigquery
from google.cloud import bigquery_storage
from google.cloud.aiplatform import datasets

from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.cloud.aiplatform_v1beta1 import (
    FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient)
from google.cloud.aiplatform_v1beta1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1beta1.types import \
    entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_monitoring as featurestore_monitoring_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_online_service as featurestore_online_service_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.protobuf.duration_pb2 import Duration

from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.google import experimental
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import ClassificationMetrics, Metrics, Output, Input, component, Model, Dataset, Artifact, Condition

from explainable_ai_sdk.metadata.tf.v2 import SavedModelMetadataBuilder

2022-03-24 05:53:33.408887: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


Set environment variables

In [21]:
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"

---
---
## 2. Feature Store Creation
---
### 2.1 Prepare training dataset
1. Download sample dataset from https://www.kaggle.com/ashishkumarsingh123/telecom-churn-dataset
2. Create a new dataset in your GCP BQ instance, and upload the downloaded data to BQ as a new table. e.g.: bq://sample-project.ml_sample.telecom_churn

---
### 2.2 Create Feature store

In [None]:
# Create admin_client for CRUD and data_client for reading feature values.
admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": API_ENDPOINT})
data_client = FeaturestoreOnlineServingServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)

# Represents featurestore resource path.
BASE_RESOURCE_PATH = admin_client.common_location_path(PROJECT_ID, REGION)
FEATURESTORE_ID_TO_CREATE = "telecom_churn_prediction_{timestamp}".format(timestamp=TIMESTAMP)
create_lro = admin_client.create_featurestore(
    featurestore_service_pb2.CreateFeaturestoreRequest(
        parent=BASE_RESOURCE_PATH,
        featurestore_id=FEATURESTORE_ID_TO_CREATE,
        featurestore=featurestore_pb2.Featurestore(
            #display_name="Featurestore for telco churn prediction",
            online_serving_config=featurestore_pb2.Featurestore.OnlineServingConfig(
                fixed_node_count=3
            ),
        ),
    )
)
# Wait for LRO to finish and get the LRO result.
print(create_lro.result())

name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529"



---
### 2.2 Create Entity Type
You can specify a monitoring config which will by default be inherited by all Features under this EntityType.

In [None]:
# Create users entity type with monitoring enabled.
# All Features belonging to this EntityType will by default inherit the monitoring config.
users_entity_type_lro = admin_client.create_entity_type(
    featurestore_service_pb2.CreateEntityTypeRequest(
        parent=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID_TO_CREATE),
        entity_type_id="users",
        entity_type=entity_type_pb2.EntityType(
            description="Users entity",
            monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                    monitoring_interval=Duration(seconds=86400),  # 1 day
                ),
            ),
        ),
    )
)

# Similarly, wait for EntityType creation operation.
print(users_entity_type_lro.result())

name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529/entityTypes/users"
etag: "AMEw9yMZsrwIzYcA-OcFn5qWCoPAaAPbeE7BgcaSBzfmCXhgePUU"




---
### 2.3 Create Feature Type

You can also set a custom monitoring configuration at the Feature level, and view the properties and metrics in the console: sample [properties](https://storage.googleapis.com/cloud-samples-data/ai-platform-unified/datasets/featurestore/Feature%20Properties.png), sample [metrics](https://storage.googleapis.com/cloud-samples-data/ai-platform-unified/datasets/featurestore/Feature%20Snapshot%20Distribution.png).

In [None]:
# Create features for the 'users' entity.
admin_client.batch_create_features(
    parent=admin_client.entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID_TO_CREATE, "users"),
    requests=[
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="mobile_number",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="mobile_number",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="average revenue per user on first month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="arpu_m1",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="average revenue per user on second month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="arpu_m2",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="average revenue per user on third month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="arpu_m3",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="average revenue per user on forth month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="arpu_m4",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="Minutes of usage - voice calls on first month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="mou_m1",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="Minutes of usage - voice calls month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="mou_m2",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="Minutes of usage - voice calls on third month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="mou_m3",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.BOOL,
                description="if the user churn on the forth month. Judged by the spend > 0 on forth month",
                monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                    snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                        disabled=False,
                    ),
                ),
            ),
            feature_id="is_churn",
        ),
    ],
).result()

features {
  name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529/entityTypes/users/features/mobile_number"
  etag: "AMEw9yNZPqqfXGtXjxqYK7e05eKVOtW8Vafb9oH_9gPzgtgWn5Rq"
}
features {
  name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529/entityTypes/users/features/arpu_m1"
  etag: "AMEw9yOSWdk52FkHNIfTFTSzpCSEIgGsrpkk51iVA9pmw6BcRGMW"
}
features {
  name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529/entityTypes/users/features/arpu_m2"
  etag: "AMEw9yNyLmu8NYgbzlw39UnjnuEkYcBkF_eqnScS4xF5XCKpHdR4"
}
features {
  name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529/entityTypes/users/features/arpu_m3"
  etag: "AMEw9yMAX78gaTSZMc5qgYbM-hi1z5dz3ge-KYWGSJHuMV8BhR4c"
}
features {
  name: "projects/33856278209/locations/us-central1/featurestores/telecom_churn_prediction_20210927065529/entit

---
### 2.4 Feature Engineering
Select a subset of feature from raw dataset

In [None]:
BQ_RAW_DATA = "bq://erwinh-ml-demos.telecom_churn.churn_v1"  
FEATURE_DESTINATION = "bq://erwinh-ml-demos.ml_sample.feature_to_import_"+TIMESTAMP

In [None]:
client = bigquery.Client('erwinh-ml-demos')

job_config = bigquery.QueryJobConfig(destination=FEATURE_DESTINATION.split('/')[-1])

sql = """
    SELECT cast(mobile_number as string) mobile_number,arpu_6,arpu_7,arpu_8,arpu_9<=0 as is_churn,onnet_mou_6,onnet_mou_7,onnet_mou_8,CURRENT_TIMESTAMP() as update_time
    FROM `{}`;
""".format(BQ_RAW_DATA.split('/')[-1])

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

<google.cloud.bigquery.table.RowIterator at 0x7f9f50098510>

---
### 2.5 Feature Importing
Import processed features into feature store


In [None]:
# Create admin_client for CRUD and data_client for reading feature values.
admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": API_ENDPOINT})
data_client = FeaturestoreOnlineServingServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)

# Represents featurestore resource path.
BASE_RESOURCE_PATH = admin_client.common_location_path(PROJECT_ID, REGION)
import_users_request = featurestore_service_pb2.ImportFeatureValuesRequest(
    entity_type=admin_client.entity_type_path(
        PROJECT_ID, REGION, FEATURESTORE_ID_TO_CREATE, "users"
    ),
    bigquery_source=io_pb2.BigQuerySource(
        # Source
        input_uri=FEATURE_DESTINATION
    ),
    entity_id_field="mobile_number",
    feature_specs=[
        # Features
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="arpu_m1", source_field="arpu_6"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="arpu_m2", source_field="arpu_7"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="arpu_m3", source_field="arpu_8"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="is_churn", source_field="is_churn"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="mou_m1", source_field="onnet_mou_6"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="mou_m2", source_field="onnet_mou_7"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="mou_m3", source_field="onnet_mou_8"),
    ],
    feature_time_field="update_time",
    worker_count=10,
)
ingestion_lro = admin_client.import_feature_values(import_users_request)
ingestion_lro.result()

imported_entity_count: 99999
imported_feature_value_count: 686819

---
---
## 3. Explainable AI Model
Train a customized model with TF, and explain it with explainable AI SDK

---
#### 3.0 Serve training data from feature store

In [None]:
TRAINING_DATA_TABLE = 'erwinh-ml-demos.ml_sample.training_data_'+TIMESTAMP
FEATURESTORE_ID = 'telecom_churn_prediction_20210708091932'
TRAINING_DATA_SELECTOR_LOC = BUCKET_NAME + '/dataset/query_instance_2.csv'

bqclient = bigquery.Client()
bqstorageclient = bigquery_storage.BigQueryReadClient()
query_string = """
SELECT
    mobile_number
FROM `{}`
""".format(BQ_RAW_DATA.split('/')[-1])

user_df = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)

X_train = user_df['mobile_number']

now = datetime.now()
current_time = now.strftime("%Y-%m-%dT%H:%M:%SZ")
res = pd.DataFrame()
res['users']  = X_train
res['timestamp'] = current_time
res.to_csv(TRAINING_DATA_SELECTOR_LOC, index=False)
admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": API_ENDPOINT})
batch_serving_request = featurestore_service_pb2.BatchReadFeatureValuesRequest(
    # featurestore info
    featurestore=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
    # URL for the label data, i.e., Table 1.
    csv_read_instances=io_pb2.CsvSource(
        gcs_source=io_pb2.GcsSource(uris=[TRAINING_DATA_SELECTOR_LOC])
    ),
    destination=featurestore_service_pb2.FeatureValueDestination(
        bigquery_destination=io_pb2.BigQueryDestination(
            # Output to BigQuery table created earlier
            output_uri='bq://'+TRAINING_DATA_TABLE
        )
    ),
    entity_type_specs=[
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            entity_type_id="users",
            feature_selector=FeatureSelector(
                id_matcher=IdMatcher(
                    ids=[
                        # features, use "*" if you want to select all features within this entity type
                        "mou_m1",
                        "mou_m2",
                        "mou_m3",
                        "arpu_m1",
                        "arpu_m2",
                        "arpu_m3",
                        "is_churn"
                    ]
                )
            ),
        ),
    ],
)
batch_serving_lro = admin_client.batch_read_feature_values(batch_serving_request)
batch_serving_lro.result()

---
### 3.1 train a customized model

Prepare training data

In [None]:
TRAINING_DATA_TABLE = 'erwinh-ml-demos.ml_sample.training_data_20210726163730'
CUSTOM_MODEL_DIR = 'gs://erwin-demo-vertex/model/sample_model3'
TENSORBOARD_LOG_DIR = 'gs://erwin-demo-vertex/tensorboard/'
model_dir = CUSTOM_MODEL_DIR
table_id = TRAINING_DATA_TABLE

bqclient = bigquery.Client('erwinh-ml-demos')
bqstorageclient = bigquery_storage.BigQueryReadClient()

query_string = """
SELECT
  *
FROM `{}`
""".format(table_id)
logging.info('loading training data')
df = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
logging.info('training data loaded');

INFO:root:loading training data
INFO:root:training data loaded


Train a simple DNN model

In [None]:
def getModel(hiddenSize, lr, df, tensorboard_log_dir = None):
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(hiddenSize, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    df = df.fillna(0)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr, clipnorm=1.0),
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=[tf.keras.metrics.AUC(curve='ROC')])
    # df['is_churn'] = df['arpu_m4'] < 1
    X_train, y_train = df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']][1000:], df['is_churn'][1000:]
    callbacks = []
    if tensorboard_log_dir != None:
        callbacks.append(tf.keras.callbacks.TensorBoard(
            log_dir=tensorboard_log_dir,
            histogram_freq=1))
    
    model.fit(X_train, y_train, epochs=2, verbose=1, batch_size=10, callbacks=callbacks)
    
    return model

output_directory = model_dir
if os.environ.get('AIP_MODEL_DIR') is not None:
    output_directory = os.environ["AIP_MODEL_DIR"]

logging.info('Creating and training model ...')
model = getModel(hiddenSize=5, lr=0.01, df=df, tensorboard_log_dir=TENSORBOARD_LOG_DIR)
tf.saved_model.save(model, 'churn_prediction_model1')

model_builder = SavedModelMetadataBuilder(
    'churn_prediction_model1')

metadata = model_builder.get_metadata()

model_builder.save_model_with_metadata(model_dir)

result = model.evaluate(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']][:1000], df['is_churn'][:1000])
logging.info(f"result: {result}")
logging.info(model.metrics_names)

model_builder.save_model_with_metadata('churn_prediction_model')

### 3.1.1 Using managed tensorboard to track experiments

In [None]:
! pip install -U pip
! pip install google-cloud-aiplatform[tensorboard]

In [None]:
! gcloud beta ai tensorboards create --display-name test123 \
  --project erwinh-ml-demos \
  --region us-central1

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
Waiting for operation [9006611840653852672]...done.                            
Created Vertex AI Tensorboard: projects/33856278209/locations/us-central1/tensorboards/3198787188456161280.


In [None]:
! tb-gcp-uploader --tensorboard_resource_name projects/33856278209/locations/us-central1/tensorboards/3198787188456161280 \
  --logdir=gs://gs://erwin-demo-vertex/tensorboard/train \
  --experiment_name=test-experiment --one_shot=True

2021-09-27 08:04:45.988633: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.11.0
View your Tensorboard at https://us-central1.tensorboard.googleusercontent.com/experiment/projects+33856278209+locations+us-central1+tensorboards+3198787188456161280+experiments+test-experiment
[1m[2021-09-27T08:04:47][0m Started scanning logdir.
[1m[2021-09-27T08:04:48][0m Total uploaded: 4 scalars, 16 tensors (1.6 kB), 1 binary objects (13.8 kB)
[2K[33mListening for new data in logdir...[0m

### 3.1.2 Vizier (auto hyper parameter tuning)

#### import package

In [None]:
from google.cloud import aiplatform_v1beta1
import datetime
import json

#### Create study configuration

In [None]:
# Parameter Configuration
STUDY_DISPLAY_NAME = "{}_study_{}".format(
    PROJECT_ID.replace("-", ""), datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
)  # @param {type: 'string'}

param_lr = {"parameter_id": "lr", "double_value_spec": {"min_value": 0.01, "max_value": 0.1}}

param_hs = {
    "parameter_id": "hiddenSize",
    "integer_value_spec": {"min_value": 3, "max_value": 10},
}

# Objective Metrics
metric_auc = {"metric_id": "auc", "goal": "MAXIMIZE"}

# Put it all together in a study configuration
study = {
    "display_name": STUDY_DISPLAY_NAME,
    "study_spec": {
        "algorithm": "RANDOM_SEARCH",
        "parameters": [
            param_lr,
            param_hs,
        ],
        "metrics": [metric_auc],
    },
}

print(json.dumps(study, indent=2, sort_keys=True))

#### create study

In [None]:
ENDPOINT = REGION + "-aiplatform.googleapis.com"
PARENT = "projects/{}/locations/{}".format(PROJECT_ID, REGION)
vizier_client = aiplatform_v1beta1.VizierServiceClient(
    client_options=dict(api_endpoint=ENDPOINT)
)
study = vizier_client.create_study(parent=PARENT, study=study)
STUDY_ID = study.name
print("STUDY_ID: {}".format(STUDY_ID))

STUDY_ID: projects/33856278209/locations/us-central1/studies/3996296626522


#### Set evaluation metrics

In [None]:
def MetricAUCEvaluation(hs, lr, df):
    model = getModel(hs, lr, df)
    result = model.evaluate(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']][:1000], df['is_churn'][:1000])
    return result[1]


def CreateMetrics(trial_id, hs, lr, df):
    print(("=========== Start Trial: [{}] =============").format(trial_id))

    # Evaluate both objective metrics for this trial
    auc = MetricAUCEvaluation(hs, lr, df)
    print(
        "[hidden size = {}, lr = {}] => auc = {}".format(
            hs, lr, auc
        )
    )
    metric1 = {"metric_id": "auc", "value": auc}

    # Return the results for this trial
    return [metric1]

#### Set configuration parameters for running trials


In [None]:
client_id = "client1"  # @param {type: 'string'}
suggestion_count_per_request = 5  # @param {type: 'integer'}
max_trial_id_to_stop = 4  # @param {type: 'integer'}

print("client_id: {}".format(client_id))
print("suggestion_count_per_request: {}".format(suggestion_count_per_request))
print("max_trial_id_to_stop: {}".format(max_trial_id_to_stop))

client_id: client1
suggestion_count_per_request: 5
max_trial_id_to_stop: 4


In [None]:
df

Unnamed: 0,timestamp,entity_type_users,mou_m1,mou_m2,mou_m3,arpu_m1,arpu_m2,arpu_m3,is_churn
0,2021-07-21 14:31:11+00:00,7000002729,8.86,0.06,17.96,172.801,298.935,278.804,True
1,2021-07-21 14:31:11+00:00,7000008874,0.00,108.59,145.56,0.000,163.702,144.836,True
2,2021-07-21 14:31:11+00:00,7000013153,0.00,84.58,20.98,42.955,303.673,400.007,True
3,2021-07-21 14:31:11+00:00,7000013547,1666.98,1597.48,2522.23,1308.004,1150.284,1601.142,True
4,2021-07-21 14:31:11+00:00,7000016253,6.24,0.00,23.68,26.150,0.000,103.692,True
...,...,...,...,...,...,...,...,...,...
99994,2021-07-21 14:31:11+00:00,7002155209,0.00,0.00,0.00,247.650,187.647,183.422,True
99995,2021-07-21 14:31:11+00:00,7002246324,9.79,8.01,0.00,280.572,100.973,33.990,False
99996,2021-07-21 14:31:11+00:00,7002280506,276.21,56.71,0.00,831.464,371.873,0.050,True
99997,2021-07-21 14:31:11+00:00,7002401627,2.39,0.25,0.00,540.820,273.201,440.417,True


### Run vertex Vizier trials

In [None]:
trial_id = 0
while int(trial_id) < max_trial_id_to_stop:
    suggest_response = vizier_client.suggest_trials(
        {
            "parent": STUDY_ID,
            "suggestion_count": suggestion_count_per_request,
            "client_id": client_id,
        }
    )

    for suggested_trial in suggest_response.result().trials:
        trial_id = suggested_trial.name.split("/")[-1]
        trial = vizier_client.get_trial({"name": suggested_trial.name})

        if trial.state in ["COMPLETED", "INFEASIBLE"]:
            continue
        for param in trial.parameters:
            if param.parameter_id == "lr":
                lr = param.value
            elif param.parameter_id == "hiddenSize":
                hs = param.value
        print("Trial : hidden size is {}, lr is {}.".format(hs, lr))

        vizier_client.add_trial_measurement(
            {
                "trial_name": suggested_trial.name,
                "measurement": {
                    "metrics": CreateMetrics(suggested_trial.name, hs, lr, df)
                },
            }
        )

        response = vizier_client.complete_trial(
            {"name": suggested_trial.name, "trial_infeasible": False}
        )

Trial : hidden size is 10.0, lr is 0.014751820652343813.
Epoch 1/10


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
[hidden size = 10.0, lr = 0.014751820652343813] => auc = 0.6107170581817627
Trial : hidden size is 7.0, lr is 0.09845553757419358.
Epoch 1/10


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floa

---
### 3.2 Deploy trained model to Vertex AI endpoint 
Upload model

In [None]:
! gcloud beta ai models upload \
  --region=us-central1  \
  --display-name=churn-model-explain{TIMESTAMP} \
  --container-image-uri=us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-3:latest  \
  --artifact-uri={CUSTOM_MODEL_DIR} \
  --explanation-method=sampled-shapley \
  --explanation-path-count=10 \
  --explanation-metadata-file=./churn_prediction_model/explanation_metadata.json

Deploy to endpoint

In [None]:
! gcloud beta ai models list \
  --region={REGION} \
  --filter=display_name=churn-model-explain{TIMESTAMP}

Using endpoint [https://{REGION}-aiplatform.googleapis.com/]
[1;31mERROR:[0m (gcloud.beta.ai.models.list) The endpoint_overrides property must be an absolute URI beginning with http:// or https:// and ending with a trailing '/'. [https://{REGION}-aiplatform.googleapis.com/] is not a valid endpoint override.


In [None]:
! gcloud  ai models  update 5699789113553584128 \
--region=us-central1 \
--update-labels backend=webserver,media=images

[1;31mERROR:[0m (gcloud.ai.models) Invalid choice: 'update'.
Maybe you meant:
  gcloud ai-platform models update
  gcloud ai-platform models add-iam-policy-binding
  gcloud ai-platform models create
  gcloud ai-platform models delete
  gcloud ai-platform models describe
  gcloud ai-platform models get-iam-policy
  gcloud ai-platform models list
  gcloud ai-platform models remove-iam-policy-binding
  gcloud ai-platform models set-iam-policy

To search the help text of gcloud commands, run:
  gcloud help -- SEARCH_TERMS


In [None]:
MODEL_ID = '7595188830165008384'
model = aiplatform.Model(MODEL_ID)
model.labels = {'aba':1}

In [None]:
# Copy and paste the model ID from the output of last block in the MODEL_ID field
MODEL_ID = '7595188830165008384'
model = aiplatform.Model(MODEL_ID)

endpoint = model.deploy(
    traffic_percentage=100,
    deployed_model_display_name='churn-model-explain'+TIMESTAMP,
    machine_type="n1-standard-4",
)

---
### 3.3 Make explainable prediction

In [None]:
! gcloud beta ai endpoints list \
  --region={REGION} \
  --filter=display_name=churn-model-explain{TIMESTAMP}_endpoint

In [None]:
# Copy and paste the endpoint ID from the output of last block in the ENDPOINT_ID field
ENDPOINT_ID = '5680921494020947968'
vertex_model = explainable_ai_sdk.load_model_from_vertex(
    project=PROJECT_ID,
    region=REGION,
    endpoint_id=ENDPOINT_ID
)


response = vertex_model.explain(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']].values.tolist()[:10])
response[0].visualize_attributions()
response[0].feature_importance()
print(response[0].as_tensors())
print(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']].values.tolist()[0])


---
### 3.4 Explain the model: What-if tool
#### 3.4.1 Install the What-if tool and jupyter widget
<font color='red'>Note that if it's the first time you use What-if tool in notebook, you may need to restart the browser tab</font>

In [None]:
!pip install witwidget witwidget-gpu
!sudo /opt/conda/bin/jupyter labextension install wit-widget
!sudo /opt/conda/bin/jupyter labextension install @jupyter-widgets/jupyterlab-manager

In [None]:
df.head()
endpoint.predict(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']].values.tolist()[:100])

#### 3.4.2 Run What-if Tool

In [None]:
from witwidget.notebook.visualization import WitWidget, WitConfigBuilder
def predict_func(feature):
    result = endpoint.predict(feature)
    return result[0]
    
config_builder = (WitConfigBuilder(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3', 'is_churn']].values.tolist()[-100:], ['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3', 'is_churn'])
  .set_custom_predict_fn(predict_func)
  .set_target_feature('is_churn')
  .set_label_vocab(['churn', 'not_churn']))
WitWidget(config_builder, height=800)

---
---
## 4. Vertex Pipeline
Create a vertex pipeline that orchestrate feature serving, model training, evaluation, and deployment

---

### 4.1 Create components with Kubeflow
#### 4.1.1 Feature serving component

In [None]:
TRAINING_DATA_SELECTOR_LOC

In [None]:
TRAINING_DATA_SELECTOR_LOC = BUCKET_NAME + '/dataset/query_instance_2.csv'

@component(base_image='gcr.io/deeplearning-platform-release/tf2-gpu.2-5',
           packages_to_install=[
               'pandas', 
               "google-cloud-bigquery-storage==2.4.0",
               'google-cloud-bigquery', 
               'pyarrow ',
               'fsspec', 
               'gcsfs', 
               'google-cloud-aiplatform==1.1.0'])
def feature_store_serve_op(api_endpoint: str, 
    project_id: str, 
    region: str, 
    featurestore_id: str, 
    bq_feature_source: str,
    bq_training_output: str,
    training_data_selector_loc: str = 'gs://erwin-demo-vertex/data/query_instance_2.csv'):
    import google.auth
    from google.cloud import bigquery
    from google.cloud import bigquery_storage
    from google.cloud.aiplatform_v1beta1 import (
        FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient)
    from google.cloud.aiplatform_v1beta1.types import FeatureSelector, IdMatcher
    from google.cloud.aiplatform_v1beta1.types import \
        entity_type as entity_type_pb2
    from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
    from google.cloud.aiplatform_v1beta1.types import \
        featurestore as featurestore_pb2
    from google.cloud.aiplatform_v1beta1.types import \
        featurestore_monitoring as featurestore_monitoring_pb2
    from google.cloud.aiplatform_v1beta1.types import \
        featurestore_online_service as featurestore_online_service_pb2
    from google.cloud.aiplatform_v1beta1.types import \
        featurestore_service as featurestore_service_pb2
    from google.cloud.aiplatform_v1beta1.types import io as io_pb2
    from datetime import datetime
    import pandas as pd
    
    print("feature store serve task")

    bqclient = bigquery.Client(project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient()
    query_string = """
    SELECT
        mobile_number
    FROM `{}`
    """.format(bq_feature_source.split('/')[-1])

    user_df = (
        bqclient.query(query_string)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )

    X_train = user_df['mobile_number']
    
    now = datetime.now()

    current_time = now.strftime("%Y-%m-%dT%H:%M:%SZ")
    print("Current Time =", current_time)
    res = pd.DataFrame()
    res['users']  = X_train
    res['timestamp'] = current_time
    res.to_csv(training_data_selector_loc, index=False)
    admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": api_endpoint})
    batch_serving_request = featurestore_service_pb2.BatchReadFeatureValuesRequest(
        # featurestore info
        featurestore=admin_client.featurestore_path(project_id, region, featurestore_id),
        # URL for the label data, i.e., Table 1.
        csv_read_instances=io_pb2.CsvSource(
            gcs_source=io_pb2.GcsSource(uris=['gs://erwin-demo-vertex/data/query_instance_2.csv'])
        ), 
        destination=featurestore_service_pb2.FeatureValueDestination(
            bigquery_destination=io_pb2.BigQueryDestination(
                # Output to BigQuery table created earlier
                output_uri=bq_training_output
            )
        ),
        entity_type_specs=[
            featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
                entity_type_id="users",
                feature_selector=FeatureSelector(
                    id_matcher=IdMatcher(
                        ids=[
                            # features, use "*" if you want to select all features within this entity type
                            "mou_m1",
                            "mou_m2",
                            "mou_m3",
                            "arpu_m1",
                            "arpu_m2",
                            "arpu_m3",
                            "is_churn"
                        ]
                    )
                ),
            ),
        ],
    )
    batch_serving_lro = admin_client.batch_read_feature_values(batch_serving_request)
    batch_serving_lro.result()

@component(base_image='gcr.io/deeplearning-platform-release/tf2-gpu.2-5',
           packages_to_install=['google-cloud-aiplatform==1.1.0'])
def evaluate_model(
    api_endpoint: str, 
    project_id: str, 
    region: str, 
    model: Input[Model],
    metrics: Output[Metrics]):
    model_id = model.uri.split('/')[-1]
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    from google.cloud import aiplatform
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    name = client.model_path(
        project=project_id, location=region, model=model_id
    )
    print(name)
    response = client.list_model_evaluations(parent=name)
    print("response:", response)
    print(model)
    print(model.metadata)
    print(model.uri)
    print(model.name)
    auRoc = response.__getattr__('model_evaluations')[0].metrics['auRoc']
    metrics.log_metric("auROC", float(auRoc))

In [None]:
PROJECT_ID

In [None]:
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage
from google.cloud.aiplatform_v1beta1 import (
    FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient)
from google.cloud.aiplatform_v1beta1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1beta1.types import \
    entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_monitoring as featurestore_monitoring_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_online_service as featurestore_online_service_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from datetime import datetime
import pandas as pd

bq_feature_source = BQ_FEATURE_SOURCE
bq_training_output = BQ_TRAINING_OUTPUT
bq_feature_source_table = BQ_FEATURE_TABLE
project = PROJECT_ID
model_display_name = MODEL_DISPLAY_NAME
api_endpoint = API_ENDPOINT
region = REGION
serving_container_image_uri = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-3:latest"
featurestore_id = FEATURE_STORE
training_data_selector_loc = 'gs://erwin-demo-vertex/data/query_instance_2.csv'
api_endpoint=api_endpoint
project_id=project
region=region
featurestore_id=featurestore_id
bq_feature_source=bq_feature_source
bq_training_output=bq_training_output
        
print("feature store serve task")

bqclient = bigquery.Client(project_id)
bqstorageclient = bigquery_storage.BigQueryReadClient()
query_string = """
SELECT
    mobile_number
FROM `{}`
""".format(bq_feature_source.split('/')[-1])

user_df = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)

X_train = user_df['mobile_number']

now = datetime.now()

current_time = now.strftime("%Y-%m-%dT%H:%M:%SZ")
print("Current Time =", current_time)
res = pd.DataFrame()
res['users']  = X_train
res['timestamp'] = current_time
res.to_csv(training_data_selector_loc, index=False)
admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": api_endpoint})
batch_serving_request = featurestore_service_pb2.BatchReadFeatureValuesRequest(
    # featurestore info
    featurestore=admin_client.featurestore_path(project_id, region, featurestore_id),
    # URL for the label data, i.e., Table 1.
    csv_read_instances=io_pb2.CsvSource(
        gcs_source=io_pb2.GcsSource(uris=['gs://erwin-demo-vertex/data/query_instance_2.csv'])
    ),
    destination=featurestore_service_pb2.FeatureValueDestination(
        bigquery_destination=io_pb2.BigQueryDestination(
            # Output to BigQuery table created earlier
            output_uri=bq_training_output
        )
    ),
    entity_type_specs=[
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            entity_type_id="users",
            feature_selector=FeatureSelector(
                id_matcher=IdMatcher(
                    ids=[
                        # features, use "*" if you want to select all features within this entity type
                        "mou_m1",
                        "mou_m2",
                        "mou_m3",
                        "arpu_m1",
                        "arpu_m2",
                        "arpu_m3",
                        "is_churn"
                    ]
                )
            ),
        ),
    ],
)
batch_serving_lro = admin_client.batch_read_feature_values(batch_serving_request)
batch_serving_lro.result()

#### 4.1.2 Customized Training Component

In [None]:
from google.cloud.aiplatform import datasets
from google.cloud import aiplatform
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import ClassificationMetrics, Metrics, Output, Input, component, Model, Dataset, Artifact, Condition
from kfp.v2.google.client import AIPlatformClient

CUSTOM_MODEL_DIR = 'gs://erwin-demo-vertex/model'+TIMESTAMP
TENSORBOARD_LOG_DIR = 'gs://erwin-demo-vertex/tensorboard/'

@component(base_image='gcr.io/deeplearning-platform-release/tf2-gpu.2-5',
           packages_to_install=[
                                'pandas', 
                                "google-cloud-bigquery-storage==2.4.0",
                                'google-cloud-bigquery', 
                                'pyarrow',
                                'fsspec', 
                                'gcsfs', 
                                'google-cloud-aiplatform==1.1.0',
                                'kfp==1.6.3',
                                'kfp-pipeline-spec',
                                'kfp-server-api',
                                'pandas==1.2.4',
                                'pandas-profiling==3.0.0',
                                'tensorboard==2.5.0',
                                'tensorboard-data-server==0.6.1',
                                'tensorboard-plugin-wit==1.8.0',
                                'tensorflow==2.5.0',
                                'tensorflow-datasets==4.3.0',
                                'tensorflow-estimator==2.5.0',
                                'tensorflow-metadata==1.0.0',
                                'google-cloud-logging==2.4.0',
                                'explainable-ai-sdk==1.3.0',
                                'numpy==1.19.5',
                                'sklearn'])
def custom_training_op(
    project_id: str,
    dataset: Input[Dataset],
    metrics: Output[Metrics],
    model_dir: str,
    tensorboard_log_dir: str,
):
    from google.cloud.aiplatform import datasets
    from google.cloud import aiplatform
    import logging
    import os
    import tensorflow as tf
    import pandas as pd
    from google.cloud import bigquery
    from google.cloud import bigquery_storage
    from kfp.v2.dsl import ClassificationMetrics, Metrics, Output, component
    import numpy as np
    
    dataset = aiplatform.TabularDataset('projects/' + dataset.uri.split('projects/')[-1])
    table_id = dataset._gca_resource.metadata.get("inputConfig").get("bigquerySource").get("uri").split('//')[-1]
    
    bqclient = bigquery.Client(project_id)
    bqstorageclient = bigquery_storage.BigQueryReadClient()

    query_string = """
    SELECT
      *
    FROM `{}`
    """.format(table_id)

    df = (
        bqclient.query(query_string)
        .result()
        .to_dataframe(bqstorage_client=bqstorageclient)
    )

    output_directory = model_dir
    if os.environ.get('AIP_MODEL_DIR') is not None:
        output_directory = os.environ["AIP_MODEL_DIR"]

    logging.info('Creating and training model ...')
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(5, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    log_dir = tensorboard_log_dir
    if os.environ.get('AIP_TENSORBOARD_LOG_DIR') is not None:
        log_dir = os.environ["AIP_TENSORBOARD_LOG_DIR"]

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir,
      histogram_freq=1)

    df = df.fillna(0)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.01, clipnorm=1.0),
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=[tf.keras.metrics.AUC()])
    X_train, y_train = df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']][1000:], df['is_churn'][1000:]
    model.fit(X_train, y_train, epochs=10, verbose=1, batch_size=10, callbacks=[tensorboard_callback])

    logging.info(f'Exporting SavedModel to: {output_directory}')
    model.save(output_directory)

    result = model.evaluate(df[['mou_m1','mou_m2', 'mou_m3', 'arpu_m1', 'arpu_m2', 'arpu_m3']][:1000], df['is_churn'][:1000])
    
    metrics.log_metric("auROC", result[1])


---
### 4.2 Build Pipeline With KFP

In [None]:
FEATURE_STORE = "telecom_churn_prediction_20210708091932"
BQ_FEATURE_SOURCE = "bq://erwinh-ml-demos.telecom_churn.processed_features_9"
BQ_TRAINING_OUTPUT = "bq://erwinh-ml-demos.telecom_churn.training_data_"+TIMESTAMP
BQ_FEATURE_TABLE = BQ_FEATURE_SOURCE.split('/')[-1]
MODEL_DISPLAY_NAME = 'sample-churn-prediction-model'

@kfp.dsl.pipeline(name="train" + str(uuid.uuid4()))
def pipeline(featurestore_id: str = FEATURE_STORE, 
    bq_feature_source: str = BQ_FEATURE_SOURCE,
    bq_training_output: str = BQ_TRAINING_OUTPUT,
    bq_feature_source_table: str = BQ_FEATURE_TABLE,
    project: str = PROJECT_ID,
    model_display_name: str = MODEL_DISPLAY_NAME,
    api_endpoint: str = API_ENDPOINT, 
    region: str = REGION,
    serving_container_image_uri: str = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-3:latest",
):
    feature_store_serve_job = feature_store_serve_op(
        api_endpoint=api_endpoint, 
        project_id=project, 
        region=region, 
        featurestore_id=featurestore_id, 
        bq_feature_source=bq_feature_source,
        bq_training_output=bq_training_output)
    
    dataset_create_job = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name='dataset_creation_op',
        bq_source=bq_training_output
    )
    
#     dataset_create_job.after(feature_store_serve_job)

    custom_training_job = custom_training_op(
        project_id=project,
        dataset=dataset_create_job.outputs["dataset"],
        model_dir=WORKING_DIR,
        tensorboard_log_dir=TENSORBOARD_LOG_DIR,
    )
    
    custom_training_job.after(dataset_create_job)
    
    endpoint_create_job = gcc_aip.EndpointCreateOp(
        project=project,
        display_name="pipelines-created-endpoint",
    )
    
    automl_training_job = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name='automl_training_op',
        optimization_prediction_type="classification",
        budget_milli_node_hours=1,
        column_transformations=[
            {"numeric": {"column_name": "mou_m1"}},
            {"numeric": {"column_name": "mou_m2"}},
            {"numeric": {"column_name": "mou_m3"}},
            {"numeric": {"column_name": "arpu_m1"}},
            {"numeric": {"column_name": "arpu_m2"}},
            {"numeric": {"column_name": "arpu_m3"}},
            {"categorical": {"column_name": "is_churn"}},
        ],
        dataset=dataset_create_job.outputs["dataset"],
        target_column="is_churn",
    )
    automl_training_job.after(dataset_create_job)

    evaluate_model(
        api_endpoint=api_endpoint, 
        project_id=project, 
        region=region, 
        model=automl_training_job.outputs["model"])
    
    automl_model_deploy_job = gcc_aip.ModelDeployOp(
        traffic_percentage=100,
        endpoint=endpoint_create_job.outputs["endpoint"],
        model=automl_training_job.outputs["model"],
        project=project,
        machine_type="n1-standard-4",
    )
    custom_model_upload_job = gcc_aip.ModelUploadOp(
        project=project,
        display_name=model_display_name,
        artifact_uri=WORKING_DIR,
        serving_container_image_uri=serving_container_image_uri,
        serving_container_environment_variables={"NOT_USED": "NO_VALUE"},
    )
    custom_model_upload_job.after(custom_training_job)
    custom_model_deploy_job = gcc_aip.ModelDeployOp(
        project=project,
        traffic_percentage=50,
        endpoint=automl_model_deploy_job.outputs["endpoint"],
        model=custom_model_upload_job.outputs["model"],
        deployed_model_display_name=model_display_name,
        machine_type="n1-standard-4",
    )

---
### 4.3 Compile and upload pipeline to Vertex AI

Now, you're ready to compile the pipeline:

In [None]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="telecom-sample-pipeline.json"
)

The pipeline compilation generates the `train_upload_deploy.json` job spec file.

Next, instantiate an API client object:

In [None]:
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

In [None]:
PIPELINE_ROOT

Then, you run the defined pipeline like this: 

In [None]:

response = api_client.create_run_from_job_spec( "telecom-sample-pipeline.json", pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project": PROJECT_ID,
        'bq_feature_source': BQ_FEATURE_SOURCE,
        "bq_training_output": BQ_TRAINING_OUTPUT,
        'featurestore_id': FEATURE_STORE,
    },
    enable_caching=False)
#response = api_client.create_schedule_from_job_spec(job_spec_path = "train_upload_deploy.json",
#                                  schedule="2 * * * *")

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:
- Delete Cloud Storage objects that were created.  Uncomment and run the command in the cell below **only if you are not using the `PIPELINE_ROOT` path for any other purpose**.
- Delete your deployed model: first, undeploy it from its *endpoint*, then delete the model and endpoint.


In [None]:
# Warning: this command will delete ALL Cloud Storage objects under the PIPELINE_ROOT path.
# ! gsutil -m rm -r $PIPELINE_ROOT

In [None]:
model = aiplatform.Model(model_name='9070328814361378816')


In [None]:
endpoints = aiplatform.Endpoint.list()

In [None]:
for endpoint in endpoints:
    try:
        endpoint.delete(force=True)
    except:
        print("delete fail~!!!! ")
        print(endpoint)