# [모듈 2] SageMaker Pipelines 사용하기
이 노트북에서는 아래와 같은 작업을 수행합니다.
- 데이터 준비
- Pipeline 정의
- 데이터 전처리: Processing Step 이용 
- Autoglueon을 이용한 학습: Training Step 이용
- Hyper Parameter Optimizer를 이용한 최적화: Tuning Step 이용
- 검증데이터 추론: Processing Step 이용
- 학습결과 Evaluation하기: Processing Step 이용
- Condition 확인하여 모델 등록하기: Condition Step 이용


In [2]:
import sagemaker

sagemaker_session = sagemaker.session.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

# role = sagemaker.get_execution_role()
# sess = boto3.Session()
# region = sess.region_name
# account = boto3.client("sts").get_caller_identity().get("Account")
# sagemaker_session = sagemaker.Session()
# bucket = sagemaker_session.default_bucket()

In [3]:
pipeline_prefix = 'sm-autoglueon-pipeline-base'
s3_data_path =f's3://{bucket}/{pipeline_prefix}/data'

nb_dataset_path = '../data/raw'
claims_data_path = f'{nb_dataset_path}/claims.csv'
customers_data_path = f'{nb_dataset_path}/customers.csv'

In [4]:
print("claim_data_path: ", claims_data_path)
print("customer_data_path: ", customers_data_path)
print("s3_data_path: ", s3_data_path)

claim_data_path:  ../data/raw/claims.csv
customer_data_path:  ../data/raw/customers.csv
s3_data_path:  s3://sagemaker-us-east-1-238312515155/sm-autoglueon-pipeline-base/data


In [5]:
s3_claims_data_path = sagemaker.s3.S3Uploader.upload(
    local_path = claims_data_path, 
    desired_s3_uri = s3_data_path
)
print("claims data path in S3: ", s3_claims_data_path)

s3_customers_data_path = sagemaker.s3.S3Uploader.upload(
    local_path = customers_data_path, 
    desired_s3_uri = s3_data_path
)
print("customers data path in S3: ", s3_customers_data_path)

claims data path in S3:  s3://sagemaker-us-east-1-238312515155/sm-autoglueon-pipeline-base/data/claims.csv
customers data path in S3:  s3://sagemaker-us-east-1-238312515155/sm-autoglueon-pipeline-base/data/customers.csv


# 2.2 전처리 스텝 개발

In [6]:
import os
import boto3
import pandas as pd
from IPython.display import display as dp

In [7]:
df_customers = pd.read_csv(customers_data_path)
df_customers.head()

Unnamed: 0,policy_id,customer_age,months_as_customer,num_claims_past_year,num_insurers_past_5_years,policy_state,policy_deductable,policy_annual_premium,policy_liability,customer_zip,customer_gender,customer_education,auto_year
0,1,54,94,0,1,WA,750,3000,25/50,99207,Unkown,Associate,2006
1,2,41,165,0,1,CA,750,2950,15/30,95632,Male,Bachelor,2012
2,3,57,155,0,1,CA,750,3000,15/30,93203,Female,Bachelor,2017
3,4,39,80,0,1,AZ,750,3000,30/60,85208,Female,Advanced Degree,2020
4,5,39,60,0,1,CA,750,3000,15/30,91792,Female,High School,2018


In [8]:
df_claims = pd.read_csv(claims_data_path)
df_claims.head()

Unnamed: 0,policy_id,driver_relationship,incident_type,collision_type,incident_severity,authorities_contacted,num_vehicles_involved,num_injuries,num_witnesses,police_report_available,injury_claim,vehicle_claim,total_claim_amount,incident_month,incident_day,incident_dow,incident_hour,fraud
0,1,Spouse,Collision,Front,Minor,,2,0,0,No,71600,8913.668763,80513.668763,3,17,6,8,0
1,2,Self,Collision,Rear,Totaled,Police,3,4,0,Yes,6400,19746.724395,26146.724395,12,11,2,11,0
2,3,Self,Collision,Front,Minor,Police,2,0,1,Yes,10400,11652.969918,22052.969918,12,24,1,14,0
3,4,Child,Collision,Side,Minor,,2,0,0,No,104700,11260.930936,115960.930936,12,23,0,19,0
4,5,Self,Collision,Side,Major,Police,2,1,0,No,3400,27987.704652,31387.704652,5,8,2,8,0


In [9]:
%%writefile src/preprocess.py

import argparse
import os
import requests
import tempfile
import subprocess, sys

import pandas as pd
import numpy as np
from glob import glob

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

import logging
import logging.handlers

def _get_logger():
    '''
    로깅을 위해 파이썬 로거를 사용
    # https://stackoverflow.com/questions/17745914/python-logging-module-is-printing-lines-multiple-times
    '''
    loglevel = logging.DEBUG
    l = logging.getLogger(__name__)
    if not l.hasHandlers():
        l.setLevel(loglevel)
        logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))        
        l.handler_set = True
    return l  

logger = _get_logger()


def split_train_test(df, test_ratio=0.1):
    '''
    두 개의 데이터 세트로 분리
    '''
    total_rows = df.shape[0]
    train_end = int(total_rows * (1 - test_ratio))
    
    train_df = df[0:train_end]
    test_df = df[train_end:]
    
    return train_df, test_df


def get_dataframe(base_preproc_input_dir, file_name_prefix ):    
    '''
    파일 이름이 들어가 있는 csv 파일을 모두 저장하여 데이터 프레임을 리턴
    '''
    
    input_files = glob('{}/{}*.csv'.format(base_preproc_input_dir, file_name_prefix))
    #claim_input_files = glob('{}/dataset*.csv'.format(base_preproc_input_dir))    
    logger.info(f"input_files: \n {input_files}")    
    
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(base_preproc_input_dir, "train"))
        
    raw_data = [ pd.read_csv(file, index_col=0) for file in input_files ]
    df = pd.concat(raw_data)
   
    logger.info(f"dataframe shape \n {df.shape}")    
    logger.info(f"dataset sample \n {df.head(2)}")        
    #logger.info(f"df columns \n {df.columns}")    
    
    return df


def convert_type(raw, cols, type_target):
    '''
    해당 데이터 타입으로 변경
    '''
    df = raw.copy()
    
    for col in cols:
        df[col] = df[col].astype(type_target)
    
    return df
    

if __name__ =='__main__':
    
    ################################
    #### 커맨드 인자 파싱   
    #################################        
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--base_output_dir', type=str, default="/opt/ml/processing/output")
    parser.add_argument('--base_preproc_input_dir', type=str, default="/opt/ml/processing/input")   
    parser.add_argument('--split_rate', type=float, default=0.1)       
    parser.add_argument('--label_column', type=str, default="fraud")       
    # parse arguments
    args = parser.parse_args()     
    
    logger.info("######### Argument Info ####################################")
    logger.info(f"args.base_output_dir: {args.base_output_dir}")
    logger.info(f"args.base_preproc_input_dir: {args.base_preproc_input_dir}")    
    logger.info(f"args.label_column: {args.label_column}")        
    logger.info(f"args.split_rate: {args.split_rate}")            

    base_output_dir = args.base_output_dir
    base_preproc_input_dir = args.base_preproc_input_dir
    label_column = args.label_column    
    split_rate = args.split_rate

    #################################        
    #### 두개의 파일(claim, customer) 을 로딩하여 policy_id 로 조인함  ########
    #################################    
    
    logger.info(f"\n### Loading Claim Dataset")
    claim_df = get_dataframe(base_preproc_input_dir,file_name_prefix='claim' )        
    
    logger.info(f"\n### Loading Customer Dataset")    
    customer_df = get_dataframe(base_preproc_input_dir,file_name_prefix='customer' )            
    
    df = customer_df.join(claim_df, how='left')
    logger.info(f"### dataframe merged with customer and claim: {df.shape}")


    #################################    
    #### 카테고리 피쳐를 원핫인코딩  
    #################################    
    
    logger.info(f"\n ### Encoding: Category Features")    
    categorical_features = df.select_dtypes(include=['object']).columns.values.tolist()    
    #categorical_features = ['driver_relationship']    
    logger.info(f"categorical_features: {categorical_features}")            

    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )
    
    preprocess = ColumnTransformer(
        transformers=[
            ("cat", categorical_transformer, categorical_features)
        ],
        sparse_threshold = 0, # dense format 으로 제공
    )

    X_pre_category = preprocess.fit_transform(df)
    

    # 원핫인코딩한 컬럼의 이름 로딩
    # Ref: Sklearn Pipeline: Get feature names after OneHotEncode In ColumnTransformer,  https://stackoverflow.com/questions/54646709/sklearn-pipeline-get-feature-names-after-onehotencode-in-columntransformer
    
    processed_category_features = preprocess.transformers_[0][1].named_steps['onehot'].get_feature_names(categorical_features)
    #logger.info(f"processed_category_features: {processed_category_features}")
#    print(X_pre)
    
    ###############################
    ### 숫자형 변수 전처리 
    ###############################
    
    logger.info(f"\n ### Encoding: Numeric Features")        
    
    float_cols = df.select_dtypes(include=['float64']).columns.values
    int_cols = df.select_dtypes(include=['int64']).columns.values
    numeric_features = np.concatenate((float_cols, int_cols), axis=0).tolist()
    
    logger.info(f"int_cols: \n{int_cols}")    
    logger.info(f"float_cols: \n{float_cols}")        
    #logger.info(f"numeric_features: \n{numeric_features}")

    # 따로 스케일링은 하지 않고, 미싱 값만 중간값을 취함
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
           # ("scaler", StandardScaler())
        ]
    )

    numeric_preprocessor = ColumnTransformer(
        transformers=[
            ("cat", numeric_transformer, numeric_features)
        ],
        sparse_threshold = 0,
    )

    X_pre_numeric = numeric_preprocessor.fit_transform(df)    

    
    ###############################
    ### 전처리 결과 결합 ####
    ###############################
    
    logger.info(f"\n ### Handle preprocess results")            
    
    # 전처리 결과를 데이터 프레임으로 생성
    category_df = pd.DataFrame(data=X_pre_category, columns=processed_category_features)
    numeric_df = pd.DataFrame(data=X_pre_numeric, columns=numeric_features)    

    full_df = pd.concat([numeric_df, category_df ], axis=1)
    
    # float 타입을 int 로 변경
    full_df = convert_type(full_df, cols=int_cols, type_target='int')
    full_df = convert_type(full_df, cols=processed_category_features, type_target='int')    
    
    # label_column을 맨 앞으로 이동 시킴
    full_df = pd.concat([full_df[label_column], full_df.drop(columns=[label_column])], axis=1)
    
    ###############################    
    # 훈련, 테스트 데이터 세트로 분리 및 저장
    ###############################
    
    train_df, test_df = split_train_test(full_df, test_ratio=split_rate)    
    train_df.to_csv(f"{base_output_dir}/train/train.csv", index=False)
    test_df.to_csv(f"{base_output_dir}/test/test.csv", index=False)    

    logger.info(f"preprocessed train shape \n {train_df.shape}")        
    logger.info(f"preprocessed test shape \n {test_df.shape}")            

    # logger.info(f"preprocessed train path \n {base_output_dir}/train/train.csv")
    logger.info(f"\n ### Final result for train dataset ")    
    logger.info(f"preprocessed train sample \n {train_df.head(2)}")

Overwriting src/preprocess.py


In [20]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

framework_version = "0.23-1"
processing_instance_type = "ml.m5.xlarge"
processing_instance_count = 1
split_rate = 0.1

sklearn_processor = SKLearnProcessor(
    framework_version = framework_version,
    instance_type = processing_instance_type,
    instance_count = processing_instance_count,
    base_job_name = "sklearn-fraud-process",
    role = role,
)

sklearn_processor.run(
    code = "src/preprocess.py",
    inputs = [ProcessingInput(source = s3_data_path, destination = "/opt/ml/processing/input")],
    outputs = [ProcessingOutput(output_name = "train",
                                source = "/opt/ml/processing/output/train"),
               ProcessingOutput(output_name = "test",
                                source = "/opt/ml/processing/output/test")],
    arguments = ['--split_rate', f"{split_rate}"]
)




Job Name:  sklearn-fraud-process-2022-07-21-07-37-25-130
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-238312515155/sm-autoglueon-pipeline-base/data', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-238312515155/sklearn-fraud-process-2022-07-21-07-37-25-130/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-238312515155/sklearn-fraud-process-2022-07-21-07-37-25-130/output/train', 'LocalPath': '/opt/ml/processing/output/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test',