# Part 1 : Data Preparation, Process, and Store Features

<a id='all-up-overview'></a>

## [Overview](./0-AutoClaimFraudDetection.ipynb)
* [Notebook 0: Overview, Architecture and Data Exploration](./0-AutoClaimFraudDetection.ipynb)
* **[Notebook 1: Data Preparation, Process, and Store Features](./1-data-prep-e2e.ipynb)**
  * **[Architecture](#arch)**
  * **[Getting started](#aud-getting-started)**
  * **[DataSets](#aud-datasets)**
  * **[SageMaker Feature Store](#aud-feature-store)**
  * **[Create train and test datasets](#aud-dataset)**
* [Notebook 2: Train, Check Bias, Tune, Record Lineage, and Register a Model](./2-lineage-train-assess-bias-tune-registry-e2e.ipynb)
* [Notebook 3: Mitigate Bias, Train New Model, Store in Registry](./3-mitigate-bias-train-model2-registry-e2e.ipynb)
* [Notebook 4: Deploy Model, Run Predictions](./4-deploy-run-inference-e2e.ipynb)
* [Notebook 5: Create and Run an End-to-End Pipeline to Deploy the Model](./5-pipeline-e2e.ipynb)

이 노트북의 목적은 ML 수명주기(lifecycle)의 데이터 준비 단계를 수행하는 것입니다. 주요 데이터 랭글링(data wrangling), 데이터 수집 및 다중 변환(multiple transformatio)이 SageMaker Studio Data Wrangler GUI를 통해 수행됩니다.

이 노트북에서는 raw 데이터에 대한 변환을 정의하는 `.flow` 파일을 가져옵니다. `.csv` 파일로 S3 버킷에 저장된 raw 데이터에 이러한 변환을 적용하는 SageMaker Processing job을 사용하여 적용합니다.

<a id='arch'> </a>
## Architecture for Data Prep, Process and Store Features
[overview](#all-up-overview)
___
![Data Prep and Store](./images/e2e-1-pipeline-v3b.png)

### Install required and/or update third-party libraries

In [None]:
import sys
import IPython
install_needed = False

if install_needed:
    print("installing deps and restarting kernel")
    !python -m pip install -Uq pip
    !python -m pip install -q awscli==1.20.25 awswrangler==2.10.0 imbalanced-learn==0.8.0 sagemaker==2.54.0 boto3==1.18.25
    IPython.Application.instance().kernel.do_shutdown(True)

### Loading stored variables

이전에 이 노트북을 실행한 경우, AWS에서 생성한 리소스를 재사용할 수 있습니다. 아래 셀을 실행하여 이전에 생성된 변수를 로드합니다. 기존 변수의 출력물이 표시되어야 합니다. 인쇄된 내용이 보이지 않으면 노트북을 처음 실행한 것일 수 있습니다.

In [None]:
#%store -z

In [None]:
%store -r
%store

**<font color='red'>Important</font>: StoreMagic 명령을 사용하여 변수를 검색하려면 이전 노트북을 실행해야 합니다.**

### Import libraries

In [None]:
import json
import time
import boto3
import string
import sagemaker
import pandas as pd
import awswrangler as wr

from sagemaker.feature_store.feature_group import FeatureGroup

<a id='aud-getting-started'></a>
## Getting started: Creating Resources

[overview](#all-up-overview)
___

이 노트북을 성공적으로 실행하려면, 몇 가지 AWS 리소스를 생성해야 합니다. 먼저 이 자습서의 모든 데이터를 저장하기 위해 S3 버킷이 생성됩니다. 생성된 후에는 IAM 콘솔을 사용하여 AWS Glue role을 생성한 다음, 이 노트북에 대한 FeatureStore 액세스를 허용하는 policy를 S3 버킷에 연결해야 합니다. 이미 이 노트북을 실행하고 중단한 부분을 선택하는 경우 아래 셀을 실행하면 추가 리소스를 생성하는 대신, 이미 생성한 리소스를 선택해야 합니다.

#### Add FeatureStore policy to Studio's execution role

![title](images/iam-policies.png)

1. 별도의 브라우저 탭에서 AWS 콘솔의 IAM 섹션으로 이동합니다.
2. Roles 섹션으로 이동하여 SageMaker Studio user에서 사용 중인 실행 role을 선택합니다.
    * 어떤 role을 사용하고 있는지 확실하지 않은 경우, 아래 셀을 실행하여 출력하세요.
3. 이 role에 <font color='green'> AmazonSageMakerFeatureStoreAccess </font> policy를 연결합니다. 연결되면 변경 사항이 즉시 적용됩니다.

In [None]:
print('SageMaker Role:', sagemaker.get_execution_role().split('/')[-1])

### Set region, boto3 and SageMaker SDK variables

In [None]:
#You can change this to a region of your choice
import sagemaker
region = sagemaker.Session().boto_region_name
print("Using AWS Region: {}".format(region))

In [None]:
boto3.setup_default_session(region_name=region)

boto_session = boto3.Session(region_name=region)

s3_client = boto3.client('s3', region_name=region)

sagemaker_boto_client = boto_session.client('sagemaker')

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client)

Note: SageMaker Studio 또는 SageMaker Classic 노트북에서 이 노트북을 실행하지 않는 경우, SageMakerFullAccess 및 SageMakerFeatureStoreFullAccess가 있는 AWS role로 sagemaker_execution_role_name을 인스턴스화해야 합니다.

In [None]:
sagemaker_execution_role_name = 'AmazonSageMaker-ExecutionRole-20210107T234882'
try:
    sagemaker_role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    sagemaker_role = iam.get_role(RoleName=sagemaker_execution_role_name)['Role']['Arn']
    print(f"\n instantiating sagemaker_role with supplied role name : {sagemaker_role}")

account_id = boto3.client('sts').get_caller_identity()["Account"]

### Create a directory in the SageMaker default bucket for this tutorial

In [None]:
if 'bucket' not in locals():
    bucket = sagemaker_session.default_bucket()
    prefix = 'fraud-detect-demo'
    %store bucket
    %store prefix
    print(f'Creating bucket: {bucket}...')

이미 존재하는 자체 S3 버킷을 사용하려면 아래 코드 셀의 주석 처리를 제거하고 다음 예제 코드를 활용하세요.

In [None]:
'''
try:
    s3_client.create_bucket(Bucket=bucket, ACL='private', CreateBucketConfiguration={'LocationConstraint': region})
    print('Create S3 bucket: SUCCESS')
    
except Exception as e:
    if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
        print(f'Using existing bucket: {bucket}/{prefix}')
    else:
        raise(e)
'''

In [None]:
#======> Tons of output_paths
traing_job_output_path = f's3://{bucket}/{prefix}/training_jobs'
bias_report_1_output_path = f's3://{bucket}/{prefix}/clarify-bias-1'
bias_report_2_output_path = f's3://{bucket}/{prefix}/clarify-bias-2'
explainability_output_path = f's3://{bucket}/{prefix}/clarify-explainability'

train_data_uri = f's3://{bucket}/{prefix}/data/train/train.csv'
test_data_uri = f's3://{bucket}/{prefix}/data/test/test.csv'

#=======> variables used for parameterizing the notebook run
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"

claify_instance_count = 1
clairfy_instance_type = 'ml.c5.xlarge'

predictor_instance_count = 1
predictor_instance_type = "ml.c5.xlarge"

### Upload raw data to S3

SageMaker Data Wrangler를 사용하여 raw 데이터를 전처리하려면, 데이터가 S3에 있어야 합니다.

In [None]:
s3_client.upload_file(Filename='data/claims.csv', Bucket=bucket, Key=f'{prefix}/data/raw/claims.csv')
s3_client.upload_file(Filename='data/customers.csv', Bucket=bucket, Key=f'{prefix}/data/raw/customers.csv')

### Update attributes within the  `.flow` file 

DataWrangler는 `.flow` 파일을 생성합니다. 여기에는 랭글링 중에 사용된 S3 버킷에 대한 참조가 포함됩니다. 이것은 이 노트북에 기본으로 설정되어 있는 것과 다를 수 있습니다. 예를 들어 다른 사람이 랭글링을 수행한 경우, 해당 버킷에 액세스할 수 없으므로 실제로 로드할 수 있도록 자신의 S3 버킷을 가리킴으로써 `.flow` 파일을 Wrangler에 저장하거나 데이터에 액세스할 수 있습니다.

아래 셀을 실행한 후 `claim.flow` 및 `customers.flow` 파일을 열고 데이터를 S3로 내보내거나 제공된 `data/claims_preprocessed.csv` 및 `data/customers_preprocessed.csv` 파일을 사용하여 가이드를 계속할 수 있습니다. 

In [None]:
claims_flow_template_file = "claims_flow_template"

with open(claims_flow_template_file, 'r') as f:
    variables   = {'bucket': bucket, 'prefix': prefix}
    template    = string.Template(f.read())
    claims_flow = template.substitute(variables)
    claims_flow = json.loads(claims_flow)

with open('claims.flow', 'w') as f:
    json.dump(claims_flow, f)
    
customers_flow_template_file = "customers_flow_template"

with open(customers_flow_template_file, 'r') as f:
    variables      = {'bucket': bucket, 'prefix': prefix}
    template       = string.Template(f.read())
    customers_flow = template.substitute(variables)
    customers_flow = json.loads(customers_flow)
    
with open('customers.flow', 'w') as f:
    json.dump(customers_flow, f)

### Load preprocessed data from Data Wrangler job

`claim.flow` 및 `customers.flow` 에서 Data Wrangler job을 실행한 경우 여기에서 전처리된 데이터를 로드할 수 있습니다. Data Wrangler job을 실행하지 않은 경우에도 이 예제의 `/data` 디렉토리에서 미리 만들어진 데이터셋을 로드하여 시작할 수 있습니다.

<a id='aud-datasets'></a>
## DataSets and Feature Types
[overview](#all-up-overview)
___

In [None]:
claims_dtypes = {
    "policy_id": int,
    "incident_severity": int,
    "num_vehicles_involved": int,
    "num_injuries": int,
    "num_witnesses": int,
    "police_report_available": int,
    "injury_claim": float,
    "vehicle_claim": float,
    "total_claim_amount": float,
    "incident_month": int,
    "incident_day": int,
    "incident_dow": int,
    "incident_hour": int,
    "fraud": int,
    "driver_relationship_self": int,
    "driver_relationship_na": int,
    "driver_relationship_spouse": int,
    "driver_relationship_child": int,
    "driver_relationship_other": int,
    "incident_type_collision": int,
    "incident_type_breakin": int,
    "incident_type_theft": int,
    "collision_type_front": int,
    "collision_type_rear": int,
    "collision_type_side": int,
    "collision_type_na": int,
    "authorities_contacted_police": int,
    "authorities_contacted_none": int,
    "authorities_contacted_fire": int,
    "authorities_contacted_ambulance": int,
    "event_time": float,
}

customers_dtypes = {
    "policy_id": int,
    "customer_age": int,
    "customer_education": int,
    "months_as_customer": int,
    "policy_deductable": int,
    "policy_annual_premium": int,
    "policy_liability": int,
    "auto_year": int,
    "num_claims_past_year": int,
    "num_insurers_past_5_years": int,
    "customer_gender_male": int,
    "customer_gender_female": int,
    "policy_state_ca": int,
    "policy_state_wa": int,
    "policy_state_az": int,
    "policy_state_or": int,
    "policy_state_nv": int,
    "policy_state_id": int,
    "event_time": float,
}

In [None]:
#======> This is your DataFlow output path if you decide to redo the work in DataFlow on your own
flow_output_path = 'YOUR_PATH_HERE'

try:
    # this will try to load the exported dataframes from the claims and customers .flow files
    claims_s3_path = f'{flow_output_path}/claims_output'
    customers_s3_path = f'{flow_output_path}/customers_output'
    
    claims_preprocessed = wr.s3.read_csv(
        path=claims_s3_path, 
        dataset=True, 
        index_col=0, 
        dtype=claims_dtypes)
    
    customers_preprocessed = wr.s3.read_csv(
        path=customers_s3_path, 
        dataset=True, 
        index_col=0, 
        dtype=customers_dtypes)

except:
    # if the Data Wrangler job was not run, the claims and customers dataframes will be loaded from local copies
    timestamp = pd.to_datetime('now').timestamp()
    print('Unable to load Data Wrangler output. Loading pre-made dataframes...')
    
    claims_preprocessed = pd.read_csv(
        filepath_or_buffer='data/claims_preprocessed.csv', 
        dtype=claims_dtypes)
    
    # a timestamp column is required by the feature store, so one is added with a current timestamp
    claims_preprocessed['event_time'] = timestamp
    
    customers_preprocessed = pd.read_csv(
        filepath_or_buffer='data/customers_preprocessed.csv', 
        dtype=customers_dtypes)
    
    customers_preprocessed['event_time'] = timestamp
    
    print('Complete')

이제 올바른 데이터 유형과 함께 고객 및 청구 데이터를 포함하는 Pandas 데이터프레임이 있습니다. Dat Wrangler가 피쳐를 원-핫 인코딩 피쳐로 인코딩할 때, 결과 피쳐에 대한 데이터 유형을 float로 기본 설정합니다.

<font color ='red'> Note: </font> : Data Wrangler에서 생성 된 범주형 피쳐에 대한 데이터 유형을 명시적으로 변환하는 이유는, Clarify에서 범주형 변수로 처리할 수 있도록 정수 유형인지 확인하기 위한 것입니다.

<a id='aud-feature-store'></a>
## SageMaker Feature Store

[overview](#all-up-overview)
___

Amazon SageMaker Feature Store는 피쳐를 저장하고 액세스할 수 있는 전용 리포지토리이므로, 팀 간에 이름을 지정하고 구성하고 재사용하기가 훨씬 쉽습니다. SageMaker Feature Store는 추가 코드를 작성하거나 피쳐를 일관되게 유지하기 위해 수동 프로세스를 생성할 필요 없이 훈련 및 실시간 추론 중 피쳐에 대한 통합 저장소를 제공합니다. SageMaker Feature Store는 저장된 피쳐의 메타데이터 (예: 피쳐 이름 또는 버전 번호)를 추적하므로 대화형 쿼리 서비스인 Amazon Athena를 사용하여 배치 또는 실시간으로 올바른 속성에 대한 피쳐를 쿼리할 수 있습니다. SageMaker Feature Store는 추론 중에 새 데이터가 생성될 때 단일 리포지토리가 업데이트되어 모델이 훈련 및 추론 중에 사용할 수 있도록 항상 새로운 피쳐를 사용할 수 있기 때문에 피쳐를 업데이트된 상태로 유지합니다.

피쳐 저장소는 S3에 저장된 오프라인 컴포넌트와 지연 시간이 짧은 데이터베이스에 저장된 온라인 컴포넌트로 구성됩니다. 온라인 데이터베이스는 선택 사항이지만 추론에서 사용할 수 있는 추가 피쳐가 필요한 경우 매우 유용합니다. 이 섹션에서는 보험 청구 및 고객 데이터셋에 대한 피쳐 그룹을 생성합니다. 보험 청구 및 고객 데이터를 각 피쳐 그룹에 삽입한 후, Athena를 사용하여 오프라인 스토어를 쿼리하여 훈련 데이터 세트를 구축해야 합니다.

SageMaker 피쳐 저장소에 대한 자세한 내용은 [SageMaker Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html)를 참조해 주십시오.

In [None]:
featurestore_runtime = boto_session.client(
    service_name='sagemaker-featurestore-runtime', 
    region_name=region
)

feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

### Configure the feature groups

각 피쳐의 데이터 유형은 데이터프레임을 전달하고, 적절한 데이터 유형을 유추하여 설정됩니다. 피쳐 데이터 유형은 설정 변수(config variable)를 통해 설정할 수도 있지만, 피쳐 그룹에 수집될 때 Pandas 데이터프레임의 해당 Python 데이터 유형과 일치해야 합니다.

In [None]:
claims_fg_name = f'{prefix}-claims'
customers_fg_name = f'{prefix}-customers'
%store claims_fg_name 
%store customers_fg_name

claims_feature_group = FeatureGroup(
    name=claims_fg_name, 
    sagemaker_session=feature_store_session)

customers_feature_group = FeatureGroup(
    name=customers_fg_name, 
    sagemaker_session=feature_store_session)

claims_feature_group.load_feature_definitions(data_frame=claims_preprocessed);
customers_feature_group.load_feature_definitions(data_frame=customers_preprocessed);

### Create the feature groups
데이터프레임에서 필요한 레코드 식별자 및 이벤트 타임 피쳐에 해당하는 컬럼을 피쳐 그룹에 알려야 합니다.

In [None]:
print(f"{customers_fg_name} -- {claims_fg_name} are the feature group names in use")

In [None]:
record_identifier_feature_name = 'policy_id'
event_time_feature_name = 'event_time'

try:
    print(f"\n Using s3://{bucket}/{prefix}")
    claims_feature_group.create(
        s3_uri=f"s3://{bucket}/{prefix}",
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=sagemaker_role,
        enable_online_store=True
    )
    print(f'Create "claims" feature group: SUCCESS')
except Exception as e:
    code = e.response.get('Error').get('Code')
    if code == 'ResourceInUse':
        print(f'Using existing feature group: {claims_fg_name}')
    else:
        raise(e)

try:
    customers_feature_group.create(
        s3_uri=f"s3://{bucket}/{prefix}",
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=sagemaker_role,
        enable_online_store=True
    )
    print(f'Create "customers" feature group: SUCCESS')
except Exception as e:
    code = e.response.get('Error').get('Code')
    if code == 'ResourceInUse':
        print(f'Using existing feature group: {customers_fg_name}')
    else:
        raise(e)

### Wait until feature group creation has fully completed

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")
    
wait_for_feature_group_creation_complete(feature_group=claims_feature_group)
wait_for_feature_group_creation_complete(feature_group=customers_feature_group)

### Ingest records into the Feature Groups

피쳐 그룹이 생성된 후, PutRecord API를 사용하여 각 store에 데이터를 넣을 수 있습니다. 이 API는 높은 TPS를 처리할 수 있으며 다른 스트림에서 호출되도록 설계되었습니다. 이러한 모든 Put 요청의 데이터는 버퍼링되어 s3에 chunk로 기록됩니다. 수집 후 몇 분 이내에 파일이 오프라인 저장소에 기록됩니다.

In [None]:
if 'claims_table' in locals():
    print("You may have already ingested the data into your Feature Groups. If you'd like to do this again, you can run the ingest methods outside of the 'if/else' statement.")

else:
    claims_feature_group.ingest(data_frame=claims_preprocessed, max_workers=3, wait=True);
    customers_feature_group.ingest(data_frame=customers_preprocessed, max_workers=3, wait=True);

### Wait for offline store data to become available
아래 코드 셀은 약 5-8분이 소요됩니다.

In [None]:
if 'claims_table' not in locals():
    claims_table = (
        claims_feature_group.describe()["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]
    )
if 'customers_table' not in locals():
    customers_table = (
        customers_feature_group.describe()["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]
    )

claims_feature_group_s3_prefix = (
    f"{prefix}/{account_id}/sagemaker/{region}/offline-store/{claims_table}/data"
)
customers_feature_group_s3_prefix = (
    f"{prefix}/{account_id}/sagemaker/{region}/offline-store/{customers_table}/data"
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=bucket, Prefix=customers_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...")
        time.sleep(60)

print("\nData available.")

<a id='aud-dataset'></a>
## Create train and test datasets

[overview](#all-up-overview)
___

오프라인 스토어에서 데이터를 사용할 수있게 되면 자동으로 카탈로그화되고 Athena 테이블에 로드됩니다. (기본적으로 수행되지만 끌 수 있습니다.) 훈련 및 테스트 데이터셋을 구축하기 위해 Athena에서 생성된 Claims 및 Customers 테이블을 조인하는 SQL 쿼리를 실행합니다.

In [None]:
claims_query = claims_feature_group.athena_query()
customers_query = customers_feature_group.athena_query()

claims_table = claims_query.table_name
customers_table = customers_query.table_name
database_name = customers_query.database
%store claims_table
%store customers_table
%store database_name

feature_columns = list(set(claims_preprocessed.columns) ^ set(customers_preprocessed.columns))
feature_columns_string = ", ".join(f'"{c}"' for c in feature_columns)
feature_columns_string = f'"{claims_table}".policy_id as policy_id, ' + feature_columns_string

query_string = f"""
SELECT DISTINCT {feature_columns_string}
FROM "{claims_table}" LEFT JOIN "{customers_table}" 
ON "{claims_table}".policy_id = "{customers_table}".policy_id
"""

In [None]:
claims_query.run(query_string=query_string, output_location=f"s3://{bucket}/{prefix}/query_results")
claims_query.wait()
dataset = claims_query.as_dataframe()

In [None]:
dataset.head()

In [None]:
dataset.to_csv("./data/claims_customer.csv")

In [None]:
col_order = ["fraud"] + list(dataset.drop(["fraud", "policy_id"], axis=1).columns)
%store col_order

train = dataset.sample(frac=0.80, random_state=0)[col_order]
test = dataset.drop(train.index)[col_order]

### Write train, test data to S3

훈련 데이터셋 및 테스트 데이터셋을 S3로 업로드합니다.

In [None]:
train.to_csv("data/train.csv", index=False)
test.to_csv("data/test.csv", index=False)
dataset.to_csv("data/dataset.csv", index=True)

In [None]:
s3_client.upload_file(Filename='data/train.csv', Bucket=bucket, Key=f'{prefix}/data/train/train.csv')
s3_client.upload_file(Filename='data/test.csv', Bucket=bucket, Key=f'{prefix}/data/test/test.csv')
%store train_data_uri
%store test_data_uri

In [None]:
train.head(5)

In [None]:
test.head(5)

___

### Next Notebook: [Train, Check Bias, Tune, Record Lineage, Register Model](./2-lineage-train-assess-bias-tune-registry-e2e.ipynb)