# Inside Account A

### Imports 

In [None]:
import pandas as pd
import logging
import boto3
import json
import time

### Assume cross account role from account B using STS
AWS Security Token Service (STS) `AssumeRole` API call. This call returns a set of temporary credentials that you can use to create any service clients. When using these clients, your function has permissions conferred to it by the assumed role, and acts as if it belongs to account B. For more information, see `assume_role` in the AWS SDK for Python (Boto 3) documentation.

In [None]:
sts = boto3.client('sts')

In [None]:
# Assume role created in account B
CROSS_ACCOUNT_ASSUME_ROLE = 'arn:aws:iam::<ACCOUNT B ID>:role/cross-account-assume-role'

In [None]:
account_b = sts.assume_role(RoleArn=CROSS_ACCOUNT_ASSUME_ROLE, 
                            RoleSessionName='FeatureStoreCrossAccountAccessDemo'
                           )

In [None]:
access_key_id = account_b['Credentials']['AccessKeyId']
secret_access_key = account_b['Credentials']['SecretAccessKey']
session_token = account_b['Credentials']['SessionToken']

#### Setup sessions and clients
Create SageMaker client using the assumed role temporary credentials.

In [None]:
REGION = 'us-east-1'
boto_session = boto3.Session(region_name=REGION)

In [None]:
sagemaker_client = boto3.client('sagemaker', 
                                 aws_access_key_id=access_key_id,
                                 aws_secret_access_key=secret_access_key,
                                 aws_session_token=session_token
                               )

In [None]:
sagemaker_featurestore_runtime_client = boto3.client(service_name='sagemaker-featurestore-runtime', 
                                                     aws_access_key_id=access_key_id,
                                                     aws_secret_access_key=secret_access_key,
                                                     aws_session_token=session_token
                                                    )

In [None]:
s3_client = boto3.client(service_name='s3',
                         aws_access_key_id=access_key_id,
                         aws_secret_access_key=secret_access_key,
                         aws_session_token=session_token
                        )

In [None]:
athena_client = boto3.client(service_name='athena',
                             aws_access_key_id=access_key_id,
                             aws_secret_access_key=secret_access_key,
                             aws_session_token=session_token
                            )

#### Setup Logger

In [None]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

In [None]:
logger.info(f'[Using Boto3 version: {boto3.__version__}]')

### Create Feature Group in Account B
Let us create a feature group in account B to store a few aggregated features for a credit card. 

In [None]:
schema = json.loads(open('./schema.json').read())
schema

In [None]:
def schema_to_defs(filename):
    schema = json.loads(open(filename).read())
    
    feature_definitions = []
    
    for col in schema['features']:
        feature = {'FeatureName': col['name']}
        if col['type'] == 'double':
            feature['FeatureType'] = 'Fractional'
        elif col['type'] == 'bigint':
            feature['FeatureType'] = 'Integral'
        else:
            feature['FeatureType'] = 'String'
        feature_definitions.append(feature)

    return feature_definitions, schema['record_identifier_feature_name'], schema['event_time_feature_name']

In [None]:
feature_definitions, record_identifier_feature_name, event_time_feature_name = schema_to_defs('./schema.json')

In [None]:
feature_definitions

In [None]:
OFFLINE_STORE_BUCKET = '<NAME OF OFFLINE STORE BUCKET IN ACCOUNT B>'
FEATURE_GROUP_NAME = 'credit-card-aggregated-features'

In [None]:
offline_config = {'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': f's3://{OFFLINE_STORE_BUCKET}'}}}
# offline_config = {} use this if needed to write ONLY to the Online store

In [None]:
sagemaker_client.delete_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

In [None]:
sagemaker_client.create_feature_group(FeatureGroupName=FEATURE_GROUP_NAME,
                                    RecordIdentifierFeatureName=record_identifier_feature_name,
                                    EventTimeFeatureName=event_time_feature_name,
                                    FeatureDefinitions=feature_definitions,
                                    Description=schema['description'],
                                    Tags=schema['tags'],
                                    OnlineStoreConfig={'EnableOnlineStore': True},
                                    RoleArn=CROSS_ACCOUNT_ASSUME_ROLE,
                                    **offline_config)

In [None]:
sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

#### Write sample features (records) to the created feature group in account B

In [None]:
rows = [
    ['7822756498736827', '24', '34.32'],
    ['3782749218642874', '36', '65.77'],
    ['7713974627482452', '4', '342.14'],
    ['1030637826379113', '23', '63.19'],
    ['5632839778232447', '73', '5.32']
]

In [None]:
records = []
for row in rows:
    cc_num, num_trans_last_1w, avg_amt_last_1w = row
    record = []
    record.append({'ValueAsString': cc_num, 'FeatureName': 'cc_num'})
    record.append({'ValueAsString': num_trans_last_1w, 'FeatureName': 'num_trans_last_1w'})
    record.append({'ValueAsString': avg_amt_last_1w, 'FeatureName': 'avg_amt_last_1w'})
    event_time_feature = {'ValueAsString': str(int(round(time.time()))), 'FeatureName': 'trans_time'}
    record.append(event_time_feature)
    records.append(record)

In [None]:
for record in records:
    response = sagemaker_featurestore_runtime_client.put_record(FeatureGroupName=FEATURE_GROUP_NAME, 
                                                                Record=record
                                                               )
    print(response['ResponseMetadata']['HTTPStatusCode'])

#### Verify if we can retrieve features from the feature group in account B

In [None]:
response = sagemaker_featurestore_runtime_client.get_record(FeatureGroupName=FEATURE_GROUP_NAME, 
                                                            RecordIdentifierValueAsString='1030637826379113'
                                                           )
response

### Get records from Account B's Offline store (S3 bucket)
Now let's wait for the data to appear in our offline store before moving forward to creating a dataset. This will take approximately 5 minutes.

In [None]:
ACCOUNT_ID = '<ACCOUNT B ID>'

In [None]:
feature_group_s3_prefix = f'{ACCOUNT_ID}/sagemaker/{REGION}/offline-store/{FEATURE_GROUP_NAME}/data'
feature_group_s3_prefix

In [None]:
offline_store_contents = None
while offline_store_contents is None:
    objects = s3_client.list_objects(Bucket=OFFLINE_STORE_BUCKET, Prefix=feature_group_s3_prefix)
    if 'Contents' in objects and len(objects['Contents']) > 1:
        logger.info('[Features are available in Offline Store!]')
        offline_store_contents = objects['Contents']
    else:
        logger.info('[Waiting for data in Offline Store...]')
        time.sleep(60)

### Use Athena to query features from the feature group in account B from account A

In [None]:
feature_group = sagemaker_client.describe_feature_group(FeatureGroupName=FEATURE_GROUP_NAME)

In [None]:
glue_table_name = feature_group['OfflineStoreConfig']['DataCatalogConfig']['TableName']

In [None]:
query_string = f'SELECT * FROM "{glue_table_name}"'
query_string

#### Run Athena query in account B and save results back to a bucket in account A

In [None]:
ATHENA_RESULTS_BUCKET = '<NAME OF ATHENA QUERY RESULTS BUCKET IN ACCOUNT A>'

In [None]:
response = athena_client.start_query_execution(
                QueryString=query_string,
                QueryExecutionContext={
                    'Database': 'sagemaker_featurestore',
                    'Catalog': 'AwsDataCatalog'
                },
                ResultConfiguration={
                    'OutputLocation': f's3://{ATHENA_RESULTS_BUCKET}/query_results/model-1',
                }
            )
response

In [None]:
query_results = athena_client.get_query_results(QueryExecutionId=response['QueryExecutionId'],
                                                MaxResults=100
                                               )

In [None]:
query_results['ResultSet']['Rows']

### Grant account A access to Athena results (Important)
The objects in Athena query results bucket (account A) are owned by account B. To allow this notebook or account A to access these objects, we would have to grant account A permissions via ACL. 

Get canonical ID of account A

In [None]:
# Note: the client below does not use any temp credentials from the assumed role,
# hence points to this account (account A)
s3 = boto3.client('s3')
can_a = s3.list_buckets()['Owner']['ID']
can_a

Get canonical ID of account B

In [None]:
# Note: the client below was the one created using temp credentials from the assumed role, 
# hence it points to account B
can_b = s3_client.list_buckets()['Owner']['ID'] 
can_b

In [None]:
for s3_object in s3_client.list_objects(Bucket=ATHENA_RESULTS_BUCKET)['Contents']:
    key = s3_object['Key']
    print(f'Bucket = {ATHENA_RESULTS_BUCKET} | Key = {key}')
    response = s3_client.put_object_acl(
    AccessControlPolicy={
        "Grants": [
            {
                'Grantee': {
                    'ID': can_a,
                    'Type': 'CanonicalUser'
                },
                'Permission': 'FULL_CONTROL'
            }
        ],
        'Owner': {
            'ID': can_b
        }
    },
    Bucket=ATHENA_RESULTS_BUCKET,
    Key=key,
    )

#### Load Athena query result csv into a Pandas dataframe for model training

In [None]:
df = pd.read_csv('<PATH TO ATHENA QUERY RESULT CSV (SEE PREVIOUS CELLS)>')

In [None]:
df.head()