In [2]:
import boto3
import pandas as pd
from io import StringIO
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session
import time

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [3]:
# SageMaker session
session = sagemaker.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()  # IAM Role with permissions

# S3 Bucket & Files
bucket_name = 'melissafinalbucket'
file_keys = [
    'prod_csv/prod_flight_data.csv',
    'test_csv/test_flight_data.csv',
    'training_csv/train_flight_data.csv',
    'valid_csv/valid_flight_sales.csv'
]

# Feature Group Name
feature_group_name = "flight_data_feature_store"

# Create SageMaker Feature Store client
featurestore_runtime = boto3.client('sagemaker-featurestore-runtime', region_name=region)

In [4]:
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum  

feature_definitions = [
    FeatureDefinition(feature_name="SampleID", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="TimeStep", feature_type=FeatureTypeEnum.FRACTIONAL),  # Changed to FRACTIONAL
]

# Dynamically add Feature_0 to Feature_19 as FLOAT type
for i in range(20):
    feature_definitions.append(FeatureDefinition(feature_name=f"Feature_{i}", feature_type=FeatureTypeEnum.FRACTIONAL))  # Float type

# Add the Label column
feature_definitions.append(FeatureDefinition(feature_name="Label", feature_type=FeatureTypeEnum.INTEGRAL))  # Integer type

In [5]:
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=session)

# Define the S3 URI for offline feature storage
offline_store_s3_uri = f"s3://{bucket_name}/feature-store/"

# Create the Feature Group with definitions
feature_group = FeatureGroup(
    name=feature_group_name, 
    feature_definitions=feature_definitions,  # Pass feature definitions here
    sagemaker_session=session
)

# Create the Feature Group if it doesn't exist
try:
    feature_group.describe()
    print("Feature Group already exists.")
except:
    feature_group.create(
        record_identifier_name="SampleID",
        event_time_feature_name="TimeStep",
        role_arn=role,
        s3_uri=offline_store_s3_uri,  # Required S3 URI
        enable_online_store=True  # Enables real-time querying
    )
    print("Feature Group Created!")

Feature Group already exists.


In [None]:
# Read and ingest each file into the Feature Store
s3 = boto3.client('s3')

for file_key in file_keys:
    response = s3.get_object(Bucket=bucket_name, Key=file_key)
    csv_content = response['Body'].read().decode('utf-8')
    
    # Process CSV in chunks to reduce memory usage
    chunk_size = 1000  # Adjust based on your system
    for chunk in pd.read_csv(StringIO(csv_content), chunksize=chunk_size):
        chunk["TimeStep"] = chunk["TimeStep"].astype(str)
        
        # Convert 'Label' to integer type
        chunk["Label"] = pd.to_numeric(chunk["Label"], errors='coerce').fillna(0).astype(int)

        records = chunk.to_dict(orient="records")
        
        for record in records:
            featurestore_runtime.put_record(
                FeatureGroupName=feature_group_name,
                Record=[{"FeatureName": key, "ValueAsString": str(value)} for key, value in record.items()]
            )
    
    print(f"Ingested data from {file_key}")

In [None]:
# Wait for feature group to be ready
print("Waiting for feature group to be ready...")
time.sleep(300)  # Wait 5 minutes for processing

# Describe the feature group to confirm
feature_group.describe()

In [None]:
query = feature_group.athena_query()
query_output_location = f"s3://{bucket_name}/query_results/"

query.run(query_string=f'SELECT * FROM "{feature_group_name}" LIMIT 10;', output_location=query_output_location)
query.wait()

# Retrieve the query results
df_results = query.as_dataframe()
print(df_results.head())