In [1]:
import pandas as pd
import boto3
from pyathena import connect
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup, FeatureDefinition, FeatureTypeEnum
from sagemaker.session import Session
import time

# S3 and Athena details
bucket_name = "group3-project-bucket"
database_name = "group_project_db"
table_name = "hospital_readmissions"
s3_output = f"s3://{bucket_name}/athena-results/"
region = "us-east-1"
s3_client = boto3.client("s3", region_name=region)

sagemaker_session = Session()
sagemaker_client = boto3.client("sagemaker")


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Connect to Athena
connection = connect(s3_staging_dir=s3_output, region_name=region)

# Query the data
query = f"""
SELECT * 
FROM {database_name}.{table_name}
"""
df = pd.read_sql(query, connection)

display(df.head())

  df = pd.read_sql(query, connection)


Unnamed: 0,age,time_in_hospital,n_lab_procedures,n_procedures,n_medications,n_outpatient,n_inpatient,n_emergency,medical_specialty,diag_1,diag_2,diag_3,glucose_test,a1ctest,change,diabetes_med,readmitted
0,3,8,72,1,18,2,0,0,4,0,7,6,1,1,0,1,0
1,3,3,34,2,13,0,0,0,5,6,6,6,1,1,0,1,0
2,1,5,45,0,18,0,0,0,4,0,0,0,1,1,1,1,1
3,3,2,36,0,12,1,0,0,4,0,6,1,1,1,1,1,1
4,2,1,42,0,7,0,0,0,3,6,0,7,1,1,0,1,0


In [3]:
from sagemaker import get_execution_role

role = get_execution_role()
print(role)

feature_group_name = "hospital_readmissions_features_" + str(int(time.time()))

# **Map Athena Data Types to SageMaker Feature Store Types**
pandas_to_sagemaker_types = {
    "object": FeatureTypeEnum.STRING,   # Athena STRING
    "int64": FeatureTypeEnum.INTEGRAL,  # Athena BIGINT
    "float64": FeatureTypeEnum.FRACTIONAL,  # Athena DOUBLE
    "datetime64[ns]": FeatureTypeEnum.STRING,  # Athena TIMESTAMP -> Convert to STRING
    "bool": FeatureTypeEnum.STRING,  # Athena BOOLEAN -> Convert to STRING
}

# **Ensure event_time column exists (Required for Feature Store)**
import datetime
df["event_time"] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")

# **Define feature group schema using correct data types**
feature_definitions = [
    FeatureDefinition(
        feature_name=col,
        feature_type=pandas_to_sagemaker_types.get(str(df[col].dtype), FeatureTypeEnum.STRING),
    )
    for col in df.columns
]

# **Ensure feature definitions are not empty**
if len(feature_definitions) == 0:
    raise ValueError("Feature Definitions are empty! Check column mapping.")

# **Create Feature Group with feature definitions (CORRECTED)**
feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=sagemaker_session,
    feature_definitions=feature_definitions,  # Pass the feature definitions here!
)

# **Create Feature Group in SageMaker Feature Store**
print("Feature Group does not exist. Creating...")

feature_group.create(
    record_identifier_name="age",  # Ensure this column is unique (change if necessary)
    event_time_feature_name="event_time",  # Required for time tracking in Feature Store
    s3_uri=f"s3://{bucket_name}/feature-store/",
    role_arn=role,
    enable_online_store=True,
)

print("Feature Group creation initiated. Waiting for it to become available...")

# **Wait for Feature Group to be available**
while True:
    status = sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)[
        "FeatureGroupStatus"
    ]
    print(f"Feature Group Status: {status}")
    if status == "Created":
        break
    time.sleep(5)

# **Load Feature Definitions into Feature Store**
feature_group.load_feature_definitions(data_frame=df)

# **Ingest Data into Feature Store**
feature_group.ingest(data_frame=df, max_workers=3, wait=True)

print(f"Feature Group {feature_group_name} data ingested successfully.")

arn:aws:iam::321261761338:role/LabRole
Feature Group does not exist. Creating...
Feature Group creation initiated. Waiting for it to become available...
Feature Group Status: Creating
Feature Group Status: Creating
Feature Group Status: Creating
Feature Group Status: Creating
Feature Group Status: Created
Feature Group hospital_readmissions_features_1740867247 data ingested successfully.


In [4]:
# Test query the feature group (offline)
feature_group_name = "hospital_readmissions_features"
featurestore_runtime = boto3.client("sagemaker-featurestore-runtime")

response = featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name,
    RecordIdentifierValueAsString="3"  # Change to an actual stored value
)
feature_data = {item["FeatureName"]: item["ValueAsString"] for item in response["Record"]}
df_online = pd.DataFrame([feature_data])
display(df_online)

Unnamed: 0,age,time_in_hospital,n_lab_procedures,n_procedures,n_medications,n_outpatient,n_inpatient,n_emergency,medical_specialty,diag_1,diag_2,diag_3,glucose_test,a1ctest,change,diabetes_med,readmitted,event_time
0,3,2,61,3,15,0,0,0,2,7,1,6,1,1,1,1,0,2025-02-23T23:49:39Z


In [5]:
print(feature_group_name)

hospital_readmissions_features


In [6]:
import boto3

glue_client = boto3.client("glue")

# List databases in AWS Glue
response = glue_client.get_databases()
print("\nAvailable Databases in Glue:")
for db in response["DatabaseList"]:
    print(f"- {db['Name']}")

# List tables in the `sagemaker_featurestore` database (if it exists)
database_name = "sagemaker_featurestore"

try:
    response = glue_client.get_tables(DatabaseName=database_name)
    print(f"\nTables in `{database_name}` database:")
    for table in response["TableList"]:
        print(f"- {table['Name']}")
except glue_client.exceptions.EntityNotFoundException:
    print(f"\nDatabase `{database_name}` not found in Glue.")


Available Databases in Glue:
- default
- group_project_db
- sagemaker_featurestore

Tables in `sagemaker_featurestore` database:
- hospital_readmissions_features_1740354579
- hospital_readmissions_features_1740867183_1740867183
- hospital_readmissions_features_1740867247_1740867247


In [7]:
# Test offline queries
from pyathena import connect
import pandas as pd

# Query the feature store in Athena

latest_table = response["TableList"][-1]
query = f"""
SELECT * 
FROM "sagemaker_featurestore"."{latest_table["Name"]}"
"""

# Connect to Athena
connection = connect(
    s3_staging_dir=f"s3://{bucket_name}/athena-results/",
    region_name="us-east-1"
)

# Retrieve all feature data
df_all_features = pd.read_sql(query, connection)
display(df_all_features)

  df_all_features = pd.read_sql(query, connection)


Unnamed: 0,age,time_in_hospital,n_lab_procedures,n_procedures,n_medications,n_outpatient,n_inpatient,n_emergency,medical_specialty,diag_1,...,diag_3,glucose_test,a1ctest,change,diabetes_med,readmitted,event_time,write_time,api_invocation_time,is_deleted
0,3,4,54,3,15,0,0,0,4,0,...,0,1,1,1,1,0,2025-03-01T22:14:07Z,2025-03-01 22:16:06.820,2025-03-01 22:15:18,False
1,2,1,46,0,8,0,0,0,1,7,...,6,1,1,0,1,0,2025-03-01T22:14:07Z,2025-03-01 22:15:26.549,2025-03-01 22:14:30,False
2,3,4,68,0,23,0,1,1,4,7,...,6,1,1,0,1,0,2025-03-01T22:14:07Z,2025-03-01 22:16:06.820,2025-03-01 22:15:18,False
3,3,12,51,5,22,0,1,0,4,0,...,0,1,1,0,0,1,2025-03-01T22:14:07Z,2025-03-01 22:16:06.820,2025-03-01 22:15:18,False
4,2,6,34,4,16,0,2,0,0,0,...,7,1,1,0,1,0,2025-03-01T22:14:07Z,2025-03-01 22:15:26.549,2025-03-01 22:14:30,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
16454,1,6,50,6,14,0,0,0,4,0,...,1,1,0,0,0,1,2025-03-01T22:14:07Z,2025-03-01 22:15:44.707,2025-03-01 22:15:43,False
16455,1,2,1,4,14,0,0,0,2,6,...,6,1,1,1,1,0,2025-03-01T22:14:07Z,2025-03-01 22:15:44.707,2025-03-01 22:15:43,False
16456,1,8,86,4,28,1,5,3,4,7,...,0,1,1,1,1,1,2025-03-01T22:14:07Z,2025-03-01 22:15:44.707,2025-03-01 22:15:43,False
16457,1,2,46,3,11,0,0,0,0,0,...,1,1,0,0,1,0,2025-03-01T22:14:07Z,2025-03-01 22:15:44.707,2025-03-01 22:15:43,False
