![tracker](https://us-central1-vertex-ai-mlops-369716.cloudfunctions.net/pixel-tracking?path=statmike%2Fvertex-ai-mlops%2FMLOps%2FFeature+Store&file=Feature+Store.ipynb)
<!--- header table --->
<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/statmike/vertex-ai-mlops/blob/main/MLOps/Feature%20Store/Feature%20Store.ipynb">
      <img width="32px" src="https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg" alt="Google Colaboratory logo">
      <br>Run in<br>Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https%3A%2F%2Fraw.githubusercontent.com%2Fstatmike%2Fvertex-ai-mlops%2Fmain%2FMLOps%2FFeature%2520Store%2FFeature%2520Store.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo">
      <br>Run in<br>Colab Enterprise
    </a>
  </td>      
  <td style="text-align: center">
    <a href="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/Feature%20Store/Feature%20Store.ipynb">
      <img width="32px" src="https://www.svgrepo.com/download/217753/github.svg" alt="GitHub logo">
      <br>View on<br>GitHub
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/statmike/vertex-ai-mlops/main/MLOps/Feature%20Store/Feature%20Store.ipynb">
      <img width="32px" src="https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg" alt="Vertex AI logo">
      <br>Open in<br>Vertex AI Workbench
    </a>
  </td>
</table>

---

**File Move Notices**

This file moved locations:
- On 09/08/2024 (mm/dd/yyyy)
	- From: `Feature Store/Feature Store.ipynb`
	- To: `MLOps/Feature Store/Feature Store.ipynb`
---
<!---end of move notices--->

# Feature Store

In ML, instances (think rows) of data are used to both train models, and request predictions.  The transformation of the colomns in these instances into features by feature engineering is essential for input at training and serving.  Thus, feature management is important.

A feature store centralizes the tasks of creating, maintaining, sharing, and serving ML features.  This allows for easier reuse, faster ML development, less training/serving skew, and discoverability of features and feature sets.

In Vertex AI, this is covered by [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore) of which there are two types:
- [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore#vaifs)
    - Covered in this workflow
- [Vertex AI Feature Store (Legacy)](https://cloud.google.com/vertex-ai/docs/featurestore#vaifs_legacy)
    - Covered in the prior workflow at [Feature Store (Legacy)](./Feature%20Store%20%28Legacy%29.ipynb)
- [Comparison](https://cloud.google.com/vertex-ai/docs/featurestore#comparison_between_and)

**Prerequisites:**
-  [01 - BigQuery - Table Data Source](../../01%20-%20Data%20Sources/01%20-%20BigQuery%20-%20Table%20Data%20Source.ipynb)

**Resources:**
- [Documentation](https://cloud.google.com/vertex-ai/docs/featurestore/latest/overview)
- [Tutorial from Documentation](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/feature_store/online_feature_serving_and_fetching_bigquery_data_with_feature_store_bigtable.ipynb)

**Todo List:**
- show multiple feature groups for one feature view
- add embedding to this datasource and include embedding search
    - use as opportunity to show multiple tables > feature groups > feature View
- add dataplex data catalog narrative
- add FS management section: add/delete/list/update
- add example of multiple groups feeding features to the same online view
- show prediction with feature store integration

---
**tl;dr**

<p align="center" ><center>
    <img src="../resources/images/created/featurestore/overview.png" width="50%">
</center></p>

The main layout for Feature Store is serving environment for **features** observed on **entities**:
- **entity** = a unique record, think row
- **feature** = observations, input for ML, think column

The **offline store** is made up of any BigQuery Table(s)/View(s), the **data source**, that you manage:
- (1) If a table/view has a single row per unique **entity** with columns that are non-changing values for **features** then the table can be directly used in an **online store's** **feature view** (see below).
- (2) For time bound **features** the table/view needs to have two additional columns: entity_id, feature_timestamp. Think of this as a history table.

The **feature registry**:
- Tables/Views of type (2) above are registered as **Feature Groups** - a feature group is sourced by a single table/view
- Columns from the **feature group** are then registered as **features**

The **online store** is has two types to choose from:
- Cloud Bigtable online serving - highly scalable
- Optimized online serving - ultra-low latencies and responsive to burst of requests

**Feature Views** are created in the **online store** from either:
- Features from one or more **feature groups**
- a table/view of type (1) above
 
BigQuery as a **data source**:
- This means that managing time bound **features** is done in BigQuery but before the **feature store**.  You can create multiple rows per **entity** in tables and use the entity_id and feature_timestamp columns to indicate the time based values. To make this shape of source data useful for training data batches (or evaluation, test, batch prediction) there are two new functions in BigQuery to help extract point-in-time value for **entity/feature** data:
    - [ML.FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-feature-time) - will take a table and timestamp as input and return the value for each feature on each entity as of the timestamp.  There are additional optional configurations also.  
    - [ML.ENTITY_FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-entity-feature-time) - will take a input table and an additional table of entity+timestamp pairs and return the feature values for each entity+timestamp pair.  This allows both multiple points in time for single entities as well as different times for different entities.
- Time bound data, or column values that change for a row/entity, might not be the native way data scientist are used to working with data.  There are great features in BigQuery to help with handling data that changes with time.
    - Creating these tables/views with timestamp entity records may be benefited by [time-travel](https://cloud.google.com/bigquery/docs/time-travel#time_travel) (up to 7 days - configurable) and [snapshots](https://cloud.google.com/bigquery/docs/table-snapshots-intro) (user controlled points in time). You can also [query time-travel](https://cloud.google.com/bigquery/docs/access-historical-data) as well as [create snapshots from time-travel](https://cloud.google.com/bigquery/docs/table-snapshots-create#create_a_table_snapshot_using_time_travel).



---
## Colab Setup

When running this notebook in [Colab](https://colab.google/) or [Colab Enterprise](https://cloud.google.com/colab/docs/introduction), this section will authenticate to GCP (follow prompts in the popup) and set the current project for the session.

In [1]:
PROJECT_ID = 'statmike-mlops-349915' # replace with project ID

In [2]:
try:
    from google.colab import auth
    auth.authenticate_user(project_id = PROJECT_ID)
    print('Colab authorized to GCP')
except Exception:
    print('Not a Colab Environment')
    pass

Not a Colab Environment


---
## Installs

The list `packages` contains tuples of package import names and install names.  If the import name is not found then the install name is used to install quitely for the current user.

In [3]:
# tuples of (import name, install name)
packages = [
    ('google.cloud.aiplatform', 'google-cloud-aiplatform'),
    ('google.cloud.bigquery', 'google-cloud-bigquery'),
    ('bigframes', 'bigframes'),
]

import importlib
install = False
for package in packages:
    if not importlib.util.find_spec(package[0]):
        print(f'installing package {package[1]}')
        install = True
        !pip install {package[1]} -U -q --user

### Restart Kernel (If Installs Occured)

After a kernel restart the code submission can start with the next cell after this one.

In [4]:
if install:
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

---
## Setup

inputs:

In [5]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'statmike-mlops-349915'

In [161]:
REGION = 'us-central1'
EXPERIMENT = 'workflow'
SERIES = 'feature-store'

# source data
BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'fraud'
BQ_TABLE = 'fraud_prepped'

packages:

In [7]:
from google.cloud import aiplatform
from datetime import datetime, timedelta
import time
import numpy as np
import asyncio
from google.cloud import bigquery
import bigframes.pandas as bpd

clients:

In [8]:
aiplatform.init(project = PROJECT_ID, location = REGION)
bq = bigquery.Client(project = PROJECT_ID)
#bpd.options.bigquery.project = PROJECT_ID

---
## Review Source Data

The data source here was prepared in [01 - BigQuery - Table Data Source](../../01%20-%20Data%20Sources/01%20-%20BigQuery%20-%20Table%20Data%20Source.ipynb).

This is a table of 284,807 credit card transactions classified as fradulant or normal in the column `Class`.  In order protect confidentiality, the original features have been transformed using [principle component analysis (PCA)](https://en.wikipedia.org/wiki/Principal_component_analysis) into 28 features named `V1, V2, ... V28` (float).  Two descriptive features are provided without transformation by PCA:
- `Time` (integer) is the seconds elapsed between the transaction and the earliest transaction in the table
- `Amount` (float) is the value of the transaction

The data preparation included added splits for machine learning with a column named `splits` with 80% for training (`TRAIN`), 10% for validation (`VALIDATE`) and 10% for testing (`TEST`).  Additionally, a unique identifier was added to each transaction, `transaction_id`.  

### Connect to the BigQuery table:

In [9]:
source_data = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}')

### Review a sample of the data:

In [10]:
source_data.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V23,V24,V25,V26,V27,V28,Amount,Class,transaction_id,splits
0,146737,-8.313253,7.353339,-6.784848,-1.647257,-3.151681,-1.036468,-3.176614,4.783091,2.525343,...,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0,9c23ed6c-805e-4c54-af95-dfb8cb34446a,TRAIN
1,41342,-0.950509,1.005787,2.004522,3.180712,-0.721331,0.622745,-0.367445,0.690638,-0.831808,...,-0.019173,0.438937,-0.462187,0.328724,-0.041549,0.082586,39.52,0,f23f1435-eaf1-455c-9877-9e3b1db6473d,TRAIN
2,154083,-2.41823,-1.114104,0.0731,-1.827987,0.743047,5.017568,-2.471885,1.141392,0.812167,...,0.712716,0.746484,-0.70748,0.632214,-0.536598,0.014419,25.0,0,a147e336-81d1-47f2-87ef-206843430e00,TRAIN
3,137227,-1.125604,0.873506,2.20555,0.725739,-0.107164,1.59102,-1.093884,-2.480163,0.097936,...,-0.377725,-1.07084,0.37751,-0.257676,0.009876,-0.124529,54.0,0,2a7353de-8003-48a5-8ddf-327b42a5731d,TRAIN
4,52804,-0.336088,1.064353,2.150498,4.428434,0.463562,1.12541,0.579549,-0.118975,-1.284248,...,-0.095214,0.014986,-0.458112,0.291475,-0.00217,-0.228903,27.81,0,fd69d5aa-4c53-43a7-8ba9-32bbda68eab0,TRAIN


In [11]:
source_data.dtypes

Time                Int64
V1                Float64
V2                Float64
V3                Float64
V4                Float64
V5                Float64
V6                Float64
V7                Float64
V8                Float64
V9                Float64
V10               Float64
V11               Float64
V12               Float64
V13               Float64
V14               Float64
V15               Float64
V16               Float64
V17               Float64
V18               Float64
V19               Float64
V20               Float64
V21               Float64
V22               Float64
V23               Float64
V24               Float64
V25               Float64
V26               Float64
V27               Float64
V28               Float64
Amount            Float64
Class               Int64
transaction_id     string
splits             string
dtype: object

### Review the number of records for each level of `Class` (the target/label) for each of the data splits:

In [12]:
source_data.groupby(['splits', 'Class'])['Class'].count()

splits    Class
TEST      0         28455
          1            47
TRAIN     0        227664
          1           397
VALIDATE  0         28196
          1            48
Name: Class, dtype: Int64

### Get Row Level IDs

Grab a short list of `transaction_id` values from the source data to use in examples and testing throughout this workflow.

In [17]:
transaction_ids = list(source_data['transaction_id'].head(10))

In [18]:
transaction_ids

['9c23ed6c-805e-4c54-af95-dfb8cb34446a',
 'f23f1435-eaf1-455c-9877-9e3b1db6473d',
 'a147e336-81d1-47f2-87ef-206843430e00',
 '2a7353de-8003-48a5-8ddf-327b42a5731d',
 'fd69d5aa-4c53-43a7-8ba9-32bbda68eab0',
 'cec92e71-e96a-4b27-b88e-7a0f18c1eb18',
 '4a40f0d6-bad3-4097-a649-0c4ff0dec9af',
 '0d4fa1d8-7ca3-46b5-8761-3c1cca038c53',
 '59a9a10a-6129-4e5b-9b42-3525c9b034c7',
 'ddfa46db-ead1-484f-975c-f7f2d065ee04']

---
## Prepare Data For Feature Store

There are two formats that can be used for feature store:
- BigQuery Source
    - A table or view of the **latest feature values** for each entity
    - This can be used to directly serve features
- BigQuery Source With Timestamped History
    - A table or view of feature values along with two columns to help derive latest values for features:
        - **entity_id** - a string value to identify the entity the features are measures of
        - **feature_timestamp** - a timestamp value uses by feature store to serve the latest non-null values of a feature
        
This section creates views of the source table in each of these formats.

### BigQuery Source View: Raw

In [13]:
columns = ','.join([column.lower() for column in source_data.columns.tolist()])

In [14]:
query = f'''
CREATE OR REPLACE VIEW `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw` AS
    SELECT {columns}
    FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}`
'''
print(query)


CREATE OR REPLACE VIEW `statmike-mlops-349915.fraud.fraud_prepped_fs_raw` AS
    SELECT time,v1,v2,v3,v4,v5,v6,v7,v8,v9,v10,v11,v12,v13,v14,v15,v16,v17,v18,v19,v20,v21,v22,v23,v24,v25,v26,v27,v28,amount,class,transaction_id,splits
    FROM `statmike-mlops-349915.fraud.fraud_prepped`



In [15]:
bq_job = bq.query(query)
bq_job.result()
(bq_job.ended - bq_job.started).total_seconds()

0.137

#### Review Data For a Single Entity

In this form there is a single row and each feature/column has the latest value of the feature.

In [19]:
source_raw = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw')

In [20]:
source_raw[source_raw['transaction_id'] == transaction_ids[0]]

Unnamed: 0,time,v1,v2,v3,v4,v5,v6,v7,v8,v9,...,v23,v24,v25,v26,v27,v28,amount,class,transaction_id,splits
0,146737,-8.313253,7.353339,-6.784848,-1.647257,-3.151681,-1.036468,-3.176614,4.783091,2.525343,...,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0,9c23ed6c-805e-4c54-af95-dfb8cb34446a,TRAIN


### BigQuery Source View: Latest

A view with the latest values of all features and a column for the `entity-id`, in this case `transaction_id`.

In [21]:
query = f'''
CREATE OR REPLACE VIEW `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_latest` AS
    SELECT * EXCEPT(class, splits)
    FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw`
'''
print(query)


CREATE OR REPLACE VIEW `statmike-mlops-349915.fraud.fraud_prepped_fs_latest` AS
    SELECT * EXCEPT(class, splits)
    FROM `statmike-mlops-349915.fraud.fraud_prepped_fs_raw`



In [22]:
bq_job = bq.query(query)
bq_job.result()
(bq_job.ended - bq_job.started).total_seconds()

0.155

#### Review Data For a Single Entity

In this form, just like the raw data above, there is a single row and each feature/column has the latest value of the feature.

In [23]:
source_latest = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_latest')

In [24]:
source_latest[source_latest['transaction_id'] == transaction_ids[0]]

Unnamed: 0,time,v1,v2,v3,v4,v5,v6,v7,v8,v9,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,transaction_id
204223,146737,-8.313253,7.353339,-6.784848,-1.647257,-3.151681,-1.036468,-3.176614,4.783091,2.525343,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,9c23ed6c-805e-4c54-af95-dfb8cb34446a


### BigQuery Source View: History

A deconstruction to an example history table version of the source data - **AS AN EXAMPLE**.  In this version, the feature values for each entiy arrived at different times.  This mirrors many real-world scenarios.

To illustrate a history tables shape this query:
- assigns each `transaction_id` a `feature_timestamp` at a random minute in the last 14 days
- assigns arrival times to features V1, V2, V3, V4, V5 that are after the other features.  

#### Example: Deconstruct the table into an example history table

In [25]:
query = f'''
CREATE OR REPLACE TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history` AS 
WITH
    TRANSACTION_TIME AS (
        SELECT transaction_id,
            TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL CAST(CEIL(RAND()*60*24*14) + 1440 AS INT64) MINUTE) as feature_timestamp
        FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw`
    ),
    FEATURE_DELAYS AS (
        SELECT *
        FROM (SELECT * FROM TRANSACTION_TIME)
        CROSS JOIN (
            SELECT *, ROW_NUMBER() OVER(ORDER BY delay) AS pos FROM UNNEST(ARRAY(
                (SELECT CAST(CEIL(RAND()*1440) AS INT64) FROM UNNEST(GENERATE_ARRAY(1, 6)))
            )) AS delay
        )   
    )
SELECT
    transaction_id as entity_id,
    TIMESTAMP_ADD(feature_timestamp, INTERVAL delay MINUTE) AS feature_timestamp,
    CASE
        WHEN pos = 2 THEN v1
        ELSE NULL
    END AS v1,
    CASE
        WHEN pos = 3 THEN v2
        ELSE NULL
    END AS v2,
    CASE
        WHEN pos = 4 THEN v3
        ELSE NULL
    END AS v3,
    CASE
        WHEN pos = 5 THEN v4
        ELSE NULL
    END AS v4,
    CASE
        WHEN pos = 6 THEN v5
        ELSE NULL
    END AS v5,
    * EXCEPT(pos, delay, feature_timestamp, v1, v2, v3, v4, v5, transaction_id, splits),
FROM FEATURE_DELAYS
LEFT JOIN `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_raw` USING(transaction_id)
ORDER BY transaction_id, feature_timestamp    
'''
print(query)


CREATE OR REPLACE TABLE `statmike-mlops-349915.fraud.fraud_prepped_fs_history` AS 
WITH
    TRANSACTION_TIME AS (
        SELECT transaction_id,
            TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL CAST(CEIL(RAND()*60*24*14) + 1440 AS INT64) MINUTE) as feature_timestamp
        FROM `statmike-mlops-349915.fraud.fraud_prepped_fs_raw`
    ),
    FEATURE_DELAYS AS (
        SELECT *
        FROM (SELECT * FROM TRANSACTION_TIME)
        CROSS JOIN (
            SELECT *, ROW_NUMBER() OVER(ORDER BY delay) AS pos FROM UNNEST(ARRAY(
                (SELECT CAST(CEIL(RAND()*1440) AS INT64) FROM UNNEST(GENERATE_ARRAY(1, 6)))
            )) AS delay
        )   
    )
SELECT
    transaction_id as entity_id,
    TIMESTAMP_ADD(feature_timestamp, INTERVAL delay MINUTE) AS feature_timestamp,
    CASE
        WHEN pos = 2 THEN v1
        ELSE NULL
    END AS v1,
    CASE
        WHEN pos = 3 THEN v2
        ELSE NULL
    END AS v2,
    CASE
        WHEN pos = 4 THEN v3
        ELSE NULL
    END A

In [30]:
bq_job = bq.query(query)
bq_job.result()
(bq_job.ended - bq_job.started).total_seconds()

10.824

#### Review Data For a Single Entity

In this form there are multiple rows for an entity.  Notice that features V1-V5 arrived over time.  At the earliest time none have values and then each arrives over the course of about 24 hours in this example.  There could be multiple values for a single feature over time: think account balance, inventory available, time on website, number of clicks, etc.

In [33]:
source_history = bpd.read_gbq(f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history')

In [34]:
example = source_history[source_history['entity_id'] == transaction_ids[0]].sort_values('feature_timestamp')
example

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
1598988,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-01 18:25:01.603430+00:00,,,,,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
246774,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-01 22:26:01.603430+00:00,-8.313253,,,,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
381823,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-01 22:40:01.603430+00:00,,7.353339,,,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
1382179,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 02:44:01.603430+00:00,,,-6.784848,,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
1156615,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 03:29:01.603430+00:00,,,,-1.647257,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
1643880,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 04:29:01.603430+00:00,,,,,-3.151681,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0


In [35]:
example_entity = example['entity_id'].iloc[0]
example_mintime = example['feature_timestamp'].min().strftime('%Y-%m-%d %H:%M:%S.%f%z') #'2023-01-01 12:00:00+00'
example_maxtime = example['feature_timestamp'].max().strftime('%Y-%m-%d %H:%M:%S%z') #'2023-01-01 12:00:00+00'
example_maxtimeFull = example['feature_timestamp'].max().strftime('%Y-%m-%d %H:%M:%S.%f%z') #'2023-01-01 12:00:00+00'

example_entity, example_mintime, example_maxtime, example_maxtimeFull

('9c23ed6c-805e-4c54-af95-dfb8cb34446a',
 '2024-05-01 18:25:01.603430+0000',
 '2024-05-02 04:29:01+0000',
 '2024-05-02 04:29:01.603430+0000')

### Using A Feature History Table In BigQuery (Batch Mode)

A history table version of features is not directly ready for use in machine learning. There are BigQuery functions available to make it easy to create training, evaluation, and other sets of instances from a history table.

- [ML.FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-feature-time)
    - specify a point-in-time cutoff for all entities when retrieving features.  Prevents [data leakage](https://en.wikipedia.org/wiki/Leakage_(machine_learning)).
- [ML.ENTITY_FEATURES_AT_TIME](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-entity-feature-time)
    - specify a table of entity values and timestamps to retrieve point-in-time features for all combinations

#### Features At Point-In-Time

Retrieve feature values at a point-in-time for all entity values.  **Notice** The precision in time requires to caputure the latest arriving values in the `v1, v2, v3, v4, v5` columns.

In [36]:
bpd.read_gbq(f'''
SELECT *
FROM
    ML.FEATURES_AT_TIME(
        TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history`,
        time => '{example_maxtime}',
        num_rows => 1,
        ignore_feature_nulls => TRUE
    )
WHERE entity_id = '{example_entity}'
''')

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
0,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 04:29:01+00:00,-8.313253,7.353339,-6.784848,-1.647257,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0


In [37]:
bpd.read_gbq(f'''
SELECT *
FROM
    ML.FEATURES_AT_TIME(
        TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history`,
        time => '{example_maxtimeFull}',
        num_rows => 1,
        ignore_feature_nulls => TRUE
    )
WHERE entity_id = '{example_entity}'
''')

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
0,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 04:29:01.603430+00:00,-8.313253,7.353339,-6.784848,-1.647257,-3.151681,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0


#### Entity Features At Point-In-Time

Retrieve feature values for select entity and point-in-time values.  Using an input table with combinations of `entity_id` and `time` gather any view over time or at a point in time that is needed for a data science project.

In [41]:
bpd.read_gbq(f'''
WITH
    ENTITY_TIME_TABLE AS (
        SELECT '{example_entity}' as entity_id, TIMESTAMP('{example_mintime}') as time UNION ALL
        SELECT '{example_entity}' as entity_id, TIMESTAMP('{example_maxtime}') as time UNION ALL
        SELECT '{example_entity}' as entity_id, TIMESTAMP('{example_maxtimeFull}') as time
    )
SELECT *
FROM
    ML.ENTITY_FEATURES_AT_TIME(
        TABLE `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history`,
        TABLE ENTITY_TIME_TABLE,
        num_rows => 1,
        ignore_feature_nulls => TRUE
    )
WHERE entity_id = '{example_entity}'
''').sort_values('feature_timestamp')

Unnamed: 0,entity_id,feature_timestamp,v1,v2,v3,v4,v5,time,v6,v7,...,v21,v22,v23,v24,v25,v26,v27,v28,amount,class
2,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-01 18:25:01.603430+00:00,,,,,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
0,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 04:29:01+00:00,-8.313253,7.353339,-6.784848,-1.647257,,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0
1,9c23ed6c-805e-4c54-af95-dfb8cb34446a,2024-05-02 04:29:01.603430+00:00,-8.313253,7.353339,-6.784848,-1.647257,-3.151681,146737,-1.036468,-3.176614,...,0.100413,0.520158,0.553924,-1.551447,0.668017,-0.047452,1.246589,0.71985,0.89,0


---
## Feature Store

from google.cloud.aiplatform_v1 import (FeatureOnlineStoreAdminServiceClient,
                                        FeatureOnlineStoreServiceClient,
                                        FeatureRegistryServiceClient)

**API Links (Python)**
- [Feature Registry Service](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.services.feature_registry_service)
- [Feature Online Store Admin Service](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.services.feature_online_store_admin_service)
- [Feature Online Store Service](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1.services.feature_online_store_service)

### Feature Registry Client

Used for creating Feature Groups, Features from groups

In [42]:
registry_client = aiplatform.gapic.FeatureRegistryServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

### Create A Feature Group

The history version of the feature table(s) can be registered to the Feature Registry.

**References:**
- [Create a feature group](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-featuregroup)

In [43]:
FEATURE_GROUP_NAME = 'transactions'

In [44]:
try:
    feature_group = registry_client.get_feature_group(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureGroups/{FEATURE_GROUP_NAME}')
except Exception:
    create_group = registry_client.create_feature_group(
        request = aiplatform.gapic.CreateFeatureGroupRequest(
            parent=f"projects/{PROJECT_ID}/locations/{REGION}",
            feature_group_id = FEATURE_GROUP_NAME,
            feature_group = aiplatform.gapic.FeatureGroup(
                big_query = aiplatform.gapic.FeatureGroup.BigQuery(
                    big_query_source = aiplatform.gapic.BigQuerySource(input_uri = f'bq://{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_history'),
                    entity_id_columns = ['entity_id']
                )
            )
        )
    )
    feature_group = create_group.result()
    
feature_group.name

'projects/1026793852137/locations/us-central1/featureGroups/transactions'

In [45]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project=statmike-mlops-349915


### Create Features

**References:**
- [Create a feature](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-feature)

In [46]:
source_history.columns.tolist()

['entity_id',
 'feature_timestamp',
 'v1',
 'v2',
 'v3',
 'v4',
 'v5',
 'time',
 'v6',
 'v7',
 'v8',
 'v9',
 'v10',
 'v11',
 'v12',
 'v13',
 'v14',
 'v15',
 'v16',
 'v17',
 'v18',
 'v19',
 'v20',
 'v21',
 'v22',
 'v23',
 'v24',
 'v25',
 'v26',
 'v27',
 'v28',
 'amount',
 'class']

In [47]:
features = []
for column in source_history.columns.tolist():
    if column not in ['entity_id', 'feature_timestamp']:
        try:
            feature = registry_client.get_feature(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureGroups/{FEATURE_GROUP_NAME}/features/{column}')
            features.append(feature)
        except Exception:
            create_feature = registry_client.create_feature(
                request = aiplatform.gapic.CreateFeatureRequest(
                    parent = response.name,
                    feature_id = column,
                    feature = aiplatform.gapic.Feature(name = column)
                )
            )
            features.append(create_feature.result())

In [48]:
for feature in features:
    print(feature.name)

projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v1
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v2
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v3
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v4
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v5
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/time
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v6
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v7
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v8
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v9
projects/1026793852137/locations/us-central1/featureGroups/transactions/features/v10
projects/1026793852137/locations/us-central1/featureGroups/transactions/f

In [49]:
print(f'Review in the console (expand feature group):\n\nhttps://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project={PROJECT_ID}')

Review in the console (expand feature group):

https://console.cloud.google.com/vertex-ai/feature-store/feature-groups?project=statmike-mlops-349915


### Feature Online Store Admin Client

Used to create online stores and feature views

In [50]:
online_admin_client = aiplatform.gapic.FeatureOnlineStoreAdminServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

### Create Online Store

**NOTE:** This can take around 10 minutes

**Reference:**
- [Online Serving Types](https://cloud.google.com/vertex-ai/docs/featurestore/latest/online-serving-types)
- [Create an Online Store Instance](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-onlinestore)

In [51]:
FEATURE_ONLINE_STORE_NAME = 'featurestore'

In [52]:
try:
    online_store = online_admin_client.get_feature_online_store(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureOnlineStores/{FEATURE_ONLINE_STORE_NAME}')
except Exception:
    create_online_store = online_admin_client.create_feature_online_store(
        request = aiplatform.gapic.CreateFeatureOnlineStoreRequest(
            parent = f'projects/{PROJECT_ID}/locations/{REGION}',
            feature_online_store_id = FEATURE_ONLINE_STORE_NAME,
            feature_online_store = aiplatform.gapic.FeatureOnlineStore(
                bigtable = aiplatform.gapic.FeatureOnlineStore.Bigtable(
                    auto_scaling = aiplatform.gapic.FeatureOnlineStore.Bigtable.AutoScaling(
                        min_node_count = 1,
                        max_node_count = 2,
                        cpu_utilization_target = 50
                    )
                )
            )
        )
    )
    online_store = create_online_store.result()
    
online_store.name

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore'

In [53]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore?project=statmike-mlops-349915


### Create Feature View: From Feature Registry

Create a feature view that syncs the latest values for each feature using the feature registry features.

**Reference:**
- [Create a feature view from feature groups](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-featureview#create_a_feature_view_from_feature_groups)

In [54]:
REGISTRY_FEATURE_VIEW_NAME = 'registry_feature_view'

In [55]:
try:
    registry_view = online_admin_client.get_feature_view(name = f'{online_store.name}/featureViews/{REGISTRY_FEATURE_VIEW_NAME}')
except Exception:
    create_registry_view = online_admin_client.create_feature_view(
        request = aiplatform.gapic.CreateFeatureViewRequest(
            parent = online_store.name,
            feature_view_id = REGISTRY_FEATURE_VIEW_NAME,
            feature_view = aiplatform.gapic.FeatureView(
                feature_registry_source = aiplatform.gapic.FeatureView.FeatureRegistrySource(
                    feature_groups = [
                        aiplatform.gapic.FeatureView.FeatureRegistrySource.FeatureGroup(
                            feature_group_id = FEATURE_GROUP_NAME,
                            feature_ids = [feature.name.split('/')[-1] for feature in features if not feature.name.endswith('class')]
                        )
                    ]
                ),
                sync_config = aiplatform.gapic.FeatureView.SyncConfig(cron = 'TZ=America/New_York 10 * * * *')
            ),
            run_sync_immediately = True
        )
    )
    registry_view = create_registry_view.result()
    
registry_view.name

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/registry_feature_view'

In [56]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore/feature-views/{REGISTRY_FEATURE_VIEW_NAME}?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore/feature-views/registry_feature_view?project=statmike-mlops-349915


### Create Feature View: From BigQuery Source

Create a feature view directly from a BigQuery table/view, the 'latest' version create above.

**Reference:**
- [Create a feature view from a BigQuery source](https://cloud.google.com/vertex-ai/docs/featurestore/latest/create-featureview#create_from_bq)

In [57]:
BQ_FEATURE_VIEW_NAME = 'bq_feature_view'

In [58]:
try:
    bq_view = online_admin_client.get_feature_view(name = f'{online_store.name}/featureViews/{BQ_FEATURE_VIEW_NAME}')
except Exception:
    create_bq_view = online_admin_client.create_feature_view(
        request = aiplatform.gapic.CreateFeatureViewRequest(
            parent = online_store.name,
            feature_view_id = BQ_FEATURE_VIEW_NAME,
            feature_view = aiplatform.gapic.FeatureView(
                big_query_source = aiplatform.gapic.FeatureView.BigQuerySource(
                    uri = f'bq://{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_fs_latest',
                    entity_id_columns = ['transaction_id']
                ),
                sync_config = aiplatform.gapic.FeatureView.SyncConfig(cron = 'TZ=America/New_York 10 * * * *')
            ),
            run_sync_immediately = True
        )
    )
    bq_view = create_bq_view.result()
    
bq_view.name

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/bq_feature_view'

In [59]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore/feature-views/{BQ_FEATURE_VIEW_NAME}?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore/feature-views/bq_feature_view?project=statmike-mlops-349915


### Start Sync Manually: BQ View

Manually start a sync for the feature view create from BigQuery source.

**References:**
- [Sync feature data to online store](https://cloud.google.com/vertex-ai/docs/featurestore/latest/sync-data)
- [List sync operations](https://cloud.google.com/vertex-ai/docs/featurestore/latest/list-data-syncs)

In [60]:
bq_sync = online_admin_client.sync_feature_view(feature_view = bq_view.name)

In [61]:
bq_sync.feature_view_sync

'projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/bq_feature_view/featureViewSyncs/6391616346739703808'

In [62]:
while True:
    feature_view_sync = online_admin_client.get_feature_view_sync(name = bq_sync.feature_view_sync)
    if feature_view_sync.run_time.end_time.seconds > 0:
        status = feature_view_sync.final_status.code
        break
    else:
        print('waiting for 20 seconds...')
    time.sleep(20)
    
if status == 0: print('Succeeded!')
else: print('Failed!')

waiting for 20 seconds...
Succeeded!


In [63]:
online_admin_client.list_feature_view_syncs(
    request = dict(
        parent = bq_view.name,
        page_size = 1,
        #filter = f'create_time > "{(datetime.now() - timedelta(hours = 9)).strftime("%Y-%m-%dT%X")}"'
    )
)

ListFeatureViewSyncsPager<feature_view_syncs {
  name: "projects/1026793852137/locations/us-central1/featureOnlineStores/featurestore/featureViews/bq_feature_view/featureViewSyncs/6391616346739703808"
  create_time {
    seconds: 1714747496
    nanos: 35863000
  }
  final_status {
  }
  run_time {
    start_time {
      seconds: 1714747496
      nanos: 35863000
    }
    end_time {
      seconds: 1714747513
      nanos: 107672000
    }
  }
}
next_page_token: "AMEw9yO283Mp2SOtVDL4xYuOxSVnHLrV72WKrr7UOSeI9AeuO9A7j5sOD3PJkxsW8vKzf_TD"
>

In [64]:
print(f'Review in the console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/online-stores/featurestore/feature-views/{BQ_FEATURE_VIEW_NAME}?project={PROJECT_ID}')

Review in the console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/online-stores/featurestore/feature-views/bq_feature_view?project=statmike-mlops-349915


### Online Serving From Feature Views

Request feature values for and entity from a feature view:
    
**References:**
- [Serve feature values](https://cloud.google.com/vertex-ai/docs/featurestore/latest/serve-feature-values)

In [65]:
online_serve_client = aiplatform.gapic.FeatureOnlineStoreServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

In [66]:
transaction_ids

['9c23ed6c-805e-4c54-af95-dfb8cb34446a',
 'f23f1435-eaf1-455c-9877-9e3b1db6473d',
 'a147e336-81d1-47f2-87ef-206843430e00',
 '2a7353de-8003-48a5-8ddf-327b42a5731d',
 'fd69d5aa-4c53-43a7-8ba9-32bbda68eab0',
 'cec92e71-e96a-4b27-b88e-7a0f18c1eb18',
 '4a40f0d6-bad3-4097-a649-0c4ff0dec9af',
 '0d4fa1d8-7ca3-46b5-8761-3c1cca038c53',
 '59a9a10a-6129-4e5b-9b42-3525c9b034c7',
 'ddfa46db-ead1-484f-975c-f7f2d065ee04']

Retreive From the Feature View of BigQuery Source:

In [67]:
dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = bq_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = transaction_ids[0]),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)

{'v10': 4.45301937539806,
 'v8': 4.7830911937597005,
 'v20': 1.68358290642766,
 'v19': 0.0630377604111553,
 'v21': 0.10041258604290401,
 'v2': 7.353339232204321,
 'v27': 1.24658886192516,
 'v22': 0.520157914427551,
 'v5': -3.1516806404773394,
 'v28': 0.7198498053288,
 'v14': 2.5793564460153,
 'time': 146737.0,
 'v25': 0.6680174270664859,
 'v9': 2.52534292971131,
 'v23': 0.5539240437368951,
 'v16': 0.9461931007135649,
 'v12': 1.25903164514966,
 'v18': 1.24873292729528,
 'v24': -1.5514466625571,
 'v7': -3.17661397093912,
 'v13': -0.762206789397063,
 'amount': 0.89,
 'v11': -1.7256458326016102,
 'v1': -8.313253421276519,
 'v6': -1.03646837237838,
 'v26': -0.0474515210303145,
 'v3': -6.78484804124268,
 'v17': 0.48182101171999103,
 'v4': -1.64725680373494,
 'v15': -0.0140659364965795}

Retreive From the Feature View of Feature Registry Source:

In [71]:
dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = registry_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = transaction_ids[0]),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)

{'v14': 2.5793564460153,
 'v17': 0.48182101171999103,
 'feature_timestamp': 1714745414581292.0,
 'v23': 0.5539240437368951,
 'v15': -0.0140659364965795,
 'v21': 0.10041258604290401,
 'v18': 1.24873292729528,
 'v27': 1.24658886192516,
 'v8': 4.7830911937597005,
 'v13': -0.762206789397063,
 'v19': 0.0630377604111553,
 'v4': -1.64725680373494,
 'v16': 0.9461931007135649,
 'amount': 0.89,
 'v6': -1.03646837237838,
 'v28': 0.7198498053288,
 'v9': 2.52534292971131,
 'v2': 7.353339232204321,
 'v3': -6.78484804124268,
 'v1': -8.313253421276519,
 'v24': -1.5514466625571,
 'v12': 1.25903164514966,
 'v25': 0.6680174270664859,
 'v20': 1.68358290642766,
 'v5': -3.1516806404773394,
 'v7': -3.17661397093912,
 'v11': -1.7256458326016102,
 'v10': 4.45301937539806,
 'time': 146737.0,
 'v22': 0.520157914427551,
 'v26': -0.0474515210303145}

---
## Model Inference: Serving Using Feature Store

In [this repository](https://github.com/statmike/vertex-ai-mlops), there are many notebook workflows that build models with the data source used for feature store here.  The notebook workflows in the [05 - Tensorflow](../../05%20-%20TensorFlow/readme.md) folder all update the same endpoint.  If any of the notebook have been run then there will be an endpoint with model deployed to it.  This is used below to serve prediction using feature store as the source for instances.

There are many ways to request online prediction from and endpoint and they are covered in [05Tools - Prediction - Online](../../05%20-%20TensorFlow/05Tools%20-%20Prediction%20-%20Online.ipynb).


### Get Endpoint

[Endpoint Properties and Methods](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.Endpoint):

```python
endpoint
endpoint.display_name
endpoint.resource_name
endpoint.traffic_split
endpoint.list_models()
```

In [72]:
endpoints = aiplatform.Endpoint.list(filter = f"labels.series=05 AND display_name=05")
if endpoints:
    endpoint = endpoints[0]
    print(f"Endpoint Exists: {endpoints[0].resource_name}")
else:
    print(f"There does not appear to be an endpoint for SERIES = 05")

Endpoint Exists: projects/1026793852137/locations/us-central1/endpoints/725723853820526592


In [73]:
endpoint.display_name

'05'

In [74]:
print(f'Review the Endpoint in the Console:\n\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/endpoints/{endpoint.name}?project={PROJECT_ID}')

Review the Endpoint in the Console:

https://console.cloud.google.com/vertex-ai/locations/us-central1/endpoints/725723853820526592?project=statmike-mlops-349915


In [75]:
endpoint.traffic_split

{'2423682068408958976': 100}

In [76]:
endpoint.list_models()[0]

id: "2423682068408958976"
model: "projects/1026793852137/locations/us-central1/models/model_05_05"
display_name: "05_05"
create_time {
  seconds: 1703181253
  nanos: 58849000
}
dedicated_resources {
  machine_spec {
    machine_type: "n1-standard-4"
  }
  min_replica_count: 1
  max_replica_count: 1
}
model_version_id: "18"

---
### Get Predictions: Python Client

[aiplatform.Endpoint.predict()](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.Endpoint#google_cloud_aiplatform_Endpoint_predict)

In [77]:
instance = dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = registry_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = transaction_ids[0]),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)

In [78]:
instance = {key.capitalize():[val] for key, val in instance.items() if key not in ['feature_timestamp']}
instance

{'V24': [-1.5514466625571],
 'V9': [2.52534292971131],
 'V16': [0.9461931007135649],
 'V6': [-1.03646837237838],
 'V22': [0.520157914427551],
 'V19': [0.0630377604111553],
 'V8': [4.7830911937597005],
 'V20': [1.68358290642766],
 'V7': [-3.17661397093912],
 'V23': [0.5539240437368951],
 'V2': [7.353339232204321],
 'V14': [2.5793564460153],
 'V5': [-3.1516806404773394],
 'V18': [1.24873292729528],
 'V25': [0.6680174270664859],
 'V10': [4.45301937539806],
 'V1': [-8.313253421276519],
 'V13': [-0.762206789397063],
 'V21': [0.10041258604290401],
 'V3': [-6.78484804124268],
 'V12': [1.25903164514966],
 'V17': [0.48182101171999103],
 'V28': [0.7198498053288],
 'V4': [-1.64725680373494],
 'Amount': [0.89],
 'V26': [-0.0474515210303145],
 'V11': [-1.7256458326016102],
 'V15': [-0.0140659364965795],
 'Time': [146737.0],
 'V27': [1.24658886192516]}

In [79]:
prediction = endpoint.predict(instances = [instance])
prediction

Prediction(predictions=[[0.999925256, 7.47486483e-05]], deployed_model_id='2423682068408958976', metadata=None, model_version_id='18', model_resource_name='projects/1026793852137/locations/us-central1/models/model_05_05', explanations=None)

In [80]:
prediction.predictions[0]

[0.999925256, 7.47486483e-05]

In [81]:
np.argmax(prediction.predictions[0])

0

---
## Feature Store: Serving Features at Scale

What happens when there are multiple request for features?  All at the same time?  This section show ways of interacting with feature store concurrent and asynchronously, to retriece features for many entities.

### Get a list of entities

In [82]:
entities = source_latest['transaction_id'].head(1000).tolist()

In [83]:
len(entities), entities[0:5]

(1000,
 ['2d028def-4e4a-4505-b2a1-b41716b1e5fc',
  '29fb1a48-3b77-44f5-87e5-eb7508cf9432',
  'a6058d9b-afbf-4bb1-8d58-15a7acc45c1b',
  '5665d803-249c-4fbd-b9e7-31e345dc6c55',
  '63d89996-f0d1-4c81-adf3-9ab056177213'])

### Make Single Request: Synchronous

In [84]:
online_serve_client = aiplatform.gapic.FeatureOnlineStoreServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

In [86]:
start_time = time.perf_counter()
features = dict(
    online_serve_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = bq_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = entities[0]),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    ).proto_struct
)
elapsed_time = 1000 * (time.perf_counter() - start_time)
print(f'Latency (Client > FS > Client > Deserialize):  {elapsed_time:.2f} ms')

Latency (Client > FS > Client > Deserialize):  25.02 ms


### Make Single Request: Asynchronous

If we make a request with the async client the response is a [coroutine](https://docs.python.org/3/glossary.html#term-coroutine) object.  This means the method is already implemented with an `async def` statement which makes it [awaitable](https://docs.python.org/3/library/asyncio-task.html#awaitables).

In [87]:
online_serve_async_client = aiplatform.gapic.FeatureOnlineStoreServiceAsyncClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

In [88]:
start_time = time.perf_counter()
features = dict(
    (await online_serve_async_client.fetch_feature_values(
        request = aiplatform.gapic.FetchFeatureValuesRequest(
            feature_view = bq_view.name,
            data_key = aiplatform.gapic.FeatureViewDataKey(key = entities[0]),
            data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
        )
    )).proto_struct
)
elapsed_time = 1000 * (time.perf_counter() - start_time)
print(f'Latency (Client > FS > Client > Deserialize):  {elapsed_time:.2f} ms')

Latency (Client > FS > Client > Deserialize):  27.51 ms


In [89]:
features

{'v20': -0.865733249267083,
 'v8': -0.8599618474835101,
 'v28': -0.00176685300144425,
 'v26': 0.915580653354396,
 'v7': 1.975873869957,
 'v17': -0.905905597649433,
 'v16': -1.39289805112596,
 'v27': -0.611555376987556,
 'v5': 0.630997374296391,
 'amount': 211.97,
 'v13': -0.256824962379698,
 'v1': -2.09777300837294,
 'v2': -0.990693629523365,
 'v21': -0.6536185484661489,
 'v11': 0.37705912648027895,
 'time': 146976.0,
 'v3': 0.0107141546786369,
 'v12': -0.405290048206578,
 'v22': -0.41126451019945004,
 'v10': 0.6934577415814771,
 'v24': 0.243862980702728,
 'v23': -0.0101119225335565,
 'v4': -2.86603740105142,
 'v18': 1.4337315279358998,
 'v6': -0.153175276183871,
 'v14': -0.266279146513299,
 'v25': 1.03064583285185,
 'v15': -0.654517779536198,
 'v9': -0.850826520329798,
 'v19': -0.350815933927949}

### Make 1000 Request: Seqentially

This is just iterating through the request one at a time and not proceeding until after each is completed.

In [90]:
LOOP_TIMES = []

for entity in entities:
    start_time = time.perf_counter()
    features = dict(
        online_serve_client.fetch_feature_values(
            request = aiplatform.gapic.FetchFeatureValuesRequest(
                feature_view = bq_view.name,
                data_key = aiplatform.gapic.FeatureViewDataKey(key = entity),
                data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
            )
        ).proto_struct
    )
    LOOP_TIMES.append(1000 * (time.perf_counter() - start_time))

In [91]:
print(f'Total time for all requests: {np.sum(LOOP_TIMES):.2f} ms')

Total time for all requests: 12512.93 ms


In [92]:
print(f'Average time per requests: {np.mean(LOOP_TIMES):.2f} ms')

Average time per requests: 12.51 ms


In [93]:
print(f'Range of times across all requests: {np.min(LOOP_TIMES):.2f} ms to {np.max(LOOP_TIMES):.2f} ms')

Range of times across all requests: 8.15 ms to 79.59 ms


In [94]:
print(f'The 99th percentile of the requests: {np.percentile(LOOP_TIMES, 99):.2f} ms')

The 99th percentile of the requests: 21.86 ms


### Make 1000 Request: Asynchronously

This is an all-at-once approach.  It might return errors for service unavailable ("503") but can be retried and will be successful.  See below for methods with error handling and retries.

Accomplish all-at-once by grouping the coroutines together using [asyncio.gather()](https://docs.python.org/3/library/asyncio-task.html#running-tasks-concurrently):

In [95]:
try:
    start_time = time.perf_counter()
    features = await asyncio.gather(*[
        online_serve_async_client.fetch_feature_values(
            request = aiplatform.gapic.FetchFeatureValuesRequest(
                feature_view = bq_view.name,
                data_key = aiplatform.gapic.FeatureViewDataKey(key = entity),
                data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
            )
        ) for entity in entities
    ])
    elapsed_time = 1000 * (time.perf_counter() - start_time)
    print('Successful!')
except Exception as err:
    print(f"{type(err).__name__} was raised: {err}", "\n...Try again")

Successful!


In [96]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 750.27 ms


In [97]:
print(f'Averge time per request: {(elapsed_time/len(features)):.2f} ms')

Averge time per request: 0.75 ms


### Managed Concurrency: constant load of set size

In some cases, doing all the tasks concurrently can work. Usually, there are limitations though. Waiting on a API to respond does not put a burden on the local compute so managing lots of requests may not be an issue on the client side.  It can still be helpful to limits to concurrency for managing the requests.  A first step to limiting concurrency is using a tool like [asyncio.Semaphore](https://docs.python.org/3/library/asyncio-sync.html#semaphore) to managed a counter of current concurrent requests.

The following builds a function that manages the full list of request and uses a semaphore to control the concurrency.  Think of this as the concurrency buffer limit.

In [98]:
async def fs_load(instances, limit_concur_requests = 10):
    limit = asyncio.Semaphore(limit_concur_requests)
    results = [None] * len(instances)
    
    # make requests
    async def make_request(p):
        async with limit:
            if limit.locked():
                await asyncio.sleep(.01)
            ############## ERROR HANDLING ##############
            fail_count = 0
            start_time = time.perf_counter()
            while fail_count <= 20:
                try:
                    features = dict(
                        (await online_serve_async_client.fetch_feature_values(
                            request = aiplatform.gapic.FetchFeatureValuesRequest(
                                feature_view = bq_view.name,
                                data_key = aiplatform.gapic.FeatureViewDataKey(key = instances[p]),
                                data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
                            )
                        )).proto_struct
                    )
                    if fail_count > 0:
                        print(f'Item {p} succeed after fail count = {fail_count}')
                    break
                except:
                    fail_count += 1
                    print(f'Item {p} failed: current fail count = {fail_count}')
                    # add exponential backoff:
                    #await asyncio.sleep(2^(min(fail_count, 6) - 1))
            elapsed_time = 1000 * (time.perf_counter() - start_time)
            ############################################
        results[p] = (instances[p], features, elapsed_time, fail_count)
        
    # manage tasks
    tasks = [asyncio.create_task(make_request(p)) for p in range(len(instances))]
    responses = await asyncio.gather(*tasks)
    
    return results

#### Constant Load: 10 request, for 1000 requests

In [99]:
constant_load = 10

start_time = time.perf_counter()
features = await fs_load(entities, limit_concur_requests = constant_load)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [100]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 2124.09 ms


In [101]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 2.12 ms


In [102]:
timings = [feature[2] for feature in features]

In [103]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 10.01 ms


In [104]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 4.69 ms to 46.40 ms


In [105]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 17.99 ms


#### Constant Load: 20 request, for 1000 requests

In [106]:
constant_load = 20

start_time = time.perf_counter()
features = await fs_load(entities, limit_concur_requests = constant_load)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [107]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 1508.66 ms


In [108]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 1.51 ms


In [109]:
timings = [feature[2] for feature in features]

In [110]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 13.14 ms


In [111]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 7.70 ms to 121.98 ms


In [112]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 57.68 ms


#### Constant Load: 50 request, for 1000 requests

In [113]:
constant_load = 50

start_time = time.perf_counter()
features = await fs_load(entities, limit_concur_requests = constant_load)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [114]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 1021.49 ms


In [115]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 1.02 ms


In [116]:
timings = [feature[2] for feature in features]

In [117]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 25.37 ms


In [118]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 11.59 ms to 59.60 ms


In [119]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 50.63 ms


#### Constant Load: 100 request, for 1000 requests

In [120]:
constant_load = 100

start_time = time.perf_counter()
features = await fs_load(entities, limit_concur_requests = constant_load)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [121]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 1046.88 ms


In [122]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 1.05 ms


In [123]:
timings = [feature[2] for feature in features]

In [124]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 57.21 ms


In [125]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 12.00 ms to 107.33 ms


In [126]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 91.30 ms


### Managed Concurrency: rate per second evenly spaced

The previous section manages a constant load of a set size with `limit_concur_requests`.  This section manages a load of a set rate with `limit_rps` for rate per second.  It still manages error with retries and limit the total number of concurrent requests.

In [127]:
async def fs_load2(instances, limit_concur_requests = 1000, limit_rps = 100):
    limit = asyncio.Semaphore(limit_concur_requests)
    results = [None] * len(instances)
    
    # make requests
    async def make_request(p):
        # wait for exact start time: second + fraction of second
        await asyncio.sleep((p // limit_rps) + (p % limit_rps)/limit_rps)
        async with limit:
            if limit.locked():
                await asyncio.sleep(.01)
            ############## ERROR HANDLING ##############
            fail_count = 0
            start_time = time.perf_counter()
            while fail_count <= 20:
                try:
                    features = dict(
                        (await online_serve_async_client.fetch_feature_values(
                            request = aiplatform.gapic.FetchFeatureValuesRequest(
                                feature_view = bq_view.name,
                                data_key = aiplatform.gapic.FeatureViewDataKey(key = instances[p]),
                                data_format = aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT # KEY_VALUE, PROTO_STRUCT
                            )
                        )).proto_struct
                    )
                    if fail_count > 0:
                        print(f'Item {p} succeed after fail count = {fail_count}')
                    break
                except:
                    fail_count += 1
                    print(f'Item {p} failed: current fail count = {fail_count}')
                    # add exponential backoff:
                    #await asyncio.sleep(2^(min(fail_count, 6) - 1))
            elapsed_time = 1000 * (time.perf_counter() - start_time)
            ############################################
        results[p] = (instances[p], features, elapsed_time, fail_count)
        
    # manage tasks
    tasks = [asyncio.create_task(make_request(p)) for p in range(len(instances))]
    responses = await asyncio.gather(*tasks)
    
    return results

#### Rate Load: 100 request per second, for 1000 requests

In [128]:
n_per_second = 100

start_time = time.perf_counter()
features = await fs_load2(entities, limit_rps = n_per_second)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [129]:
print(f'Expected time for all requests: {len(entities)/n_per_second * 1000} ms')

Expected time for all requests: 10000.0 ms


In [130]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 10021.70 ms


In [131]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 10.02 ms


In [132]:
timings = [feature[2] for feature in features]

In [133]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 8.06 ms


In [134]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 4.82 ms to 16.30 ms


In [135]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 12.71 ms


#### Rate Load: 150 request per second, for 1000 requests

In [136]:
n_per_second = 150

start_time = time.perf_counter()
features = await fs_load2(entities, limit_rps = n_per_second)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [137]:
print(f'Expected time for all requests: {(len(entities)/n_per_second * 1000):.2f} ms')

Expected time for all requests: 6666.67 ms


In [138]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 6690.75 ms


In [139]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 6.69 ms


In [140]:
timings = [feature[2] for feature in features]

In [141]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 13.07 ms


In [142]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 8.63 ms to 86.28 ms


In [143]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 28.08 ms


#### Rate Load: 200 request per second, for 1000 requests

In [144]:
n_per_second = 200

start_time = time.perf_counter()
features = await fs_load2(entities, limit_rps = n_per_second)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [145]:
print(f'Expected time for all requests: {(len(entities)/n_per_second * 1000):.2f} ms')

Expected time for all requests: 5000.00 ms


In [146]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 5022.63 ms


In [147]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 5.02 ms


In [148]:
timings = [feature[2] for feature in features]

In [149]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 8.13 ms


In [150]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 4.34 ms to 19.45 ms


In [151]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 12.61 ms


#### Rate Load: 250 request per second, for 1000 requests

In [152]:
n_per_second = 250

start_time = time.perf_counter()
features = await fs_load2(entities, limit_rps = n_per_second)
elapsed_time = 1000 * (time.perf_counter() - start_time)

In [153]:
print(f'Expected time for all requests: {(len(entities)/n_per_second * 1000):.2f} ms')

Expected time for all requests: 4000.00 ms


In [154]:
print(f'Total time for all requests: {elapsed_time:.2f} ms')

Total time for all requests: 4019.68 ms


In [155]:
print(f'Averge time per request: {(elapsed_time/len(entities)):.2f} ms')

Averge time per request: 4.02 ms


In [156]:
timings = [feature[2] for feature in features]

In [157]:
print(f'Average time per requests: {np.mean(timings):.2f} ms')

Average time per requests: 7.97 ms


In [158]:
print(f'Range of times across all requests: {np.min(timings):.2f} ms to {np.max(timings):.2f} ms')

Range of times across all requests: 4.46 ms to 27.09 ms


In [159]:
print(f'The 99th percentile of the requests: {np.percentile(timings, 99):.2f} ms')

The 99th percentile of the requests: 12.29 ms


## Cleanup

This deletes the Feature Store objects: Registry Views, Online Store, Feature, then Feature Group, 

In [160]:
# delete feature store objects:

del_registry_views = False
del_online_store = False
del_features = False
del_feature_group = False

if del_registry_views:
    online_admin_client.delete_feature_view(name = registry_view.name)
    online_admin_client.delete_feature_view(name = bq_view.name)
if del_online_store:
    online_admin_client.delete_feature_online_store(name = online_store.name, force = True)
if del_features:
    registry_client.delete_feature(name = feature.name)
if del_feature_group:
    registry_client.delete_feature_group(name = feature_group.name)