<a href="https://colab.research.google.com/github/alicechang0909/ai-platform-samples/blob/main/feature_monitoring_with_feature_registry.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Feature Monitoring in Vertex AI Featurestore

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/feature_store/feature_monitoring_with_feature_registry.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fofficial%2Ffeature_store%2Ffeature_monitoring_with_feature_registry.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/official/feature_store/feature_monitoring_with_feature_registry.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/feature_store/feature_monitoring_with_feature_registry.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview

In this tutorial, you will learn how to use the Vertex AI SDK for Python to monitor feature data in Vertex AI featurestore

This tutorial uses the following Google Cloud ML services and resources:

* Vertex AI Feature Store
* BigQuery

The steps performed include the following:

* Setup BigQuery data
* Setup Feature Registry
* Setup FeatureMonitors, execute FeatureMonitorJobs to observe feature stats and detect drift.
* Clean up

## Get started

### Install Vertex AI SDK for Python and other required packages


In [35]:
! pip3 install --upgrade --quiet google-cloud-aiplatform bigframes

### Restart runtime

To use the newly installed packages, you must restart the runtime on Google Colab (Or restart kernel on workbench notebook).

In [36]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [1]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Grant instance service account permissions (Workbench only)

Grant your workbench instance owner (in format of xxx-compute@developer.gserviceaccount.com) following IAM permissions:
*   Bigquery Admin
*   Vertex AI Feature Store Admin




### Set Google Cloud project information and initialize Vertex AI SDK for Python

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [2]:
# change to your own project id
PROJECT_ID = ""  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

import vertexai

vertexai.init(project=PROJECT_ID, location=LOCATION)

### Imports and IDs

In [3]:
import bigframes
import bigframes.pandas
import pandas as pd
from google.cloud import bigquery
from vertexai.resources.preview.feature_store import (Feature, FeatureGroup, FeatureMonitor,
                                                      offline_store)
from vertexai.resources.preview.feature_store import utils as fs_utils

The following variables set BigQuery and Feature Group resources that will be
used or created. If you'd like to use your own data source (CSV), please adjust
`DATA_SOURCE`.

In [4]:
BQ_DATASET_ID = "fhfv_dataset_unique"  # @param {type:"string"}
BQ_TABLE_ID = "fhfv_table_unique"  # @param {type:"string"}
BQ_TABLE_URI = f"{PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}"

FEATURE_GROUP_ID = "fg_feature_monitoring_tutorial"  # @param {type:"string"}

DATA_SOURCE = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"

## Create BigQuery table containing feature data

First we'll use BigQuery DataFrames to load in our CSV data source. Then we'll
rename the `timestamp` column to `feature_timestamp` to support usage as a
BigQuery source in Feature Registry.

In [5]:
session = bigframes.connect(
    bigframes.BigQueryOptions(
        project=PROJECT_ID,
        location=LOCATION,
    )
)
df = session.read_csv(DATA_SOURCE)
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
df = df.rename(columns={"timestamp": "feature_timestamp"})

Let's preview the data we'll write to the table.

In [6]:
df.head()

Unnamed: 0,users,movies,feature_timestamp
0,alice,movie_01,2021-09-15 08:28:14+00:00
1,bob,movie_02,2021-09-15 08:28:14+00:00
2,dav,movie_03,2021-09-15 08:28:14+00:00
3,eve,movie_04,2021-09-15 08:28:14+00:00
4,alice,movie_03,2021-09-14 09:35:15+00:00


And finally we'll write the DataFrame to the target BigQuery table.

In [7]:
df.to_gbq(BQ_TABLE_URI, if_exists="replace")

'ruichang-fs-billing.fhfv_dataset_unique.fhfv_table_unique'

## Create feature registry resources

Create a feature group backed by the BigQuery table created above.

In [8]:
fg: FeatureGroup = FeatureGroup.create(
    f"{FEATURE_GROUP_ID}",
    fs_utils.FeatureGroupBigQuerySource(
        uri=f"bq://{BQ_TABLE_URI}", entity_id_columns=["users"]
    ),
)

INFO:vertexai.resources.preview.feature_store.feature_group:Creating FeatureGroup
INFO:vertexai.resources.preview.feature_store.feature_group:Create FeatureGroup backing LRO: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/operations/7789990851446308864
INFO:vertexai.resources.preview.feature_store.feature_group:FeatureGroup created. Resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial
INFO:vertexai.resources.preview.feature_store.feature_group:To use this FeatureGroup in another session:
INFO:vertexai.resources.preview.feature_store.feature_group:feature_group = aiplatform.FeatureGroup('projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial')


In [9]:
# For existing FeatureGroup, get by passing FEATURE_GROUP_ID
fg = FeatureGroup(f"{FEATURE_GROUP_ID}")
print(fg)

<vertexai.resources.preview.feature_store.feature_group.FeatureGroup object at 0x7ff2081b45e0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial


Create the `movies` feature which corresponds to the `movies` column in the
recently created BigQuery table.

In [10]:
movies_feature: Feature = fg.create_feature("movies")

INFO:vertexai.resources.preview.feature_store.feature_group:Creating Feature
INFO:vertexai.resources.preview.feature_store.feature_group:Create Feature backing LRO: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies/operations/5784763117359595520
INFO:vertexai.resources.preview.feature_store.feature_group:Feature created. Resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies
INFO:vertexai.resources.preview.feature_store.feature_group:To use this Feature in another session:
INFO:vertexai.resources.preview.feature_store.feature_group:feature = aiplatform.Feature('projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies')


## Setup Feature Monitoring

### Create Feature Monitor

In [11]:
FEATURE_MONITOR_ID = "vertex_sdk_fm_cron"  # @param {type:"string"}
fm: FeatureMonitor = fg.create_feature_monitor(
    name= FEATURE_MONITOR_ID,
    feature_selection_configs=[("movies", 0.1)],
    schedule_config = "0 * * * *"  # Default schedule (hourly)
)

INFO:vertexai.resources.preview.feature_store.feature_group:Creating FeatureMonitor
INFO:vertexai.resources.preview.feature_store.feature_group:Create FeatureMonitor backing LRO: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron/operations/3478920108145901568
INFO:vertexai.resources.preview.feature_store.feature_group:FeatureMonitor created. Resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron
INFO:vertexai.resources.preview.feature_store.feature_group:To use this FeatureMonitor in another session:
INFO:vertexai.resources.preview.feature_store.feature_group:feature_monitor = aiplatform.FeatureMonitor('projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron')


List Feature Monitors created in the Feature Group

In [12]:
fms : list[FeatureMonitor] = fg.list_feature_monitors()
print(fms)

[<vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor object at 0x7ff2081e8eb0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron]


Get FeatureMonitor and it's properties

In [13]:
fm = fg.get_feature_monitor(FEATURE_MONITOR_ID)
print(fm)
print("feature selection configs: (feature and it's drift threshold):", fm.feature_selection_configs)
print("schedule config in cron string: ", fm.schedule_config)

<vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor object at 0x7ff20b4f56c0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron
feature selection configs: (feature and it's drift threshold): [('movies', 0.1)]
schedule config in cron string:  0 * * * *


### Execute a FeatureMonitorJob

FeatureMonitorJob will be excuted in two ways:
1. Automatically executed in scheduled time set the schedule_config in FeatureMonitor.
2. Manually trigger. In the following sections we will manually trigger monitor job to observe stats and drifts.

Stats are generated on the snapshot of the data in FeatureMonitorJob execution.

Manually execute FeatureMonitorJob as following

In [26]:
fmj = fm.create_feature_monitor_job()

In [15]:
print(fmj)

<vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor.FeatureMonitorJob object at 0x7ff20819ae90> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron/featureMonitorJobs/6197403414582788096


#### Observe Feature Stats in FeatureMonitorJob

Get Feature Monitor Job and observe the feature_stats_and_anomalies. feature_stats refers to tensor flow proto [FeatureNameStatistics](https://www.tensorflow.org/tfx/tf_metadata/api_docs/python/tfmd/proto/statistics_pb2/FeatureNameStatistics)

In [None]:
# Note: if feature_stats_and_anomalies not shown, wait for a few seconds to minutes then retry
import time

while True:
  fmj_get = fm.get_feature_monitor_job(fmj.name)
  if (fmj_get.feature_stats_and_anomalies is None or len(fmj_get.feature_stats_and_anomalies) == 0):
      time.sleep(5)
  else:
     break
print(fmj_get)
print(fmj_get.feature_stats_and_anomalies)

At this time, only one job executed, no drift detected.

In [28]:
for feature_stats_and_anomalies in fmj_get.feature_stats_and_anomalies:
    print("feature: ", feature_stats_and_anomalies.feature_id)
    print("drift score: ", feature_stats_and_anomalies.distribution_deviation)
    print("drift detected: ", feature_stats_and_anomalies.drift_detected)

feature:  movies
drift score:  0.0
drift detected:  False


#### Get Feature Stats in Feature

In [29]:
feature_movie = fg.get_feature("movies", latest_stats_count=5)
print(feature_movie)

# At this time, only one job executed, no drift detected.
for feature_stats in feature_movie.feature_stats_and_anomalies:
    print("feature monitor job id: ", feature_stats.feature_monitor_job_id)
    print("drift score: ", feature_stats.distribution_deviation)
    print("drift detected: ", feature_stats.drift_detected)

<vertexai.resources.preview.feature_store.feature.Feature object at 0x7ff1f8fab4f0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies
feature monitor job id:  5161575500287574016
drift score:  0.0
drift detected:  False
feature monitor job id:  7511610080844840960
drift score:  0.0
drift detected:  False
feature monitor job id:  4055941791768117248
drift score:  0.0
drift detected:  False
feature monitor job id:  8628502788432723968
drift score:  0.0
drift detected:  False
feature monitor job id:  6197403414582788096
drift score:  0.0
drift detected:  False


Full feature_stats_and_anomalies in feature

In [30]:
print(feature_movie.feature_stats_and_anomalies)

[feature_stats {
  struct_value {
    fields {
      key: "type"
      value {
        string_value: "STRING"
      }
    }
    fields {
      key: "stringStats"
      value {
        struct_value {
          fields {
            key: "unique"
            value {
              string_value: "4"
            }
          }
          fields {
            key: "topValues"
            value {
              list_value {
                values {
                  struct_value {
                    fields {
                      key: "value"
                      value {
                        string_value: "movie_02"
                      }
                    }
                    fields {
                      key: "frequency"
                      value {
                        number_value: 1
                      }
                    }
                  }
                }
                values {
                  struct_value {
                    fields {
                      key: 

### Detect drift


Drifts happen when data in Feature Offline Store (BQ Source) changes overtime. Every Feature Monitor job will caculate drift comparing the data snapshot in the new job with the data snapshot in last job.

Algorithm to calculate drift score:
* For Categorical type: [L-infinity](https://en.wikipedia.org/wiki/Chebyshev_distance) distance.
* For Numerical type: [Jensen–Shannon divergence](https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence)

In this tutorial, append additional data to the BQ table to simulate the data changes.



In [31]:
from io import StringIO

data = """users,movies,timestamp
"new_1","action_1",2024-08-15T08:28:14Z
"new_2","drama_2",2024-09-15T08:28:14Z
"new_3","romance_3",2024-10-15T08:28:14Z
"new_4","science_fiction_4",2024-11-15T09:29:16Z
"new_5","comedy_5",2024-12-11T07:27:19Z
"""

# Read the data into a pandas DataFrame
df_new = session.read_csv(StringIO(data))
df_new["timestamp"] = pd.to_datetime(df_new["timestamp"], utc=True)
df_new = df_new.rename(columns={"timestamp": "feature_timestamp"})
df_new.head()

Unnamed: 0,users,movies,feature_timestamp
0,new_1,action_1,2024-08-15 08:28:14+00:00
1,new_2,drama_2,2024-09-15 08:28:14+00:00
2,new_3,romance_3,2024-10-15 08:28:14+00:00
3,new_4,science_fiction_4,2024-11-15 09:29:16+00:00
4,new_5,comedy_5,2024-12-11 07:27:19+00:00


In [38]:
# Append new data to the Bigquery table
df_new.to_gbq(BQ_TABLE_URI, if_exists="append")

'ruichang-fs-billing.fhfv_dataset_unique.fhfv_table_unique'

In [36]:
fmj_new = fm.create_feature_monitor_job(description="new job test drift detection")

List FeatureMonitorJobs, all jobs including the new one are shown

In [34]:
fmjs = fm.list_feature_monitor_jobs()
print(fmjs)

[<vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor.FeatureMonitorJob object at 0x7ff1f3af74f0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron/featureMonitorJobs/4055941791768117248, <vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor.FeatureMonitorJob object at 0x7ff1f3af47f0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron/featureMonitorJobs/5161575500287574016, <vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor.FeatureMonitorJob object at 0x7ff1f3af40a0> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron/featureMonitorJobs/5740288052404682752, <vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor.FeatureMonitorJob object at 0x7ff1f

Observe drift in Feature Monitor Job

In [39]:
while True:
  fmj_with_drift = fm.get_feature_monitor_job(fmj_new.name)
  if (fmj_with_drift.feature_stats_and_anomalies is None or len(fmj_with_drift.feature_stats_and_anomalies) == 0):
      time.sleep(5)
  else:
     break
print(fmj_with_drift)
for feature_stats_and_anomalies in fmj_with_drift.feature_stats_and_anomalies:
    print("feature: ", feature_stats_and_anomalies.feature_id)
    print("drift score (distribution_deviation): ", feature_stats_and_anomalies.distribution_deviation)
    print("drift detected: ", feature_stats_and_anomalies.drift_detected)

<vertexai.resources.preview.feature_store.feature_monitor.FeatureMonitor.FeatureMonitorJob object at 0x7ff1f3ae2980> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron/featureMonitorJobs/5919306137592659968
feature:  movies
drift score (distribution_deviation):  0.0
drift detected:  False


Observe the full stastics and drift

In [None]:
print(fmj_with_drift.feature_stats_and_anomalies)

Observe drift in Feature

In [41]:
feature_movie = fg.get_feature("movies", latest_stats_count=5)
print(feature_movie)

# There will be stats generated by two jobs, one has no drift, one detected drift
for feature_stats in feature_movie.feature_stats_and_anomalies:
    print("feature monitor job id: ", feature_stats.feature_monitor_job_id)
    print("drift score: ", feature_stats.distribution_deviation)
    print("drift detected: ", feature_stats.drift_detected)

<vertexai.resources.preview.feature_store.feature.Feature object at 0x7ff1f3ae0760> 
resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies
feature monitor job id:  5919306137592659968
drift score:  0.0
drift detected:  False
feature monitor job id:  5740288052404682752
drift score:  0.1388888888888889
drift detected:  True
feature monitor job id:  5161575500287574016
drift score:  0.0
drift detected:  False
feature monitor job id:  7511610080844840960
drift score:  0.0
drift detected:  False
feature monitor job id:  4055941791768117248
drift score:  0.0
drift detected:  False


## Cleaning up

### Delete feature monitor, feature and feature group

In [42]:
# Delete Feature Monitor, all FeatureMonitorJobs created under the Feature Monitor will be automatically deleted, but stats kept under Feature.
fm.delete()

INFO:google.cloud.aiplatform.base:Deleting FeatureMonitor : projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron
INFO:google.cloud.aiplatform.base:FeatureMonitor deleted. . Resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron
INFO:google.cloud.aiplatform.base:Deleting FeatureMonitor resource: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron
INFO:google.cloud.aiplatform.base:Delete FeatureMonitor backing LRO: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/operations/4757942402319122432
INFO:google.cloud.aiplatform.base:FeatureMonitor resource projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/featureMonitors/vertex_sdk_fm_cron deleted.


In [43]:
# Delete Feature, all stats under the Feature will be automatically deleted.
movies_feature.delete()

INFO:google.cloud.aiplatform.base:Deleting Feature : projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies
INFO:google.cloud.aiplatform.base:Feature deleted. . Resource name: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies
INFO:google.cloud.aiplatform.base:Deleting Feature resource: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies
INFO:google.cloud.aiplatform.base:Delete Feature backing LRO: projects/743352906802/locations/us-central1/operations/2452099393105428480
INFO:google.cloud.aiplatform.base:Feature resource projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial/features/movies deleted.


In [44]:
# Delete Feature Group.
fg.delete()

INFO:vertexai.resources.preview.feature_store.feature_group:Deleting FeatureGroup resource: projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial
INFO:vertexai.resources.preview.feature_store.feature_group:Delete FeatureGroup backing LRO: projects/743352906802/locations/us-central1/operations/1239505193435922432
INFO:vertexai.resources.preview.feature_store.feature_group:FeatureGroup resource projects/743352906802/locations/us-central1/featureGroups/fg_feature_monitoring_tutorial deleted.


### Delete BigQuery dataset and table

In [45]:
client = bigquery.Client()

In [46]:
client.delete_table(f"{BQ_TABLE_URI}")

In [47]:
client.delete_dataset(f"{PROJECT_ID}.{BQ_DATASET_ID}")