In [12]:
import sagemaker
import pandas as pd
import io
from time import gmtime, strftime, sleep
import datetime
import random
import time
import boto3
from sagemaker.feature_store.feature_group import FeatureGroup

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()

account_id = role.split(':')[4]

def mask_account(text):
    return str(text).replace(account_id, 'XXXXXXXXXXXX')

print(f"Region: {region}")
print(f"Bucket: {mask_account(bucket)}")
print(f"Role: {mask_account(role)}")

Region: us-east-1
Bucket: sagemaker-us-east-1-XXXXXXXXXXXX
Role: arn:aws:iam::XXXXXXXXXXXX:role/service-role/AmazonSageMaker-ExecutionRole-20250923T104318


In [13]:
data_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data'
col_names = ['mpg','cylinders','displacement','horsepower','weight','acceleration','model_year','origin','car_name']

df = pd.read_csv(data_url, sep=r'\s+', header=None, names=col_names, na_values='?')

df["car_name"] = df["car_name"].astype('string')

current_time = time.time()
df["event_time"] = current_time + df.index

df["metadata_0"] = pd.Series(["empty"] * len(df), dtype="string")
df["metadata_1"] = pd.Series(["empty"] * len(df), dtype="string")
df["metadata_2"] = pd.Series(["empty"] * len(df), dtype="string")
df["metadata_3"] = pd.Series(["empty"] * len(df), dtype="string")

print(f"Total records: {len(df)}")
print(f"Unique car_name: {df['car_name'].nunique()}")

df.head()

Total records: 398
Unique car_name: 305


Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model_year,origin,car_name,event_time,metadata_0,metadata_1,metadata_2,metadata_3
0,18.0,8,307.0,130.0,3504.0,12.0,70,1,chevrolet chevelle malibu,1761717000.0,empty,empty,empty,empty
1,15.0,8,350.0,165.0,3693.0,11.5,70,1,buick skylark 320,1761717000.0,empty,empty,empty,empty
2,18.0,8,318.0,150.0,3436.0,11.0,70,1,plymouth satellite,1761717000.0,empty,empty,empty,empty
3,16.0,8,304.0,150.0,3433.0,12.0,70,1,amc rebel sst,1761717000.0,empty,empty,empty,empty
4,17.0,8,302.0,140.0,3449.0,10.5,70,1,ford torino,1761717000.0,empty,empty,empty,empty


In [14]:
def check_feature_group_status(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    print(f'Current status: {status}')
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")

feature_group_name = f'poc-{int(time.time())}'
print(f'Feature group name: {feature_group_name}')

feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sess)
feature_group.load_feature_definitions(data_frame=df)
feature_group.create(
    s3_uri=f's3://{bucket}/{feature_group_name}',
    enable_online_store=True,
    record_identifier_name='car_name',
    event_time_feature_name='event_time',
    description='This feature group tracks the vehicle information such as mpg, and horsepower between 1970 and 1982.',
    role_arn=role
)

check_feature_group_status(feature_group)

fg_details = feature_group.describe()
s3_location = fg_details['OfflineStoreConfig']['S3StorageConfig']['ResolvedOutputS3Uri']
print(f"S3 Location: {mask_account(s3_location)}")

Feature group name: poc-1761716939
Current status: Creating
Waiting for Feature Group to be Created
Waiting for Feature Group to be Created
Waiting for Feature Group to be Created
Waiting for Feature Group to be Created
FeatureGroup poc-1761716939 successfully created.
S3 Location: s3://sagemaker-us-east-1-XXXXXXXXXXXX/poc-1761716939/XXXXXXXXXXXX/sagemaker/us-east-1/offline-store/poc-1761716939-1761716939/data


In [15]:
print("Ingesting data...")
ingestion_start = time.time()

feature_group.ingest(
    data_frame=df,
    max_workers=3,
    wait=True
)

ingestion_duration = time.time() - ingestion_start
print(f"Ingestion completed in {ingestion_duration:.2f} seconds")

Ingesting data...
Ingestion completed in 2.47 seconds


In [16]:
s3_client = boto3.client('s3', region_name=region)
bucket_name = s3_location.split('/')[2]
prefix = '/'.join(s3_location.split('/')[3:])

print("Waiting for S3 data...")
print(f"Bucket: {mask_account(bucket_name)}")
print(f"Prefix: {mask_account(prefix)}")

max_wait = 900
check_interval = 30
elapsed = 0
parquet_files = []

while elapsed < max_wait:
    paginator = s3_client.get_paginator('list_objects_v2')
    all_objects = []
    
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            all_objects.extend(page['Contents'])
    
    parquet_files = [obj for obj in all_objects if obj['Key'].endswith('.parquet')]
    
    print(f"[{elapsed}s] Parquet files: {len(parquet_files)}")
    
    if len(parquet_files) > 0:
        total_size = sum(obj['Size'] for obj in parquet_files)
        print(f"Data written to S3: {len(parquet_files)} files, {total_size / (1024*1024):.2f} MB")
        break
    
    time.sleep(check_interval)
    elapsed += check_interval

if len(parquet_files) == 0:
    print(f"No parquet files after {max_wait}s")

Waiting for S3 data...
Bucket: sagemaker-us-east-1-XXXXXXXXXXXX
Prefix: poc-1761716939/XXXXXXXXXXXX/sagemaker/us-east-1/offline-store/poc-1761716939-1761716939/data
[0s] Parquet files: 0
[30s] Parquet files: 0
[60s] Parquet files: 0
[90s] Parquet files: 0
[120s] Parquet files: 0
[150s] Parquet files: 0
[180s] Parquet files: 0
[210s] Parquet files: 0
[240s] Parquet files: 0
[270s] Parquet files: 0
[300s] Parquet files: 0
[330s] Parquet files: 0
[360s] Parquet files: 53
Data written to S3: 53 files, 0.22 MB


In [17]:
featurestore_runtime = sess.boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

print("Checking online store for all unique cars...")

unique_cars = df['car_name'].unique()
online_found = 0
online_not_found = 0
online_not_found_list = []

for record in unique_cars:
    try:
        response = featurestore_runtime.get_record(
            FeatureGroupName=feature_group_name,
            RecordIdentifierValueAsString=record
        )
        online_found += 1
    except Exception as e:
        online_not_found += 1
        online_not_found_list.append(record)

print(f"Online store: {online_found}/{len(unique_cars)} found")

if online_not_found > 0:
    print(f"\nNot found in online store:")
    for car in online_not_found_list:
        print(f"  - {car}")

Checking online store for all unique cars...
Online store: 305/305 found


In [18]:
print("Querying all records from offline store...")

query = feature_group.athena_query()
full_query = f'SELECT * FROM "{query.table_name}"'

try:
    query.run(
        query_string=full_query,
        output_location=f's3://{bucket}/queries/{feature_group_name}/full/'
    )
    query.wait()
    athena_df = query.as_dataframe()
    
    print(f"Total records in offline store: {len(athena_df)}")
    print(f"Unique cars in offline store: {athena_df['car_name'].nunique()}")
    print(f"Expected: {len(df)} records, {df['car_name'].nunique()} unique cars")
    
    original_cars = set(df['car_name'].unique())
    athena_cars = set(athena_df['car_name'].unique())
    
    missing = original_cars - athena_cars
    
    if len(missing) == 0:
        print("\nAll cars present in offline store")
    else:
        print(f"\nMissing {len(missing)} cars in offline store:")
        for car in missing:
            print(f"  - {car}")
            original_record = df[df['car_name'] == car][['mpg', 'cylinders', 'model_year']].iloc[0]
            print(f"    Original: mpg={original_record['mpg']}, cyl={original_record['cylinders']}, year={original_record['model_year']}")
    
    print("\nOriginal dataframe:")
    display(df)
    
    print("\nOffline store dataframe:")
    display(athena_df)
    
except Exception as e:
    print(f"Query failed: {e}")

Querying all records from offline store...
Total records in offline store: 398
Unique cars in offline store: 305
Expected: 398 records, 305 unique cars

All cars present in offline store

Original dataframe:


Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model_year,origin,car_name,event_time,metadata_0,metadata_1,metadata_2,metadata_3
0,18.0,8,307.0,130.0,3504.0,12.0,70,1,chevrolet chevelle malibu,1.761717e+09,empty,empty,empty,empty
1,15.0,8,350.0,165.0,3693.0,11.5,70,1,buick skylark 320,1.761717e+09,empty,empty,empty,empty
2,18.0,8,318.0,150.0,3436.0,11.0,70,1,plymouth satellite,1.761717e+09,empty,empty,empty,empty
3,16.0,8,304.0,150.0,3433.0,12.0,70,1,amc rebel sst,1.761717e+09,empty,empty,empty,empty
4,17.0,8,302.0,140.0,3449.0,10.5,70,1,ford torino,1.761717e+09,empty,empty,empty,empty
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
393,27.0,4,140.0,86.0,2790.0,15.6,82,1,ford mustang gl,1.761717e+09,empty,empty,empty,empty
394,44.0,4,97.0,52.0,2130.0,24.6,82,2,vw pickup,1.761717e+09,empty,empty,empty,empty
395,32.0,4,135.0,84.0,2295.0,11.6,82,1,dodge rampage,1.761717e+09,empty,empty,empty,empty
396,28.0,4,120.0,79.0,2625.0,18.6,82,1,ford ranger,1.761717e+09,empty,empty,empty,empty



Offline store dataframe:


Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model_year,origin,car_name,event_time,metadata_0,metadata_1,metadata_2,metadata_3,write_time,api_invocation_time,is_deleted
0,15.0,8,350.0,145.0,4440.0,14.0,75,1,chevrolet bel air,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:23.992,2025-10-29 05:49:25.000,False
1,12.0,8,429.0,198.0,4952.0,11.5,73,1,mercury marquis brougham,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:24.003,2025-10-29 05:49:26.000,False
2,22.0,6,225.0,100.0,3233.0,15.4,76,1,plymouth valiant,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:24.044,2025-10-29 05:49:26.000,False
3,15.0,8,390.0,190.0,3850.0,8.5,70,1,amc ambassador dpl,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:24.089,2025-10-29 05:49:25.000,False
4,13.0,8,302.0,129.0,3169.0,12.0,75,1,ford mustang ii,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:28.349,2025-10-29 05:49:25.000,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
393,26.4,4,140.0,88.0,2870.0,18.1,80,1,ford fairmont,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:23.992,2025-10-29 05:49:26.000,False
394,11.0,8,429.0,208.0,4633.0,11.0,72,1,mercury marquis,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:23.992,2025-10-29 05:49:26.000,False
395,25.8,4,156.0,92.0,2620.0,14.4,81,1,dodge aries wagon (sw),1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:23.992,2025-10-29 05:49:26.000,False
396,15.0,8,302.0,130.0,4295.0,14.9,77,1,mercury cougar brougham,1.761717e+09,empty,empty,empty,empty,2025-10-29 05:55:23.992,2025-10-29 05:49:26.000,False


In [19]:
def find_record(record_id):
    print(f"\nQuerying: {record_id}")
    
    online_record = featurestore_runtime.get_record(FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=record_id)
    print(f"Online record: {online_record}")

    query = feature_group.athena_query()
    table_name = query.table_name
    print(f"Table: {table_name}")

    sql_query = f"""
    SELECT *
    FROM "{table_name}"
    where car_name = '{record_id}'
    """
    query.run(query_string=sql_query, output_location=f's3://{bucket}/queries/{feature_group_name}/query_results/')
    query.wait()
    offline_record = query.as_dataframe()
    print(f'Offline record: {offline_record}')

find_record('amc ambassador dpl')
find_record('amc concord')


Querying: amc ambassador dpl
Online record: {'ResponseMetadata': {'RequestId': '764d0654-0ed7-4097-83e0-8adb08624c98', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '764d0654-0ed7-4097-83e0-8adb08624c98', 'content-type': 'application/json', 'content-length': '1120', 'date': 'Wed, 29 Oct 2025 05:55:56 GMT'}, 'RetryAttempts': 0}, 'Record': [{'FeatureName': 'mpg', 'ValueAsString': '15.0'}, {'FeatureName': 'cylinders', 'ValueAsString': '8'}, {'FeatureName': 'displacement', 'ValueAsString': '390.0'}, {'FeatureName': 'horsepower', 'ValueAsString': '190.0'}, {'FeatureName': 'weight', 'ValueAsString': '3850.0'}, {'FeatureName': 'acceleration', 'ValueAsString': '8.5'}, {'FeatureName': 'model_year', 'ValueAsString': '70'}, {'FeatureName': 'origin', 'ValueAsString': '1'}, {'FeatureName': 'car_name', 'ValueAsString': 'amc ambassador dpl'}, {'FeatureName': 'event_time', 'ValueAsString': '1761716946.4774606'}, {'FeatureName': 'metadata_0', 'ValueAsString': 'empty'}, {'FeatureName': 'me

In [20]:
print("\nSummary")
print("="*80)
print(f"Feature Group: {feature_group_name}")
print(f"Ingestion: {len(df)} records in {ingestion_duration:.2f}s")
print(f"S3 files: {len(parquet_files)}")
print(f"Online store: {online_found}/{len(unique_cars)} found")
print(f"Offline store: {len(athena_df)} records, {athena_df['car_name'].nunique()} unique cars")


Summary
Feature Group: poc-1761716939
Ingestion: 398 records in 2.47s
S3 files: 53
Online store: 305/305 found
Offline store: 398 records, 305 unique cars
