In [1]:
import boto3
import sagemaker
import pandas as pd


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"FraudScratchModelPackageGroupName"

## 데이터 세트 로딩 및 S3 업로드

In [2]:
data_dir = '../data'
df_train = pd.read_csv(f"{data_dir}/train.csv", index_col=0)
df_test = pd.read_csv(f"{data_dir}/test.csv", index_col=0)

이제 데이터를 디폴트 버킷으로 업로드합니다. `input_data_uri` 변수를 통해 데이터셋의 위치를 저장하였습니다.

In [5]:
local_path = "../data/train.csv"
data_prefix = 'fraud2scratch'
base_uri = f"s3://{default_bucket}/{data_prefix}"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-ap-northeast-2-057716757052/fraud2scratch/train.csv


모델 생성 후 배치변환 추론을 실행할 때 사용할 두번째 데이터셋을 다운로드 합니다. 본 파일의 경로를 `batch_data_uri`에 저장합니다. 

In [6]:
local_path = "../data/test.csv"

# base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-ap-northeast-2-057716757052/fraud2scratch/test.csv


### 파이프라인 실행을 위한 파라미터 정의 



## Preprocessing File 작성

In [7]:
import os
base_preproc_dir = 'opt/ml/processing'
os.makedirs(base_preproc_dir, exist_ok=True)

base_input_dir = 'opt/ml/processing/input'
os.makedirs(base_input_dir, exist_ok=True)

base_train_dir = 'opt/ml/processing/train'
os.makedirs(base_train_dir, exist_ok=True)

base_validation_dir = 'opt/ml/processing/validation'
os.makedirs(base_validation_dir, exist_ok=True)

base_test_dir = 'opt/ml/processing/test'
os.makedirs(base_test_dir, exist_ok=True)


# !mkdir -p abalone
s3_input_data = 's3://sagemaker-ap-northeast-2-057716757052/abalone/abalone-dataset.csv'
! aws s3 cp  {s3_input_data} {base_input_dir}



download: s3://sagemaker-ap-northeast-2-057716757052/abalone/abalone-dataset.csv to opt/ml/processing/input/abalone-dataset.csv


In [8]:
# df = pd.read_csv(base_input_dir + "/abalone-dataset.csv")
# df
# df_train.info()

## 데이터 피쳐 전처리 스크래치 작업

In [10]:
# df_train.info()
import numpy as np

cols = df_train.columns
cols = ['incident_month', 'customer_age', 'incident_hour', 'months_as_customer','injury_claim'
       ]
df2 = df_train[cols].reset_index()
df2.to_csv('opt/ml/processing/input/fraud-dataset.csv', index=False)
dataset_file_path = 'input/fraud-dataset.csv'

# float_cols = df2.select_dtypes(include=['float']).columns.values
# int_cols = df2.select_dtypes(include=['int']).columns.values
# numeric_cols = np.concatenate((float_cols, int_cols), axis=0).tolist()
# numeric_cols
# obj_cols = df2.select_dtypes(include=['object']).columns.values.tolist()
# obj_cols


In [11]:
! python preprocessing.py --base_dir {base_preproc_dir} --dataset_file_path {dataset_file_path}

#############################################
args.base_dir: opt/ml/processing
args.dataset_file_path: input/fraud-dataset.csv
args.label_column: fraud
dataset sample 
    fraud  incident_month  ...  months_as_customer  injury_claim
0      0               1  ...                   2       15000.0
1      0               8  ...                 117           0.0

[2 rows x 6 columns]
df columns 
 Index(['fraud', 'incident_month', 'customer_age', 'incident_hour',
       'months_as_customer', 'injury_claim'],
      dtype='object')
preprocessed train sample 
      0         1         2         3         4         5
0  0.0 -0.287400  1.177914  0.820867  0.415755  1.185224
1  0.0 -0.586515  0.904462  0.390446  0.975051  0.965575
All files are preprocessed


### 특성공학(Feature Engineering)을 위한 프로세싱 단계 정의 



In [12]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

다음으로 프로세싱을 위한 `SKLearnProcessor`의 인스턴스를 생성합니다. 이 인스턴스는 `ProcessingStep`에서 사용합니다.

본 노트북에서 계속 사용할 `framework_version`값을 지정합니다.

`sklearn_processor` 인스턴스의 생성시 `processing_instance_type`과 `processing_instance_count` 파라미터가 사용된 것을 확인합니다.


In [13]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

In [14]:
input_data

ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-ap-northeast-2-057716757052/fraud2scratch/train.csv')

In [18]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="FraudScratchProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="fraud/preprocessing.py",
)

### 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의



In [19]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"FraudScratchPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process],
)

#### (선택) 파이프라인 정의 확인 

파이프라인을 정의하는 JSON을 생성하고 파이프라인 내에서 사용하는 파라미터와 단계별 속성들이 잘 정의되었는지 확인할 수 있습니다.

In [20]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-057716757052/fraud2scratch/train.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-057716757052/fraud2scratch/test.csv'}],
 'Steps': [{'Name': 'FraudScratchProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification

### 파이프라인을 SageMaker에 제출하고 실행하기 

파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [21]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/fraudscratchpipeline',
 'ResponseMetadata': {'RequestId': '02b93807-e732-415b-94f4-52dfa89598a5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '02b93807-e732-415b-94f4-52dfa89598a5',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '93',
   'date': 'Sun, 11 Apr 2021 07:15:12 GMT'},
  'RetryAttempts': 0}}

디폴트값을 이용하여 파이프라인을 샐행합니다. 

In [22]:
execution = pipeline.start()

### 파이프라인 운영: 파이프라인 대기 및 실행상태 확인

워크플로우의 실행상황을 살펴봅니다. 

In [23]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/fraudscratchpipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/fraudscratchpipeline/execution/zwi0hbfkvh7y',
 'PipelineExecutionDisplayName': 'execution-1618125316219',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2021, 4, 11, 7, 15, 16, 113000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 4, 11, 7, 15, 16, 113000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:user-profile/d-6d7jmk8n7gqu/ds01gsmoon-fc4',
  'UserProfileName': 'ds01gsmoon-fc4',
  'DomainId': 'd-6d7jmk8n7gqu'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:user-profile/d-6d7jmk8n7gqu/ds01gsmoon-fc4',
  'UserProfileName': 'ds01gsmoon-fc4',
  'DomainId': 'd-6d7jmk8n7gqu'},
 'ResponseMetadata': {'RequestId': '5e6e1699-b0a6-4066-8e26-fbbcb4b84817',
  'HTTPStatusCode': 200,
  

실행이 완료될 때까지 기다립니다.

In [24]:
# execution.wait()

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

In [26]:
execution.list_steps()

[{'StepName': 'FraudScratchProcess',
  'StartTime': datetime.datetime(2021, 4, 11, 7, 15, 16, 479000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:processing-job/pipelines-zwi0hbfkvh7y-fraudscratchprocess-wkkfiucdir'}}}]