# Creating a Vertex AI Feature Store Using Pandas

### Learning objectives

1. Create Feature Store Resources.
2. Ingest Feature Values into Entity Type from a Pandas DataFrame.
3. Read/Online Serve Entity's Feature Values from Vertex AI Online Feature Store.
4. Batch Serve Feature Values from Vertex AI Feature Store.
5. Read the Updated Feature Values.


## Introduction

This notebook introduces Pandas support for Feature Store using Vertex AI SDK. For pre-requisites and introduction on Vertex AI SDK and Feature Store native support, please go through this [Colab notebook](https://colab.sandbox.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/feature_store/sdk-feature-store.ipynb).

### Dataset

This notebook uses a movie recommendation dataset as an example throughout all the notebooks including this one. The original task is to train a model to predict if a user is going to watch a movie and serve the model online.


## Before you begin

In [1]:
import os

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [2]:
PROJECT_ID = "fifth-sprite-402605"  # Replace with your project ID

In [3]:
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [4]:
REGION = "us-central1"

### Import libraries and define constants

In [5]:
import datetime

import pandas as pd
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

## Create Feature Store Resources

### Create Feature Store

The method to create a Feature Store returns a
[long-running operation](https://google.aip.dev/151) (LRO). An LRO starts an asynchronous job. LROs are returned for other API
methods too, such as updating or deleting a featurestore. Running the code cell creates a featurestore and prints the process logs.

In [6]:
movie_predictions_feature_store = aiplatform.Featurestore.create(
    featurestore_id="movie_predictions",
    online_store_fixed_node_count=1,
)

Creating Featurestore
Create Featurestore backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/operations/4227811805835034624
Featurestore created. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions
To use this Featurestore in another session:
featurestore = aiplatform.Featurestore('projects/117917517031/locations/us-central1/featurestores/movie_predictions')


### Create Entity Types

Entity types can be created within the Featurestore class. Below, you create the `Users` entity type and `Movies` entity type. Process logs are printed in the output for each cell.

In [7]:
#1
# Create the Users entity type
users_entity_type = movie_predictions_feature_store.create_entity_type(
    entity_type_id="users",
    description="Users entity",
)

Creating EntityType
Create EntityType backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/operations/3315832881292509184
EntityType created. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users
To use this EntityType in another session:
entity_type = aiplatform.EntityType('projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users')


In [8]:
#2
# Create the Movies entity type
movies_entity_type = movie_predictions_feature_store.create_entity_type(
    entity_type_id="movies",
    description="Movies entity",
)

Creating EntityType
Create EntityType backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies/operations/7321784749838565376
EntityType created. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies
To use this EntityType in another session:
entity_type = aiplatform.EntityType('projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies')


### Create Features
Features can be created within each entity type. Add defining features to the `Users` entity type and `Movies` entity type by using the following methods.

In [9]:
users_feature_age = users_entity_type.create_feature(
    feature_id="age",
    value_type="INT64",
    description="User age",
)

users_feature_gender = users_entity_type.create_feature(
    feature_id="gender",
    value_type="STRING",
    description="User gender",
)

users_feature_liked_genres = users_entity_type.create_feature(
    feature_id="liked_genres",
    value_type="STRING_ARRAY",
    description="An array of genres this user liked",
)

Creating Feature
Create Feature backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/features/age/operations/4684364218059718656
Feature created. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/features/age
To use this Feature in another session:
feature = aiplatform.Feature('projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/features/age')
Creating Feature
Create Feature backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/features/gender/operations/6452590021756059648
Feature created. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/features/gender
To use this Feature in another session:
feature = aiplatform.Feature('projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/feature

In [10]:
movies_feature_configs = {
    "title": {
        "value_type": "STRING",
        "description": "The title of the movie",
    },
    "genres": {
        "value_type": "STRING",
        "description": "The genre of the movie",
    },
    "average_rating": {
        "value_type": "DOUBLE",
        "description": "The average rating for the movie, range is [1.0-5.0]",
    },
}

In [11]:
movie_features = movies_entity_type.batch_create_features(
    feature_configs=movies_feature_configs,
)

Batch creating features EntityType entityType: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies
Batch create Features EntityType entityType backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies/operations/8171839179504746496
EntityType entityType Batch created features. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies


## Ingest Feature Values into Entity Type from a Pandas DataFrame

You need to ingest feature values into your entity type containing the features, so you can later `read` (online) or `batch serve` (offline) the feature values from the entity type. In this step, you will learn how to ingest feature values from a Pandas DataFrame into an entity type. We can also import feature values from BigQuery or Google Cloud Storage.


#### Get data from source files

In [12]:
GCS_USERS_AVRO_URI = (
    "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/users.avro"
)
GCS_MOVIES_AVRO_URI = (
    "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro"
)

USERS_AVRO_FN = "users.avro"
MOVIES_AVRO_FN = "movies.avro"

In [13]:
! gsutil cp $GCS_USERS_AVRO_URI $USERS_AVRO_FN
! gsutil cp $GCS_MOVIES_AVRO_URI $MOVIES_AVRO_FN

Copying gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/users.avro...
/ [1 files][  637.0 B/  637.0 B]                                                
Operation completed over 1 objects/637.0 B.                                      
Copying gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro...
/ [1 files][  599.0 B/  599.0 B]                                                
Operation completed over 1 objects/599.0 B.                                      


#### Load Avro Files into Pandas DataFrames

In [15]:
!pip install avro

Collecting avro
  Downloading avro-1.11.3.tar.gz (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 KB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hBuilding wheels for collected packages: avro
  Building wheel for avro (pyproject.toml) ... [?25ldone
[?25h  Created wheel for avro: filename=avro-1.11.3-py2.py3-none-any.whl size=123912 sha256=447ed13ad2b15d4402eeea633104f0f55ae46c797f5c83e244f288e49c90b1b9
  Stored in directory: /home/gcpuser/.cache/pip/wheels/1d/f6/41/0e0399396af07060e64d4e32c8bd259b48b98a4a114df31294
Successfully built avro
Installing collected packages: avro
Successfully installed avro-1.11.3


In [16]:
from avro.datafile import DataFileReader
from avro.io import DatumReader


class AvroReader:
    def __init__(self, data_file):
        self.avro_reader = DataFileReader(open(data_file, "rb"), DatumReader())

    def to_dataframe(self):
        records = [record for record in self.avro_reader]
        return pd.DataFrame.from_records(data=records)

In [17]:
users_avro_reader = AvroReader(data_file=USERS_AVRO_FN)
users_source_df = users_avro_reader.to_dataframe()
print(users_source_df)

   user_id   age  gender               liked_genres  \
0     dave   NaN    None  [Children's, Documentary]   
1    alice   NaN    None            [Drama, Comedy]   
2  charlie   NaN    None        [Sci-Fi, Animation]   
3      bob   NaN    None            [Action, Crime]   
4      eve  26.0    None                   [Horror]   
5      bob  35.0    Male        [Action, Adventure]   
6    alice  55.0  Female                    [Drama]   

                       update_time  
0 2021-08-20 20:58:22.261581+00:00  
1 2021-08-20 20:58:22.261581+00:00  
2 2021-08-20 20:58:22.261581+00:00  
3 2021-08-20 20:58:22.261581+00:00  
4 2021-08-20 20:58:22.261581+00:00  
5 2021-08-20 20:58:22.261581+00:00  
6 2021-08-20 20:58:22.261581+00:00  


In [18]:
movies_avro_reader = AvroReader(data_file=MOVIES_AVRO_FN)
movies_source_df = movies_avro_reader.to_dataframe()
print(movies_source_df)

   movie_id  average_rating                     title   genres  \
0  movie_02             4.2               The Shining   Horror   
1  movie_04             4.6           The Dark Knight   Action   
2  movie_03             4.5           Cinema Paradiso  Romance   
3  movie_01             4.9  The Shawshank Redemption    Drama   

                       update_time  
0 2021-08-20 20:44:11.094375+00:00  
1 2021-08-20 20:44:11.094375+00:00  
2 2021-08-20 20:44:11.094375+00:00  
3 2021-08-20 20:44:11.094375+00:00  


#### Ingest Feature Values into _Users_ Entity Type

In [19]:
#3
# Ingest Feature Values into Users Entity Type
users_entity_type.ingest_from_df(
    feature_ids=["age", "gender", "liked_genres"],
    feature_time="update_time",
    df_source=users_source_df,
    entity_id_field="user_id",
)

Received datetime-like column in the dataframe. Please note that the column could be interpreted differently in BigQuery depending on which major version you are using. For more information, please reference the BigQuery v3 release notes here: https://github.com/googleapis/python-bigquery/releases/tag/v3.0.0
Importing EntityType feature values: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users
Import EntityType feature values backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users/operations/5997726459391639552
EntityType feature values imported. Resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users


<google.cloud.aiplatform.featurestore.entity_type.EntityType object at 0x7fd6415a1ed0> 
resource name: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/users

#### Ingest Feature Values into _Movies_ Entity Type

In [None]:
#4
# Ingest Feature Values into Movies Entity Type
movies_entity_type.ingest_from_df(
    feature_ids=["average_rating", "title", "genres"],
    feature_time="update_time",
    df_source=movies_source_df,
    entity_id_field="movie_id",
)

Received datetime-like column in the dataframe. Please note that the column could be interpreted differently in BigQuery depending on which major version you are using. For more information, please reference the BigQuery v3 release notes here: https://github.com/googleapis/python-bigquery/releases/tag/v3.0.0
Importing EntityType feature values: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies
Import EntityType feature values backing LRO: projects/117917517031/locations/us-central1/featurestores/movie_predictions/entityTypes/movies/operations/2880109617344413696


## Read/Online Serve Entity's Feature Values from Vertex AI Online Feature Store

Feature Store allows [online serving](https://cloud.google.com/vertex-ai/docs/featurestore/serving-online)
which lets you read feature values for small batches of entities. It works well when you want to read values of selected features from an entity or multiple entities in an entity type.

In [None]:
users_read_df = users_entity_type.read(
    entity_ids=["dave", "alice", "charlie", "bob", "eve"],
)
print(users_read_df)

In [None]:
movies_read_df = movies_entity_type.read(
    entity_ids=["movie_01", "movie_02", "movie_03", "movie_04"],
    feature_ids=["title", "genres", "average_rating"],
)
print(movies_read_df)

## Batch Serve Feature Values from Vertex AI Feature Store

Batch Serving is used to fetch a large batch of feature values for high-throughput, and is typically used for training a model or batch prediction. In this section, you learn how to prepare training examples by using the Feature Store's batch serve function.

#### Read instances from source file

In [None]:
GCS_READ_INSTANCES_CSV_URI = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"
READ_INSTANCES_CSV_FN = "data.csv"

In [None]:
! gsutil cp $GCS_READ_INSTANCES_CSV_URI $READ_INSTANCES_CSV_FN

#### Load CSV file into a Pandas DataFrame

In [None]:
#5
read_instances_df = pd.read_csv(READ_INSTANCES_CSV_FN)
print(read_instances_df)

#### Change the Dtype of `Timestamp` to `Datetime64`

In [None]:
print("before: ", read_instances_df["timestamp"].dtype)
read_instances_df = read_instances_df.astype({"timestamp": "datetime64"})
print("after:  ", read_instances_df["timestamp"].dtype)

#### Batch Serve Feature Values from Movie Predictions Feature Store

In [None]:
movie_predictions_df = movie_predictions_feature_store.batch_serve_to_df(
    serving_feature_ids={
        "users": ["age", "gender", "liked_genres"],
        "movies": ["title", "average_rating", "genres"],
    },
    read_instances_df=read_instances_df,
)
movie_predictions_df

## Read the Updated Feature Values

#### Feature Values from last ingestion
Recall read from the Entity Type shows Feature Values from the last ingestion.

In [None]:
print(movies_read_df)

#### Ingest updated Feature Values

In [None]:
update_movies_df = pd.DataFrame(
    data=[["movie_03", 4.3], ["movie_04", 4.8]],
    columns=["movie_id", "average_rating"],
)
print(update_movies_df)

In [None]:
movies_entity_type.ingest_from_df(
    feature_ids=["average_rating"],
    feature_time=datetime.datetime.now(),
    df_source=update_movies_df,
    entity_id_field="movie_id",
)

#### Latest Feature Values
Read from the Entity Type shows updated Feature values from the latest ingestion.

In [None]:
update_movies_read_df = movies_entity_type.read(
    entity_ids=["movie_01", "movie_02", "movie_03", "movie_04"],
    feature_ids=["title", "genres", "average_rating"],
)
print(update_movies_read_df)

## Point-in-Time Correctness

#### Missing data
Recall Batch Serve from the last ingestion has some missing data in it.

In [None]:
print(movie_predictions_df)

#### Backfill/Correct point-in-time data

In [None]:
backfill_users_df = pd.DataFrame(
    data=[["bob", 34, "Male", ["Drama"], "2020-02-13 09:35:15"]],
    columns=["user_id", "age", "gender", "liked_genres", "update_time"],
)
backfill_users_df = backfill_users_df.astype({"update_time": "datetime64"})
print(backfill_users_df)

In [None]:
backfill_movies_df = pd.DataFrame(
    data=[["movie_04", 4.2, "The Dark Knight", "Action", "2020-02-13 09:35:15"]],
    columns=["movie_id", "average_rating", "title", "genres", "update_time"],
)
backfill_movies_df = backfill_movies_df.astype({"update_time": "datetime64"})
print(backfill_movies_df)

#### Ingest backfilled/corrected point-in-time data from dataframe

In [None]:
users_entity_type.ingest_from_df(
    feature_ids=["age", "gender", "liked_genres"],
    feature_time="update_time",
    df_source=backfill_users_df,
    entity_id_field="user_id",
)

In [None]:
movies_entity_type.ingest_from_df(
    feature_ids=["average_rating", "title", "genres"],
    feature_time="update_time",
    df_source=backfill_movies_df,
    entity_id_field="movie_id",
)

#### Latest ingestion with imputed missing data
Batch Serve from the latest ingestion with backfill/correction has reduced missing data.

In [None]:
backfill_movie_predictions_df = movie_predictions_feature_store.batch_serve_to_df(
    serving_feature_ids={
        "users": ["age", "gender", "liked_genres"],
        "movies": ["title", "average_rating", "genres"],
    },
    read_instances_df=read_instances_df,
)
print(backfill_movie_predictions_df)

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

You can also keep the project but delete the featurestore by running the code below:

In [35]:
movie_predictions_feature_store.delete(force=True)

Deleting Featurestore : projects/816732343711/locations/us-central1/featurestores/movie_predictions
Delete Featurestore  backing LRO: projects/816732343711/locations/us-central1/operations/1667699998189420544
Featurestore deleted. . Resource name: projects/816732343711/locations/us-central1/featurestores/movie_predictions
