# Inside Account A
### Read/Write to its own Online and Offline Store

#### Prerequisites

In [1]:
#!pip install awswrangler

#### Imports 

In [2]:
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
from sagemaker.session import Session
import awswrangler as wr
import pandas as pd
import sagemaker
import logging
import boto3
import time
import s3fs

#### Setup Logger

In [3]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

In [4]:
logger.info(f'[Using SageMaker version: {sagemaker.__version__}]')

[Using SageMaker version: 2.16.4.dev0]


#### Essentials 
* Create SageMaker & Feature Store Runtime Clients
* Create a Feature Store Session encapsulating the above clients
* Ensure the Execution Role you use for this notebook has all the required policies attached to it as per the instructions. If not, please make sure to attach them to the role before proceeding.

In [5]:
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)
s3 = boto_session.resource('s3', region_name=region)
role = get_execution_role()

s3_client = boto3.client('s3', region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html <br>
API Documentation: https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html

In [6]:
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

#### Pick a Bucket to be used as Offline Feature Store
<b>IMPORTANT: </b>If the bucket name does not contain the word `sagemaker`, make sure to change the bucket name inside `AmazonSageMakerFeatureStoreAccess` policy to capture it.

In [7]:
bucket = 'sagemaker-feature-store-account-a'
if s3.Bucket(bucket).creation_date is None:
    s3.create_bucket(Bucket=bucket)
else:
    logger.warn(f'Bucket: {bucket} already exists!')

Bucket: sagemaker-feature-store-account-a already exists!


`offline_feature_store_s3_uri` URI below is the location of your offline store

In [8]:
offline_feature_store_s3_uri = f's3://{bucket}/'
offline_feature_store_s3_uri

's3://sagemaker-feature-store-account-a/'

#### Load Features 

In [9]:
features = pd.read_csv('features.csv', names=['employee_id', 'name', 'age', 'sex', 'happiness_score'])
features['created_by'] = 'account-a'

In [10]:
features.dtypes

employee_id          int64
name                object
age                  int64
sex                 object
happiness_score    float64
created_by          object
dtype: object

### Ingest Features into SageMaker Feature Store

In [11]:
record_identifier_feature_name = 'employee_id'
event_time_feature_name = 'event_time'

#### Create Feature Group

In [12]:
feature_group_name = 'employees'
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)

Feature Store supported types are `String`, `Fractional`, and `Integral`. The default type is set to `String`. This means that, if a column in your dataset is not a `float` or `long` type, it will default to `String` in your feature store.

In [13]:
def cast_object_to_string(df):
    """
    Cast object dtype to string. The SageMaker FeatureStore Python SDK will then 
    map the string dtype to String feature type.
    """
    for label in df.columns:
        if df.dtypes[label] == 'object':
            df[label] = df[label].astype('string')

In [14]:
cast_object_to_string(features)

#### Append event_time to the `features` dataframe 

In [15]:
current_time_sec = int(round(time.time()))
features[event_time_feature_name] = pd.Series([current_time_sec]*len(features), dtype='float64')

In [16]:
features.dtypes

employee_id          int64
name                string
age                  int64
sex                 string
happiness_score    float64
created_by          string
event_time         float64
dtype: object

In [17]:
features

Unnamed: 0,employee_id,name,age,sex,happiness_score,created_by,event_time
0,100,aaron,33,M,4.2,account-a,1609038000.0
1,101,brett,34,F,3.7,account-a,1609038000.0
2,102,cathy,44,F,4.8,account-a,1609038000.0
3,103,danny,44,M,2.1,account-a,1609038000.0
4,104,emily,44,F,1.5,account-a,1609038000.0


#### Load Feature Definitions
SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data

In [18]:
feature_group.load_feature_definitions(data_frame=features)

[FeatureDefinition(feature_name='employee_id', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='name', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='age', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='sex', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='happiness_score', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='created_by', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='event_time', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>)]

#### KMS Custom Key ID

Use the KMS Customer Managed Key (CMK) for encrypting the data inside the feature store.

In [19]:
kms_key = 'arn:aws:kms:us-east-1:892313895307:key/d3763b61-8d94-43bd-a3d6-4b4516ad28e7'

#### Create Feature Group

Delete feature group if it already exists by uncommenting the line below.

In [21]:
#sagemaker_client.delete_feature_group(FeatureGroupName='employees')

Create the feature group named `employees`

In [22]:
feature_group.create(
    s3_uri=offline_feature_store_s3_uri,
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
    offline_store_kms_key_id=kms_key
)

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:892313895307:feature-group/employees',
 'ResponseMetadata': {'RequestId': 'eccce06c-04c2-4b9e-861d-e789a415ada8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'eccce06c-04c2-4b9e-861d-e789a415ada8',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Sun, 27 Dec 2020 02:57:57 GMT'},
  'RetryAttempts': 0}}

<b>Note:</b> Ensure `FeatureGroupStatus` of the created feature group shows as `Created` before proceeding to the next steps.

#### Validate if feature group is created

In [23]:
feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:892313895307:feature-group/employees',
 'FeatureGroupName': 'employees',
 'RecordIdentifierFeatureName': 'employee_id',
 'EventTimeFeatureName': 'event_time',
 'FeatureDefinitions': [{'FeatureName': 'employee_id',
   'FeatureType': 'Integral'},
  {'FeatureName': 'name', 'FeatureType': 'String'},
  {'FeatureName': 'age', 'FeatureType': 'Integral'},
  {'FeatureName': 'sex', 'FeatureType': 'String'},
  {'FeatureName': 'happiness_score', 'FeatureType': 'Fractional'},
  {'FeatureName': 'created_by', 'FeatureType': 'String'},
  {'FeatureName': 'event_time', 'FeatureType': 'Fractional'}],
 'CreationTime': datetime.datetime(2020, 12, 27, 2, 57, 57, 616000, tzinfo=tzlocal()),
 'OnlineStoreConfig': {'EnableOnlineStore': True},
 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemaker-feature-store-account-a/',
   'KmsKeyId': 'arn:aws:kms:us-east-1:892313895307:key/d3763b61-8d94-43bd-a3d6-4b4516ad28e7'},
  'DisableGlueTableCreation': Fal

In [25]:
sagemaker_client.list_feature_groups()

{'FeatureGroupSummaries': [{'FeatureGroupName': 'employees',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:892313895307:feature-group/employees',
   'CreationTime': datetime.datetime(2020, 12, 27, 2, 57, 57, 616000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'}],
 'ResponseMetadata': {'RequestId': 'b7eb9d8d-76cc-4738-85e4-ae6329ff46c1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b7eb9d8d-76cc-4738-85e4-ae6329ff46c1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '208',
   'date': 'Sun, 27 Dec 2020 02:58:13 GMT'},
  'RetryAttempts': 0}}

#### Put Records into Feature Group (Both Online & Offline)

After the FeatureGroups have been created, we can put data into the FeatureGroups by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to S3 in chunks. The files will be written to the offline store within a few minutes of ingestion. For this example, to accelerate the ingestion process, we are specifying multiple workers to do the job simultaneously. 

In [26]:
%%time

feature_group.ingest(data_frame=features, max_workers=5, wait=True)

Started ingesting index 0 to 1
Started ingesting index 1 to 2
Started ingesting index 2 to 3
Started ingesting index 3 to 4
Started ingesting index 4 to 5
Successfully ingested row 0 to 1
Successfully ingested row 4 to 5
Successfully ingested row 2 to 3
Successfully ingested row 1 to 2
Successfully ingested row 3 to 4


CPU times: user 103 ms, sys: 11.4 ms, total: 114 ms
Wall time: 1.08 s


IngestionManagerPandas(feature_group_name='employees', sagemaker_session=<sagemaker.session.Session object at 0x7fe9beea80f0>, data_frame=   employee_id   name  age sex  happiness_score created_by    event_time
0          100  aaron   33   M              4.2  account-a  1.609038e+09
1          101  brett   34   F              3.7  account-a  1.609038e+09
2          102  cathy   44   F              4.8  account-a  1.609038e+09
3          103  danny   44   M              2.1  account-a  1.609038e+09
4          104  emily   44   F              1.5  account-a  1.609038e+09, max_workers=5, _futures={<Future at 0x7fe9bd3be080 state=finished returned NoneType>: (0, 1), <Future at 0x7fe9bd3be2e8 state=finished returned NoneType>: (1, 2), <Future at 0x7fe9bd3be550 state=finished returned NoneType>: (2, 3), <Future at 0x7fe9bd3bff28 state=finished returned NoneType>: (3, 4), <Future at 0x7fe9bd375710 state=finished returned NoneType>: (4, 5)})

### Get Record from Online Store (Available Immediately)

To confirm that data has been ingested, we can quickly retrieve a record from the online store:

In [27]:
record_identifier = str(101)

featurestore_runtime.get_record(FeatureGroupName=feature_group_name, 
                                RecordIdentifierValueAsString=record_identifier)

{'ResponseMetadata': {'RequestId': '73ff6a5d-7305-4c7f-a3a1-2e82cab98b5b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '73ff6a5d-7305-4c7f-a3a1-2e82cab98b5b',
   'content-type': 'application/json',
   'content-length': '369',
   'date': 'Sun, 27 Dec 2020 02:58:23 GMT',
   'connection': 'close'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'employee_id', 'ValueAsString': '101'},
  {'FeatureName': 'name', 'ValueAsString': 'brett'},
  {'FeatureName': 'age', 'ValueAsString': '34'},
  {'FeatureName': 'sex', 'ValueAsString': 'F'},
  {'FeatureName': 'happiness_score', 'ValueAsString': '3.7'},
  {'FeatureName': 'created_by', 'ValueAsString': 'account-a'},
  {'FeatureName': 'event_time', 'ValueAsString': '1609037857.0'}]}

### Get Records from Offline Store
Now let's wait for the data to appear in our offline store before moving forward to creating a dataset. This will take approximately 5 minutes.

In [28]:
account_id = boto3.client('sts').get_caller_identity()['Account']

In [29]:
feature_group_s3_prefix = f'{account_id}/sagemaker/{region}/offline-store/{feature_group_name}/data'
feature_group_s3_prefix

'892313895307/sagemaker/us-east-1/offline-store/employees/data'

In [30]:
offline_store_contents = None
while offline_store_contents is None:
    objects = s3_client.list_objects(Bucket=bucket, Prefix=feature_group_s3_prefix)
    if 'Contents' in objects and len(objects['Contents']) > 1:
        logger.info('[Features are available in Offline Store!]')
        offline_store_contents = objects['Contents']
    else:
        logger.info('[Waiting for data in Offline Store...]')
        time.sleep(60)

[Waiting for data in Offline Store...]
[Waiting for data in Offline Store...]
[Waiting for data in Offline Store...]
[Waiting for data in Offline Store...]
[Waiting for data in Offline Store...]
[Waiting for data in Offline Store...]
[Features are available in Offline Store!]


In [31]:
offline_store_contents

[{'Key': '892313895307/sagemaker/us-east-1/offline-store/employees/data/year=2020/month=12/day=27/hour=02/20201227T025737Z_WM6b4B9KNQzxrjGo.parquet',
  'LastModified': datetime.datetime(2020, 12, 27, 3, 3, 33, tzinfo=tzlocal()),
  'ETag': '"a7898514cb35692e783ac0b0a9537585"',
  'Size': 2081,
  'StorageClass': 'STANDARD',
  'Owner': {'DisplayName': 'arunprsh_test1',
   'ID': 'a52ce3999cdab5111cb19ca94abf5de5a69d62f34baa7d4422c630549fad3bd0'}},
 {'Key': '892313895307/sagemaker/us-east-1/offline-store/employees/data/year=2020/month=12/day=27/hour=02/20201227T025737Z_lV6rFzUwp7njtUyA.parquet',
  'LastModified': datetime.datetime(2020, 12, 27, 3, 3, 33, tzinfo=tzlocal()),
  'ETag': '"dd039e940d3a95ef9dfdc83538ebae73"',
  'Size': 2143,
  'StorageClass': 'STANDARD',
  'Owner': {'DisplayName': 'arunprsh_test1',
   'ID': 'a52ce3999cdab5111cb19ca94abf5de5a69d62f34baa7d4422c630549fad3bd0'}}]

#### Inspect the Parquet Files (Offline Store) using AWS Wrangler

In [32]:
s3_prefix = '/'.join(offline_store_contents[0]['Key'].split('/')[:-5])
s3_uri = f's3://{bucket}/{s3_prefix}'
s3_uri

's3://sagemaker-feature-store-account-a/892313895307/sagemaker/us-east-1/offline-store/employees/data'

In [33]:
df = wr.s3.read_parquet(path=s3_uri)

In [34]:
df

Unnamed: 0,employee_id,name,age,sex,happiness_score,created_by,event_time,write_time,api_invocation_time,is_deleted
0,100,aaron,33,M,4.2,account-a,1609038000.0,2020-12-27 03:03:32.808000+00:00,2020-12-27 02:58:19+00:00,False
1,101,brett,34,F,3.7,account-a,1609038000.0,2020-12-27 03:03:32.808000+00:00,2020-12-27 02:58:19+00:00,False
0,104,emily,44,F,1.5,account-a,1609038000.0,2020-12-27 03:03:32.863000+00:00,2020-12-27 02:58:19+00:00,False
1,102,cathy,44,F,4.8,account-a,1609038000.0,2020-12-27 03:03:32.863000+00:00,2020-12-27 02:58:19+00:00,False
2,103,danny,44,M,2.1,account-a,1609038000.0,2020-12-27 03:03:32.863000+00:00,2020-12-27 02:58:19+00:00,False


### Build Train Set from Offline Store Features using Glue and Athena
SageMaker FeatureStore <b>automatically</b> builds the `Glue Data Catalog` for FeatureGroups (you can optionally turn it on/off while creating the FeatureGroup). In this example, we want to create one training dataset with FeatureValues from the feature group we created. This is done by utilizing the auto-built Catalog. We can then run an `Athena` query that pulls the data stored in the offline store in S3 from the FeatureGroup.

In [35]:
query = feature_group.athena_query()
query.__dict__

{'catalog': 'AwsDataCatalog',
 'database': 'sagemaker_featurestore',
 'table_name': 'employees-1609037877',
 'sagemaker_session': <sagemaker.session.Session at 0x7fe9beea80f0>,
 '_current_query_execution_id': None,
 '_result_bucket': None,
 '_result_file_prefix': None}

In [36]:
table = query.table_name
table

'employees-1609037877'

In [37]:
query_string = f'SELECT * FROM "{table}"'
print('Running ' + query_string)

Running SELECT * FROM "employees-1609037877"


In [38]:
query.run(query_string=query_string, output_location=f's3://{bucket}/{account_id}/query_results/')
query.wait()

Query 46a5a17e-fd67-47b9-8fd7-179f51f5e671 is being executed.
Query 46a5a17e-fd67-47b9-8fd7-179f51f5e671 successfully executed.


In [39]:
df = query.as_dataframe()[['employee_id', 'name', 'age', 'sex', 'happiness_score', 'created_by']]
df

Unnamed: 0,employee_id,name,age,sex,happiness_score,created_by
0,100,aaron,33,M,4.2,account-a
1,101,brett,34,F,3.7,account-a
2,104,emily,44,F,1.5,account-a
3,102,cathy,44,F,4.8,account-a
4,103,danny,44,M,2.1,account-a
