## Amazon SageMaker Feature Store: Introduction to Feature Store

This notebook demonstrates how to get started with Feature Store, create feature groups, and ingest data into them. These feature groups are stored in your Feature Store.

Feature groups are resources that contain metadata for all data stored in your Feature Store. A feature group is a logical grouping of features, defined in the feature store to describe records. A feature group’s definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store. 

### Overview
1. Set up
2. Creating a feature group
3. Ingest data into a feature group

### Prerequisites
This notebook uses sagemaker_core SDK and `Python 3 (Data Science)` kernel. This notebook works with Studio, Jupyter, and JupyterLab. 

#### Library dependencies:
* `sagemaker_core`
* `numpy`
* `pandas`

#### Role requirements:
**IMPORTANT**: You must attach the following policies to your execution role:
* `AmazonS3FullAccess`
* `AmazonSageMakerFeatureStoreAccess`

### Set up

In [71]:
!pip uninstall sagemaker-core -y
!pip install pip --upgrade --quiet
!pip install sagemaker-core --upgrade

Found existing installation: sagemaker-core 1.0.27
Uninstalling sagemaker-core-1.0.27:
  Successfully uninstalled sagemaker-core-1.0.27
Collecting sagemaker-core
  Using cached sagemaker_core-1.0.27-py3-none-any.whl.metadata (4.9 kB)
Using cached sagemaker_core-1.0.27-py3-none-any.whl (407 kB)
Installing collected packages: sagemaker-core
Successfully installed sagemaker-core-1.0.27


In [72]:
import pandas as pd
import numpy as np
import io
import sagemaker
from sagemaker_core.helper.session_helper import get_execution_role, Session

sagemaker_session = Session()
REGION_NAME = sagemaker_session._region_name
role = get_execution_role()
s3_bucket_name = sagemaker.Session().default_bucket()
prefix = "sagemaker-featurestore-introduction"
default_bucket_prefix = sagemaker.Session().default_bucket_prefix

# If a default bucket prefix is specified, append it to the s3 path
if default_bucket_prefix:
    prefix = f"{default_bucket_prefix}/{prefix}"

### Inspect your data
In this notebook example we ingest synthetic data. We read from `./data/feature_store_introduction_customer.csv` and `./data/feature_store_introduction_orders.csv`.

In [73]:
customer_data = pd.read_csv("feature_store_introduction_customer.csv")
orders_data = pd.read_csv("feature_store_introduction_orders.csv")

In [74]:
customer_data.head()

Unnamed: 0,customer_id,city_code,state_code,country_code
0,573291,1,49,2
1,109382,2,40,2
2,828400,3,31,2
3,124013,4,5,2


In [75]:
orders_data.head()

Unnamed: 0,customer_id,order_id,order_status,store_id
0,573291,4132,1,303
1,109382,5724,0,201
2,828400,1942,0,431
3,124013,6782,1,213


Below is an illustration on the steps the data goes through before it is ingested into a Feature Store. In this notebook, we illustrate the use-case where you have data from multiple sources and want to store them independently in a feature store. Our example considers data from a data warehouse (customer data), and data from a real-time streaming service (order data). 

![data flow](images/feature_store_data_ingest.svg)

### Create a feature group

We first start by creating feature group names for customer_data and orders_data. Following this, we create two Feature Groups, one for `customer_data` and another for `orders_data`

In [76]:
from time import gmtime, strftime, sleep

customers_feature_group_name = "customers-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
orders_feature_group_name = "orders-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

In [77]:
customers_feature_group_name

'customers-feature-group-01-12-20-28'

Instantiate a FeatureGroup object for customers_data and orders_data. 

In [78]:
from sagemaker_core.shapes import FeatureDefinition

CustomerFeatureDefinitions = [
    FeatureDefinition(feature_name="customer_id", feature_type="Integral"),
    FeatureDefinition(feature_name="city_code", feature_type="Integral"),
    FeatureDefinition(feature_name="state_code", feature_type="Integral"),
    FeatureDefinition(feature_name="country_code", feature_type="Integral"),
    FeatureDefinition(feature_name="EventTime", feature_type="Fractional"),
]

OrderFeatureDefinitions = [
    FeatureDefinition(feature_name="customer_id", feature_type="Integral"),
    FeatureDefinition(feature_name="order_id", feature_type="Integral"),
    FeatureDefinition(feature_name="order_status", feature_type="Integral"),
    FeatureDefinition(feature_name="store_id", feature_type="Integral"),
    FeatureDefinition(feature_name="EventTime", feature_type="Fractional"),
]

In [79]:
import time

current_time_sec = int(round(time.time()))

record_identifier_feature_name = "customer_id"

Append `EventTime` feature to your data frame. This parameter is required, and time stamps each data point.

In [80]:
customer_data["EventTime"] = pd.Series([current_time_sec] * len(customer_data), dtype="float64")
orders_data["EventTime"] = pd.Series([current_time_sec] * len(orders_data), dtype="float64")

Load feature definitions to your feature group. 

Below we call create to create two feature groups, customers_feature_group and orders_feature_group respectively

In [81]:
from sagemaker_core.shapes import OnlineStoreConfig, OfflineStoreConfig, S3StorageConfig
from sagemaker_core.resources import FeatureGroup

customers_feature_group = FeatureGroup.create(
    feature_group_name=customers_feature_group_name,
    record_identifier_feature_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    online_store_config=OnlineStoreConfig(enable_online_store=True),
    feature_definitions=CustomerFeatureDefinitions,
    offline_store_config=OfflineStoreConfig(
        s3_storage_config=S3StorageConfig(s3_uri=f"s3://{s3_bucket_name}/{prefix}")
    ),
)

In [82]:
from sagemaker_core.resources import FeatureGroup

orders_feature_group = FeatureGroup.create(
    feature_group_name=orders_feature_group_name,
    record_identifier_feature_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    online_store_config=OnlineStoreConfig(enable_online_store=True),
    feature_definitions=OrderFeatureDefinitions,
    offline_store_config=OfflineStoreConfig(
        s3_storage_config=S3StorageConfig(s3_uri=f"s3://{s3_bucket_name}/{prefix}")
    ),
)

To confirm that your FeatureGroup has been created we use `wait_for_status` functions to wait for the feature group to be created successfully.

In [83]:
def check_feature_group_status(feature_group):
    status = feature_group.wait_for_status(target_status="Created")
    print(f"FeatureGroup {feature_group.get_name()} successfully created.")


check_feature_group_status(customers_feature_group)
check_feature_group_status(orders_feature_group)

Output()

Output()

FeatureGroup customers-feature-group-01-12-20-28 successfully created.


FeatureGroup orders-feature-group-01-12-20-28 successfully created.


### Add metadata to a feature

We can add searchable metadata fields to FeatureGroup features by using the `FeatureMetadata` class. The currently supported metadata fields are `description` and `parameters`.

In [84]:
from sagemaker_core.resources import FeatureMetadata
from sagemaker_core.shapes import FeatureParameter

customers_feature_metadata = FeatureMetadata(
    feature_group_name=customers_feature_group_name, feature_name="customer_id"
)

In [85]:
customers_feature_metadata.update(
    description="The ID of a customer. It is also used in orders_feature_group.",
    parameter_additions=[FeatureParameter(key="idType", value="primaryKey")],
)

FeatureMetadata(feature_group_name='customers-feature-group-01-12-20-28', feature_name='customer_id', feature_group_arn='arn:aws:sagemaker:us-east-1:590184044598:feature-group/customers-feature-group-01-12-20-28', feature_type='Integral', creation_time=datetime.datetime(2025, 4, 1, 12, 20, 34, 11000, tzinfo=tzlocal()), last_modified_time=datetime.datetime(2025, 4, 1, 12, 21, 18, 130000, tzinfo=tzlocal()), description='The ID of a customer. It is also used in orders_feature_group.', parameters=[FeatureParameter(key='idType', value='primaryKey')])

To view feature metadata, we can use `get` method to display that feature.

In [86]:
customers_feature_metadata.get(
    feature_group_name=customers_feature_group_name, feature_name="customer_id"
)

FeatureMetadata(feature_group_name='customers-feature-group-01-12-20-28', feature_name='customer_id', feature_group_arn='arn:aws:sagemaker:us-east-1:590184044598:feature-group/customers-feature-group-01-12-20-28', feature_type='Integral', creation_time=datetime.datetime(2025, 4, 1, 12, 20, 34, 11000, tzinfo=tzlocal()), last_modified_time=datetime.datetime(2025, 4, 1, 12, 21, 18, 130000, tzinfo=tzlocal()), description='The ID of a customer. It is also used in orders_feature_group.', parameters=[FeatureParameter(key='idType', value='primaryKey')])

Feature metadata fields are searchable. We use `search` API to find features with metadata that matches some search criteria.

In [87]:
# Method 1: If you already have a SageMaker session
current_region = sagemaker_session.boto_session.region_name
print("Current region:", current_region)

# Method 2: Using boto3 directly
import boto3
session = boto3.session.Session()
current_region = session.region_name
print("Current region:", current_region)

# Method 3: Check the AWS_REGION environment variable
import os
env_region = os.environ.get('AWS_REGION')
print("Region from environment:", env_region)

Current region: us-east-1
Current region: us-east-1
Region from environment: us-east-1


In [88]:
sagemaker_session.boto_session.client("sagemaker", region_name=current_region).search(
    Resource="FeatureMetadata",
    SearchExpression={
        "Filters": [
            {
                "Name": "FeatureGroupName",
                "Operator": "Contains",
                "Value": "customers-feature-group-",
            },
            {"Name": "Parameters.idType", "Operator": "Equals", "Value": "primaryKey"},
        ]
    },
)  # We use the boto client to search

{'Results': [{'FeatureMetadata': {'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:590184044598:feature-group/customers-feature-group-31-09-49-14',
    'FeatureGroupName': 'customers-feature-group-31-09-49-14',
    'FeatureName': 'customer_id',
    'FeatureType': 'Integral',
    'CreationTime': datetime.datetime(2025, 3, 31, 9, 49, 17, tzinfo=tzlocal()),
    'LastModifiedTime': datetime.datetime(2025, 3, 31, 9, 49, 55, tzinfo=tzlocal()),
    'Description': 'The ID of a customer. It is also used in orders_feature_group.',
    'Parameters': [{'Key': 'idType', 'Value': 'primaryKey'}]}},
  {'FeatureMetadata': {'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:590184044598:feature-group/customers-feature-group-31-09-23-12',
    'FeatureGroupName': 'customers-feature-group-31-09-23-12',
    'FeatureName': 'customer_id',
    'FeatureType': 'Integral',
    'CreationTime': datetime.datetime(2025, 3, 31, 9, 30, tzinfo=tzlocal()),
    'LastModifiedTime': datetime.datetime(2025, 3, 31, 9, 46, 52, tzi

### Ingest data into a feature group

We can put data into the FeatureGroup by using the `PutRecord` API. It will take < 1 minute to ingest data.

In [89]:
# converting all columns to integral 64 type for further processing
customer_data["customer_id"] = pd.to_numeric(customer_data["customer_id"]).astype("Int64")
customer_data["city_code"] = pd.to_numeric(customer_data["city_code"]).astype("Int64")
customer_data["state_code"] = pd.to_numeric(customer_data["state_code"]).astype("Int64")
customer_data["country_code"] = pd.to_numeric(customer_data["country_code"]).astype("Int64")

orders_data["customer_id"] = pd.to_numeric(orders_data["customer_id"]).astype("Int64")
orders_data["order_id"] = pd.to_numeric(orders_data["order_id"]).astype("Int64")
orders_data["order_status"] = pd.to_numeric(orders_data["order_status"]).astype("Int64")
orders_data["store_id"] = pd.to_numeric(orders_data["store_id"]).astype("Int64")

Creating `IngestData` function to ingest all dataframe records using `PutRecord` API call.

In [90]:
from sagemaker_core.shapes import FeatureValue


def IngestData(df, feature_group):
    try:
        feature_values_list = list()
        for index, row in df.iterrows():
            # Iterate through each column for the current row
            for column in df.columns:
                feature_values = FeatureValue(
                    feature_name=str(column), value_as_string=str(row[column])
                )
                feature_values_list.append(feature_values)
            feature_group.put_record(record=feature_values_list)
            feature_values_list.clear()
        SuccessString = f"The dataframe with {len(df)} rows has been ingested successfully for feature group {feature_group.get_name()}"
        return SuccessString
    except Exception as e:
        # Handle any other exceptions
        print(f"An unexpected error occurred: {e}")
        return None

In [91]:
IngestData(customer_data, customers_feature_group)

'The dataframe with 4 rows has been ingested successfully for feature group customers-feature-group-01-12-20-28'

In [92]:
IngestData(orders_data, orders_feature_group)

'The dataframe with 4 rows has been ingested successfully for feature group orders-feature-group-01-12-20-28'

Using an arbitrary customer record ID, 573291 we use `get_record` to check that the data has been ingested into the feature group.

In [93]:
customer_id = 573291
sample_record = customers_feature_group.get_record(
    record_identifier_value_as_string=str(customer_id)
)

In [94]:
sample_record

GetRecordResponse(record=[FeatureValue(feature_name='customer_id', value_as_string='573291', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='city_code', value_as_string='1', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='state_code', value_as_string='49', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='country_code', value_as_string='2', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='EventTime', value_as_string='1743510031.0', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>)], expires_at=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>)

We use `batch_get_record` to check that all data has been ingested into two feature groups by providing customer IDs.

In [95]:
from sagemaker_core.shapes import BatchGetRecordIdentifier

all_records_customers = customers_feature_group.batch_get_record(
    identifiers=[
        BatchGetRecordIdentifier(
            feature_group_name=customers_feature_group_name,
            record_identifiers_value_as_string=["573291", "109382", "828400", "124013"],
        )
    ]
)

In [96]:
all_records_customers

BatchGetRecordResponse(records=[BatchGetRecordResultDetail(feature_group_name='customers-feature-group-01-12-20-28', record_identifier_value_as_string='573291', record=[FeatureValue(feature_name='customer_id', value_as_string='573291', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='city_code', value_as_string='1', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='state_code', value_as_string='49', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='country_code', value_as_string='2', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='EventTime', value_as_string='1743510031.0', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>)], expires_at=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed9

In [97]:
all_records_orders = orders_feature_group.batch_get_record(
    identifiers=[
        BatchGetRecordIdentifier(
            feature_group_name=orders_feature_group_name,
            record_identifiers_value_as_string=["573291", "109382", "828400", "124013"],
        )
    ]
)

In [98]:
all_records_orders

BatchGetRecordResponse(records=[BatchGetRecordResultDetail(feature_group_name='orders-feature-group-01-12-20-28', record_identifier_value_as_string='573291', record=[FeatureValue(feature_name='customer_id', value_as_string='573291', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='order_id', value_as_string='4132', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='order_status', value_as_string='1', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='store_id', value_as_string='303', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureValue(feature_name='EventTime', value_as_string='1743510031.0', value_as_string_list=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>)], expires_at=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>

### Add features to a feature group

If we want to update a FeatureGroup that has done the data ingestion, we can use the `Update` function and then re-ingest data by using the updated dataset.

In [118]:
# Print details about the feature group to understand what we're working with
print("Feature group details:")
print(f"Type: {type(customers_feature_group)}")
print(f"Available attributes: {dir(customers_feature_group)}")

# Try to identify the feature group name
# Try different possible attributes that might contain the name
possible_name_attrs = ['name', 'feature_group_name', 'feature_group_arn', '__str__', 'Name', 'FeatureGroupName']
for attr in possible_name_attrs:
    if hasattr(customers_feature_group, attr):
        print(f"Found name attribute '{attr}': {getattr(customers_feature_group, attr)}")

# Check if there are any attributes related to features
feature_related_attrs = [attr for attr in dir(customers_feature_group) if 'feature' in attr.lower()]
print(f"Feature-related attributes: {feature_related_attrs}")

# Try to list features using any relevant attribute
for attr in feature_related_attrs:
    try:
        value = getattr(customers_feature_group, attr)
        print(f"Value of {attr}: {value}")
    except Exception as e:
        print(f"Error accessing {attr}: {e}")

Feature group details:
Type: <class 'sagemaker_core.main.resources.FeatureGroup'>
Available attributes: ['__abstractmethods__', '__annotations__', '__class__', '__class_getitem__', '__class_vars__', '__copy__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__fields__', '__fields_set__', '__format__', '__ge__', '__get_pydantic_core_schema__', '__get_pydantic_json_schema__', '__getattr__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__pretty__', '__private_attributes__', '__pydantic_complete__', '__pydantic_computed_fields__', '__pydantic_core_schema__', '__pydantic_custom_init__', '__pydantic_decorators__', '__pydantic_extra__', '__pydantic_fields__', '__pydantic_fields_set__', '__pydantic_generic_metadata__', '__pydantic_init_subclass__', '__pydantic_parent_namespace__', '__pydantic_post_init__', '__pydantic_private__', '__pydantic_root_model_

In [119]:
# 1. Wait for any in-progress updates to complete
import time
print("Waiting for in-progress updates to complete...")
time.sleep(180)  # Wait 3 minutes

# 2. Refresh the feature group to get its latest state
customers_feature_group.refresh()

# 3. Print feature definitions again to see if they've changed
print("Updated feature definitions:")
print(customers_feature_group.feature_definitions)

# 4. Try with different feature names to avoid conflicts
try:
    customers_feature_group.update(
        feature_additions=[
            {"feature_name": "customer_fullname", "feature_type": "String"},
            {"feature_name": "customer_email_address", "feature_type": "String"}
        ]
    )
    print("Added new features with different names successfully")
except Exception as e:
    print(f"Error adding features with different names: {e}")

Waiting for in-progress updates to complete...
Updated feature definitions:
[FeatureDefinition(feature_name='customer_id', feature_type='Integral', collection_type=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>, collection_config=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureDefinition(feature_name='city_code', feature_type='Integral', collection_type=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>, collection_config=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureDefinition(feature_name='state_code', feature_type='Integral', collection_type=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>, collection_config=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>), FeatureDefinition(feature_name='country_code', feature_type='Integral', collection_type=<sagemaker_core.main.utils.Unassigned object at 0x7fc60efaed90>, collection_config=<sagemaker_core.main.utils.Unassigned object at

Added new features with different names successfully


In [121]:
# from sagemaker.feature_store.feature_definition import FeatureDefinition

# # Add only the email feature (since 'name' already exists)
# customers_feature_group.update(
#     feature_additions=[
#         {"feature_name": "email", "feature_type": "String"}
#     ]
# )
# time.sleep(120)  # waiting for 120 seconds for the update process to get completed

Inspect the new dataset.

In [122]:
customer_data_updated = pd.read_csv("feature_store_introduction_customer_updated.csv")

In [102]:
customer_data_updated.head()

Unnamed: 0,customer_id,city_code,state_code,country_code
0,573291,1,49,2
1,109382,2,40,2
2,828400,3,31,2
3,124013,4,5,2


Append `EventTime` feature to your data frame again.

In [103]:
customer_data_updated["EventTime"] = pd.Series(
    [current_time_sec] * len(customer_data), dtype="float64"
)

Ingest the new dataset.

In [None]:
## need to see how to ingest data in new SDK
IngestData(customer_data_updated, customers_feature_group)

Use `batch_get_record` again to check that all updated data has been ingested into `customers_feature_group` by providing customer IDs.

In [None]:
updated_records_customers = customers_feature_group.batch_get_record(
    identifiers=[
        BatchGetRecordIdentifier(
            feature_group_name=customers_feature_group_name,
            record_identifiers_value_as_string=["573291", "109382", "828400", "124013"],
        )
    ]
)

In [None]:
updated_records_customers

### Clean up
Here we remove the Feature Groups we created. 

In [None]:
customers_feature_group.delete()
orders_feature_group.delete()