# Feature transformation with Amazon SageMaker Processing and SparkML

일반적으로 머신 러닝(ML) 프로세스는 몇 단계로 구성됩니다. 먼저, 다양한 ETL 작업으로 데이터를 수집한 다음 데이터를 사전 처리하고 표준 기술 또는 사전 지식을 통합하여 데이터셋을 피쳐라이징(featurizing)하고 알고리즘을 사용하여 ML 모델을 학습합니다.

종종 Spark와 같은 분산 데이터 처리 프레임워크는 학습을 위해 데이터셋을 전처리하는 데 사용됩니다. 이 노트북에서는 Amazon SageMaker Processing을 사용하고 관리형 SageMaker 환경에서 Spark의 기능을 활용하여 전처리 워크로드를 실행합니다. 그런 다음 전처리된 데이터 집합을 가져와 XGBoost를 사용하여 회귀(regression) 모델을 학습합니다.

## Contents

1. [Objective](#Objective:-predict-the-age-of-an-Abalone-from-its-physical-measurement)
1. [Setup](#Setup)
1. [Using Amazon SageMaker Processing to execute a SparkML Job](#Using-Amazon-SageMaker-Processing-to-execute-a-SparkML-Job)
  1. [Downloading dataset and uploading to S3](#Downloading-dataset-and-uploading-to-S3)
  1. [Build a Spark container for running the preprocessing job](#Build-a-Spark-container-for-running-the-preprocessing-job)
  1. [Run the preprocessing job using Amazon SageMaker Processing](#Run-the-preprocessing-job-using-Amazon-SageMaker-Processing)
    1. [Inspect the preprocessed dataset](#Inspect-the-preprocessed-dataset)
1. [Train a regression model using the Amazon SageMaker XGBoost algorithm](#Train-a-regression-model-using-the-SageMaker-XGBoost-algorithm)
  1. [Retrieve the XGBoost algorithm image](#Retrieve-the-XGBoost-algorithm-image)
  1. [Set XGBoost model parameters and dataset details](#Set-XGBoost-model-parameters-and-dataset-details)
  1. [Train the XGBoost model](#Train-the-XGBoost-model)

## Objective: 물리적 측정 값에서 전복의 나이를 예측

본 예제의 데이터셋은 [UCI Machine Learning](https://archive.ics.uci.edu/ml/datasets/abalone)에서 다운로드받을 수 있습니다. 이 작업(task)의 목표는 물리적 측정 값에서 전복(Abalone, 조개류)의 나이를 확인하는 것입니다. 핵심은 회귀 문제입니다. 데이터 집합에는 성별(`sex`, 범주형), 길이(`length`, 연속형), 지름(`diameter`, 연속형), 높이(`height`, 연속형), 전체 무게(`whole_weight`', 연속형), `shucked_weight` (연속), 내장 무게(`viscera_weight`, 연속형), 조개 무게(`shell_weight`, 연속형) 및 `rings`(정수형)과 같은 여러 피쳐(feature)들이 있습니다. 우리의 목표는 연령에 맞는 근사치인 변수 `rings`를 예측하는 것입니다 (나이는 `rings` + 1.5).

SparkML을 사용하여 데이터셋을 처리하고 (하나 이상의 피쳐 변환기; feature transformer 적용) 변환된 데이터셋을 Amazon S3에 업로드하여 XGBoost 학습에 사용할 수 있습니다.

## Setup

다음을 지정하여 시작하겠습니다.

* 학습 및 모델 데이터에 사용하는 S3 버킷 및 접두사를 지정합니다. 본 예제는 Amazon SageMaker 세션에서 지정한 기본 버킷을 사용합니다.
* IAM 역할(role) ARN은 데이터셋에 대한 처리(processing) 및 학습 액세스 권한을 부여하는 데 사용됩니다.

In [1]:
import sagemaker
from time import gmtime, strftime

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

prefix = 'sagemaker/spark-preprocess-demo/' + timestamp_prefix
input_prefix = prefix + '/input/raw/abalone'
input_preprocessed_prefix = prefix + '/input/preprocessed/abalone'
model_prefix = prefix + '/model'

## Using Amazon SageMaker Processing to execute a SparkML job

### Downloading dataset and uploading to Amazon Simple Storage Service (Amazon S3)

Amazon SageMaker 팀은 University of California-Irvine 저장소(repository)에서 전복(abalone) 데이터셋을 다운로드하여 S3 버킷(bucket)에 업로드했습니다. 이 노트북에서는 Amazon SageMaker가 데이터셋에 액세스 할 수 있도록 해당 버킷에서 다운로드하여 자체 버킷에 업로드합니다.

In [2]:
# Fetch the dataset from the SageMaker bucket
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

# Uploading the training data to S3
sagemaker_session.upload_data(path='abalone.csv', bucket=bucket, key_prefix=input_prefix)

--2019-12-13 08:13:42--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.235.32
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.235.32|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘abalone.csv’


2019-12-13 08:13:42 (782 KB/s) - ‘abalone.csv’ saved [191873/191873]



's3://sagemaker-us-east-1-143656149352/sagemaker/spark-preprocess-demo/2019-12-13-08-13-41/input/raw/abalone/abalone.csv'

### Build a Spark container for running the preprocessing job

본 예제의 Spark 컨테이너는 이 예제의 `./container` 디렉토리에 포함되어 있습니다. 컨테이너는 모든 Spark 설정의 부트스트래핑을 처리하고 `spark-submit` CLI 주위의 래퍼(wrapper) 역할을 합니다. 컨테이너는 높은 수준(high level)에서 아래 항목들을 제공합니다.

* 기본 Spark/YARN/Hadoop 설정 셋
* Spark 마스터/작업자(master/worker) 노드 구성 및 시작을 위한 부트스트래핑 스크립트
* `spark-submit` CLI를 감싸서 Spark 애플리케이션을 제출하는 래퍼(wrapper)

컨테이너 빌드(build) 및 푸시(push) 프로세스가 완료되면 Amazon SageMaker Python SDK를 사용하여 데이터셋 전처리를 수행하는 관리형 분산 Spark 애플리케이션을 제출합니다.

이제, 예제 Spark 컨테이너를 빌드해 보겠습니다.

In [3]:
%cd container
!docker build -t sagemaker-spark-example .
%cd ../

/home/ec2-user/SageMaker/feature_transformation_with_sagemaker_processing_2019-12-13/container
Sending build context to Docker daemon  24.06kB
Step 1/32 : FROM openjdk:8-jre-slim
8-jre-slim: Pulling from library/openjdk

[1Bee12ec04: Already exists 
[1Bc2bdcfe1: Pulling fs layer 
[1B06caa98c: Pulling fs layer 
[1BDigest: sha256:171bf189bf031f412aabbbd6b531d47c0971822e31fe0faee3b38a45bead8b53[K[1A[1K[K[3A[1K[K[1A[1K[K[3A[1K[K[2A[1K[K[2A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K[1A[1K[K
Status: Downloaded newer image for openjdk:8-jre-slim
 ---> bf4f62306d0f
Step 2/32 : RUN apt-get update
 ---> Running in 7fa0756d5165
Get:1 http://cdn-fastly.deb.debian.org/debian buster InRelease [122 kB]
Get:2 http://security-cdn.debian.org/debian-security buster/updates InRelease [65.4 kB]
Get:3 http://cdn-fastly.deb.debian.org/debian buster-updates InRelease [49.3 kB]
Get:4 http://cdn-fastly.deb.debian.org/debian b

Get:13 http://cdn-fastly.deb.debian.org/debian buster/main amd64 readline-common all 7.0-5 [70.6 kB]
Get:14 http://cdn-fastly.deb.debian.org/debian buster/main amd64 libreadline7 amd64 7.0-5 [151 kB]
Get:15 http://cdn-fastly.deb.debian.org/debian buster/main amd64 libsqlite3-0 amd64 3.27.2-3 [641 kB]
Get:16 http://cdn-fastly.deb.debian.org/debian buster/main amd64 libpython2.7-stdlib amd64 2.7.16-2+deb10u1 [1912 kB]
Get:17 http://cdn-fastly.deb.debian.org/debian buster/main amd64 python2.7 amd64 2.7.16-2+deb10u1 [305 kB]
Get:18 http://cdn-fastly.deb.debian.org/debian buster/main amd64 libpython2-stdlib amd64 2.7.16-1 [20.8 kB]
Get:19 http://cdn-fastly.deb.debian.org/debian buster/main amd64 libpython-stdlib amd64 2.7.16-1 [20.8 kB]
Get:20 http://cdn-fastly.deb.debian.org/debian buster/main amd64 python2 amd64 2.7.16-1 [41.6 kB]
Get:21 http://cdn-fastly.deb.debian.org/debian buster/main amd64 python amd64 2.7.16-1 [22.8 kB]
Get:22 http://cdn-fastly.deb.debian.org/debian buster/main amd6

Get:100 http://cdn-fastly.deb.debian.org/debian buster/main amd64 libgirepository-1.0-1 amd64 1.58.3-2 [92.8 kB]
Get:101 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gir1.2-glib-2.0 amd64 1.58.3-2 [143 kB]
Get:102 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gnupg-l10n all 2.2.12-1+deb10u1 [1010 kB]
Get:103 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gnupg-utils amd64 2.2.12-1+deb10u1 [861 kB]
Get:104 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gpg amd64 2.2.12-1+deb10u1 [865 kB]
Get:105 http://cdn-fastly.deb.debian.org/debian buster/main amd64 pinentry-curses amd64 1.1.0-2 [64.5 kB]
Get:106 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gpg-agent amd64 2.2.12-1+deb10u1 [617 kB]
Get:107 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gpg-wks-client amd64 2.2.12-1+deb10u1 [485 kB]
Get:108 http://cdn-fastly.deb.debian.org/debian buster/main amd64 gpg-wks-server amd64 2.2.12-1+deb10u1 [478 kB]
Get:109 http://cdn-fa

Selecting previously unselected package libsqlite3-0:amd64.
Preparing to unpack .../13-libsqlite3-0_3.27.2-3_amd64.deb ...
Unpacking libsqlite3-0:amd64 (3.27.2-3) ...
Selecting previously unselected package libpython2.7-stdlib:amd64.
Preparing to unpack .../14-libpython2.7-stdlib_2.7.16-2+deb10u1_amd64.deb ...
Unpacking libpython2.7-stdlib:amd64 (2.7.16-2+deb10u1) ...
Selecting previously unselected package python2.7.
Preparing to unpack .../15-python2.7_2.7.16-2+deb10u1_amd64.deb ...
Unpacking python2.7 (2.7.16-2+deb10u1) ...
Selecting previously unselected package libpython2-stdlib:amd64.
Preparing to unpack .../16-libpython2-stdlib_2.7.16-1_amd64.deb ...
Unpacking libpython2-stdlib:amd64 (2.7.16-1) ...
Selecting previously unselected package libpython-stdlib:amd64.
Preparing to unpack .../17-libpython-stdlib_2.7.16-1_amd64.deb ...
Unpacking libpython-stdlib:amd64 (2.7.16-1) ...
Setting up libpython2.7-minimal:amd64 (2.7.16-2+deb10u1) ...
Setting up python2.7-minimal (2.7.16-2+deb10u

Selecting previously unselected package libmpx2:amd64.
Preparing to unpack .../032-libmpx2_8.3.0-6_amd64.deb ...
Unpacking libmpx2:amd64 (8.3.0-6) ...
Selecting previously unselected package libquadmath0:amd64.
Preparing to unpack .../033-libquadmath0_8.3.0-6_amd64.deb ...
Unpacking libquadmath0:amd64 (8.3.0-6) ...
Selecting previously unselected package libgcc-8-dev:amd64.
Preparing to unpack .../034-libgcc-8-dev_8.3.0-6_amd64.deb ...
Unpacking libgcc-8-dev:amd64 (8.3.0-6) ...
Selecting previously unselected package gcc-8.
Preparing to unpack .../035-gcc-8_8.3.0-6_amd64.deb ...
Unpacking gcc-8 (8.3.0-6) ...
Selecting previously unselected package gcc.
Preparing to unpack .../036-gcc_4%3a8.3.0-1_amd64.deb ...
Unpacking gcc (4:8.3.0-1) ...
Selecting previously unselected package libstdc++-8-dev:amd64.
Preparing to unpack .../037-libstdc++-8-dev_8.3.0-6_amd64.deb ...
Unpacking libstdc++-8-dev:amd64 (8.3.0-6) ...
Selecting previously unselected package g++-8.
Preparing to unpack .../038-g

Selecting previously unselected package libalgorithm-diff-xs-perl.
Preparing to unpack .../083-libalgorithm-diff-xs-perl_0.04-5+b1_amd64.deb ...
Unpacking libalgorithm-diff-xs-perl (0.04-5+b1) ...
Selecting previously unselected package libalgorithm-merge-perl.
Preparing to unpack .../084-libalgorithm-merge-perl_0.08-3_all.deb ...
Unpacking libalgorithm-merge-perl (0.08-3) ...
Selecting previously unselected package libexpat1-dev:amd64.
Preparing to unpack .../085-libexpat1-dev_2.2.6-2+deb10u1_amd64.deb ...
Unpacking libexpat1-dev:amd64 (2.2.6-2+deb10u1) ...
Selecting previously unselected package libfile-fcntllock-perl.
Preparing to unpack .../086-libfile-fcntllock-perl_0.22-3+b5_amd64.deb ...
Unpacking libfile-fcntllock-perl (0.22-3+b5) ...
Selecting previously unselected package libglib2.0-data.
Preparing to unpack .../087-libglib2.0-data_2.58.3-2+deb10u2_all.deb ...
Unpacking libglib2.0-data (2.58.3-2+deb10u2) ...
Selecting previously unselected package libicu63:amd64.
Preparing to

Setting up libicu63:amd64 (63.1-6) ...
Setting up libfakeroot:amd64 (1.23-1) ...
Setting up libkrb5support0:amd64 (1.17-3) ...
Setting up libsasl2-modules-db:amd64 (2.1.27+dfsg-1) ...
Setting up fakeroot (1.23-1) ...
update-alternatives: using /usr/bin/fakeroot-sysv to provide /usr/bin/fakeroot (fakeroot) in auto mode
Setting up libasan5:amd64 (8.3.0-6) ...
Setting up libglib2.0-data (2.58.3-2+deb10u2) ...
Setting up make (4.2.1-1.2) ...
Setting up libmpfr6:amd64 (4.0.2-1) ...
Setting up gnupg-l10n (2.2.12-1+deb10u1) ...
Setting up librtmp1:amd64 (2.4+20151223.gitfa8646d.1-2) ...
Setting up libdbus-1-3:amd64 (1.12.16-1) ...
Setting up dbus (1.12.16-1) ...
invoke-rc.d: could not determine current runlevel
invoke-rc.d: policy-rc.d denied execution of start.
Setting up xz-utils (5.2.4-1) ...
update-alternatives: using /usr/bin/xz to provide /usr/bin/lzma (lzma) in auto mode
Setting up libquadmath0:amd64 (8.3.0-6) ...
Setting up libmpc3:amd64 (1.1.0-1) ...
Setting up libatomic1:amd64 (8.3.

Setting up gnupg (2.2.12-1+deb10u1) ...
Setting up build-essential (12.6) ...
Setting up libpython2-dev:amd64 (2.7.16-1) ...
Setting up libalgorithm-diff-xs-perl (0.04-5+b1) ...
Setting up libalgorithm-merge-perl (0.08-3) ...
Setting up python2.7-dev (2.7.16-2+deb10u1) ...
Setting up libpython3-dev:amd64 (3.7.3-1) ...
Setting up python2-dev (2.7.16-1) ...
Setting up libpython-dev:amd64 (2.7.16-1) ...
Setting up python3-secretstorage (2.3.1-2) ...
Setting up python3-dev (3.7.3-1) ...
Setting up python3-keyring (17.1.1-1) ...
Setting up python-dev (2.7.16-1) ...
Processing triggers for libc-bin (2.28-10) ...
Removing intermediate container 081a956dcf4e
 ---> 3c6e500e334b
Step 4/32 : RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4
 ---> Running in 79a461b0f456
Collecting py4j
  Downloading https://files.pythonhosted.org/packages/04/de/2d314a921ef4c20b283e1de94e0780273678caac901564df06b948e4ba9b/py4j-0.10.8.1-py2.py3-none-any.whl (196kB)
Collecting psutil==5.6.5
  Downloading https://fil

Spark 컨테이너의 Amazon Elastic Container Registry(Amazon ECR) 리포지토리를 생성하고 이미지를 푸시합니다.

In [4]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'sagemaker-spark-example'
tag = ':latest'
spark_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $spark_repository_uri
!docker push $spark_repository_uri

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:us-east-1:143656149352:repository/sagemaker-spark-example",
        "registryId": "143656149352",
        "repositoryName": "sagemaker-spark-example",
        "repositoryUri": "143656149352.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-example",
        "createdAt": 1576224959.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        }
    }
}
The push refers to repository [143656149352.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-example]

[1B58b9ddd1: Preparing 
[1Bfc41467c: Preparing 
[1B4d63ac4a: Preparing 
[1B68bcdd7f: Preparing 
[1Bc80ef96c: Preparing 
[1B47d3ef6d: Preparing 
[1B49045054: Preparing 
[1B13c580d4: Preparing 
[1B8743132f: Preparing 
[1Ba2e331ca: Preparing 
[1B9afd07bf: Preparing 
[1Bb2dba006: Preparing 
[1Bc372b2bb: Preparing 


[6B8743132f: Pushing  490.3MB/481.8MB[13A[1K[K[14A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[12A[1K[K[11A[1K[K[10A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[9A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[10A[1K[K[10A[1K[K[11A[1K[K[11A[1K[K[11A[1K[K[11A[1K[K[8A[1K[K[11A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[11A[1K[K[11A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[11A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[10A[1K[K[10A[1K[K[10A[1K[K[11A[1K[K[10A[1K[K[7A[1K[K[10A[1K[K[11A[1K[K[7A[1K[K[11A[1K[K[7A[1K[K[10A[1K[K[11A[1K[K[7A[1K[K[10A[1K[K[7A[1K[K[11A[1K[K[11A[1K[K[10A[1K[K[11A[1K[K[7A[1K

[6Blatest: digest: sha256:864adfb2cbe673b94296c746cb86eea8b454eecbffddfb07cf139b329dca4c40 size: 3261


### Run the preprocessing job using Amazon SageMaker Processing

다음으로 Amazon SageMaker Python SDK를 사용하여 처리 작업(processing job)을 제출합니다. 방금 구축된 Spark 컨테이너와 작업 설정에서 전처리를 위해 SparkML 스크립트를 사용합니다.

아래 코드에서 SparkML 전처리 스크립트를 작성합니다.

In [5]:
%%writefile preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import time
import sys
import os
import shutil
import csv

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *


def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    spark = SparkSession.builder.appName("PySparkAbalone").getOrCreate()
    
    # Convert command line args into a map of args
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")
    
    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType([StructField("sex", StringType(), True), 
                         StructField("length", DoubleType(), True),
                         StructField("diameter", DoubleType(), True),
                         StructField("height", DoubleType(), True),
                         StructField("whole_weight", DoubleType(), True),
                         StructField("shucked_weight", DoubleType(), True),
                         StructField("viscera_weight", DoubleType(), True), 
                         StructField("shell_weight", DoubleType(), True), 
                         StructField("rings", DoubleType(), True)])

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(('s3a://' + os.path.join(args['s3_input_bucket'], args['s3_input_key_prefix'],
                                                   'abalone.csv')), header=False, schema=schema)

    #StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
    
    #one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    #vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(inputCols=["sex_vec", 
                                           "length", 
                                           "diameter", 
                                           "height", 
                                           "whole_weight", 
                                           "shucked_weight", 
                                           "viscera_weight", 
                                           "shell_weight"], 
                                outputCol="features")
    
    # The pipeline comprises of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
    
    # This step trains the feature transformers
    model = pipeline.fit(total_df)
    
    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)
    
    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
    
    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile('s3a://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'train'))
    
    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile('s3a://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'validation'))


if __name__ == "__main__":
    main()

Writing preprocess.py


방금 생성한 Docker 이미지 및 전처리 스크립트를 사용하여 처리 작업을 실행합니다. `spark_processor.run()` 함수를 호출할 때 Amazon S3 입력 및 출력 경로를 Amazon S3의 입력 및 출력 위치를 결정하기 위해 전처리 스크립트에 필요한 인수로 전달합니다. 또한, 분산 Spark 작업에 사용될 인스턴스 수와 인스턴스 유형도 같이 지정합니다.

In [6]:
from sagemaker.processing import ScriptProcessor, ProcessingInput
spark_processor = ScriptProcessor(base_job_name='spark-preprocessor',
                                  image_uri=spark_repository_uri,
                                  command=['/opt/program/submit'],
                                  role=role,
                                  instance_count=2,
                                  instance_type='ml.r5.xlarge',
                                  max_runtime_in_seconds=1200,
                                  env={'mode': 'python'})

spark_processor.run(code='preprocess.py',
                    arguments=['s3_input_bucket', bucket,
                               's3_input_key_prefix', input_prefix,
                               's3_output_bucket', bucket,
                               's3_output_key_prefix', input_preprocessed_prefix],
                    logs=False)


Job Name:  spark-preprocessor-2019-12-13-08-17-02-174
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-143656149352/spark-preprocessor-2019-12-13-08-17-02-174/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []
.....................................................................!

#### Inspect the preprocessed dataset
전처리가 성공적으로 수행되었는지 확인하기 위해, 변환 완료된 데이터셋의 첫 몇 행을 살펴보세요.

In [7]:
print('Top 5 rows from s3://{}/{}/train/'.format(bucket, input_preprocessed_prefix))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix/train/part-00000 - | head -n5

Top 5 rows from s3://sagemaker-us-east-1-143656149352/sagemaker/spark-preprocess-demo/2019-12-13-08-13-41/input/preprocessed/abalone/train/
5.0,0.0,0.0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025
7.0,0.0,0.0,0.305,0.225,0.07,0.1485,0.0585,0.0335,0.045
7.0,0.0,0.0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048
7.0,0.0,0.0,0.325,0.26,0.09,0.1915,0.085,0.036,0.062
9.0,0.0,0.0,0.33,0.26,0.08,0.2,0.0625,0.05,0.07


## Train a regression model using the SageMaker XGBoost algorithm

Amazon SageMaker XGBoost 알고리즘을 사용하여이 데이터셋을 학습합니다. 전처리된 학습 데이터가 처리 작업 출력의 일부로 업로드된 Amazon S3 경로를 이미 알고 있습니다.

### Retrieve the XGBoost algorithm image

학습 작업에서 사용할 수 있도록 XGBoost 내장 알고리즘 이미지를 검색합니다.

In [8]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost', repo_version="0.90-1")
print(training_image)

683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3


### Set XGBoost model parameters and dataset details

다음으로 XGBoost 알고리즘 및 입력 데이터셋에 대한 Estimator를 설정합니다. 노트북은 이제 SparkML 스크립트에 사용된 것과 동일한 데이터 위치를 XGBoost Estimator로 전달할 수 있도록 매개변수화됩니다.

In [9]:
s3_train_data = 's3://{}/{}/{}'.format(bucket, input_preprocessed_prefix, 'train/part')
s3_validation_data = 's3://{}/{}/{}'.format(bucket, input_preprocessed_prefix, 'validation/part')
s3_output_location = 's3://{}/{}/{}'.format(bucket, prefix, 'xgboost_model')

xgb_model = sagemaker.estimator.Estimator(training_image,
                                          role, 
                                          train_instance_count=1, 
                                          train_instance_type='ml.m4.xlarge',
                                          train_volume_size = 20,
                                          train_max_run = 3600,
                                          input_mode= 'File',
                                          output_path=s3_output_location,
                                          sagemaker_session=sagemaker_session)

xgb_model.set_hyperparameters(objective = "reg:linear",
                              eta = .2,
                              gamma = 4,
                              max_depth = 5,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

### Train the XGBoost model

In [10]:
xgb_model.fit(inputs=data_channels, logs=True)

2019-12-13 08:23:05 Starting - Starting the training job...
2019-12-13 08:23:06 Starting - Launching requested ML instances......
2019-12-13 08:24:11 Starting - Preparing the instances for training......
2019-12-13 08:25:35 Downloading - Downloading input data
2019-12-13 08:25:35 Training - Downloading the training image...
2019-12-13 08:26:04 Uploading - Uploading generated training model.[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:linear to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[08:26:01] 3298x

### Summary

짜잔! 피쳐 변환(feature transformation)을 위해 Amazon SageMaker Processing를 사용하고 회귀 모델 학습을 위해 Amazon SageMaker XGBoost를 사용하여 머신 러닝 파이프라인의 첫 번째 부분을 완료했습니다.