# [모듈 3.1] 전처리 스텝 개발 (SageMaker Preprocessing)
---

이 노트북은 세이지 메이커의 Processing Job을 통해서 데이터 전처리를 합니다. 
- 일반적으로 크게 아래 4가지의 스텝으로 진행이 됩니다.
    - S3에 입력 파일 준비
    - 전처리를 수행하는 코드 준비
    - Projcessing Job을 생성시에 아래와 같은 항목을 제공합니다.
        - Projcessing Job을 실행할 EC2(예: ml.m4.2xlarge) 기술
        - Ec2에서 로딩할 다커 이미지의 이름 기술
        - S3 입력 파일 경로
        - 전처리 수행 코드 경로
        - S3 출력 파일 경로
    - EC2에서 전치리 실행 하여 S3 출력 위치에 저장
- 상세한 사항은 개발자 가이드를 참조 하세요. -->  [SageMaker Processing](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/processing-job.html)

![Processing-1.png](img/Processing-1.png)

---
### 위와 같은 전처리를 개발하기 위해서는 아래 3개의 단계를 수행합니다.
- (1) 로컬 노트북 인스턴스에서 **다커 컨테이너 없이** 전처리 코드 실행
    - SageMaker Processing은 다커 컨테이너에서 실행이 되기에, 로컬에서 다커 환경과 비슷한 환경을 구성(예: 입력, 출력 위치)하여 실행합니다.
    - 실제적으로 현업 프로젝트의 개발시에 로직 확인, 디버깅이 수월하기 위해서 이 단계를 진행하는 것을 권장 합니다.
- (2) 로컬 노트북 인스턴스에서 **다커 컨테이너를 통해서** 전처리 코드 실행      
    - 위의 (1) 단계에서 전처리 코드의 로직 확인이 되었기에, 실제 로컬 노트북 인스턴스에서 다커 컨테이너를 통해 전치리 코드를 수행 합니다.
    - [알림] 로컬 모드 참고 자료
        - 로컬모드 설명하는 블로그 자료 --> [Use the Amazon SageMaker local mode to train on your notebook instance](https://aws.amazon.com/blogs/machine-learning/use-the-amazon-sagemaker-local-mode-to-train-on-your-notebook-instance/)
        - TF, Pytorch, SKLean, SKLearn Processing JOb에 대한 로컬 모드 샘플 --> [Amazon SageMaker Local Mode Examples](https://github.com/aws-samples/amazon-sagemaker-local-mode)
        - Python SDK -->  [로컬모드 Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode)
- (3) SageMaker Pipeline 에서 전치러를 수행합니다.
    - 상세 사항은 여기에서 확인 하세요. --> [Amazon SageMaker 모델 구축 파이프라인](img/https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines.html)
    

## 0. 기본 세이지 메이커 정보 및 기본 변수 로딩

In [1]:
import boto3
import sagemaker
import pandas as pd
from IPython.display import display as dp

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

%store -r 

In [3]:
# 현재 로컬 디스크에 저장되어 있는 변수를 확인
%store 

Stored variables and their in-db values:
claims_data_uri                -> 's3://sagemaker-ap-northeast-2-057716757052/sagema
customers_data_uri             -> 's3://sagemaker-ap-northeast-2-057716757052/sagema
default_bucket                 -> 'sagemaker-ap-northeast-2-057716757052'
input_data_uri                 -> 's3://sagemaker-ap-northeast-2-057716757052/sagema
project_prefix                 -> 'sagemaker-pipeline-step-by-step'


## 1. 전처리 스크립트 확인

전처리 코드는 크게 아래와 같이 구성 되어 있습니다.
- 커맨드 인자로 전달된 변수 내용 확인
- 두개의 입력 파일(claim, customer)을 로딩하고, policy_id 로 조인하여 하나의 데이터 세트로 만듦
- 카테고리 피쳐를 원핫인코딩을 함.
- 숫자형 변수의 결측값에 대해서 채우는 전처리 수행
    - [알림] 이 워크샵은 XGBoost를 통해서 훈련을 합니다. XGBoost 같은 Tree 계열의 알고리즘은 피쳐의 스케일링(Scaling)이 필요하지 않습니다.
- 전치리 결과를 모두 결합 합니다.
- 훈련, 테스트 데이터 세트로 9:1 로 분리 합니다.
    - [알림] 일반적으로 Fruad Detection의 훈련, 테스트 데이터의 분리는 시간의 순서에 따라 분리 합니다. 예를 들어서 2021.01.01 ~ 2021.12.31 의 데이터가 존재한다고 하면, 2021.01.01 ~ 2021.11.30 을 훈련 데이터, 2021.12.01 ~ 2021.12.31을 테스트 테이터로 분리 합니다. 이 워크샵에서 사용하고 있는 데이터 세트는 명시적인 날짜 정보가 없어서, 5000개의 의 policy_id에서 1~4,500 은 훈련 데이터, 4501 ~ 5,000은 테스트 데이터로 분리 하였습니다.

---

In [21]:
preprocessing_code = 'src/preprocessing.py'
%store preprocessing_code

Stored 'preprocessing_code' (str)


In [11]:
# !pygmentize {preprocessing_code}

## 2. 전처리 코드 실행
---

### (1) 로컬 노트북 인스턴스에서 다커 컨테이너 없이 전처리 코드 실행




#### 로컬 환경 셋업 

로컬에서 테스트 하기 위해 다커 컨테이너와 같은 환경을 생성합니다.

In [15]:
import os
base_output_dir = 'opt/ml/processing/output'
# base_preproc_dir = 'opt/ml/processing'

base_preproc_input_dir = 'opt/ml/processing/input'
os.makedirs(base_preproc_input_dir, exist_ok=True)

# base_train_dir = 'opt/ml/processing/output/train'
# os.makedirs(base_train_dir, exist_ok=True)

# base_validation_dir = 'opt/ml/processing/output/validation'
# os.makedirs(base_validation_dir, exist_ok=True)

# base_test_dir = 'opt/ml/processing/output/test'
# os.makedirs(base_test_dir, exist_ok=True)


#### claims.csv, customers.csv 를 다커환경에서 사용하는 입력 경로로 복사

In [16]:
! cp ../data/claims.csv {base_preproc_input_dir}
! cp ../data/customers.csv {base_preproc_input_dir}

#### 로컬에서 스크립트 실행
- 전처리 코드에서 제공하는 로그를 통해서, 전처리 수행 내역을 확인 합니다.

In [17]:
! python {preprocessing_code} --base_preproc_input_dir {base_preproc_input_dir} \
                              --base_output_dir {base_output_dir} 


######### Argument Info ####################################
args.base_output_dir: opt/ml/processing/output
args.base_preproc_input_dir: opt/ml/processing/input
args.label_column: fraud

### Loading Claim Dataset
input_files: 
 ['opt/ml/processing/input/claims.csv']
dataframe shape 
 (5000, 17)
dataset sample 
           driver_relationship incident_type  ... incident_hour fraud
policy_id                                    ...                    
1                      Spouse     Collision  ...             8     0
2                        Self     Collision  ...            11     0

[2 rows x 17 columns]

### Loading Customer Dataset
input_files: 
 ['opt/ml/processing/input/customers.csv']
dataframe shape 
 (5000, 12)
dataset sample 
            customer_age  months_as_customer  ...  customer_education  auto_year
policy_id                                    ...                               
1                    54                  94  ...           Associate       2006
2              

#### 전처리된 데이터 확인
- 실제로 전처리 된 파일의 내역을 확인 합니다.
- 훈련 및 테스트 세트의 fraud 의 분포를 확인 합니다. (0: non-fruad, 1: fraud)

In [18]:
preprocessed_train_path = os.path.join(base_output_dir + '/train/train.csv')
preprocessed_test_path = os.path.join(base_output_dir + '/test/test.csv')
#print("preprocessed_train_path: ", preprocessed_train_path)

preprocessed_train_df = pd.read_csv(preprocessed_train_path)
preprocessed_test_df = pd.read_csv(preprocessed_test_path)
#print(preprocessed_train_df[[0]].value_counts())
preprocessed_train_df
                                       

Unnamed: 0,fraud,vehicle_claim,total_claim_amount,customer_age,months_as_customer,num_claims_past_year,num_insurers_past_5_years,policy_deductable,policy_annual_premium,customer_zip,...,collision_type_missing,incident_severity_Major,incident_severity_Minor,incident_severity_Totaled,authorities_contacted_Ambulance,authorities_contacted_Fire,authorities_contacted_None,authorities_contacted_Police,police_report_available_No,police_report_available_Yes
0,0,8913.668763,80513.668763,54,94,0,1,750,3000,99207,...,0,0,1,0,0,0,1,0,1,0
1,0,19746.724395,26146.724395,41,165,0,1,750,2950,95632,...,0,0,0,1,0,0,0,1,0,1
2,0,11652.969918,22052.969918,57,155,0,1,750,3000,93203,...,0,0,1,0,0,0,0,1,0,1
3,0,11260.930936,115960.930936,39,80,0,1,750,3000,85208,...,0,0,1,0,0,0,1,0,1,0
4,0,27987.704652,31387.704652,39,60,0,1,750,3000,91792,...,0,1,0,0,0,0,0,1,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4495,0,19657.157428,40757.157428,55,151,0,1,750,3000,92105,...,0,1,0,0,0,0,0,1,1,0
4496,0,17431.505171,28331.505171,47,155,0,1,750,3000,93704,...,0,1,0,0,0,0,0,1,0,1
4497,0,13691.463019,22791.463019,34,177,0,1,750,2800,89131,...,1,1,0,0,0,0,0,1,1,0
4498,0,11806.709642,15506.709642,61,268,0,1,750,2550,83686,...,0,0,1,0,0,0,1,0,1,0


In [19]:
dp(preprocessed_train_df[['fraud']].value_counts())
dp(preprocessed_test_df[['fraud']].value_counts())

fraud
0        4351
1         149
dtype: int64

fraud
0        485
1         15
dtype: int64

### (2) 로컬 노트북 인스턴스에서 **다커 컨테이너를 통해서** 전처리 코드 실행      
- 아래 셀은 최초 실행시에 약 1분 정도 소요 됩니다. 이후에는 약 5초 정도 걸립니다.
    - 최초 실행에는 SKLearnProcessor 의 다커 이미지를 다운로드 받기에 시간이 걸립니다.
- instance_type 을 local 로 해야만, 로컬 모드로 수행 됩니다.
- 아래 코드의 상세 설명은 여기를 참조 하세요. --> [Scikit-learn을 통한 데이터 처리](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/use-scikit-learn-processing-container.html)
---


In [23]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

instance_type = 'local'
sklearn_processor = SKLearnProcessor(framework_version= "0.23-1",
                                     role=role,
                                     instance_type= instance_type,
                                     instance_count=1)

sklearn_processor.run(code= preprocessing_code,
                      inputs=[
                        ProcessingInput(source=input_data_uri,
                                        destination='/opt/ml/processing/input'),
                             ],
                      outputs=[ProcessingOutput(source='/opt/ml/processing/output/train'),
                               ProcessingOutput(source='/opt/ml/processing/output/test')]
                     )


Job Name:  sagemaker-scikit-learn-2021-06-26-08-21-39-619
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-2-057716757052/sagemaker-pipeline-step-by-step/input', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-2-057716757052/sagemaker-scikit-learn-2021-06-26-08-21-39-619/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-ap-northeast-2-057716757052/sagemaker-scikit-learn-2021-06-26-08-21-39-619/output/output-1', 'LocalPath': '/opt/ml/processing/output/train', 'S3UploadMode': 'End

### (3) SageMaker Pipeline 에서 전치러를 수행합니다.
---



### 모델 빌딩 파이프라인 변수 생성



In [35]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)


### 전처리 스텝 단계 정의

In [36]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-fraud-process",
    role=role,
)
print("input_data: \n", input_data)

input_data: 
 s3://sagemaker-ap-northeast-2-057716757052/sagemaker-pipeline-step-by-step/input


In [37]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="FraudScratchProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data_uri,destination='/opt/ml/processing/input'),
         ],
    outputs=[ProcessingOutput(output_name="train",
                              source='/opt/ml/processing/output/train'),
             ProcessingOutput(output_name="test",
                              source='/opt/ml/processing/output/test')],
    code= preprocessing_code
)


### 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의



In [38]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = project_prefix
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        input_data,
    ],
    steps=[step_process],
)

#### (선택) 파이프라인 정의 확인 

파이프라인을 정의하는 JSON을 생성하고 파이프라인 내에서 사용하는 파라미터와 단계별 속성들이 잘 정의되었는지 확인할 수 있습니다.

In [39]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-057716757052/sagemaker-pipeline-step-by-step/input'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'FraudScratchProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processin

#### 파이프라인을 SageMaker에 제출하고 실행하기 

파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [40]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/sagemaker-pipeline-step-by-step',
 'ResponseMetadata': {'RequestId': '33acf102-0053-49fb-893b-f05176f8eca5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '33acf102-0053-49fb-893b-f05176f8eca5',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '104',
   'date': 'Sat, 26 Jun 2021 08:52:29 GMT'},
  'RetryAttempts': 0}}

디폴트값을 이용하여 파이프라인을 샐행합니다. 

In [41]:
execution = pipeline.start()

#### 파이프라인 운영: 파이프라인 대기 및 실행상태 확인

워크플로우의 실행상황을 살펴봅니다. 

In [42]:
execution.describe()
execution.wait()

실행이 완료될 때까지 기다립니다.

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

In [45]:
execution.list_steps()

[{'StepName': 'FraudScratchProcess',
  'StartTime': datetime.datetime(2021, 6, 26, 8, 52, 31, 214000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 6, 26, 8, 56, 29, 185000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:processing-job/pipelines-yalv7nzpksob-fraudscratchprocess-dzotrrjmq7'}}}]

#### 아티펙트 경로 추출

In [46]:
def get_proc_artifact(execution, client, kind=0):
    '''
    kind: 0 --> train
    kind: 2 --> test
    '''
    response = execution.list_steps()
    proc_arn = response[0]['Metadata']['ProcessingJob']['Arn']
    proc_job_name = proc_arn.split('/')[-1]
    # print("proc_job_name: ", proc_job_name)
    
    response = client.describe_processing_job(ProcessingJobName = proc_job_name)
    train_preproc_artifact = response['ProcessingOutputConfig']['Outputs'][kind]['S3Output']['S3Uri']    
    
    return train_preproc_artifact

import boto3
client = boto3.client("sagemaker")

train_preproc_dir_artifact = get_proc_artifact(execution, client, kind=0 )
#val_preproc_dir_artifact = get_proc_artifact(execution, client, kind=1 )
test_preproc_dir_artifact = get_proc_artifact(execution, client, kind=1 )

print("train_preproc_dir_artifact: ", train_preproc_dir_artifact)
#print("val_preproc_dir_artifact: ", val_preproc_dir_artifact)
print("test_preproc__dir_artifact: ", test_preproc_dir_artifact)

%store train_preproc_dir_artifact
#%store val_preproc_dir_artifact
%store test_preproc_dir_artifact

train_preproc_dir_artifact:  s3://sagemaker-ap-northeast-2-057716757052/sklearn-fraud-process-2021-06-26-08-52-28-611/output/train
test_preproc__dir_artifact:  s3://sagemaker-ap-northeast-2-057716757052/sklearn-fraud-process-2021-06-26-08-52-28-611/output/test
Stored 'train_preproc_dir_artifact' (str)
Stored 'test_preproc_dir_artifact' (str)


In [47]:
! aws s3 ls {train_preproc_dir_artifact} --recursive

2021-06-26 08:56:22     767618 sklearn-fraud-process-2021-06-26-08-52-28-611/output/train/train.csv


In [48]:
! aws s3 ls {test_preproc_dir_artifact} --recursive

2021-06-26 08:56:23      86423 sklearn-fraud-process-2021-06-26-08-52-28-611/output/test/test.csv
