## 6. Create Feature Store

Code learned and followed from AAI 540 Lab 3

In [2]:
!pip install --upgrade pip

Collecting pip
  Using cached pip-24.2-py3-none-any.whl.metadata (3.6 kB)
Using cached pip-24.2-py3-none-any.whl (1.8 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.0
    Uninstalling pip-24.0:
      Successfully uninstalled pip-24.0
Successfully installed pip-24.2
[0m

In [3]:
!pip install -q PyAthena

[0m

In [4]:
import boto3
import sagemaker

original_boto3_version = boto3.__version__
%pip install 'boto3>1.17.21'

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
[0mNote: you may need to restart the kernel to use updated packages.


In [38]:
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

In [39]:
# Declare bucket name and project-specific prefix
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-featurestore-predictive-maintenance"

# Print the default bucket name and prefix
print(f"Default S3 Bucket: {default_s3_bucket_name}")
print(f"Prefix: {prefix}")

Default S3 Bucket: sagemaker-us-east-1-807494057176
Prefix: sagemaker-featurestore-predictive-maintenance


In [40]:
from sagemaker import get_execution_role

# call for the role information.
role = get_execution_role()
print(role)

arn:aws:iam::807494057176:role/LabRole


In [41]:
from pyathena import connect

In [42]:
import pandas as pd
sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

# Print some information
print(f"Connected to S3 bucket: {bucket}")
print(f"Using execution role: {role}")
print(f"Using region: {region}")

Connected to S3 bucket: sagemaker-us-east-1-807494057176
Using execution role: arn:aws:iam::807494057176:role/LabRole
Using region: us-east-1


In [52]:
# Set S3 path to Parquet data
s3_path_parquet = "s3://{}/AAI-540_Predictive-Maintenance-for-Pharmaceutical-Manufacturing-Equipment/data/parquet".format(bucket)

# Set Athena parameters
database_name = "dsoawsfp"
table_name_csv = "predictive_maintenance_csv"
table_name_parquet = "predictive_maintenance_parquet"

In [53]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [54]:
# Check the default bucket
print("Default S3 Bucket: ", bucket)

# Print the S3 staging directory path
print("S3 Staging Directory: ", s3_staging_dir)

Default S3 Bucket:  sagemaker-us-east-1-807494057176
S3 Staging Directory:  s3://sagemaker-us-east-1-807494057176/athena/staging


In [55]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [56]:
# Define the SQL statement with casting to timestamp
statement = """SELECT * FROM {}.{}
    WHERE date >= CAST('2015-01-01' AS timestamp)
    AND date <= CAST('2015-11-02' AS timestamp)
    LIMIT 100""".format(database_name, table_name_parquet)

# Execute the query using pandas
df_result = pd.read_sql(statement, conn)

# Display the first few rows of the result
print(df_result.head())


  df_result = pd.read_sql(statement, conn)


        date    device  failure      metric1  metric2  metric3  metric4  \
0 2015-01-01  S1F01085        0  215630672.0     55.0      0.0     52.0   
1 2015-01-01  S1F0166B        0   61370680.0      0.0      3.0      0.0   
2 2015-01-01  S1F01E6Y        0  173295968.0      0.0      0.0      0.0   
3 2015-01-01  S1F01JE0        0   79694024.0      0.0      0.0      0.0   
4 2015-01-01  S1F01R2B        0  135970480.0      0.0      0.0      0.0   

   metric5   metric6  metric7  metric8  metric9  
0      6.0  407438.0      0.0      0.0      7.0  
1      6.0  403174.0      0.0      0.0      0.0  
2     12.0  237394.0      0.0      0.0      0.0  
3      6.0  410186.0      0.0      0.0      0.0  
4     15.0  313173.0      0.0      0.0      3.0  


In [57]:
# Execute the query using pandas and Athena connection
predictive_maintenance_df = pd.read_sql(statement, conn)

# Display the first few rows of the result
print(predictive_maintenance_df.head())

  predictive_maintenance_df = pd.read_sql(statement, conn)


        date    device  failure      metric1  metric2  metric3  metric4  \
0 2015-01-01  S1F01085        0  215630672.0     55.0      0.0     52.0   
1 2015-01-01  S1F0166B        0   61370680.0      0.0      3.0      0.0   
2 2015-01-01  S1F01E6Y        0  173295968.0      0.0      0.0      0.0   
3 2015-01-01  S1F01JE0        0   79694024.0      0.0      0.0      0.0   
4 2015-01-01  S1F01R2B        0  135970480.0      0.0      0.0      0.0   

   metric5   metric6  metric7  metric8  metric9  
0      6.0  407438.0      0.0      0.0      7.0  
1      6.0  403174.0      0.0      0.0      0.0  
2     12.0  237394.0      0.0      0.0      0.0  
3      6.0  410186.0      0.0      0.0      0.0  
4     15.0  313173.0      0.0      0.0      3.0  


In [58]:
# # Convert the 'date' column to string
# predictive_maintenance_df['date'] = predictive_maintenance_df['date'].astype(str)

# # Display the dataframe
# predictive_maintenance_df

### Define FeatureGroups

In [59]:
from time import gmtime, strftime, sleep

# Define the name for your predictive maintenance feature group
predictive_maintenance_feature_group_name = "predictive-maintenance-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

# Print the feature group name to confirm
print(predictive_maintenance_feature_group_name)

predictive-maintenance-feature-group-16-16-39-45


In [60]:
from sagemaker.feature_store.feature_group import FeatureGroup

# Create the FeatureGroup object with specific name
predictive_maintenance_feature_group = FeatureGroup(
    name=predictive_maintenance_feature_group_name,  # feature group name
    sagemaker_session=feature_store_session  # session set up before
)

# Print to verify the setup
print("Feature Group object created:", predictive_maintenance_feature_group)


Feature Group object created: FeatureGroup(name='predictive-maintenance-feature-group-16-16-39-45', sagemaker_session=<sagemaker.session.Session object at 0x7fc8896a7d30>, feature_definitions=[])


In [61]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import time

def clean_feature_eng(df):
    # Convert 'date' to datetime
    df['date'] = pd.to_datetime(df['date'])
    
    # Extract time-based features
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    df['day'] = df['date'].dt.day
    df['dayofweek'] = df['date'].dt.dayofweek
    
    # Handle highly skewed metrics using log transformation
    skewed_metrics = ['metric2', 'metric3', 'metric4', 'metric7', 'metric8', 'metric9']
    for metric in skewed_metrics:
        df[f'{metric}_log'] = np.log1p(df[metric])
    
    # Create simple aggregated features per device
    device_aggs = df.groupby('device').agg({
        'metric1': ['mean'],
        'metric5': ['mean'],
        'metric6': ['mean']
    })
    device_aggs.columns = ['_'.join(col).strip() for col in device_aggs.columns.values]
    
    # Merge aggregated features back to the main dataframe
    df = df.merge(device_aggs, on='device', how='left')
    
    # Encode categorical variables (device in this case)
    df['device'] = pd.factorize(df['device'])[0]
    
    # Normalize numeric columns using StandardScaler
    scaler = StandardScaler()
    numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
    df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
    
    # Drop the original 'date' column
    df = df.drop('date', axis=1)
    
    # Add EventTime and RecordId for feature store ingestion
    current_time_sec = int(round(time.time()))
    df['EventTime'] = pd.Series([current_time_sec] * len(df), dtype='float64')
    df['RecordId'] = df.index.astype(str)
    
    return df

# Example of using the function
# df_result is your loaded dataset
cleaned_df = clean_feature_eng(df_result)

# Now `cleaned_df` can be ingested into SageMaker Feature Store
print(cleaned_df.head())

     device  failure   metric1   metric2   metric3   metric4   metric5  \
0 -1.714816      0.0  1.417342 -0.246443 -0.110115  0.171636 -0.332573   
1 -1.680173      0.0 -0.763582 -0.268246 -0.108906 -0.219472 -0.332573   
2 -1.645531      0.0  0.818815 -0.268246 -0.110115 -0.219472 -0.015333   
3 -1.610888      0.0 -0.504527 -0.268246 -0.110115 -0.219472 -0.332573   
4 -1.576245      0.0  0.291108 -0.268246 -0.110115 -0.219472  0.143287   

    metric6   metric7   metric8  ...  metric3_log  metric4_log  metric7_log  \
0  1.259997 -0.132039 -0.132039  ...    -0.309358     1.990467    -0.217197   
1  1.204178 -0.132039 -0.132039  ...     0.551067    -0.552496    -0.217197   
2 -0.965980 -0.132039 -0.132039  ...    -0.309358    -0.552496    -0.217197   
3  1.295970 -0.132039 -0.132039  ...    -0.309358    -0.552496    -0.217197   
4  0.026012 -0.132039 -0.132039  ...    -0.309358    -0.552496    -0.217197   

   metric8_log  metric9_log  metric1_mean  metric5_mean  metric6_mean  \
0    -0

In [30]:
df_engineered = clean_feature_eng(df_result)  # Assuming df_result is your dataset

# Verify the results
print("Dataframe Info:")
print(df_engineered.info())  # Check the structure and datatypes

print("\nMissing values:")
print(df_engineered.isnull().sum())  # Check for missing values in the transformed dataset

print("\nFirst few rows:")
print(df_engineered.head())  # Display the first few rows of the processed dataframe

Dataframe Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 26 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   device        100 non-null    float64
 1   failure       100 non-null    float64
 2   metric1       100 non-null    float64
 3   metric2       100 non-null    float64
 4   metric3       100 non-null    float64
 5   metric4       100 non-null    float64
 6   metric5       100 non-null    float64
 7   metric6       100 non-null    float64
 8   metric7       100 non-null    float64
 9   metric8       100 non-null    float64
 10  metric9       100 non-null    float64
 11  year          100 non-null    int32  
 12  month         100 non-null    int32  
 13  day           100 non-null    int32  
 14  dayofweek     100 non-null    int32  
 15  metric2_log   100 non-null    float64
 16  metric3_log   100 non-null    float64
 17  metric4_log   100 non-null    float64
 18  metric7_log   1

In [31]:
import time

# Get the current time in seconds
current_time_sec = int(round(time.time()))

# Add EventTime as the current timestamp (same for all rows)
df_engineered['EventTime'] = pd.Series([current_time_sec] * len(df_engineered), dtype='float64')

# Add RecordId by converting the index of the dataframe to a string
df_engineered['RecordId'] = df_engineered.index.astype(str)

# Display the dataframe with EventTime and RecordId
print(df_engineered[['EventTime', 'RecordId']].head())

      EventTime RecordId
0  1.729096e+09        0
1  1.729096e+09        1
2  1.729096e+09        2
3  1.729096e+09        3
4  1.729096e+09        4


In [37]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

# Create Feature Groups 
device_feature_group.create(
    s3_uri=f"s3://{default_bucket}/sagemaker-featurestore-predictive-maintenance/device_features",
    record_identifier_name="RecordId",
    event_time_feature_name="EventTime",
    role_arn=sagemaker.get_execution_role(),
    enable_online_store=True
)

metrics_feature_group.create(
    s3_uri=f"s3://{default_bucket}/sagemaker-featurestore-predictive-maintenance/metrics_features",
    record_identifier_name="RecordId",
    event_time_feature_name="EventTime",
    role_arn=sagemaker.get_execution_role(),
    enable_online_store=True
)

# Wait for Feature Group creation to complete
wait_for_feature_group_creation_complete(device_feature_group)
wait_for_feature_group_creation_complete(metrics_feature_group)


NameError: name 'default_bucket' is not defined

In [34]:
# Describe device feature group
device_description = device_feature_group.describe()
print(device_description)

# Describe metrics feature group
metrics_description = metrics_feature_group.describe()
print(metrics_description)

ResourceNotFound: An error occurred (ResourceNotFound) when calling the DescribeFeatureGroup operation: Resource Not Found: Amazon SageMaker can't find a FeatureGroup with name device-feature-group-16-16-29-15

In [None]:
sagemaker_client.list_feature_groups() 

### PutRecords into FeatureGroup

In [None]:
smartgrid_feature_group.ingest(data_frame=smartgrid_feature_group_df, max_workers=3, wait=True)

In [None]:
record_identifier_value = "2011-11-23 09:00:00"

featurestore_runtime.get_record(
    FeatureGroupName=smartgrid_feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value,
)

In [None]:
print(smartgrid_feature_group.as_hive_ddl())

In [None]:
s3_client = boto3.client("s3", region_name=region)

In [None]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

smartgrid_feature_group_resolved_output_s3_uri = (
    smartgrid_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

smartgrid_feature_group_s3_prefix = smartgrid_feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=smartgrid_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

In [None]:
smartgrid_query = smartgrid_feature_group.athena_query()

smartgrid_table = smartgrid_query.table_name

In [None]:
query_string = f'''
SELECT *
FROM 
    "{smartgrid_table}"
'''

print("Running " + query_string)

In [None]:
# run Athena query. The output is loaded to a Pandas dataframe.
# dataset = pd.DataFrame()
smartgrid_query.run(
    query_string=query_string,
    output_location="s3://" + default_s3_bucket_name + "/" + prefix + "/query_results/",
)
smartgrid_query.wait()
dataset = smartgrid_query.as_dataframe()

dataset

In [None]:
smartgrid_feature_group.delete()

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>