# Fraud Detection with Amazon SageMaker FeatureStore

***본 노트북 코드는 [영문 노트북](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-featurestore/sagemaker_featurestore_fraud_detection_python_sdk.ipynb)의 번역본으로 직역이 아닌 중간중간 설명을 덧붙이고 코드를 서울(ICN) 리전에서도 수행 가능하도록 일부 수정했습니다.***

이 코드는 SageMaker Studio의 `Python 3 (Data Science)` 커널에서 정상 동작하며, SageMaker에서도 동작하지만 모든 기능들이 지원되지 않으므로 SageMaker Studio에서 실행하는 것을 권장합니다.
또한, 코드를 원활하게 실행하려면 Feature Store에 대한 IAM 권한(`AmazonSageMakerFeatureStoreAccess`)을 추가로 부여해야 합니다.


## Contents
1. [Background](#Background)
1. [Setup SageMaker FeatureStore](#Setup-SageMaker-FeatureStore)
1. [Inspect Dataset](#Inspect-Dataset)
1. [Ingest Data into FeatureStore](#Ingest-Data-into-FeatureStore)
1. [Build_Training_Dataset](#Build-Training-Dataset)
1. [Train_and Deploy_the Model](#Train-and-Deploy-the-Model)
1. [SageMaker FeatureStore At Inference](#SageMaker-FeatureStore-During-Inference)
1. [Cleanup Resources](#Cleanup-Resources)

## Background

Amazon SageMaker FeatureStore는 고객이 ML (머신 러닝) 개발을 위해 선별된 데이터를 쉽게 생성하고 관리할 수 있도록 해주는 신규 SageMaker 기능입니다. SageMaker FeatureStore는 높은 TPS API를 통한 데이터 수집과 온라인 및 오프라인 스토어를 통한 데이터 소비를 지원합니다.

이 노트북은 사기 탐지(fraud detection) 모델 훈련 과정을 통해 SageMaker FeatureStore에서 제공하는 API에 대한 예제를 제공합니다. 노트북은 데이터 세트의 테이블을 FeatureStore로 수집하고, 훈련 데이터 세트를 생성하도록 쿼리하고, 추론 중에 빠르게 액세스하는 방법을 보여줍니다.

### Terminology

**FeatureGroup**은 SageMaker FeatureStore에 저장된 모든 데이터에 대한 메타 데이터를 포함하는 기본 리소스입니다. FeatureGroup에는 FeatureDefinitions 목록이 포함됩니다. **FeatureDefinition**은 이름과 정수, 문자열 또는 십진수와 같은 데이터 유형 중 하나로 구성됩니다. FeatureGroup에는 데이터가 저장되는 위치를 제어하는 **OnlineStoreConfig** 및 **OfflineStoreConfig**도 포함됩니다. 온라인 스토어를 활성화하면 GetRecord API를 통해 레코드의 최신 값에 빠르게 액세스할 수 있습니다. 필수 구성 인 오프라인 저장소를 사용하면 S3 버켓에 과거 데이터를 저장할 수 있습니다.

FeatureGroup이 생성되면 데이터를 Record로 추가할 수 있습니다. **Records**는 테이블의 행으로 생각할 수 있습니다. 각 레코드에는 FeatureGroup의 다른 모든 FeatureDefinitions에 대한 값과 함께 고유한 **RecordIdentifier**가 있습니다.

## Setup SageMaker FeatureStore

SageMaker Python SDK 및 boto 클라이언트를 설정하여 시작하겠습니다. SageMaker Python SDK는 2020년 10월부터 V2로 업데이트되었으며, V1 대비 변경점이 많기 때문에 V1 용법에 익숙하시다면 개발자 문서를 참조하시기 바랍니다.

In [1]:
# to get the latest sagemaker python sdk
!pip install -qU sagemaker

In [2]:
import boto3
import sagemaker
from sagemaker.session import Session

print(sagemaker.__version__)

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)

# Feature Store는 sagemaker-featurestore-runtime 서비스명으로 세션을 새로 생성합니다.
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

2.23.6


#### OfflineStore에 대한 S3 버켓 설정

SageMaker FeatureStore는 FeatureGroup의 오프라인 스토어 있는 데이터를 사용자 소유의 S3 버켓에 기록합니다. S3 버켓에 기록할 수 있도록 SageMaker FeatureStore에 대한 액세스 권한이 있는 IAM 역할(role)을 부여해야 합니다. FeatureGroup에서 동일한 버켓을 재사용할 수 있습니다. 버켓의 데이터는 FeatureGroup으로 분할됩니다.

In [3]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-demo'

print(default_s3_bucket_name)

sagemaker-us-east-1-143656149352


IAM 역할을 설정합니다. 이 역할은 SageMaker FeatureStore에 S3 버켓에 대한 액세스 권한을 부여합니다.

<div class="alert alert-block alert-warning">
    <b>Note:</b> 이 예제에서는 <b>AmazonSageMakerFullAccess</b> 및 <b>AmazonSageMakerFeatureStoreAccess</b> 관리형 정책이 모두 있다고 가정하고 기본 SageMaker 역할을 사용합니다. 그렇지 않은 경우 계속하기 전에 역할에 첨부했는지 확인하십시오.
</div>

In [4]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print (role)

arn:aws:iam::143656149352:role/service-role/AmazonSageMaker-ExecutionRole-20200526T075882


## Inspect Dataset 

제공된 데이터셋은 ID 테이블과 트랜잭션 테이블로 구성된 합성 데이터셋이며, 두 테이블을 `TransactionId` 열로 조인(join)할 수 있습니다. 트랜잭션 테이블에는 금액, 신용 카드 또는 직불 카드와 같은 특정 트랜잭션에 대한 정보가 포함되고, ID 테이블에는 장치 유형 및 브라우저와 같은 사용자에 대한 정보가 포함됩니다. 트랜잭션은 트랜잭션 테이블에 있어야 하지만, ID 테이블에서 항상 사용 가능한 것은 아닙니다.

모델의 목적(objective)은 거래 기록이 주어졌을 때, 거래가 사기인지 아닌지를 예측하는 것입니다. (트랜잭션 테이블의 `isFraud` 열)

아래 코드 셀에서는 `sagemaker-sample-files` 공용 S3 버켓에 저장된 데이터셋을 Pandas의 데이터프레임으로 로드 후, 아래와 같은 전처리를 수행합니다.
- 소수점 이하 5 자리까지 반올림
- 결측값을 0으로 보완(imputation)
- card4(카드 발급 은행), card6(카드 종류) 열을 One-Hot Encoding으로 변환 후 결합
- 열 이름의 공백을 '_'로 변환

In [5]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client('s3', region_name=region)

fraud_detection_bucket_name = 'sagemaker-sample-files'
identity_file_key = 'datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_identity.csv'
transaction_file_key = 'datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv'

identity_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=identity_file_key)
transaction_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=transaction_file_key)

identity_data = pd.read_csv(io.BytesIO(identity_data_object['Body'].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object['Body'].read()))

identity_data = identity_data.round(5)
transaction_data = transaction_data.round(5)

identity_data = identity_data.fillna(0)
transaction_data = transaction_data.fillna(0)

# Feature transformations for this dataset are applied before ingestion into FeatureStore.
# One hot encode card4, card6
encoded_card_bank = pd.get_dummies(transaction_data['card4'], prefix = 'card_bank')
encoded_card_type = pd.get_dummies(transaction_data['card6'], prefix = 'card_type')

transformed_transaction_data = pd.concat([transaction_data, encoded_card_type, encoded_card_bank], axis=1)
# blank space is not allowed in feature name
transformed_transaction_data = transformed_transaction_data.rename(columns={"card_bank_american express": "card_bank_american_express"})

In [6]:
identity_data.head()

Unnamed: 0,TransactionID,id_01,id_02,id_03,id_04,id_05,id_06,id_07,id_08,id_09,...,id_11,id_12,id_13,id_14,id_15,id_16,id_17,id_18,id_19,id_20
0,2990130,-5,38780.0,0.0,0.0,0.0,-70,0,1,100.0,...,32,80,253,241,260,125,T,F,F,T
1,2990266,-10,69246.0,0.0,0.0,0.0,-67,0,2,100.0,...,47,47,122,33,38,60,T,F,T,F
2,2992553,-45,348819.0,0.0,0.0,0.0,-73,0,0,100.0,...,21,143,268,111,2,135,F,F,T,F
3,2994568,-15,337170.0,0.0,0.0,0.0,-10,1,2,100.0,...,55,127,253,202,135,49,F,F,T,T
4,2994749,-5,680670.0,0.0,0.0,8.0,-1,2,2,100.0,...,52,43,257,7,19,254,F,F,T,T


In [7]:
transformed_transaction_data.head()

Unnamed: 0,TransactionID,isFraud,TransactionDT,TransactionAmt,card1,card2,card3,card4,card5,card6,...,N8,N9,card_type_0,card_type_credit,card_type_debit,card_bank_0,card_bank_american_express,card_bank_discover,card_bank_mastercard,card_bank_visa
0,3343087,0,8810855,29.0,12469,360.0,150.0,mastercard,126.0,debit,...,F,T,0,0,1,0,0,0,1,0
1,3307318,0,7955295,107.95,16188,178.0,150.0,mastercard,224.0,debit,...,F,T,0,0,1,0,0,0,1,0
2,3555327,0,15084339,159.95,1825,555.0,150.0,visa,226.0,debit,...,T,F,0,0,1,0,0,0,0,1
3,3310736,0,8017157,159.95,10057,225.0,150.0,mastercard,224.0,debit,...,F,F,0,0,1,0,0,0,1,0
4,3034711,0,1127470,117.0,11444,555.0,150.0,visa,226.0,debit,...,F,F,0,0,1,0,0,0,0,1


## Ingest Data into FeatureStore


이 단계에서는 트랜잭션 및 ID 테이블을 나타내는 FeatureGroup을 생성 후, 데이터를 삽입합니다.

#### Define FeatureGroups

In [8]:
from time import gmtime, strftime, sleep

identity_feature_group_name = 'identity-feature-group-' + strftime('%d-%H-%M-%S', gmtime())
transaction_feature_group_name = 'transaction-feature-group-' + strftime('%d-%H-%M-%S', gmtime())

FeatureGroup을 정의합니다. 정의 후, `create()` 함수를 호출해야 FeatureGroup이 실제로 생성됩니다.

In [9]:
from sagemaker.feature_store.feature_group import FeatureGroup

identity_feature_group = FeatureGroup(name=identity_feature_group_name, sagemaker_session=feature_store_session)
transaction_feature_group = FeatureGroup(name=transaction_feature_group_name, sagemaker_session=feature_store_session)

데이터를 Feature Store에서 사용하가 위해 변환 작업을 수행합니다.

`load_feature_definitions()` 함수는 피쳐 정의(feature definition)를 로드하며, 각 열의 데이터 유형을 자동으로 감지합니다. 

In [10]:
import time

current_time_sec = int(round(time.time()))

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(identity_data)
cast_object_to_string(transformed_transaction_data)

# record identifier and event time feature names
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

# append EventTime feature
identity_data[event_time_feature_name] = pd.Series([current_time_sec]*len(identity_data), dtype="float64")
transformed_transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype="float64")

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
identity_feature_group.load_feature_definitions(data_frame=identity_data); # output is suppressed
transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data); # output is suppressed

#### Create FeatureGroups in SageMaker FeatureStore

앞 코드 셀에서 FeatureGroup이 정의되었으므로 이제 `create()` 함수로 FeatureGroup을 생성할 수 있습니다. 

오프라인 스토어가 디폴트로 활성화되어 있으며, `s3_uri` 인자값에서 S3 버켓 경로를 지정하시면 됩니다.
또한, 온라인 스토어는 디폴트로 생성되지 않으므로, 만약 온라인 스토어를 활성화하려면 `enable_online_store` 인자값을 True로 설정하셔야 합니다.

In [11]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

identity_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

transaction_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

wait_for_feature_group_creation_complete(feature_group=identity_feature_group)
wait_for_feature_group_creation_complete(feature_group=transaction_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup identity-feature-group-21-11-47-34 successfully created.
Waiting for Feature Group Creation
FeatureGroup transaction-feature-group-21-11-47-34 successfully created.


DescribeFeatureGroup 및 ListFeatureGroups API를 사용하여 FeatureGroup이 생성되었는지 확인합니다.

작성된 Feature Store는 describe에서 확인하거나 list_feature_groups에서 목록 검색이 가능합니다.

In [12]:
identity_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:143656149352:feature-group/identity-feature-group-21-11-47-34',
 'FeatureGroupName': 'identity-feature-group-21-11-47-34',
 'RecordIdentifierFeatureName': 'TransactionID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'TransactionID',
   'FeatureType': 'Integral'},
  {'FeatureName': 'id_01', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_02', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_03', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_04', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_05', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_06', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_07', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_08', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_09', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_10', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_11', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_12', 'FeatureTyp

In [13]:
transaction_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:143656149352:feature-group/transaction-feature-group-21-11-47-34',
 'FeatureGroupName': 'transaction-feature-group-21-11-47-34',
 'RecordIdentifierFeatureName': 'TransactionID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'TransactionID',
   'FeatureType': 'Integral'},
  {'FeatureName': 'isFraud', 'FeatureType': 'Integral'},
  {'FeatureName': 'TransactionDT', 'FeatureType': 'Integral'},
  {'FeatureName': 'TransactionAmt', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card1', 'FeatureType': 'Integral'},
  {'FeatureName': 'card2', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card3', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card4', 'FeatureType': 'String'},
  {'FeatureName': 'card5', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card6', 'FeatureType': 'String'},
  {'FeatureName': 'B1', 'FeatureType': 'Integral'},
  {'FeatureName': 'B2', 'FeatureType': 'Integral'},
  {'FeatureName': 'B3', '

In [14]:
sagemaker_client.list_feature_groups() # use boto client to list FeatureGroups

{'FeatureGroupSummaries': [{'FeatureGroupName': 'transaction-feature-group-21-11-47-34',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:143656149352:feature-group/transaction-feature-group-21-11-47-34',
   'CreationTime': datetime.datetime(2021, 1, 21, 11, 47, 42, 505000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'identity-feature-group-21-11-47-34',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:143656149352:feature-group/identity-feature-group-21-11-47-34',
   'CreationTime': datetime.datetime(2021, 1, 21, 11, 47, 41, 58000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'}],
 'ResponseMetadata': {'RequestId': 'ec411cb2-0511-4329-bf33-3083c17e7134',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ec411cb2-0511-4329-bf33-3083c17e7134',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '495',
   'date': 'Thu, 21 Jan 2021 11:47:58 GMT'},
  'RetryAttempts': 0}}


**[Tip]** 여기까지 실행하셨다면, 아래 코드 셀들을 실행하기 전에 SageMaker Studio의 Feature Group을 확인해 보세요.

![feature_store](feature_store.png)


#### PutRecords into FeatureGroup

FeatureGroups가 생성된 후 PutRecord API를 사용하여 FeatureGroups에 데이터를 `ingest` 함수로 넣을 수 있습니다. 이 API는 높은 TPS(초당 트랜잭션)를 처리할 수 있으며 다른 스트림에서 호출되도록 설계되었습니다. 이러한 모든 Put 요청의 데이터는 버퍼링되어 청크 단위로 S3에 기록됩니다. 수집 후 몇 분 이내에 파일이 오프라인 저장소에 기록됩니다. 이 예에서는 수집 프로세스를 가속화하기 위해 작업을 동시에 수행할 여러 작업자(worker)들을 지정합니다. 2개의 FeatureGroup에 각각 데이터를 수집하는 데 약 1분 정도 소요됩니다.

In [15]:
identity_feature_group.ingest(
    data_frame=identity_data, max_workers=3, wait=True
)

IngestionManagerPandas(feature_group_name='identity-feature-group-21-11-47-34', sagemaker_session=<sagemaker.session.Session object at 0x7fe8c2397050>, data_frame=     TransactionID  id_01     id_02  id_03  id_04  id_05  id_06  id_07  id_08  \
0          2990130     -5   38780.0    0.0    0.0    0.0    -70      0      1   
1          2990266    -10   69246.0    0.0    0.0    0.0    -67      0      2   
2          2992553    -45  348819.0    0.0    0.0    0.0    -73      0      0   
3          2994568    -15  337170.0    0.0    0.0    0.0    -10      1      2   
4          2994749     -5  680670.0    0.0    0.0    8.0     -1      2      2   
..             ...    ...       ...    ...    ...    ...    ...    ...    ...   
471        3572028     -5   92780.0    0.0    0.0    0.0    -19      2      1   
472        3575285     -5   34477.0    0.0    0.0    0.0    -25      1      0   
473        3575848     -5   45284.0    0.0    0.0    3.0    -71      1      0   
474        3576043     -5  

In [16]:
transaction_feature_group.ingest(
    data_frame=transformed_transaction_data, max_workers=5, wait=True
)

IngestionManagerPandas(feature_group_name='transaction-feature-group-21-11-47-34', sagemaker_session=<sagemaker.session.Session object at 0x7fe8c2397050>, data_frame=      TransactionID  isFraud  TransactionDT  TransactionAmt  card1  card2  \
0           3343087        0        8810855          29.000  12469  360.0   
1           3307318        0        7955295         107.950  16188  178.0   
2           3555327        0       15084339         159.950   1825  555.0   
3           3310736        0        8017157         159.950  10057  225.0   
4           3034711        0        1127470         117.000  11444  555.0   
...             ...      ...            ...             ...    ...    ...   
1995        3252738        1        6443158         200.000   6019  583.0   
1996        3548960        1       14873644           6.517  10175  176.0   
1997        3319928        1        8196787          67.067  14276  177.0   
1998        3256349        1        6539367          59.000  146

#### Get records from a FeatureGroup

데이터가 수집되었는지 확인하기 위해 온라인 상점(online store)에서 레코드를 빠르게 검색할 수 있습니다.

In [17]:
record_identifier_value = str(2990130)

featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)

{'ResponseMetadata': {'RequestId': '6ed23375-9769-46ab-9652-a2f0baef1623',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6ed23375-9769-46ab-9652-a2f0baef1623',
   'content-type': 'application/json',
   'content-length': '2636',
   'date': 'Thu, 21 Jan 2021 11:48:08 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'},
  {'FeatureName': 'isFraud', 'ValueAsString': '0'},
  {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'},
  {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'},
  {'FeatureName': 'card1', 'ValueAsString': '4577'},
  {'FeatureName': 'card2', 'ValueAsString': '583.0'},
  {'FeatureName': 'card3', 'ValueAsString': '150.0'},
  {'FeatureName': 'card4', 'ValueAsString': 'mastercard'},
  {'FeatureName': 'card5', 'ValueAsString': '219.0'},
  {'FeatureName': 'card6', 'ValueAsString': 'credit'},
  {'FeatureName': 'B1', 'ValueAsString': '69'},
  {'FeatureName': 'B2', 'ValueAsString': '80'},
  {'Featur

#### Hive DDL

SageMaker Python SDK의 FeatureStore 클래스는 Hive DDL 명령을 생성하는 기능도 제공합니다. 테이블의 스키마는 피쳐 정의를 기반으로 생성됩니다. 열은 피쳐 이름에 따라 이름이 지정되고 데이터 유형은 피쳐 유형에 따라 추론됩니다.

In [18]:
print(identity_feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.identity-feature-group-21-11-47-34 (
  TransactionID INT
  id_01 INT
  id_02 FLOAT
  id_03 FLOAT
  id_04 FLOAT
  id_05 FLOAT
  id_06 INT
  id_07 INT
  id_08 INT
  id_09 FLOAT
  id_10 INT
  id_11 INT
  id_12 INT
  id_13 INT
  id_14 INT
  id_15 INT
  id_16 INT
  id_17 STRING
  id_18 STRING
  id_19 STRING
  id_20 STRING
  EventTime FLOAT
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION 's3://sagemaker-us-east-1-143656149352/sagemaker-featurestore-demo/143656149352/sagemaker/us-east-1/offline-store/identity-feature-group-21-11-47-34'


In [19]:
print(transaction_feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.transaction-feature-group-21-11-47-34 (
  TransactionID INT
  isFraud INT
  TransactionDT INT
  TransactionAmt FLOAT
  card1 INT
  card2 FLOAT
  card3 FLOAT
  card4 STRING
  card5 FLOAT
  card6 STRING
  B1 INT
  B2 INT
  B3 INT
  B4 INT
  B5 INT
  B6 INT
  B7 INT
  B8 INT
  B9 INT
  B10 INT
  B11 INT
  B12 INT
  F1 INT
  F2 INT
  F3 INT
  F4 INT
  F5 INT
  F6 INT
  F7 INT
  F8 INT
  F9 INT
  F10 INT
  F11 INT
  F12 INT
  F13 INT
  F14 INT
  F15 INT
  F16 INT
  F17 INT
  N1 STRING
  N2 STRING
  N3 STRING
  N4 STRING
  N5 STRING
  N6 STRING
  N7 STRING
  N8 STRING
  N9 STRING
  card_type_0 INT
  card_type_credit INT
  card_type_debit INT
  card_bank_0 INT
  card_bank_american_express INT
  card_bank_discover INT
  card_bank_mastercard INT
  card_bank_visa INT
  EventTime FLOAT
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORE

#### Glue Data Catalog

SageMaker Feature Store는 오프라인 스토어의 경우 FeatureGroup이 생성될 때, Glue 데이터 카탈로그에도 자동으로 정보가 등록됩니다. (아래 스크린샷 참조)

![glue](glue.png)

이제 데이터셋 생성으로 넘어가기 전에 오프라인 스토어에 데이터가 표시될 때까지 기다리겠습니다. 약 5분 정도 소요됩니다.

In [20]:
%%time
account_id = boto3.client('sts').get_caller_identity()["Account"]

identity_feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + identity_feature_group_name + '/data'
transaction_feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + transaction_feature_group_name + '/data'

offline_store_contents = None
while (offline_store_contents is None):
    objects_in_bucket = s3_client.list_objects(Bucket=default_s3_bucket_name,Prefix=transaction_feature_group_s3_prefix)
    if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) > 1):
        offline_store_contents = objects_in_bucket['Contents']
    else:
        print('Waiting for data in offline store...\n')
        sleep(60)
    
print('Data available.')

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Data available.
CPU times: user 94.3 ms, sys: 8.81 ms, total: 103 ms
Wall time: 5min


SageMaker FeatureStore는 오프라인 스토어에 수집된 각 레코드에 대한 메타 데이터를 추가합니다.

## Build Training Dataset

SageMaker FeatureStore는 피쳐 그룹에 대한 Glue Data Catalog를 자동으로 빌드합니다. (피쳐 그룹을 생성하는 동안 선택적으로 켜고 끌 수 있습니다). 이 예시에서는 ID와 트랜잭션 FeatureGroups 모두의 FeatureValue를 사용하여 하나의 훈련 데이터셋을 생성합니다. 이것은 자동 구축된(auto-built) 카탈로그를 활용하여 수행됩니다. 

데이터셋 생성 후에는 Athena SQL 쿼리로 데이터 검색이 가능합니다. 아래 코드 셀에서 2개의 FeatureGroup에서 S3의 오프라인 스토어에 저장된 데이터를 조인하는 쿼리를 실행합니다.

In [21]:
identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()

identity_table = identity_query.table_name
transaction_table = transaction_query.table_name

query_string = 'SELECT * FROM "'+transaction_table+'" LEFT JOIN "'+identity_table+'" ON "'+transaction_table+'".transactionid = "'+identity_table+'".transactionid'
print('Running ' + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.
dataset = pd.DataFrame()
identity_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/'+prefix+'/query_results/')
identity_query.wait()
dataset = identity_query.as_dataframe()

dataset

Running SELECT * FROM "transaction-feature-group-21-11-47-34-1611229662" LEFT JOIN "identity-feature-group-21-11-47-34-1611229661" ON "transaction-feature-group-21-11-47-34-1611229662".transactionid = "identity-feature-group-21-11-47-34-1611229661".transactionid


Unnamed: 0,transactionid,isfraud,transactiondt,transactionamt,card1,card2,card3,card4,card5,card6,...,id_15,id_16,id_17,id_18,id_19,id_20,eventtime.1,write_time.1,api_invocation_time.1,is_deleted.1
0,3555327,0,15084339,159.950,1825,555.0,150.0,visa,226.0,debit,...,,,,,,,,,,
1,3310736,0,8017157,159.950,10057,225.0,150.0,mastercard,224.0,debit,...,,,,,,,,,,
2,3006216,0,506631,55.000,14649,548.0,150.0,visa,226.0,debit,...,,,,,,,,,,
3,3247321,0,6234414,67.950,7664,490.0,150.0,visa,226.0,debit,...,,,,,,,,,,
4,3403159,0,10516835,59.000,10882,543.0,150.0,mastercard,224.0,debit,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,3025855,1,949222,117.000,16034,481.0,150.0,mastercard,102.0,credit,...,,,,,,,,,,
1996,3521091,1,14068731,157.389,15885,545.0,185.0,visa,138.0,debit,...,299.0,295.0,F,F,T,F,1.611230e+09,2021-01-21 11:53:14.748,2021-01-21 11:48:00.000,False
1997,3504162,0,13549362,49.000,6530,206.0,150.0,mastercard,126.0,debit,...,,,,,,,,,,
1998,3577226,0,15804474,136.000,12695,490.0,150.0,visa,226.0,debit,...,,,,,,,,,,


In [22]:
# Prepare query results for training.
query_execution = identity_query.get_query_execution()
query_result = 's3://'+default_s3_bucket_name+'/'+prefix+'/query_results/'+query_execution['QueryExecution']['QueryExecutionId']+'.csv'
print(query_result)

# Select useful columns for training with target column as the first.
dataset = dataset[["isfraud", "transactiondt", "transactionamt", "card1", "card2", "card3", "card5", "card_type_credit", "card_type_debit", "card_bank_american_express", "card_bank_discover", "card_bank_mastercard", "card_bank_visa", "id_01", "id_02", "id_03", "id_04", "id_05"]]

# Write to csv in S3 without headers and index column.
dataset.to_csv('dataset.csv', header=False, index=False)
s3_client.upload_file('dataset.csv', default_s3_bucket_name, prefix+'/training_input/dataset.csv')
dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/';

dataset

s3://sagemaker-us-east-1-143656149352/sagemaker-featurestore-demo/query_results/32141089-8a0a-461b-ab69-9a7c347e5c56.csv


Unnamed: 0,isfraud,transactiondt,transactionamt,card1,card2,card3,card5,card_type_credit,card_type_debit,card_bank_american_express,card_bank_discover,card_bank_mastercard,card_bank_visa,id_01,id_02,id_03,id_04,id_05
0,0,15084339,159.950,1825,555.0,150.0,226.0,0,1,0,0,0,1,,,,,
1,0,8017157,159.950,10057,225.0,150.0,224.0,0,1,0,0,1,0,,,,,
2,0,506631,55.000,14649,548.0,150.0,226.0,0,1,0,0,0,1,,,,,
3,0,6234414,67.950,7664,490.0,150.0,226.0,0,1,0,0,0,1,,,,,
4,0,10516835,59.000,10882,543.0,150.0,224.0,0,1,0,0,1,0,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,1,949222,117.000,16034,481.0,150.0,102.0,1,0,0,0,1,0,,,,,
1996,1,14068731,157.389,15885,545.0,185.0,138.0,0,1,0,0,0,1,-5.0,172593.0,0.0,0.0,0.0
1997,0,13549362,49.000,6530,206.0,150.0,126.0,0,1,0,0,1,0,,,,,
1998,0,15804474,136.000,12695,490.0,150.0,226.0,0,1,0,0,0,1,,,,,


## Train and Deploy the Model

이제 모델에 맞게 훈련 작업을 시작할 때입니다. 먼저, SageMaker XGBoost 컨테이너를 호출하고 generic SageMaker Estimator를 구성합니다.

In [23]:
training_image = sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")
# containers = {'us-west-1': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
#               'us-west-2': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
#               'us-east-1': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
#               'eu-east-2': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
#               'ap-northeast-1': '354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
#               'ap-northeast-2': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3'            
#              } 
# training_image = containers[region]
print(training_image)

683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3


**Note**: 이전 코드 셀이 SageMaker XGBoost 훈련 이미지를 호출하지 못하는 경우, 리전의 제한된 지원 때문일 수 있습니다. 사용 가능한 리전을 찾으려면 Amazon SageMaker 개발자 안내서의 [Docker Registry Paths for SageMaker Built-in Algorithms](https://docs.aws.amazon.com/en_us/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)를 참조하십시오. 설명서에 나열되지 않은 리전에 있는 경우, 아래 코드를 실행하여 사전 빌드된 SageMaker XGBoost 기본 이미지를 수동으로 가져와 [Amazon Elastic Container Registry (ECR)](https://aws.amazon.com/ecr/)로 푸시해야 합니다. SageMaker Studio 앱은 도커 컨테이너를 기반으로 실행되고 도커를 지원하지 않기 때문에, **SageMaker 노트북 인스턴스에서 이 수동 도커 등록을 사용해야 합니다.**

**Step 1** : SageMaker XGBoost 컨테이너를 ECR 계정으로 가져오고 빌드하고 푸시합니다. 다음 bash 스크립트는 `us-east-2` 리전에서 SageMaker XGBoost Docker 이미지를 가져와 ECR로 푸시합니다.

```bash
%%bash
public_ecr=257758044811.dkr.ecr.us-east-2.amazonaws.com
image=sagemaker-xgboost
tag=1.0-1-cpu-py3

# Add the public ECR for XGBoost image to authenticated registries
aws ecr get-login-password --region us-east-2 | \
    docker login --username AWS --password-stdin $public_ecr

# Pull the XGBoost image
docker pull $public_ecr/$image:$tag

# Push the image to your ECR
my_region=$(aws configure get region)
my_account=$(aws sts get-caller-identity --query Account | tr -d '"')
my_ecr=$my_account.dkr.ecr.$my_region.amazonaws.com 

# Authenticate your ECR
aws ecr get-login-password --region $my_region | \
    docker login --username AWS --password-stdin $my_ecr

# Create a repository in your ECR to host the XGBoost image
repository_name=sagemaker-xgboost

if aws ecr create-repository --repository-name $repository_name ; then
    echo "Repository $repository_name created!"
else
    echo "Repository $repository_name already exists!"
fi

# Push the image to your ECR
docker tag $public_ecr/$image:$tag $my_ecr/$image:$tag
docker push $my_ecr/$image:$tag
```

**Step 2**: ECR 이미지 URI를 사용하려면 다음 코드를 실행하고 `training_image` string object를 설정합니다.
```python
import boto3
region = boto3.Session().region_name
account_id = boto3.client('sts').get_caller_identity()["Account"]
ecr = '{}.dkr.ecr.{}.amazonaws.com'.format(account_id, region)

training_image=ecr + '/' + 'sagemaker-xgboost:1.0-1-cpu-py3'
```

`Estimator` 개체를 생성합니다. 이 estimator는 훈련 작업을 시작합니다.

#### SageMaker XGBoost 컨테이너를 사용하여 SageMaker generic estimator 생성

In [24]:
training_output_path='s3://' + default_s3_bucket_name+'/'+prefix + '/training_output'

from sagemaker.estimator import Estimator
training_model = Estimator(training_image,
                           role, 
                           instance_count=1, 
                           instance_type='ml.m5.2xlarge',
                           volume_size = 5,
                           max_run = 3600,
                           input_mode= 'File',
                           output_path=training_output_path,
                           sagemaker_session=feature_store_session)

#### 하이퍼파라메터 설정

In [25]:
training_model.set_hyperparameters(objective = "binary:logistic",
                                   num_round = 50)

#### 훈련 데이터셋 지정

[Build Training Dataset](#Build-Training-Dataset) 섹션에서 생성한 훈련 데이터셋을 지정합니다.

In [26]:
import sagemaker.inputs

train_data = sagemaker.inputs.TrainingInput(dataset_uri_prefix, distribution='FullyReplicated', 
                                            content_type='text/csv', s3_data_type='S3Prefix')
data_channels = {'train': train_data}

#### 훈련 시작

In [27]:
%%time
training_model.fit(inputs=data_channels, logs=True)

2021-01-21 12:01:37 Starting - Starting the training job...
2021-01-21 12:02:02 Starting - Launching requested ML instancesProfilerReport-1611230497: InProgress
.........
2021-01-21 12:03:23 Starting - Preparing the instances for training...
2021-01-21 12:04:03 Downloading - Downloading input data...
2021-01-21 12:04:37 Training - Training image download completed. Training in progress..[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[12:04:38] 2000x17 matrix with 32476 entries loaded from /opt/ml/input/dat

## Set up Hosting for the Model

훈련이 완료되면 훈련된 모델을 Amazon SageMaker 실시간 호스팅 엔드포인트(real-time hosted endpoint)로 배포할 수 있습니다. 이를 통해 모델에서 예측(또는 추론)을 수행할 수 있습니다. 훈련에 사용한 것과 동일한 인스턴스(또는 인스턴스 유형)에서 호스팅할 필요가 없습니다. 아래 코드를 통해 간단하게 엔드포인트를 배포 가능하며, 약 8-10분이 소요됩니다.

In [28]:
%%time
predictor = training_model.deploy(initial_instance_count = 1, instance_type = 'ml.m5.xlarge')

-------------!CPU times: user 214 ms, sys: 9.03 ms, total: 223 ms
Wall time: 6min 32s


## SageMaker FeatureStore During Inference

SageMaker FeatureStore는 low-latency GetRecord 기능을 지원하기에, 추론 요청을 위한 데이터를 보완하는 데 유용할 수 있습니다. 이 데모에서는 TransactionId가 제공되고 추론 요청을 작성하기 위해 트랜잭션 데이터에 대한 온라인 FeatureGroup을 쿼리합니다.

In [29]:
# Incoming inference request.
transaction_id = str(3450774)

# Helper to parse the feature value from the record.
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r['FeatureName'] == feature_name, record))[0]['ValueAsString'])

transaction_response = featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=transaction_id)
transaction_record = transaction_response['Record']

transaction_test_data = [
    get_feature_value(transaction_record, 'TransactionDT'),
    get_feature_value(transaction_record, 'TransactionAmt'),
    get_feature_value(transaction_record, 'card1'),
    get_feature_value(transaction_record, 'card2'),
    get_feature_value(transaction_record, 'card3'),
    get_feature_value(transaction_record, 'card5'),
    get_feature_value(transaction_record, 'card_type_credit'),
    get_feature_value(transaction_record, 'card_type_debit'),
    get_feature_value(transaction_record, 'card_bank_american_express'),
    get_feature_value(transaction_record, 'card_bank_discover'),
    get_feature_value(transaction_record, 'card_bank_mastercard'),
    get_feature_value(transaction_record, 'card_bank_visa')
]

identity_response = featurestore_runtime.get_record(FeatureGroupName=identity_feature_group_name, RecordIdentifierValueAsString=transaction_id)
identity_record = identity_response['Record']
id_test_data = [
    get_feature_value(identity_record, 'id_01'),
    get_feature_value(identity_record, 'id_02'),
    get_feature_value(identity_record, 'id_03'),
    get_feature_value(identity_record, 'id_04'),
    get_feature_value(identity_record, 'id_05')
]

# Join all pieces for inference request.
inference_request = []
inference_request.extend(transaction_test_data[:])
inference_request.extend(id_test_data[:])

inference_request

['11923451',
 '50.0',
 '12501',
 '490.0',
 '150.0',
 '226.0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '1',
 '-40',
 '20130.0',
 '0.0',
 '0.0',
 '16.0']

In [30]:
import json

results = predictor.predict(','.join(inference_request), initial_args = {"ContentType": "text/csv"})
prediction = json.loads(results)
print (prediction)

0.8026058077812195


## Cleanup Resources

In [31]:
predictor.delete_endpoint()

In [32]:
identity_feature_group.delete()
transaction_feature_group.delete()