## SageMaker Feature Store 
haimtran 25/05/2023
- [boto3 sagemaker feature store](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-featurestore-runtime.html)
- [Feature Store APIs](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-store)

In [None]:
# SageMaker Python SDK version 2.100.0 is required
# boto3 version 1.24.20 is required
import sagemaker
import boto3
import sys

!pip install 'sagemaker>=2.100.0'
!pip install 'boto3>=1.24.20'

In [None]:
import pandas as pd
import numpy as np
import io
from sagemaker.session import Session
from sagemaker import get_execution_role

prefix = "sagemaker-featurestore-introduction"
role = get_execution_role()

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
s3_bucket_name = sagemaker_session.default_bucket()

## Read data from csv 

In [None]:
customer_data = pd.read_csv("feature-store-data/feature_store_introduction_customer.csv")
orders_data = pd.read_csv("feature-store-data/feature_store_introduction_orders.csv")

In [None]:
customer_data.head(10)

In [None]:
orders_data.head(10)

## Create a feature group 

In [None]:
from time import gmtime, strftime, sleep

customers_feature_group_name = "customers-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
orders_feature_group_name = "orders-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

In [None]:
customers_feature_group_name

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

customers_feature_group = FeatureGroup(
    name=customers_feature_group_name, 
    sagemaker_session=sagemaker_session
)
orders_feature_group = FeatureGroup(
    name=orders_feature_group_name, 
    sagemaker_session=sagemaker_session
)

In [None]:
import time

current_time_sec = int(round(time.time()))

record_identifier_feature_name = "customer_id"

In [None]:
customer_data["EventTime"] = pd.Series([current_time_sec] * len(customer_data), dtype="float64")
orders_data["EventTime"] = pd.Series([current_time_sec] * len(orders_data), dtype="float64")

In [None]:
customer_data.head(10)

In [None]:
customers_feature_group.load_feature_definitions(data_frame=customer_data)
orders_feature_group.load_feature_definitions(data_frame=orders_data)

In [None]:
record_identifier_feature_name

In [None]:
customers_feature_group.create(
    s3_uri=f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=True,
)

orders_feature_group.create(
    s3_uri=f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=True,
)

In [None]:
customers_feature_group.describe()

In [None]:
orders_feature_group.describe()

## Check the feature group created 

In [None]:
sagemaker_session.boto_session.client(
    "sagemaker", region_name=region
).list_feature_groups()  # We use the boto client to list FeatureGroups

In [None]:
def check_feature_group_status(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")


check_feature_group_status(customers_feature_group)
check_feature_group_status(orders_feature_group)

## Add metadata to a feature 

In [None]:
from sagemaker.feature_store.inputs import FeatureParameter

customers_feature_group.update_feature_metadata(
    feature_name="customer_id",
    description="The ID of a customer. It is also used in orders_feature_group.",
    parameter_additions=[FeatureParameter("idType", "primaryKey")],
)

In [None]:
customers_feature_group.describe_feature_metadata(feature_name="customer_id")

In [None]:
sagemaker_session.boto_session.client("sagemaker", region_name=region).search(
    Resource="FeatureMetadata",
    SearchExpression={
        "Filters": [
            {
                "Name": "FeatureGroupName",
                "Operator": "Contains",
                "Value": "customers-feature-group-",
            },
            {"Name": "Parameters.idType", "Operator": "Equals", "Value": "primaryKey"},
        ]
    },
)  

## Ingest data into a feature group 

In [None]:
customers_feature_group.ingest(data_frame=customer_data, max_workers=3, wait=True)

In [None]:
orders_feature_group.ingest(data_frame=orders_data, max_workers=3, wait=True)

Using an arbitrary customer record ID, 573291 we use get_record to check that the data has been ingested into the feature group.

In [None]:
customer_id = 573291
sample_record = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).get_record(
    FeatureGroupName=customers_feature_group_name, 
    RecordIdentifierValueAsString=str(customer_id)
)

In [None]:
print(sample_record)

In [None]:
print(sample_record['Record'])

We use batch_get_record to check that all data has been ingested into two feature groups by providing customer IDs.

In [None]:
all_records = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": customers_feature_group_name,
            "RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
        },
        {
            "FeatureGroupName": orders_feature_group_name,
            "RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
        },
    ]
)

In [None]:
all_records

## Add features to a feature group 

In [None]:
from sagemaker.feature_store.feature_definition import StringFeatureDefinition

customers_feature_group.update(
    feature_additions=[StringFeatureDefinition("email"), StringFeatureDefinition("name")]
)

Verify the FeatureGroup has been updated successfully or not.

In [None]:
def check_last_update_status(feature_group):
    last_update_status = feature_group.describe().get("LastUpdateStatus")["Status"]
    while last_update_status == "InProgress":
        print("Waiting for FeatureGroup to be updated")
        time.sleep(5)
        last_update_status = feature_group.describe().get("LastUpdateStatus")
    if last_update_status == "Successful":
        print(f"FeatureGroup {feature_group.name} successfully updated.")
    else:
        print(
            f"FeatureGroup {feature_group.name} updated failed. The LastUpdateStatus is"
            + str(last_update_status)
        )


In [None]:
check_last_update_status(customers_feature_group)

Inspect the new dataset.

In [None]:
customer_data_updated = pd.read_csv("feature-store-data/feature_store_introduction_customer_updated.csv")

In [None]:
customer_data_updated.head(10)

Append EventTime feature to your data frame again

In [None]:
customer_data_updated["EventTime"] = pd.Series(
    [current_time_sec] * len(customer_data), dtype="float64"
)

Ingest the new dataset.

In [None]:
customers_feature_group.ingest(data_frame=customer_data_updated, max_workers=3, wait=True)

Use batch_get_record again to check that all updated data has been ingested into customers_feature_group by providing customer IDs.

In [None]:
updated_customers_records = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": customers_feature_group_name,
            "RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
        }
    ]
)

## Feature Store API - List Feature Group 

In [None]:
from sagemaker.feature_store.feature_store import FeatureStore
from sagemaker.feature_store.inputs import Identifier
from sagemaker.feature_store.feature_group import AthenaQuery

In [None]:
feature_store = FeatureStore(
    sagemaker_session=sagemaker_session
)

In [None]:
feature_store.list_feature_groups()

In [None]:
feature_store.batch_get_record(
    identifiers=[
        Identifier(
            feature_group_name="customers-feature-group-28-03-41-44",
            record_identifiers_value_as_string= ["573291", "109382", "828400", "124013"]
        )
    ]
)

In [None]:
customer_query = AthenaQuery(
    sagemaker_session=sagemaker_session, 
    catalog="AwsDataCatalog", 
    database="sagemaker_featurestore", 
    table_name="customers_feature_group_28_03_41_44_1685245305"
)

In [None]:
customer_query.run(
    'select * from customers_feature_group_28_03_41_44_1685245305', 
    output_location=f"s3://{s3_bucket_name}/notebook-athena-result/"
)

In [None]:
customer_df = customer_query.as_dataframe()

In [None]:
customer_df.head(10)

## Feature Store API - Dataset Builder - Read into DataFrame 

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

In [None]:
customer_feature_group = FeatureGroup(
    sagemaker_session=sagemaker_session,
    name="orders-feature-group-28-03-41-44"
)

In [None]:
customer_feature_group.get_record(
    record_identifier_value_as_string="573291"
)

## Feature Store API - Athena Query and Join Table 

In [None]:
customer_query = customer_feature_group.athena_query()

In [None]:
customer_table = customer_query.table_name

In [None]:
query_string = ('SELECT * FROM "' + customer_table + '"')

In [None]:
query_string

In [None]:
customer_query.run(
    query_string=query_string,
    output_location=f"s3://{s3_bucket_name}/notebook-athena-result/"
)

In [None]:
dataset = customer_query.as_dataframe()

In [None]:
dataset.head(10)

## Clean Up 
Please check in Glue Catalog 

In [None]:
# customers_feature_group.delete()
# orders_feature_group.delete()