# Amazon SageMaker Processing Job 


기계 학습 (ML) 프로세스는 몇 단계로 구성됩니다. 먼저, 다양한 ETL 작업으로 데이터를 수집 한 다음 data의 pre-processing, 전통적인 기법 또는 사전 knowledge를 이용하여 데이터의 feature화, 마지막으로 알고리즘을 이용한 ML 모델을 학습합니다.

Apache Spark와 같은 분산 데이터 처리 프레임 워크는 학습을 위해 dataset의 pre-processing하는데 사용합니다. 이 노트북에서는 Amazon SageMaker Processing에서 기본 설치된 Apache Spark의 기능을 활용하여 처리 워크로드를 실행합니다.

![](img/prepare_dataset_bert.png)

![](img/processing.jpg)


# Setup Environment


* 모델 학습에 사용되는 S3 bucket과 prefix 가 필요합니다.
* 학습과 processing을 위해 IAM role은 dataset에 액세스가 가능해야 합니다.

In [None]:
import sagemaker
from time import gmtime, strftime
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

# Setup Input Data

In [None]:
# Inputs
s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(s3_input_data)

In [None]:
!aws s3 ls $s3_input_data

# Processing Job을 수행할 Spark Docker Image

이 HOL에서는 `./container` 폴더 내에 Spark container 이미지를 포함합니다. container는 모든 Spark 구성의 부트스트랩을 처리하고 `spark-submit` CLI를 wrapper해서 제공합니다. 상위 레벨에서는,

* A set of default Spark/YARN/Hadoop configurations
* A bootstrapping script for configuring and starting up Spark master/worker nodes
* A wrapper around the `spark-submit` CLI to submit a Spark application

container 빌드와 push 절차가 완료된 후 dataset의 처리를 수행하는 관리형 분산 Spark 어플리케이션을 수행사는 것은 Amazon SageMaker Python SDK 사용합니다.

In [None]:
docker_repo = 'amazon-reviews-spark-processor'
docker_tag = 'latest'

In [None]:
!docker build -t $docker_repo:$docker_tag -f container/Dockerfile ./container

Spark container의 Amazon Elastic Container Registry(Amazon ECR) 리포지토리를 생성하고 image를 push합니다.

In [None]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, docker_repo, docker_tag)
print(image_uri)

### ECR repository 생성과 docker image를 push하기

In [None]:
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)

### `RepositoryNotFoundException` 오류는 무시하셔도 됩니다. 즉시 repository를 생성합니다.

In [None]:
!aws ecr describe-repositories --repository-names $docker_repo || aws ecr create-repository --repository-name $docker_repo

In [None]:
!docker tag $docker_repo:$docker_tag $image_uri

In [None]:
!docker push $image_uri

# Amazon SageMaker Processing Jobs 으로 Job 수행

Amazon SageMaker Python SDK를 사용하여 Processing job을 실행합니다. Spark container와 job configuration에서 processing에 대한 Spark ML script를 사용합니다.

In [None]:
!pygmentize src_dir/preprocess-spark-text-to-bert.py

In [None]:
from sagemaker.processing import ScriptProcessor

processor = ScriptProcessor(base_job_name='spark-amazon-reviews-processor',
                            image_uri=image_uri,
                            command=['/opt/program/submit'],
                            role=role,
                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  "INFO yarn.Client: Application report for application_ (state: ACCEPTED)"
                            instance_type='ml.r5.xlarge',
                            env={'mode': 'python'})

# Setup Output Data

In [None]:
from time import gmtime, strftime
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = 'amazon-reviews-spark-processor-{}'.format(timestamp_prefix)

In [None]:
train_data_bert_output = 's3://{}/{}/output/bert-train'.format(bucket, output_prefix)
validation_data_bert_output = 's3://{}/{}/output/bert-validation'.format(bucket, output_prefix)
test_data_bert_output = 's3://{}/{}/output/bert-test'.format(bucket, output_prefix)

print(train_data_bert_output)
print(validation_data_bert_output)
print(test_data_bert_output)

In [None]:
from sagemaker.processing import ProcessingOutput

processor.run(code='./src_dir/preprocess-spark-text-to-bert.py',
              arguments=['s3_input_data', s3_input_data,
                         's3_output_train_data', train_data_bert_output,
                         's3_output_validation_data', validation_data_bert_output,
                         's3_output_test_data', test_data_bert_output,                         
              ],
              # We need this dummy output to allow us to call 
              #    ProcessingJob.from_processing_name() later 
              #    to describe the job and poll for Completed status
              outputs=[
                       ProcessingOutput(s3_upload_mode='EndOfJob',
                                        output_name='dummy-output',
                                        source='/opt/ml/processing/output')
              ],          
              logs=True,
              wait=False
)

In [None]:
from IPython.core.display import display, HTML

spark_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, spark_processing_job_name)))


In [None]:
from IPython.core.display import display, HTML

# This is different than the job name because we are not using ProcessingOutput's in this Spark ML case.
spark_processing_job_s3_output_prefix = output_prefix

display(HTML('<b>Review <a href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, spark_processing_job_s3_output_prefix, region)))


# List Processing Jobs through boto3 Python SDK

In [None]:
import boto3

client = boto3.client('sagemaker')
client.list_processing_jobs()

# Please Wait Until the Processing Job Completes
Re-run this next cell until the job status shows `Completed`.

In [None]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=spark_processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

processing_job_status = processing_job_description['ProcessingJobStatus']
print('\n')
print(processing_job_status)
print('\n')

print(processing_job_description)

In [None]:
running_processor.wait()

<h2><span style="color:red">위 Processing Job이 완료되기 전까지 기다려 주시기 바랍니다.</span></h2>


# the Processed Output Dataset 확인

In [None]:
!aws s3 ls --recursive $train_data_bert_output/

In [None]:
!aws s3 ls --recursive $validation_data_bert_output/

In [None]:
!aws s3 ls --recursive $test_data_bert_output/

In [None]:
train_data = './data-tfrecord/bert-train'
validation_data = './data-tfrecord/bert-validation'
test_data = './data-tfrecord/bert-test'

!aws s3 cp $train_data_bert_output $train_data --recursive
!aws s3 cp $validation_data_bert_output $validation_data --recursive
!aws s3 cp $test_data_bert_output $test_data --recursive

In [None]:
%store train_data_bert_output train_data

In [None]:
%store validation_data_bert_output validation_data

In [None]:
%store test_data_bert_output test_data