![ga4](https://www.google-analytics.com/collect?v=2&tid=G-6VDTYWLKX6&cid=1&en=page_view&sid=1&dl=statmike%2Fvertex-ai-mlops%2FFeature+Store&dt=Feature+Store.ipynb)

# Feature Store

In ML, instances (think rows) of data are used to both train models, and request predictions.  The transformation of the coloumns in these instances into features by feature engineering is essential for input at training and serving.  Thus, feature management is important.

A feature store centralizes the tasks of creating, maintaining, sharing, and serving ML features.  This allows for easier reuse, faster ML development, less training/serving skew, and discoverability of features and feature sets.

In Vertex AI, this is covered by [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore) of which there are two types:
- [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore#vaifs)
    - Covered in this workflow
- [Vertex AI Feature Store (Legacy)](https://cloud.google.com/vertex-ai/docs/featurestore#vaifs_legacy)
    - Covered in the prior workflow at [Feature Store (Legacy)](./Feature%20Store%20(Legacy).ipynb)
- [Comparison](https://cloud.google.com/vertex-ai/docs/featurestore#comparison_between_and)

**Prerequisites:**
-  [01 - BigQuery - Table Data Source](../01%20-%20Data%20Sources/01%20-%20BigQuery%20-%20Table%20Data%20Source.ipynb)

**Resources:**
- [Documentation](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview)
- [Tutorial from Documentation](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/feature_store/online_feature_serving_and_fetching_bigquery_data_with_feature_store_bigtable.ipynb)

**Todo List:**
- add embedding to this datasource and include embedding search
- add data catalog narrative
- add FS management section: add/delete/list/update
- add example of multiple groups feeding features to the same online view
- add latency example with sync and async client

---
**tl;dr**

<p align="center" width="100%"><center>
    <img src="../architectures/architectures/images/feature store/readme/overview.png">
</center></p>

The main layout for Feature Store is serving environment for **features** observed on **entities**:
- **entity** = a unique record, think row
- **feature** = observations, input for ML, think column

The **offline store** is made up of any BigQuery Table(s)/View(s), the **data source**, that you manage:
- (1) If a table/view has a single row per unique **entity** with columns that are non-changing values for **features** then the table can be directly used in an **online store's** **feature view** (see below).
- (2) For time bound **features** the table/view needs to have two additional columns: entity_id, feature_timestamp. Think of this as a history table.

The **feature registry**:
- Tables/Views of type (2) above are registered as **Feature Groups** - a feature group is sourced by a single table/view
- Columns from the **feature group** are then registered as **features**

The **online store** is has two types to choose from:
- Cloud Bigtable online serving - highly scalable
- Optimized online serving - ultra-low latencies and responsive to burst of requests

**Feature Views** are created in the **online store** from either:
- Features from one or more **feature groups**
- a table/view of type (1) above
 
BigQuery as a **data source**:
- This means that managing time bound **features** is done in BigQuery but before the **feature store**.  You can create multiple rows per **entity** in tables and use the entity_id and feature_timestamp columns to indicate the time based values. To make this shape of source data useful for training data batches (or evaluation, test, batch prediction) there are two new functions in BigQuery to help extract point-in-time value for **entity/feature** data:
    - [ML.FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-feature-time) - will take a table and timestamp as input and return the value for each feature on each entity as of the timestamp.  There are additional optional configurations also.  
    - [ML.ENTITY_FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-entity-feature-time) - will take a input table and an additional table of entity+timestamp pairs and return the feature values for each entity+timestamp pair.  This allows both multiple points in time for single entities as well as different times for different entities.
- Time bound data, or column values that change for a row/entity, might not be the native way data scientist are used to working with data.  There are great features in BigQuery to help with handling data that changes with time.
    - Creating these tables/views with timestamp entity records may be benefited by [time-travel](https://cloud.google.com/bigquery/docs/time-travel#time_travel) (up to 7 days - configurable) and [snapshots](https://cloud.google.com/bigquery/docs/table-snapshots-intro) (user controlled points in time). You can also [query time-travel](https://cloud.google.com/bigquery/docs/access-historical-data) as well as [create snapshots from time-travel](https://cloud.google.com/bigquery/docs/table-snapshots-create#create_a_table_snapshot_using_time_travel).



---
## Colab Setup

To run this notebook in Colab click [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/statmike/vertex-ai-mlops/blob/main/Feature%20Store/Feature%20Store.ipynb) and run the cells in this section.  Otherwise, skip this section.

This cell will authenticate to GCP (follow prompts in the popup).

In [1]:
PROJECT_ID = 'statmike-mlops-349915' # replace with project ID

In [2]:
try:
    import google.colab
    from google.colab import auth
    auth.authenticate_user()
    !gcloud config set project {PROJECT_ID}
except Exception:
    pass

---
## Installs

The list `packages` contains tuples of package import names and install names.  If the import name is not found then the install name is used to install quitely for the current user.

In [2]:
# tuples of (import name, install name)
packages = [
    ('google.cloud.aiplatform', 'google-cloud-aiplatform'),
    ('google.cloud.bigquery', 'google-cloud-bigquery'),
    ('bigframes', 'bigframes'),
]

import importlib
install = False
for package in packages:
    if not importlib.util.find_spec(package[0]):
        print(f'installing package {package[1]}')
        install = True
        !pip install {package[1]} -U -q --user

### Restart Kernel (If Installs Occured)

After a kernel restart the code submission can start with the next cell after this one.

In [3]:
if install:
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

---
## Setup

inputs:

In [4]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'statmike-mlops-349915'

In [5]:
REGION = 'us-central1'
EXPERIMENT = 'workflow'
SERIES = 'feature-store'

# source data
BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'fraud'
BQ_TABLE = 'fraud_prepped'

# Model Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id,splits' # add more variables to the string with comma delimiters

packages:

In [307]:
from google.cloud import aiplatform
from datetime import datetime
import time
import numpy as np
from google.cloud import bigquery
import bigframes.pandas as bpd

clients:

In [7]:
aiplatform.init(project = PROJECT_ID, location = REGION)
bq = bigquery.Client(project = PROJECT_ID)
bpd.options.bigquery.project = PROJECT_ID

---
## Review Source Data

The data source here was prepared in [01 - BigQuery - Table Data Source](../01%20-%20Data%20Sources/01%20-%20BigQuery%20-%20Table%20Data%20Source.ipynb).

This is a table of 284,807 credit card transactions classified as fradulant or normal in the column `Class`.  In order protect confidentiality, the original features have been transformed using [principle component analysis (PCA)](https://en.wikipedia.org/wiki/Principal_component_analysis) into 28 features named `V1, V2, ... V28` (float).  Two descriptive features are provided without transformation by PCA:
- `Time` (integer) is the seconds elapsed between the transaction and the earliest transaction in the table
- `Amount` (float) is the value of the transaction

The data preparation included added splits for machine learning with a column named `splits` with 80% for training (`TRAIN`), 10% for validation (`VALIDATE`) and 10% for testing (`TEST`).  Additionally, a unique identifier was added to each transaction, `transaction_id`.  

### Connect to the BigQuery table:

In [8]:
source_data = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}')

### Review a sample of the data:

In [20]:
source_data.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V23,V24,V25,V26,V27,V28,Amount,Class,transaction_id,splits
0,146737,-8.313253,7.353339,-6.784848,-1.647257,-3.151681,-1.036468,-3.176614,4.783091,2.525343,...,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0,9c23ed6c-805e-4c54-af95-dfb8cb34446a,TRAIN
1,41342,-0.950509,1.005787,2.004522,3.180712,-0.721331,0.622745,-0.367445,0.690638,-0.831808,...,-0.019173,0.438937,-0.462187,0.328724,-0.041549,0.082586,39.52,0,f23f1435-eaf1-455c-9877-9e3b1db6473d,TRAIN
2,154083,-2.41823,-1.114104,0.0731,-1.827987,0.743047,5.017568,-2.471885,1.141392,0.812167,...,0.712716,0.746484,-0.70748,0.632214,-0.536598,0.014419,25.0,0,a147e336-81d1-47f2-87ef-206843430e00,TRAIN
3,137227,-1.125604,0.873506,2.20555,0.725739,-0.107164,1.59102,-1.093884,-2.480163,0.097936,...,-0.377725,-1.07084,0.37751,-0.257676,0.009876,-0.124529,54.0,0,2a7353de-8003-48a5-8ddf-327b42a5731d,TRAIN
4,52804,-0.336088,1.064353,2.150498,4.428434,0.463562,1.12541,0.579549,-0.118975,-1.284248,...,-0.095214,0.014986,-0.458112,0.291475,-0.00217,-0.228903,27.81,0,fd69d5aa-4c53-43a7-8ba9-32bbda68eab0,TRAIN


In [22]:
source_data.dtypes

Time                        Int64
V1                        Float64
V2                        Float64
V3                        Float64
V4                        Float64
V5                        Float64
V6                        Float64
V7                        Float64
V8                        Float64
V9                        Float64
V10                       Float64
V11                       Float64
V12                       Float64
V13                       Float64
V14                       Float64
V15                       Float64
V16                       Float64
V17                       Float64
V18                       Float64
V19                       Float64
V20                       Float64
V21                       Float64
V22                       Float64
V23                       Float64
V24                       Float64
V25                       Float64
V26                       Float64
V27                       Float64
V28                       Float64
Amount        

### Review the number of records for each level of Class (VAR_TARGET) for each of the data splits:

In [19]:
source_data.groupby(['splits', 'Class'])['Class'].count()

splits    Class
TEST      0         28455
          1            47
TRAIN     0        227664
          1           397
VALIDATE  0         28196
          1            48
Name: Class, dtype: Int64

---
## Prepare Data For Feature Store

There are two formats that can be used for feature store:
- BigQuery Source
    - A table or view of the **latest feature values** for each entity
    - This can be used to directly serve features
- BigQuery Source With Timestamped History
    - A table or view of feature values along with two columns to help derive latest values for features:
        - **entity_id** - a string value to identify the entity the features are measures of
        - **feature_timestamp** - a timestamp value uses by feature store to serve the latest non-null values of a feature
        
This section creates views of the source table in each of these formats.

## BigQuery Source View: Raw

In [114]:
columns = ','.join([column.lower() for column in source_data.columns.tolist()])

In [115]:
query = f'''
CREATE OR REPLACE VIEW `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw` AS
    SELECT {columns}
    FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}`
'''
print(query)


CREATE OR REPLACE VIEW `statmike-mlops-349915.fraud.fraud_prepped_fs_raw` AS
    SELECT time,v1,v2,v3,v4,v5,v6,v7,v8,v9,v10,v11,v12,v13,v14,v15,v16,v17,v18,v19,v20,v21,v22,v23,v24,v25,v26,v27,v28,amount,class,transaction_id,splits
    FROM `statmike-mlops-349915.fraud.fraud_prepped`



In [116]:
bq_job = bq.query(query)
bq_job.result()
(bq_job.ended - bq_job.started).total_seconds()

0.17

#### Review Data For a Single Entity

In this form there is a single row and each feature/column has the latest value of the feature.

In [117]:
source_raw = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw')
source_raw[source_raw['transaction_id'] == 'a50e10db-3573-4749-9ecb-309f5a4dc987']

Unnamed: 0,time,v1,v2,v3,v4,v5,v6,v7,v8,v9,...,v23,v24,v25,v26,v27,v28,amount,class,transaction_id,splits
38500,121839,2.10734,-1.950655,0.721045,-0.924701,-2.90679,-0.649764,-2.100326,0.147773,0.170338,...,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0,a50e10db-3573-4749-9ecb-309f5a4dc987,TRAIN


### BigQuery Source View: Latest

A view with the latest values of all features and a column for the `entity-id`, in this case `transaction_id`.

In [118]:
query = f'''
CREATE OR REPLACE VIEW `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_latest` AS
    SELECT * EXCEPT(class, splits)
    FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw`
'''
print(query)


CREATE OR REPLACE VIEW `statmike-mlops-349915.fraud.fraud_prepped_fs_latest` AS
    SELECT * EXCEPT(class, splits)
    FROM `statmike-mlops-349915.fraud.fraud_prepped_fs_raw`



In [119]:
bq_job = bq.query(query)
bq_job.result()
(bq_job.ended - bq_job.started).total_seconds()

0.145

#### Review Data For a Single Entity

In this form, just like the raw data above, there is a single row and each feature/column has the latest value of the feature.

In [120]:
source_latest = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_latest')
source_latest[source_latest['transaction_id'] == 'a50e10db-3573-4749-9ecb-309f5a4dc987']

Unnamed: 0,time,v1,v2,v3,v4,v5,v6,v7,v8,v9,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,transaction_id
40139,121839,2.10734,-1.950655,0.721045,-0.924701,-2.90679,-0.649764,-2.100326,0.147773,0.170338,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,a50e10db-3573-4749-9ecb-309f5a4dc987


### BigQuery Source View: History

A deconstruction to an example history table version of the source data - **AS AN EXAMPLE**.  In this version, the feature values for each entiy arrived at different times.

To illustrate a history tables shape this query:
- assigns each `transaction_id` a `feature_timestamp` at a random minute in the last 14 days
- assigns arrival times to features V1, V2, V3, V4, V5 that are after the other features.  

#### Example: Deconstruct the table into an example history table

In [178]:
query = f'''
CREATE OR REPLACE TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history` AS 
WITH
    TRANSACTION_TIME AS (
        SELECT transaction_id,
            TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL CAST(CEIL(RAND()*60*24*14) + 1440 AS INT64) MINUTE) as feature_timestamp
        FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw`
    ),
    FEATURE_DELAYS AS (
        SELECT *
        FROM (SELECT * FROM TRANSACTION_TIME)
        CROSS JOIN (
            SELECT *, ROW_NUMBER() OVER(ORDER BY delay) AS pos FROM UNNEST(ARRAY(
                (SELECT CAST(CEIL(RAND()*1440) AS INT64) FROM UNNEST(GENERATE_ARRAY(1, 6)))
            )) AS delay
        )   
    )
SELECT
    transaction_id as entity_id,
    TIMESTAMP_ADD(feature_timestamp, INTERVAL delay MINUTE) AS feature_timestamp,
    CASE
        WHEN pos = 2 THEN v1
        ELSE NULL
    END AS v1,
    CASE
        WHEN pos = 3 THEN v2
        ELSE NULL
    END AS v2,
    CASE
        WHEN pos = 4 THEN v3
        ELSE NULL
    END AS v3,
    CASE
        WHEN pos = 5 THEN v4
        ELSE NULL
    END AS v4,
    CASE
        WHEN pos = 6 THEN v5
        ELSE NULL
    END AS v5,
    * EXCEPT(pos, delay, feature_timestamp, v1, v2, v3, v4, v5, transaction_id, splits),
FROM FEATURE_DELAYS
LEFT JOIN `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw` USING(transaction_id)
ORDER BY transaction_id, feature_timestamp    
'''
print(query)


CREATE OR REPLACE TABLE `statmike-mlops-349915.fraud.fraud_prepped_fs_history` AS 
WITH
    TRANSACTION_TIME AS (
        SELECT transaction_id,
            TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL CAST(CEIL(RAND()*60*24*14) + 1440 AS INT64) MINUTE) as feature_timestamp
        FROM `statmike-mlops-349915.fraud.fraud_prepped_fs_raw`
    ),
    FEATURE_DELAYS AS (
        SELECT *
        FROM (SELECT * FROM TRANSACTION_TIME)
        CROSS JOIN (
            SELECT *, ROW_NUMBER() OVER(ORDER BY delay) AS pos FROM UNNEST(ARRAY(
                (SELECT CAST(CEIL(RAND()*1440) AS INT64) FROM UNNEST(GENERATE_ARRAY(1, 6)))
            )) AS delay
        )   
    )
SELECT
    transaction_id as entity_id,
    TIMESTAMP_ADD(feature_timestamp, INTERVAL delay MINUTE) AS feature_timestamp,
    CASE
        WHEN pos = 2 THEN v1
        ELSE NULL
    END AS v1,
    CASE
        WHEN pos = 3 THEN v2
        ELSE NULL
    END AS v2,
    CASE
        WHEN pos = 4 THEN v3
        ELSE NULL
    END A

In [179]:
bq_job = bq.query(query)
bq_job.result()
(bq_job.ended - bq_job.started).total_seconds()

13.286

#### Review Data For a Single Entity

In this form there are multiple rows for an entity.  Notice that features V1-V5 arrived over time.  At the earliest time none have values and then each arrives over the course of about 24 hours in this example.  There could be multiple values for a single feature over time: think account balance, inventory available, time on website, number of clicks, etc.

In [180]:
source_history = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history')

In [181]:
example = source_history[source_history['entity_id'] == 'a50e10db-3573-4749-9ecb-309f5a4dc987'].sort_values('feature_timestamp')
example

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
818599,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-06 13:12:01.873668+00:00,,,,,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
348079,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-06 15:10:01.873668+00:00,2.10734,,,,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
1619978,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 01:18:01.873668+00:00,,-1.950655,,,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
1688813,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 04:37:01.873668+00:00,,,0.721045,,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
25772,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 08:40:01.873668+00:00,,,,-0.924701,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
1282819,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 11:01:01.873668+00:00,,,,,-2.90679,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0


In [182]:
example_entity = example['entity_id'].iloc[0]
example_mintime = example['feature_timestamp'].min().strftime('%Y-%m-%d %H:%M:%S.%f%z') #'2023-01-01 12:00:00+00'
example_maxtime = example['feature_timestamp'].max().strftime('%Y-%m-%d %H:%M:%S%z') #'2023-01-01 12:00:00+00'
example_maxtimeFull = example['feature_timestamp'].max().strftime('%Y-%m-%d %H:%M:%S.%f%z') #'2023-01-01 12:00:00+00'

example_entity, example_mintime, example_maxtime, example_maxtimeFull

('a50e10db-3573-4749-9ecb-309f5a4dc987',
 '2024-02-06 13:12:01.873668+0000',
 '2024-02-07 11:01:01+0000',
 '2024-02-07 11:01:01.873668+0000')

### Using A Feature History Table In BigQuery (Batch Mode)

A history table version of features is not directly ready for use in machine learning. There are BigQuery functions available to make it easy to create training, evaluation, and other sets of instances from a history table.

- [ML.FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-feature-time)
    - specify a point-in-time cutoff for all entities when retrieving features.  Prevents [data leakage](https://en.wikipedia.org/wiki/Leakage_(machine_learning)).
- [ML.ENTITY_FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-entity-feature-time)
    - specify a table of entity values and timestamps to retrieve point-in-time features for all combinations

#### Features At Point-In-Time

Retrieve feature values at a point-in-time for all entity values.

In [183]:
bpd.read_gbq(f'''
SELECT *
FROM
    ML.FEATURES_AT_TIME(
        TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history`,
        time => '{example_maxtime}',
        num_rows => 1,
        ignore_feature_nulls => TRUE
    )
WHERE entity_id = '{example_entity}'
''')

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
0,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 11:01:01+00:00,2.10734,-1.950655,0.721045,-0.924701,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0


In [184]:
bpd.read_gbq(f'''
SELECT *
FROM
    ML.FEATURES_AT_TIME(
        TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history`,
        time => '{example_maxtimeFull}',
        num_rows => 1,
        ignore_feature_nulls => TRUE
    )
WHERE entity_id = '{example_entity}'
''')

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
0,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 11:01:01.873668+00:00,2.10734,-1.950655,0.721045,-0.924701,-2.90679,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0


#### Entity Features At Point-In-Time

Retrieve feature values for select entity and point-in-time values.

In [185]:
bpd.read_gbq(f'''
WITH
    ENTITY_TIME_TABLE AS (
        SELECT '{example_entity}' as entity_id, TIMESTAMP('{example_mintime}') as time UNION ALL
        SELECT '{example_entity}' as entity_id, TIMESTAMP('{example_maxtime}') as time UNION ALL
        SELECT '{example_entity}' as entity_id, TIMESTAMP('{example_maxtimeFull}') as time
    )
SELECT *
FROM
    ML.ENTITY_FEATURES_AT_TIME(
        TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history`,
        TABLE ENTITY_TIME_TABLE,
        num_rows => 1,
        ignore_feature_nulls => TRUE
    )
WHERE entity_id = '{example_entity}'
''')

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
0,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-06 13:12:01.873668+00:00,,,,,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
1,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 11:01:01+00:00,2.10734,-1.950655,0.721045,-0.924701,,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0
2,a50e10db-3573-4749-9ecb-309f5a4dc987,2024-02-07 11:01:01.873668+00:00,2.10734,-1.950655,0.721045,-0.924701,-2.90679,121839,-0.649764,-2.100326,...,-0.173015,0.152649,0.432282,0.875979,-0.639515,-0.241636,0.073953,-0.023142,27.0,0


---
## Feature Store

from google.cloud.aiplatform_v1 import (FeatureOnlineStoreAdminServiceClient,
                                        FeatureOnlineStoreServiceClient,
                                        FeatureRegistryServiceClient)

**API Links (Python)**
- [Feature Registry Service](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.services.feature_registry_service)
- [Feature Online Store Admin Service](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.services.feature_online_store_admin_service)
- [Feature Online Store Service](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.services.feature_online_store_service)

### Feature Registry Client

Used for creating Feature Groups, Features from groups

In [196]:
registry_client = aiplatform.gapic.FeatureRegistryServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

### Create A Feature Group

The history version of the feature table(s) can be registered to the Feature Registry.

**References:**
- [Create a feature group](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-featuregroup)

In [197]:
FEATURE_GROUP_NAME = 'transactions'

In [198]:
try:
    feature_group = registry_client.get_feature_group(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureGroups/{FEATURE_GROUP_NAME}')
except Exception:
    create_group = registry_client.create_feature_group(
        request = aiplatform.gapic.CreateFeatureGroupRequest(
            parent=f"projects/{PROJECT_ID}/locations/{REGION}",
            feature_group_id = FEATURE_GROUP_NAME,
            feature_group = aiplatform.gapic.FeatureGroup(
                big_query = aiplatform.gapic.FeatureGroup.BigQuery(
                    big_query_source = aiplatform.gapic.BigQuerySource(input_uri = f'bq://{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history'),
                    entity_id_columns = ['entity_id']
                )
            )
        )
    )
    feature_group = create_group.result()
    
feature_group.name

'projects/1026793852137/locations/us-central1/featureGroups/transactions'

In [324]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project=statmike-mlops-349915


### Create Features

**References:**
- [Create a feature](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-feature)

In [199]:
source_history.columns.tolist()

['entity_id',
 'feature_timestamp',
 'v1',
 'v2',
 'v3',
 'v4',
 'v5',
 'time',
 'v6',
 'v7',
 'v8',
 'v9',
 'v10',
 'v11',
 'v12',
 'v13',
 'v14',
 'v15',
 'v16',
 'v17',
 'v18',
 'v19',
 'v20',
 'v21',
 'v22',
 'v23',
 'v24',
 'v25',
 'v26',
 'v27',
 'v28',
 'amount',
 'class']

In [200]:
features = []
for column in source_history.columns.tolist():
    if column not in ['entity_id', 'feature_timestamp']:
        try:
            feature = registry_client.get_feature(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureGroups/{FEATURE_GROUP_NAME}/features/{column}')
            features.append(feature)
        except Exception:
            create_feature = registry_client.create_feature(
                request = aiplatform.gapic.CreateFeatureRequest(
                    parent = response.name,
                    feature_id = column,
                    feature = aiplatform.gapic.Feature(name = column)
                )
            )
            features.append(create_feature.result())

In [201]:
for feature in features:
    print(feature.name)

projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v1
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v2
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v3
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v4
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v5
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/time
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v6
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v7
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v8
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v9
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v10
projects/1026793852137/locations/us-central1/featureGroups/transactions/f

In [325]:
print(f'Review in the console (expand feature group):\n\nhttps://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project={PROJECT_ID}')

Review in the console (expand feature group):

https://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project=statmike-mlops-349915


### Feature Online Store Admin Client

Used to create online stores and feature views

In [203]:
online_admin_client = aiplatform.gapic.FeatureOnlineStoreAdminServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

### Create Online Store

**NOTE:** This can take around 10 minutes

**Reference:**
- [Online Serving Types](https://cloud.google.com/vertex-ai/docs/featurestore/latest/online-serving-types)
- [Create an Online Store Instance](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-onlinestore)

In [207]:
FEATURE_ONLINE_STORE_NAME = 'featurestore'

In [212]:
try:
    online_store = online_admin_client.get_feature_online_store(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureOnlineStores/{FEATURE_ONLINE_STORE_NAME}')
except Exception:
    create_online_store = online_admin_client.create_feature_online_store(
        request = aiplatform.gapic.CreateFeatureOnlineStoreRequest(
            parent = f'projects/{PROJECT_ID}/locations/{REGION}',
            feature_online_store_id = FEATURE_ONLINE_STORE_NAME,
            feature_online_store = aiplatform.gapic.FeatureOnlineStore(
                bigtable = aiplatform.gapic.FeatureOnlineStore.Bigtable(
                    auto_scaling = aiplatform.gapic.FeatureOnlineStore.Bigtable.AutoScaling(
                        min_node_count = 1,
                        max_node_count = 2,
                        cpu_utilization_target = 50
                    )
                )
            )
        )
    )
    online_store = create_online_store.result()
    
online_store.name

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore'

In [330]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore?project=statmike-mlops-349915


### Create Feature View: From Feature Registry

Create a feature view that syncs the latest values for each feature using the feature registry features.

**Reference:**
- [Create a feature view from feature groups](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-featureview#create_a_feature_view_from_feature_groups)

In [224]:
REGISTRY_FEATURE_VIEW_NAME = 'registry_feature_view'

In [225]:
try:
    registry_view = online_admin_client.get_feature_view(name = f'{online_store.name}/featureViews/{REGISTRY_FEATURE_VIEW_NAME}')
except Exception:
    create_registry_view = online_admin_client.create_feature_view(
        request = aiplatform.gapic.CreateFeatureViewRequest(
            parent = online_store.name,
            feature_view_id = REGISTRY_FEATURE_VIEW_NAME,
            feature_view = aiplatform.gapic.FeatureView(
                feature_registry_source = aiplatform.gapic.FeatureView.FeatureRegistrySource(
                    feature_groups = [
                        aiplatform.gapic.FeatureView.FeatureRegistrySource.FeatureGroup(
                            feature_group_id = FEATURE_GROUP_NAME,
                            feature_ids = [feature.name.split('/')[-1] for feature in features if not feature.name.endswith('class')]
                        )
                    ]
                ),
                sync_config = aiplatform.gapic.FeatureView.SyncConfig(cron = 'TZ=America/New_York 10 * * * *')
            ),
            run_sync_immediately = True
        )
    )
    registry_view = create_registry_view.result()
    
registry_view.name

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/registry_feature_view'

In [331]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore/feature-views/{REGISTRY_FEATURE_VIEW_NAME}?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore/feature-views/registry_feature_view?project=statmike-mlops-349915


### Create Feature View: From BigQuery Source

Create a feature view directly from a BigQuery table/view, the 'latest' version create above.

**Reference:**
- [Create a feature view from a BigQuery source](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-featureview#create_from_bq)

In [226]:
BQ_FEATURE_VIEW_NAME = 'bq_feature_view'

In [228]:
try:
    bq_view = online_admin_client.get_feature_view(name = f'{online_store.name}/featureViews/{BQ_FEATURE_VIEW_NAME}')
except Exception:
    create_bq_view = online_admin_client.create_feature_view(
        request = aiplatform.gapic.CreateFeatureViewRequest(
            parent = online_store.name,
            feature_view_id = BQ_FEATURE_VIEW_NAME,
            feature_view = aiplatform.gapic.FeatureView(
                big_query_source = aiplatform.gapic.FeatureView.BigQuerySource(
                    uri = f'bq://{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_latest',
                    entity_id_columns = ['transaction_id']
                ),
                sync_config = aiplatform.gapic.FeatureView.SyncConfig(cron = 'TZ=America/New_York 10 * * * *')
            ),
            run_sync_immediately = True
        )
    )
    bq_view = create_bq_view.result()
    
bq_view.name

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/bq_feature_view'

In [332]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore/feature-views/{BQ_FEATURE_VIEW_NAME}?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore/feature-views/bq_feature_view?project=statmike-mlops-349915


### Start Sync Manually: BQ View

Manually start a sync for the feature view create from BigQuery source.

**References:**
- [Sync feature data to online store](https://cloud.google.com/vertex-ai/docs/featurestore/latest/sync-data)
- [List sync operations](https://cloud.google.com/vertex-ai/docs/featurestore/latest/list-data-syncs)

In [239]:
bq_sync = online_admin_client.sync_feature_view(feature_view = bq_view.name)

In [241]:
bq_sync.feature_view_sync

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/bq_feature_view/featureViewSyncs/4208798999227924480'

In [243]:
while True:
    feature_view_sync = online_admin_client.get_feature_view_sync(name = bq_sync.feature_view_sync)
    if feature_view_sync.run_time.end_time.seconds > 0:
        status = feature_view_sync.final_status.code
        break
    else:
        print('waiting for 20 seconds...')
    time.sleep(20)
    
if status == 0: print('Succeeded!')
else: print('Failed!')

Succeeded!


In [245]:
online_admin_client.list_feature_view_syncs(parent = bq_view.name)

ListFeatureViewSyncsPager<feature_view_syncs {
  name: "projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/bq_feature_view/featureViewSyncs/4208798999227924480"
  create_time {
    seconds: 1708041467
    nanos: 578009000
  }
  run_time {
    start_time {
      seconds: 1708041467
      nanos: 578009000
    }
    end_time {
      seconds: 1708041504
      nanos: 472163000
    }
  }
  final_status {
  }
}
>

In [334]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore/feature-views/{BQ_FEATURE_VIEW_NAME}?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore/feature-views/bq_feature_view?project=statmike-mlops-349915


### Online Serving From Feature Views

Request feature values for and entity from a feature view:
    
**References:**
- [Serve feature values](https://cloud.google.com/vertex-ai/docs/featurestore/latest/serve-feature-values)

In [233]:
online_serve_client = aiplatform.gapic.FeatureOnlineStoreServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

In [234]:
example_entity

'a50e10db-3573-4749-9ecb-309f5a4dc987'

In [294]:
dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = bq_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = example_entity),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)

{'v14': -0.964829191473946,
 'v12': -1.0665184783586,
 'v9': 0.17033769786237102,
 'v8': 0.147772742443229,
 'v25': -0.6395147739189879,
 'v28': -0.0231421834269002,
 'v5': -2.9067897799963904,
 'v27': 0.0739527453274154,
 'v22': 0.15264858896539701,
 'v20': -0.566624448543521,
 'v23': 0.4322817611050171,
 'v17': 1.1853342859171698,
 'v7': -2.100326174014,
 'v6': -0.649764256663163,
 'v3': 0.7210449871770799,
 'v4': -0.924701055979975,
 'v18': -0.0384228099450662,
 'v21': -0.17301465753094503,
 'v15': -1.0637317456920101,
 'v11': -1.04281199064144,
 'v10': 1.35375755940974,
 'v16': -0.787845884246518,
 'v2': -1.9506548012839198,
 'v19': -0.287847177705979,
 'amount': 27.0,
 'v13': -1.49321985962412,
 'time': 121839.0,
 'v24': 0.87597920343245,
 'v26': -0.24163617508098004,
 'v1': 2.10733996277675}

In [295]:
dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = registry_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = example_entity),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)

{'v14': -0.964829191473946,
 'v12': -1.0665184783586,
 'v9': 0.17033769786237102,
 'v8': 0.147772742443229,
 'v5': -2.9067897799963904,
 'v28': -0.0231421834269002,
 'v25': -0.6395147739189879,
 'v27': 0.0739527453274154,
 'v22': 0.15264858896539701,
 'v20': -0.566624448543521,
 'v23': 0.4322817611050171,
 'v17': 1.1853342859171698,
 'v7': -2.100326174014,
 'v6': -0.649764256663163,
 'feature_timestamp': 1708042205262472.0,
 'v3': 0.7210449871770799,
 'v4': -0.924701055979975,
 'v18': -0.0384228099450662,
 'v21': -0.17301465753094503,
 'v11': -1.04281199064144,
 'v15': -1.0637317456920101,
 'v10': 1.35375755940974,
 'v16': -0.787845884246518,
 'v2': -1.9506548012839198,
 'v19': -0.287847177705979,
 'v13': -1.49321985962412,
 'amount': 27.0,
 'time': 121839.0,
 'v24': 0.87597920343245,
 'v26': -0.24163617508098004,
 'v1': 2.10733996277675}

---
## Model Inference: Serving Using Feature Store

In [this repository](https://github.com/statmike/vertex-ai-mlops), there are many notebook workflows that build models with the data source used for feature store here.  The notebook workflows in the [05 - Tensorflow](../05%20-%20TensorFlow/readme.md) folder all update the same endpoint.  If any of the notebook have been run then there will be an endpoint with model deployed to it.  This is used below to serve prediction using feature store as the source for instances.

There are many ways to request online prediction from and endpoint and they are covered in [05Tools - Prediction - Online](../05%20-%20TensorFlow/05Tools%20-%20Prediction%20-%20Online.ipynb).


### Get Endpoint

[Endpoint Properties and Methods](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.Endpoint):

```python
endpoint
endpoint.display_name
endpoint.resource_name
endpoint.traffic_split
endpoint.list_models()
```

In [250]:
endpoints = aiplatform.Endpoint.list(filter = f"labels.series=05 AND display_name=05")
if endpoints:
    endpoint = endpoints[0]
    print(f"Endpoint Exists: {endpoints[0].resource_name}")
else:
    print(f"There does not appear to be an endpoint for SERIES = 05")

Endpoint Exists: projects/1026793852137/locations/us-central1/endpoints/725723853820526592


In [251]:
endpoint.display_name

'05'

In [333]:
print(f'Review the Endpoint in the Console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/endpoints/{endpoint.name}?project={PROJECT_ID}')

Review the Endpoint in the Console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/endpoints/725723853820526592?project=statmike-mlops-349915


In [253]:
endpoint.traffic_split

{'2423682068408958976': 100}

In [254]:
endpoint.list_models()[0]

dedicated_resources {
  machine_spec {
    machine_type: "n1-standard-4"
  }
  min_replica_count: 1
  max_replica_count: 1
}
id: "2423682068408958976"
model: "projects/1026793852137/locations/us-central1/models/model_05_05"
model_version_id: "18"
display_name: "05_05"
create_time {
  seconds: 1703181253
  nanos: 58849000
}

---
### Get Predictions: Python Client

[aiplatform.Endpoint.predict()](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.Endpoint#google_cloud_aiplatform_Endpoint_predict)

In [345]:
instance = dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = registry_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = example_entity),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)

In [346]:
instance = {key.capitalize():[val] for key, val in instance.items() if key not in ['feature_timestamp']}
instance

{'V14': [-0.964829191473946],
 'V12': [-1.0665184783586],
 'V9': [0.17033769786237102],
 'V8': [0.147772742443229],
 'V25': [-0.6395147739189879],
 'V28': [-0.0231421834269002],
 'V5': [-2.9067897799963904],
 'V27': [0.0739527453274154],
 'V22': [0.15264858896539701],
 'V20': [-0.566624448543521],
 'V23': [0.4322817611050171],
 'V17': [1.1853342859171698],
 'V7': [-2.100326174014],
 'V6': [-0.649764256663163],
 'V3': [0.7210449871770799],
 'V4': [-0.924701055979975],
 'V18': [-0.0384228099450662],
 'V21': [-0.17301465753094503],
 'V11': [-1.04281199064144],
 'V15': [-1.0637317456920101],
 'V10': [1.35375755940974],
 'V16': [-0.787845884246518],
 'V2': [-1.9506548012839198],
 'V19': [-0.287847177705979],
 'Amount': [27.0],
 'V13': [-1.49321985962412],
 'Time': [121839.0],
 'V26': [-0.24163617508098004],
 'V24': [0.87597920343245],
 'V1': [2.10733996277675]}

In [347]:
prediction = endpoint.predict(instances = [instance])
prediction

Prediction(predictions=[[0.996969879, 0.00303014624]], deployed_model_id='2423682068408958976', model_version_id='18', model_resource_name='projects/1026793852137/locations/us-central1/models/model_05_05', explanations=None)

In [348]:
prediction.predictions[0]

[0.996969879, 0.00303014624]

In [349]:
np.argmax(prediction.predictions[0])

0

## Cleanup

This deletes the Feature Store objects: Registry Views, Online STore, Feature, then Feature Group, 

In [219]:
# delete registry view
online_admin_client.delete_feature_view(name = registry_view.name)
# delete bq view
online_admin_client.delete_feature_view(name = bq_view.name)



In [None]:
# delete online store
online_admin_client.delete_feature_online_store(name = online_store.name, force = True)

In [194]:
# delete features
for feature in features:
    registry_client.delete_feature(name = feature.name)

In [195]:
# delete feature group
registry_client.delete_feature_group(name = feature_group.name)

