# **Using Health Record Data to Detect Heart Failure with SageMaker Feature Store**

This notebook runs with Kernel Python 3 (Data Science).

Note: These policy must be attached to the execution role of the AmazonSageMaker Studio
- AmazonSageMakerFullAccess
- AmazonSageMakerFeatureStoreAccess
- AmazonS3FullAccess

## **Check policy:**
1. Copy your `execution role name`
2. Go to Servise `IAM`
3. Select `Roles` under `Access management`
4. Search the role name copied and check list under `policy name`
5. Use `Attach policies` if needed

## **Terminology:**
- `Feature Group` - A FeatureGroup is the main Feature Store resource that contains the metadata for all the data stored in Amazon SageMaker Feature Store. A FeatureGroup is a logical grouping of features, defined in the feature store, to describe record. A FeatureGroup's definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store.
- `Feature definition` - A FeatureDefinition consists of a name and one of the following data types: an Integral, String or Fractional. A FeatureGroup contains a list of feature definitions.
- `Record identifier name` - Each feature group is defined with a record identifier name. The record identifier name must refer to one of the names of a feature defined in the feature group's feature definitions.
- `Event time` - A point in time when a new event occurs that corresponds to the creation or update of a record in a feature group. All records in the feature group must have a corresponding Eventtime. It can be used to track changes to a record over time. The online store contains the record corresponding to the last Eventtime for a record identifier name, whereas the offline store contains all historic records.
- `Online store` - The low latency, high availability cache for a feature group that enables real-time lookup of records. The online store allows quick access to the latest value for a Record via the GetRecord API. A feature group contains an OnlineStoreConfig controlling where the data is stored.
- `Offline store` - The OfflineStore, stores historical data in your S3 bucket. It is used when low (sub-second) latency reads are not needed. For example, when you want to store and serve features for exploration, model training, and batch inference. A feature group contains and OfflineStoreConfig controlling where the data is stored.

## Setup SageMaker FeatureStore
Start with setting up the SageMaker Python SDK and boto client

`boto3` -  A AWS SDK for Python for creating, configuring, and managing AWS services.

`sagemaker.session.Session()` - https://sagemaker.readthedocs.io/en/stable/api/utility/session.html &
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#module-boto3.session

In [2]:
import boto3
import sagemaker
from sagemaker.session import Session

# the default region when creating new connections. Returns us-east-2
region = boto3.Session().region_name

# create a session
boto_session = boto3.Session(region_name=region)

# create a service client
# param: servise_name - the name of a service
# param: region_name - the name of the region associated with the client
# can get a list of available services via get_available_services()
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

# initialize a SageMaker Session
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

### Set up IAM Role
`sagemaker.get_execution_role()` - Return the role ARN whose credentials are used to call the API.

In [3]:
from sagemaker import get_execution_role

role = get_execution_role()
print(role)

arn:aws:iam::773441300604:role/service-role/AmazonSageMaker-ExecutionRole-20211102T144581


### Set up S3 Bucket For The Offline Store
SageMaker FeatureStore writes the data in the `OfflineStore` of a `FeatureGroup` to a S3 bucket owned by you. To be able to write to your S3 bucket, SageMaker FeatureStore assumes an IAM role which has access to the bucket. the role is also owned by you. Note that the same bucket can be re-used across `FeatureGroups`. Data in the bucket is partitioned by Feature Group.

In [8]:
# the default Amazon S3 bucket to be used by this session
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'feature_store_demo'
print(default_s3_bucket_name)

sagemaker-us-east-2-773441300604


## Inspect Dataset
This demo will use the [Heart Failure Medical Record Dataset](https://archive.ics.uci.edu/ml/datasets/Heart+failure+clinical+records)

`IPython.dislpay.display()` - https://ipython.readthedocs.io/en/stable/api/generated/IPython.display.html#IPython.display.display

In [9]:
import pandas as pd
from IPython.display import display

In [10]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00519/heart_failure_clinical_records_dataset.csv -O data/clinical_records_dataset.csv

--2021-11-02 18:48:35--  https://archive.ics.uci.edu/ml/machine-learning-databases/00519/heart_failure_clinical_records_dataset.csv
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12239 (12K) [application/x-httpd-php]
Saving to: ‘data/clinical_records_dataset.csv’


2021-11-02 18:48:35 (217 MB/s) - ‘data/clinical_records_dataset.csv’ saved [12239/12239]



In [11]:
# upload data to s3 bucket
!aws s3 cp ./data/clinical_records_dataset.csv s3://$default_s3_bucket_name/$prefix/data/

upload: data/clinical_records_dataset.csv to s3://sagemaker-us-east-2-773441300604/feature_store_demo/data/clinical_records_dataset.csv


In [13]:
clinical_data_file_name = 'clinical_records_dataset.csv'
# this is the path to get the data in the s3 bucket
clinical_data_path = "s3://{}/{}/data/{}".format(default_s3_bucket_name, prefix, clinical_data_file_name)
# use the data path to retrieve the data
clinical_df = pd.read_csv(clinical_data_path)
pd.set_option('display.max_columns', 500)
clinical_df.head()

Unnamed: 0,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT
0,75.0,0,582,0,20,1,265000.0,1.9,130,1,0,4,1
1,55.0,0,7861,0,38,0,263358.03,1.1,136,1,0,6,1
2,65.0,0,146,0,20,0,162000.0,1.3,129,1,1,7,1
3,50.0,1,111,0,20,0,210000.0,1.9,137,1,0,7,1
4,65.0,1,160,1,20,0,327000.0,2.7,116,0,0,8,1


In [14]:
# check if there is data missing in the dataframe
print('percentage of the value missing in each column is: ')

clinical_df.isnull().sum() / len(clinical_df)

percentage of the value missing in each column is: 


age                         0.0
anaemia                     0.0
creatinine_phosphokinase    0.0
diabetes                    0.0
ejection_fraction           0.0
high_blood_pressure         0.0
platelets                   0.0
serum_creatinine            0.0
serum_sodium                0.0
sex                         0.0
smoking                     0.0
time                        0.0
DEATH_EVENT                 0.0
dtype: float64

In this demo, we do not need to do any pre-processing because there is no missing value.

## Prepare data for FeatureStore
In the Amazon SageMaker Feature Store API, a feature is an attribute of a record. You can define a name and type for every feature stored in Feature Store. Name uniquely identifies a feature within a feature group. Type identifies the datatype for the values of the feature. Supported datatypes are: string, Integral and Fractional.

Take a look at the data types and making sure they are all correct and readable by Feature Store. SageMaker Feature Store python SDK will map the string dtype to String feature type.

In SageMaker Feature Store, a `record` is a collection of values for features for a single record identifier value. Specific features are flagged with record identifier and event time, and the combination of a record identifier name and a timestamp uniquely identify a record within a feature group. We will need to specify a record identifier and an event time in this case, and since the raw data does not contain these two columns, we will need to create them.

- For record identifier name: a record is a collection of values for features for a single record identifier value. In this case, we will create an unique ID for each patient and use this as the record identifier.
- For event time feature name: the event time refers to a point in the time when a new event occurred and corresponds to the creation or update of a record in a feature group. The event time can be used to track changes to a record over time. For example, in this use case, EventTime can be appended to your data when no timestamp is available. In the following code, you can see how EventTime is appended to the clinicla data.

### Create an unique ID for each patient

In [15]:
# add unique patient id for each patient
clinical_df.reset_index(inplace=True)
clinical_df.rename(columns={'index': 'patient_id'}, inplace=True)

In [16]:
# check the types
clinical_df.dtypes

patient_id                    int64
age                         float64
anaemia                       int64
creatinine_phosphokinase      int64
diabetes                      int64
ejection_fraction             int64
high_blood_pressure           int64
platelets                   float64
serum_creatinine            float64
serum_sodium                  int64
sex                           int64
smoking                       int64
time                          int64
DEATH_EVENT                   int64
dtype: object

In [19]:
# we want the unique id to be reated as a string ID not an integer
clinical_df['patient_id'] = clinical_df['patient_id'].astype(object)

### Create a TimeStamp for each Record

In [20]:
# Append EventTime
import time

current_time_sec = int(round(time.time()))
clinical_df['event_time'] = pd.Series([current_time_sec]*len(clinical_df), dtype='float64')

clinical_df.head()

Unnamed: 0,patient_id,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT,event_time
0,0,75.0,0,582,0,20,1,265000.0,1.9,130,1,0,4,1,1635879000.0
1,1,55.0,0,7861,0,38,0,263358.03,1.1,136,1,0,6,1,1635879000.0
2,2,65.0,0,146,0,20,0,162000.0,1.3,129,1,1,7,1,1635879000.0
3,3,50.0,1,111,0,20,0,210000.0,1.9,137,1,0,7,1,1635879000.0
4,4,65.0,1,160,1,20,0,327000.0,2.7,116,0,0,8,1,1635879000.0


### Check data types for each column

In [21]:
def cast_object_to_string(df):
    for label in df.columns:
        if df.dtypes[label] == 'object':
            df[label] = df[label].astype('str').astype('string')
            
# cast obkect to string
# The SageMaker FeatureStore Python SDK will map the string dtype to String feature type
# Other types like float and int will be mapped to fractional and integral
cast_object_to_string(clinical_df)

In [22]:
clinical_df.dtypes

patient_id                   string
age                         float64
anaemia                       int64
creatinine_phosphokinase      int64
diabetes                      int64
ejection_fraction             int64
high_blood_pressure           int64
platelets                   float64
serum_creatinine            float64
serum_sodium                  int64
sex                           int64
smoking                       int64
time                          int64
DEATH_EVENT                   int64
event_time                  float64
dtype: object

## Create Features
In this step we will create the FeatureGroup representing the patients' medical record data, then ingest the data into the created FeatureGroup.

## Assign a feature group name

In [23]:
from time import gmtime, strftime, sleep

clinical_feature_group_name = 'clinical-feature-group-' + strftime('%d-%H-%M-%S', gmtime())
clinical_feature_group_name

'clinical-feature-group-02-18-50-36'

### Configure a FeatureGroup

`sagemaker.feature_store.feature_group.FeatureGroup` - https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_group.FeatureGroup

In [24]:
from sagemaker.feature_store.feature_group import FeatureGroup

# This class instantiates a FeatureGroup object that comprises of a name for the FeatureGroup, 
# session instance, and a list of feature definition objects i.e., FeatureDefinition.
clinical_feature_group = FeatureGroup(
    name=clinical_feature_group_name, 
    sagemaker_session=feature_store_session
)

### Define Identifier
In this step, we will specify the record identifier name and the event time feature name

In [25]:
# record identifier and event time feature names
record_identifier_feature_name = 'patient_id'
event_time_feature_name = 'event_time'

### Load feature definitions to the feature group

In [26]:
clinical_feature_group.load_feature_definitions(data_frame=clinical_df)

[FeatureDefinition(feature_name='patient_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='age', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='anaemia', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='creatinine_phosphokinase', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='diabetes', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='ejection_fraction', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='high_blood_pressure', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='platelets', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='serum_creatinine', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='serum_sodium', feature_type=<Feature

### Create FeatureGroup
In this step, we will use the create function to create the feature group. Note that the online store is not created by default, se you must set this as `True` if you want to enable it. The `s3_url` is the S3 bucket location of your offline store.

In [28]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get('FeatureGroupStatus')
    while status == 'Creating':
        print('Waiting for Feature Group Creation')
        time.sleep(5)
        status = feature_group.describe().get('FeatureGroupStatus')
    if status != 'Created':
        raise RuntimeError(f'Failed to create feature group {feature_group.name}')
    print(f'FeatureGroup {feature_group.name} successfully created.')

clinical_feature_group.create(
    s3_uri = f's3://{default_s3_bucket_name}/{prefix}',
    record_identifier_name = record_identifier_feature_name,
    event_time_feature_name = event_time_feature_name,
    role_arn = role,
    enable_online_store = True
)

wait_for_feature_group_creation_complete(clinical_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup clinical-feature-group-02-18-50-36 successfully created.


## Work with your FeatureGroup
### Check FeatureGroup Info
When you create a feature group, it takes time to load the data, and you need to wait until the feature gourp is created before you can use it. You can check status using the DescirbeFeatureGroup and ListFeatureGroups APIs.

In [29]:
clinical_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-2:773441300604:feature-group/clinical-feature-group-02-18-50-36',
 'FeatureGroupName': 'clinical-feature-group-02-18-50-36',
 'RecordIdentifierFeatureName': 'patient_id',
 'EventTimeFeatureName': 'event_time',
 'FeatureDefinitions': [{'FeatureName': 'patient_id', 'FeatureType': 'String'},
  {'FeatureName': 'age', 'FeatureType': 'Fractional'},
  {'FeatureName': 'anaemia', 'FeatureType': 'Integral'},
  {'FeatureName': 'creatinine_phosphokinase', 'FeatureType': 'Integral'},
  {'FeatureName': 'diabetes', 'FeatureType': 'Integral'},
  {'FeatureName': 'ejection_fraction', 'FeatureType': 'Integral'},
  {'FeatureName': 'high_blood_pressure', 'FeatureType': 'Integral'},
  {'FeatureName': 'platelets', 'FeatureType': 'Fractional'},
  {'FeatureName': 'serum_creatinine', 'FeatureType': 'Fractional'},
  {'FeatureName': 'serum_sodium', 'FeatureType': 'Integral'},
  {'FeatureName': 'sex', 'FeatureType': 'Integral'},
  {'FeatureName': 'smoking', 'FeatureTy

In [30]:
# use boto client to list FeatureGroups
sagemaker_client.list_feature_groups()

{'FeatureGroupSummaries': [{'FeatureGroupName': 'clinical-feature-group-02-18-50-36',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-2:773441300604:feature-group/clinical-feature-group-02-18-50-36',
   'CreationTime': datetime.datetime(2021, 11, 2, 18, 51, 22, 827000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'clinical-feature-group-02-18-25-30',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-2:773441300604:feature-group/clinical-feature-group-02-18-25-30',
   'CreationTime': datetime.datetime(2021, 11, 2, 18, 25, 31, 328000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created',
   'OfflineStoreStatus': {'Status': 'Active'}}],
 'ResponseMetadata': {'RequestId': 'be30cd79-c652-4c04-bb7d-a3e431c25ca6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'be30cd79-c652-4c04-bb7d-a3e431c25ca6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '530',
   'date': 'Tue, 02 Nov 2021 18:51:55 GMT'},
  'RetryAttempts': 0}}

### Put Records into the Feature Store
After the FeatureGroups have been created, we can put data into the FeatureGroups by using the **PutRecord API**. This API can handle high TPS and is designed to be called by different streams. The data from all the these Put requests is buffered and written to S3 in chunks. the files will be written to the offline store within a few minutes of ingestion. You can use the ingest funciton to load your feature data. You pass in a data frame of feature data, set the number of workers, and choose to wait for it to return or not. For this example, to accelerate the ingestion process, we are specifying multiple workers to do the job simultaneouly. It will take <1min to ingest data to the Clinical FeatureGroup we created.

In [31]:
clinical_feature_group.ingest(
    data_frame = clinical_df, 
    max_workers = 3, 
    wait = True
)

IngestionManagerPandas(feature_group_name='clinical-feature-group-02-18-50-36', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f2508da83d0>, max_workers=3, max_processes=1, _async_result=<multiprocess.pool.MapResult object at 0x7f2504399990>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

### Get Records from a FeatureGroup
We can use the get_record funciton to retrieve the data for a specific record by its record identifier from the online store.

In [33]:
record_identifier_value = str(200)
featurestore_runtime.get_record(
    FeatureGroupName=clinical_feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value
)

{'ResponseMetadata': {'RequestId': '801a49fb-46d5-4cc8-96f9-f38bf5bbfcee',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '801a49fb-46d5-4cc8-96f9-f38bf5bbfcee',
   'content-type': 'application/json',
   'content-length': '789',
   'date': 'Tue, 02 Nov 2021 18:52:55 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'patient_id', 'ValueAsString': '200'},
  {'FeatureName': 'age', 'ValueAsString': '63.0'},
  {'FeatureName': 'anaemia', 'ValueAsString': '1'},
  {'FeatureName': 'creatinine_phosphokinase', 'ValueAsString': '1767'},
  {'FeatureName': 'diabetes', 'ValueAsString': '0'},
  {'FeatureName': 'ejection_fraction', 'ValueAsString': '45'},
  {'FeatureName': 'high_blood_pressure', 'ValueAsString': '0'},
  {'FeatureName': 'platelets', 'ValueAsString': '73000.0'},
  {'FeatureName': 'serum_creatinine', 'ValueAsString': '0.7'},
  {'FeatureName': 'serum_sodium', 'ValueAsString': '137'},
  {'FeatureName': 'sex', 'ValueAsString': '1'},
  {'FeatureName': 'smoking', 'Value

### Generate hive DDL Commands
The SageMaker Python SDK's FeatureStore class also provides the functionality to generate Hive DDL commands. the schema of the table is generated based on the feature definitions. Columns are named after feature naem and data-type are inferred based on feature type.

In [34]:
print(clinical_feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.clinical-feature-group-02-18-50-36 (
  patient_id STRING
  age FLOAT
  anaemia INT
  creatinine_phosphokinase INT
  diabetes INT
  ejection_fraction INT
  high_blood_pressure INT
  platelets FLOAT
  serum_creatinine FLOAT
  serum_sodium INT
  sex INT
  smoking INT
  time INT
  DEATH_EVENT INT
  event_time FLOAT
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION 's3://sagemaker-us-east-2-773441300604/feature_store_demo/773441300604/sagemaker/us-east-2/offline-store/clinical-feature-group-02-18-50-36-1635879082/data'


Wait for the data to appear in the offline store before moving forward to createing a dataset. This will take approximately 5 minutes. SageMaker FeatureStore adds metadata for each record that's ingested into the oflfine store

In [35]:
s3_client = boto3.client('s3', region_name = region)
account_id = boto3.client('sts').get_caller_identity()['Account']
clinical_feature_group_resolved_output_s3_uri = clinical_feature_group.describe().get('OfflineStoreConfig').get('DataCatalogConfig').get('TableName')

clinical_feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + clinical_feature_group_resolved_output_s3_uri + '/data'

offline_store_contents = None
while (offline_store_contents is None):
    objects_in_bucket = s3_client.list_objects(Bucket=default_s3_bucket_name, Prefix=clinical_feature_group_s3_prefix)
    if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) >= 1):
        offline_store_contents = objects_in_bucket['Contents']
    else:
        print('Waiting fot data in offline store...\n')
        sleep(60)
    
print('Data available')

Waiting fot data in offline store...

Waiting fot data in offline store...

Waiting fot data in offline store...

Waiting fot data in offline store...

Data available


## Build a Training Dataset
SageMaker FeatureStore automatically builds an AWS Glue data catalog when you create feature groups and you can turn this off if you want. We will create a training dataset with FeatureValues form the clinical FeatureGroup. This is down by utilizing the auto-built Catalog. We run an Athena query that does a simple select all in the offline store in S3 from the FeatureGroup.

For model testing purpose, we'll leave out 9 records when creating the training dataset, so that we can use those 9 records as test data for after model training. You can also do a train/test split.

athena_query() - Create an AthenaQuery instance
https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_group.AthenaQuery

In [39]:
clinical_query = clinical_feature_group.athena_query()
clinical_table = clinical_query.table_name
clinical_table

'clinical-feature-group-02-18-50-36-1635879082'

In [40]:
# Athena query, left out 9 data for testing
query_string = 'SELECT * FROM "' + clinical_table + '" LIMIT 290'

# run Athena query. The output is loaded to a Pandas dataframe
dataset = pd.DataFrame()
clinical_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/'+prefix+'/query_results/')
clinical_query.wait()
# download the result of the current query and load it into DataFrame
dataset = clinical_query.as_dataframe()
dataset.head()

Unnamed: 0,patient_id,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,death_event,event_time,write_time,api_invocation_time,is_deleted
0,231,70.0,0,93,0,35,0,185000.0,1.1,134,1,1,208,0,1635879000.0,2021-11-02 18:57:30.618,2021-11-02 18:52:09.000,False
1,92,42.0,0,582,0,60,0,263358.03,1.18,137,0,0,82,0,1635879000.0,2021-11-02 18:57:30.618,2021-11-02 18:52:10.000,False
2,120,60.0,1,737,0,60,1,210000.0,1.5,135,1,1,95,0,1635879000.0,2021-11-02 18:57:30.618,2021-11-02 18:52:10.000,False
3,168,65.0,0,582,1,40,0,270000.0,1.0,138,0,0,140,0,1635879000.0,2021-11-02 18:57:30.618,2021-11-02 18:52:10.000,False
4,212,78.0,0,224,0,50,0,481000.0,1.4,138,1,1,192,0,1635879000.0,2021-11-02 18:57:30.547,2021-11-02 18:52:09.000,False


In [42]:
# check which id is not in the DataFrame
id_for_test = []
for i in range(299):
    if i not in dataset['patient_id'].unique():
        id_for_test.append(i)
id_for_test

[77, 81, 91, 123, 124, 201, 250, 252, 276]

### Prepare dataset for training

In [43]:
# Get execution status of the current query
query_execution = clinical_query.get_query_execution()
query_result = 's3://'+default_s3_bucket_name+'/'+prefix+'/query_results/'+query_execution['QueryExecution']['QueryExecutionId']+'.csv'
print(query_result)

s3://sagemaker-us-east-2-773441300604/feature_store_demo/query_results/23851f8c-0dd8-4233-8d9b-1a4cf14754e4.csv


In [44]:
# Select usefule columns for training with target column as the first column
dataset = dataset[['death_event', 'age', 'anaemia', 'creatinine_phosphokinase', 'diabetes', 
                   'ejection_fraction', 'high_blood_pressure', 'platelets', 'serum_creatinine',
                   'serum_sodium', 'sex', 'smoking', 'time']]

# write to csv in S3 withouth headers and index column
dataset.to_csv('data/dataset.csv', header=False, index=False)
s3_client.upload_file('data/dataset.csv', default_s3_bucket_name, prefix+'/training_input/dataset.csv')
dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/'

In [45]:
dataset.head(5)

Unnamed: 0,death_event,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time
0,0,70.0,0,93,0,35,0,185000.0,1.1,134,1,1,208
1,0,42.0,0,582,0,60,0,263358.03,1.18,137,0,0,82
2,0,60.0,1,737,0,60,1,210000.0,1.5,135,1,1,95
3,0,65.0,0,582,1,40,0,270000.0,1.0,138,0,0,140
4,0,78.0,0,224,0,50,0,481000.0,1.4,138,1,1,192


## Train and Deploy the Model
We will use a SageMaker built-in algorithm, XGBoost, to predict if a patient is likely to have a heart failure.

sagemaker.image_uris.retrieve() - Retrieves the ECR URI for the Docker image matching the given arguments.
https://sagemaker.readthedocs.io/en/stable/api/utility/image_uris.html?highlight=Sagemaker.image_uris

In [46]:
training_image = sagemaker.image_uris.retrieve('xgboost', region, '1.0-1')

sagemkaer.estimator.Estimator() - A generic Estimator to train using any supplied algorithm.
https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html?highlight=Sagemaker.estimator#sagemaker.estimator.Estimator

In [47]:
training_output_path = 's3://' + default_s3_bucket_name + '/' + prefix + '/training-output'

from sagemaker.estimator import Estimator
training_model = Estimator(
    training_image,     # Container image to use for training
    role,               # An AWS IAM role
    instance_count=1,   # Number of Amazon EC2 instances to use for training
    instance_type='ml.m5.2xlarge',  # Type of EC2 instance to use for training
    volumn_size=5,      # Size in GB of the EBS volume to use for storing input data during training
    max_run=3600,  # Timeout in seconds for training. SageMaker terminates the job after this amount of time
    input_mode='File',  # The input mode that the algorithm support
    output_path=training_output_path,        # S3 location for saving the training result
    sagemaker_session=feature_store_session  # Specify session object
)

In [48]:
# use default hyperparameter because we are not trying to get the best result
training_model.set_hyperparameters(objective='binary:logistic',
                                   num_round=50)

### Specify training dataset to the dataset we just created

sagemaker.inputs.TrainingInput() - Create a definition for input data used by an SageMaker training job.
https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html?highlight=Sagemaker.inputs#sagemaker.inputs.TrainingInput

In [49]:
train_data = sagemaker.inputs.TrainingInput(
    dataset_uri_prefix,       # Location of S3 data
    distribution='FullyReplicated',  # Valid values: ‘FullyReplicated’, ‘ShardedByS3Key’
    content_type='text/csv',  # MIME type of the input data
    s3_data_type='S3Prefix'   # If ‘S3Prefix’, s3_data defines a prefix of s3 objects to train on
)

data_channels = {'train': train_data}

In [50]:
training_model.fit(inputs=data_channels, logs=True)

2021-11-02 19:00:53 Starting - Starting the training job...
2021-11-02 19:00:55 Starting - Launching requested ML instancesProfilerReport-1635879653: InProgress
...
2021-11-02 19:01:50 Starting - Preparing the instances for training............
2021-11-02 19:03:50 Downloading - Downloading input data
2021-11-02 19:03:50 Training - Downloading the training image...
2021-11-02 19:04:25 Uploading - Uploading generated training model
2021-11-02 19:04:25 Completed - Training job completed
[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of C

### Set up Hosing for the Model
Once the training is done, we can deploy the trained model as an Amazon SageMaker real-time hosted endpoint. this will allow us to make predictions (or inference) form the model. The endpoint deployment can be accomplished as follows.

In [58]:
predictor = training_model.deploy(initial_instance_count=1, instance_type='ml.m5.large')

------!

## SageMaker FeatureStore During Inference
SageMaker FeatureStore can be useful in supplementing data for inference requests because of the low-latency GetRecord functionality. For this demo, we will be given a patientID and query our online FeatureGroup to build our inference requests.
From the patient ID we left out in training set, we can choose one patient ID to test the real-time reference. In this example we choose patient 276.

In [59]:
id_for_test

[77, 81, 91, 123, 124, 201, 250, 252, 276]

To retrieve the data for a specific feature by its record identifier (patient ID randomly chosen from test set) from the online store, we can use the get_record function.

In [64]:
patient_id = str(276)

# Helper to parse the feature value form the record
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r['FeatureName'] == feature_name, record))[0]['ValueAsString'])

clinical_response = featurestore_runtime.get_record(FeatureGroupName=clinical_feature_group_name,
                                                     RecordIdentifierValueAsString=patient_id)
clinical_record = clinical_response['Record']
clinical_record

[{'FeatureName': 'patient_id', 'ValueAsString': '276'},
 {'FeatureName': 'age', 'ValueAsString': '70.0'},
 {'FeatureName': 'anaemia', 'ValueAsString': '0'},
 {'FeatureName': 'creatinine_phosphokinase', 'ValueAsString': '618'},
 {'FeatureName': 'diabetes', 'ValueAsString': '0'},
 {'FeatureName': 'ejection_fraction', 'ValueAsString': '35'},
 {'FeatureName': 'high_blood_pressure', 'ValueAsString': '0'},
 {'FeatureName': 'platelets', 'ValueAsString': '327000.0'},
 {'FeatureName': 'serum_creatinine', 'ValueAsString': '1.1'},
 {'FeatureName': 'serum_sodium', 'ValueAsString': '142'},
 {'FeatureName': 'sex', 'ValueAsString': '0'},
 {'FeatureName': 'smoking', 'ValueAsString': '0'},
 {'FeatureName': 'time', 'ValueAsString': '245'},
 {'FeatureName': 'DEATH_EVENT', 'ValueAsString': '0'},
 {'FeatureName': 'event_time', 'ValueAsString': '1635879006.0'}]

Then we choose the feature value from the retrieved feature list, exclude the record identifier id, the event time, and the target variable, and build a list of values as the input to the predictor.

In [65]:
inference_request = [
    get_feature_value(clinical_record, 'age'),
    get_feature_value(clinical_record, 'anaemia'),
    get_feature_value(clinical_record, 'creatinine_phosphokinase'),
    get_feature_value(clinical_record, 'diabetes'),
    get_feature_value(clinical_record, 'ejection_fraction'),
    get_feature_value(clinical_record, 'high_blood_pressure'),
    get_feature_value(clinical_record, 'platelets'),
    get_feature_value(clinical_record, 'serum_creatinine'),
    get_feature_value(clinical_record, 'serum_sodium'),
    get_feature_value(clinical_record, 'sex'),
    get_feature_value(clinical_record, 'smoking'),
    get_feature_value(clinical_record, 'time'),
]

In [67]:
import json

results = predictor.predict(','.join(inference_request), initial_args = {'ContentType': 'text/csv'})
prediction = json.loads(results)
print(prediction)

0.01355772279202938


The Predictor will call the hosted model and give a prediction result. The model predicts the patient 276 is not likely (1.36%) to experience a death event

## Clean up
Need to delete the feature group and endpoint. Otherwise it will remain there, produce cost and fill up the available spot

In [77]:
# uncomment the following code to delete the feature group
clinical_feature_group.delete()

# use aws command line to check if the feature group is deleted
!aws sagemaker list-feature-groups

{
    "FeatureGroupSummaries": []
}


In [76]:
predictor.delete_endpoint()

!aws sagemaker list-endpoints

{
    "Endpoints": []
}


In [75]:
# use these aws command line to delete unused feature group and endpoint
# !aws sagemaker delete-feature-group --feature-group-name clinical-feature-group-02-18-25-30
# !aws sagemaker delete-endpoint --endpoint-name sagemaker-xgboost-2021-11-02-19-08-25-428