In [44]:
import time
from time import gmtime, strftime, sleep
from sklearn.compose import make_column_transformer
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd

import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role
from sagemaker.feature_store.feature_group import FeatureGroup

import boto3

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

s3_client = boto3.client("s3", region_name=region)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [3]:
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "FinalProject-featurestore"

print(default_s3_bucket_name)

sagemaker-us-east-1-075039479415


In [4]:
role = get_execution_role()
print(role)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
arn:aws:iam::075039479415:role/LabRole


We will load the data and preprocess it and add feature engineering before creating our feature store.

In [9]:
df = pd.read_csv("data/Churn_Modelling.csv")
df.drop(columns=["Surname"], inplace=True)
df

Unnamed: 0,RowNumber,CustomerId,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,619,France,Female,42,2,0.00,1,1,1,101348.88,1
1,2,15647311,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,502,France,Female,42,8,159660.80,3,1,0,113931.57,1
3,4,15701354,699,France,Female,39,1,0.00,2,0,0,93826.63,0
4,5,15737888,850,Spain,Female,43,2,125510.82,1,1,1,79084.10,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,9996,15606229,771,France,Male,39,5,0.00,2,1,0,96270.64,0
9996,9997,15569892,516,France,Male,35,10,57369.61,1,1,1,101699.77,0
9997,9998,15584532,709,France,Female,36,7,0.00,1,0,1,42085.58,1
9998,9999,15682355,772,Germany,Male,42,3,75075.31,2,1,0,92888.52,1


In [10]:
gender_encoder = LabelEncoder()
gender_encoder.fit(["Female", "Male"])

df["Gender"] = gender_encoder.transform(df["Gender"])

geography_encoder = OneHotEncoder(categories=[['France', 'Spain', 'Germany']],
                                  sparse_output=False)

col_transformer = make_column_transformer((geography_encoder, ["Geography"]),
                                          remainder='passthrough')

df = pd.DataFrame(col_transformer.fit_transform(df),
                          columns=[col.replace("remainder__", "") for col in 
                                   col_transformer.get_feature_names_out()])

We noticed that make_column_transformer changed all the dtypes of the columns to string. Thus, I will change CustomerId to int.

In [36]:
df["CustomerId"] = df["CustomerId"].astype(int)

## Ingest Data into FeatureStore

#### Define FeatureGroups

In [14]:
feature_group_name = "final-project-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

feature_group = FeatureGroup(
    name=feature_group_name, sagemaker_session=feature_store_session
)

We don't have booleans to worry about but we will still run the following code cell just to be safe. This is done to conform with the standards of the feature store.

In [16]:
def cast_bool_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "bool":
            data_frame[label] = data_frame[label].astype("str").astype("string")
            
cast_bool_to_string(df)

In [20]:
current_time_sec = int(round(time.time()))

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str")


# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(df)

# record identifier and event time feature names
record_identifier_feature_name = "CustomerId"
event_time_feature_name = "EventTime"

# append EventTime feature
df[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(df), dtype="float64"
)


# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
feature_group.load_feature_definitions(data_frame=df)

[FeatureDefinition(feature_name='onehotencoder__Geography_France', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='onehotencoder__Geography_Spain', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='onehotencoder__Geography_Germany', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='RowNumber', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='CustomerId', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='CreditScore', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='Gender', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='Age', feature_type=<Featur

#### Create FeatureGroups in SageMaker FeatureStore

In [26]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

wait_for_feature_group_creation_complete(feature_group=feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup final-project-feature-group-06-02-41-27 successfully created.


In [27]:
feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:075039479415:feature-group/final-project-feature-group-06-02-41-27',
 'FeatureGroupName': 'final-project-feature-group-06-02-41-27',
 'RecordIdentifierFeatureName': 'CustomerId',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'onehotencoder__Geography_France',
   'FeatureType': 'Fractional'},
  {'FeatureName': 'onehotencoder__Geography_Spain',
   'FeatureType': 'Fractional'},
  {'FeatureName': 'onehotencoder__Geography_Germany',
   'FeatureType': 'Fractional'},
  {'FeatureName': 'RowNumber', 'FeatureType': 'Fractional'},
  {'FeatureName': 'CustomerId', 'FeatureType': 'Fractional'},
  {'FeatureName': 'CreditScore', 'FeatureType': 'Fractional'},
  {'FeatureName': 'Gender', 'FeatureType': 'Fractional'},
  {'FeatureName': 'Age', 'FeatureType': 'Fractional'},
  {'FeatureName': 'Tenure', 'FeatureType': 'Fractional'},
  {'FeatureName': 'Balance', 'FeatureType': 'Fractional'},
  {'FeatureName': 'NumOfProducts', 'Fe

In [28]:
sagemaker_client.list_feature_groups()

{'FeatureGroupSummaries': [{'FeatureGroupName': 'final-project-feature-group-06-02-41-27',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:075039479415:feature-group/final-project-feature-group-06-02-41-27',
   'CreationTime': datetime.datetime(2024, 2, 6, 2, 53, 23, 162000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'}],
 'ResponseMetadata': {'RequestId': '5584680d-9c2a-4a85-a2cc-d6ef775102c6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5584680d-9c2a-4a85-a2cc-d6ef775102c6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '268',
   'date': 'Tue, 06 Feb 2024 02:58:22 GMT'},
  'RetryAttempts': 0}}

#### PutRecords into FeatureGroup

In [29]:
feature_group.ingest(data_frame=df, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='final-project-feature-group-06-02-41-27', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f7da78bb3d0>, sagemaker_session=<sagemaker.session.Session object at 0x7f7da920e620>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7f7daa28eb60>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

To confirm that data has been ingested, we can quickly retrieve a record from the online store:

In [41]:
record_identifier_value = str(float(15634602))

featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value,
)

{'ResponseMetadata': {'RequestId': 'a1e17f20-1b14-478c-a9e0-404368f939ad',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a1e17f20-1b14-478c-a9e0-404368f939ad',
   'content-type': 'application/json',
   'content-length': '1321',
   'date': 'Tue, 06 Feb 2024 03:13:27 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'onehotencoder__Geography_France',
   'ValueAsString': '1.0'},
  {'FeatureName': 'onehotencoder__Geography_Spain', 'ValueAsString': '0.0'},
  {'FeatureName': 'onehotencoder__Geography_Germany', 'ValueAsString': '0.0'},
  {'FeatureName': 'RowNumber', 'ValueAsString': '1.0'},
  {'FeatureName': 'CustomerId', 'ValueAsString': '15634602.0'},
  {'FeatureName': 'CreditScore', 'ValueAsString': '619.0'},
  {'FeatureName': 'Gender', 'ValueAsString': '0.0'},
  {'FeatureName': 'Age', 'ValueAsString': '42.0'},
  {'FeatureName': 'Tenure', 'ValueAsString': '2.0'},
  {'FeatureName': 'Balance', 'ValueAsString': '0.0'},
  {'FeatureName': 'NumOfProducts', 'ValueAsStrin

Now let's wait for the data to appear in our offline store.

In [45]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

feature_group_resolved_output_s3_uri = (
    feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

feature_group_s3_prefix = feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

075039479415
Data available.


In [49]:
query = feature_group.athena_query()

table = query.table_name

query_string = """SELECT * from {}
LIMIT 100
""".format(table)

print("Running " + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.

query.run(
    query_string=query_string,
    output_location="s3://" + default_s3_bucket_name + "/" + prefix + "/query_results/",
)
query.wait()
result = query.as_dataframe()

result

Running SELECT * from final_project_feature_group_06_02_41_27_1707188003
LIMIT 100



Unnamed: 0,onehotencoder__geography_france,onehotencoder__geography_spain,onehotencoder__geography_germany,rownumber,customerid,creditscore,gender,age,tenure,balance,numofproducts,hascrcard,isactivemember,estimatedsalary,exited,eventtime,write_time,api_invocation_time,is_deleted
0,1.0,0.0,0.0,6700.0,15809999.0,709.0,0.0,41.0,3.0,150300.65,2.0,1.0,0.0,71672.86,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:02.000,False
1,0.0,0.0,1.0,6705.0,15681878.0,436.0,1.0,45.0,3.0,104339.11,2.0,1.0,1.0,183540.22,1.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:02.000,False
2,1.0,0.0,0.0,3370.0,15643294.0,703.0,0.0,33.0,8.0,190566.65,1.0,1.0,1.0,79997.14,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:02.000,False
3,1.0,0.0,0.0,3406.0,15793693.0,694.0,1.0,60.0,9.0,0.00,1.0,1.0,1.0,57088.97,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:03.000,False
4,0.0,1.0,0.0,3416.0,15710689.0,578.0,1.0,40.0,6.0,63609.92,1.0,0.0,0.0,74965.61,1.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:04.000,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,0.0,0.0,1.0,4743.0,15762134.0,506.0,1.0,59.0,8.0,119152.10,2.0,1.0,1.0,170679.74,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:24.000,False
96,1.0,0.0,0.0,1461.0,15657085.0,578.0,1.0,23.0,10.0,88980.32,1.0,1.0,1.0,125222.36,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:24.000,False
97,0.0,1.0,0.0,8152.0,15750970.0,500.0,1.0,40.0,1.0,99004.24,1.0,1.0,1.0,152845.99,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:25.000,False
98,0.0,1.0,0.0,4793.0,15809991.0,756.0,1.0,19.0,4.0,130274.22,1.0,1.0,1.0,133535.29,0.0,1.707188e+09,2024-02-06 03:04:59.699,2024-02-06 03:00:25.000,False
