# Feature Store: Streaming ingestion SDK

**NOTE**: This notebook has been tested in the following environment:

* Python version = 3.9

## Overview

This notebook demonstrates how to use Vertex AI Feature Store's streaming ingestion at the SDK layer.

Learn more about [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore).

### Objective

In this tutorial, I learnt how to ingest features from a `Pandas DataFrame` into your Vertex AI Feature Store using `write_feature_values` method from the Vertex AI SDK.

This tutorial uses the following Google Cloud ML services and resources:

- Vertex AI Feature Store


The steps performed include:

- Create `Feature Store`
- Create new `Entity Type` for your `Feature Store`
- Ingest feature values from `Pandas DataFrame` into `Feature Store`'s `Entity Types`.

### Dataset

The dataset used for this notebook is the penguins dataset from [BigQuery public datasets](https://cloud.google.com/bigquery/public-data). This dataset has the following features: `culmen_length_mm`, `culmen_depth_mm`, `flipper_length_mm`, `body_mass_g`, `species`, and `sex`.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.


## Installation

Install the following packages required to execute this notebook.

In [17]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform\
                         google-cloud-bigquery\
                         numpy\
                         pandas\
                         db-dtypes\
                         pyarrow -q\
                         --user


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


**Note:** You can ignore the dependency and incompatibility errors.

### Restart the kernel

In [18]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

### Set up Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.
2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).
3. [Enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).
4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk).

#### Set your project ID

**If you don't know your project ID**, try the following:
* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)

In [19]:
PROJECT_ID = "qwiklabs-gcp-00-2b7660c04d87"  # Replace with your project-id

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [1]:
REGION = "us-east4"  # Replace with your region

### UUID

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial.

In [2]:
import random
import string

# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()

### Import libraries

In [3]:
import numpy as np
import pandas as pd
from google.cloud import aiplatform, bigquery

  from google.cloud.aiplatform.utils import gcs_utils


### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project.

In [4]:
aiplatform.init(project="qwiklabs-gcp-00-2b7660c04d87", location="us-east4")

## Download and prepare the data

In [5]:
def download_bq_table(bq_table_uri: str) -> pd.DataFrame:
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]

    table = bigquery.TableReference.from_string(bq_table_uri)

    # Create a BigQuery client
    bqclient = bigquery.Client(project="qwiklabs-gcp-00-2b7660c04d87")

    # Download the table rows
    rows = bqclient.list_rows(table,)
    return rows.to_dataframe()

In [6]:
BQ_SOURCE = "bq://bigquery-public-data.ml_datasets.penguins"

# Download penguins BigQuery table
penguins_df = download_bq_table(BQ_SOURCE)

### Prepare the data

Feature values to be written to the Feature Store can take the form of a list of `WriteFeatureValuesPayload` objects, a Python `dict` of the form

`{entity_id : {feature_id : feature_value}, ...},`

or a pandas `Dataframe`, where the `index` column holds the unique entity ID strings and each remaining column represents a feature.  In this notebook, since you use a pandas `DataFrame` for ingesting features we convert the index column data type to `string` to be used as `Entity ID`.

In [7]:
# Prepare the data
penguins_df.index = penguins_df.index.map(str)

In [8]:
# Remove null values
NA_VALUES = ["NA", "."]
penguins_df = penguins_df.replace(to_replace=NA_VALUES, value=np.nan).dropna()

## Create Feature Store and define schemas

Vertex AI Feature Store organizes resources hierarchically in the following order:

`Featurestore -> EntityType -> Feature`

You must create these resources before you can ingest data into Vertex AI Feature Store.

Learn more about [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore)

### Create a Feature Store

You create a Feature Store using `aiplatform.Featurestore.create` with the following parameters:

* `featurestore_id (str)`: The ID to use for this Featurestore, which will become the final component of the Featurestore's resource name. The value must be unique within the project and location.
* `online_store_fixed_node_count`: Configuration for online serving resources.
* `project`: Project to create EntityType in. If not set, project set in `aiplatform.init` is used.
* `location`: Location to create EntityType in. If not set, location set in `aiplatform.init` is used.
* `sync`:  Whether to execute this creation synchronously.

In [9]:
FEATURESTORE_ID = f"penguins_{UUID}"

penguins_feature_store = aiplatform.Featurestore.create(
    featurestore_id=FEATURESTORE_ID,
    online_store_fixed_node_count=1,
    project="qwiklabs-gcp-00-2b7660c04d87",
    location="us-east4",
    sync=True,)

##### Verify that the Feature Store is created
Check if the Feature Store was successfully created by running the following code block.

In [10]:
fs = aiplatform.Featurestore(
    featurestore_name=FEATURESTORE_ID,
    project="qwiklabs-gcp-00-2b7660c04d87",
    location="us-east4",
)
print(fs.gca_resource)

name: "projects/675174787853/locations/us-east4/featurestores/penguins_op43jkoy"
create_time {
  seconds: 1762416478
  nanos: 794048000
}
update_time {
  seconds: 1762416478
  nanos: 978435000
}
etag: "AMEw9yNpITNaolgGuFPJyVxsp0BJnwi4PcJuVAR6gsrsjoY00vu8m0zPqP0hnrzeVpCR"
online_serving_config {
  fixed_node_count: 1
}
state: STABLE
online_storage_ttl_days: 4000



### **Check the Available Attributes**

In [11]:
print(dir(penguins_feature_store))

['_FutureManager__latest_future', '_FutureManager__latest_future_lock', '__abstractmethods__', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_abc_impl', '_are_futures_done', '_assert_gca_resource_is_available', '_batch_read_feature_values', '_complete_future', '_construct_sdk_resource_from_gapic', '_create_ephemeral_bq_dataset', '_delete', '_delete_method', '_empty_constructor', '_exception', '_format_resource_name', '_format_resource_name_method', '_gca_resource', '_generate_display_name', '_get_and_validate_project_location', '_get_entity_type', '_get_ephemeral_bq_full_dataset_id', '_get_gca_resource', '_getter_method', '_instantiate_client', '_latest_future', '_list', '_

### **Inspecting the feature store status**

In [12]:
# Print the feature store's name and creation time (if available)
print(f"Featurestore name: {penguins_feature_store.name}")
print(f"Featurestore create time: {penguins_feature_store.create_time}")

Featurestore name: penguins_op43jkoy
Featurestore create time: 2025-11-06 08:07:58.794048+00:00


### Create an EntityType

An entity type is a collection of semantically related features. You define your own entity types, based on the concepts that are relevant to your use case. For example, a movie service might have the entity types `movie` and `user`, which group related features that correspond to movies or users.

Here, you create an entity type entity type named `penguin_entity_type` using `create_entity_type` with the following parameters:
* `entity_type_id (str)`: The ID to use for the EntityType, which will become the final component of the EntityType's resource name. The value must be unique within a Feature Store.
* `description`: Description of the EntityType.

In [13]:
ENTITY_TYPE_ID = f"penguin_entity_type_{UUID}"

# Create penguin entity type
penguins_entity_type = penguins_feature_store.create_entity_type(
    entity_type_id=ENTITY_TYPE_ID,
    description="Penguins entity type",)

##### Verify that the EntityType is created
Check if the Entity Type was successfully created by running the following code block.

In [14]:
entity_type = penguins_feature_store.get_entity_type(entity_type_id=ENTITY_TYPE_ID)
print(entity_type.gca_resource)

name: "projects/675174787853/locations/us-east4/featurestores/penguins_op43jkoy/entityTypes/penguin_entity_type_op43jkoy"
description: "Penguins entity type"
create_time {
  seconds: 1762416481
  nanos: 643810000
}
update_time {
  seconds: 1762416481
  nanos: 643811000
}
etag: "AMEw9yPnOJIkdpvqkqi4GpMOm5ES2o1n9nDnRePqIzCiqXnGYcJ6HyuSlOiJI7oGxdCj"
monitoring_config {
}



### Create Features
A feature is a measurable property or attribute of an entity type. For example, `penguin` entity type has features such as `flipper_length_mm`, and `body_mass_g`. Features can be created within each entity type.

When you create a feature, you specify its value type such as `DOUBLE`, and `STRING`. This value determines what value types you can ingest for a particular feature.

Learn more about [Feature Value Types](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.featurestores.entityTypes.features)

In [15]:
penguins_feature_configs = {
    "species": {"value_type": "STRING",},
    "island": {"value_type": "STRING",},
    "culmen_length_mm": {"value_type": "DOUBLE",},
    "culmen_depth_mm": {"value_type": "DOUBLE",},
    "flipper_length_mm": {"value_type": "DOUBLE",},
    "body_mass_g": {"value_type": "DOUBLE"},
    "sex": {"value_type": "STRING"},
}

You can create features either using `create_feature` or `batch_create_features`. Here, for convinience, you have added all feature configs in one variabel, so we use `batch_create_features`.

In [16]:
penguin_features = penguins_entity_type.batch_create_features(feature_configs=penguins_feature_configs,)

### Write features to the Feature Store
Use the `write_feature_values` API to write a feature to the Feature Store with the following parameter:

* `instances`: Feature values to be written to the Feature Store that can take the form of a list of WriteFeatureValuesPayload objects, a Python dict, or a pandas Dataframe.

This streaming ingestion feature has been introduced to the Vertex AI SDK under the **preview** namespace. Here, you pass the pandas `Dataframe` you created from penguins dataset as `instances` parameter.

Learn more about [Streaming ingestion API](https://github.com/googleapis/python-aiplatform/blob/e6933503d2d3a0f8a8f7ef8c178ed50a69ac2268/google/cloud/aiplatform/preview/featurestore/entity_type.py#L36)

In [17]:
penguins_entity_type.preview.write_feature_values(instances=penguins_df)

<google.cloud.aiplatform.preview.featurestore.entity_type.EntityType object at 0x7f0d6de3f160> 
resource name: projects/675174787853/locations/us-east4/featurestores/penguins_op43jkoy/entityTypes/penguin_entity_type_op43jkoy

## Read back written features

Wait a few seconds for the write to propagate, then do an online read to confirm the write was successful.

In [18]:
ENTITY_IDS = [str(x) for x in range(100)]
penguins_entity_type.read(entity_ids=ENTITY_IDS)

Unnamed: 0,entity_id,island,species,flipper_length_mm,sex,body_mass_g,culmen_depth_mm,culmen_length_mm
0,0,Dream,Adelie Penguin (Pygoscelis adeliae),184.0,FEMALE,3475.0,18.4,36.6
1,1,Dream,Adelie Penguin (Pygoscelis adeliae),184.0,MALE,4650.0,19.1,39.8
2,10,Dream,Adelie Penguin (Pygoscelis adeliae),208.0,MALE,4300.0,18.9,40.8
3,11,Dream,Adelie Penguin (Pygoscelis adeliae),185.0,MALE,3650.0,18.7,39.0
4,12,Dream,Adelie Penguin (Pygoscelis adeliae),185.0,FEMALE,3000.0,16.9,37.0
...,...,...,...,...,...,...,...,...
95,95,Dream,Adelie Penguin (Pygoscelis adeliae),182.0,FEMALE,3150.0,18.0,36.5
96,96,Dream,Adelie Penguin (Pygoscelis adeliae),182.0,MALE,3425.0,19.0,41.1
97,97,Dream,Adelie Penguin (Pygoscelis adeliae),190.0,FEMALE,3450.0,17.9,36.0
98,98,Dream,Adelie Penguin (Pygoscelis adeliae),190.0,MALE,3900.0,17.5,41.1


Note: The newly created feature store will be only be visible in the Console when "Vertex AI Feature Store (Legacy)" is selected from the top right menu. 

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

In [None]:
penguins_feature_store.delete(force=True)