# Feature Transformation with Amazon a SageMaker Processing Job and Scikit-Learn

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Scikit-Learn are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Scikit-Learn in a managed SageMaker environment to run our processing workload.

![](img/prepare_dataset_bert.png)

![](img/processing.jpg)


## Contents

1. Setup Environment
1. Setup Input Data
1. Setup Output Data
1. Build a Spark container for running the processing job
1. Run the Processing Job using Amazon SageMaker
1. Inspect the Processed Output Data

# Setup Environment

Let's start by specifying:
* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.
* The IAM role ARN used to give processing and training access to the dataset.

In [1]:
import sagemaker
from time import gmtime, strftime
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

# Setup Input Data

In [2]:
# Inputs
s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(s3_input_data)

s3://sagemaker-us-west-2-086401037028/amazon-reviews-pds/tsv/


In [3]:
!aws s3 ls $s3_input_data

2020-04-25 16:31:58   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2020-04-25 16:32:07   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz


# Run the Processing Job using Amazon SageMaker

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built, and a SparkML script for processing in the job configuration.

Review the Spark processing script.

In [4]:
cat preprocess-scikit-text-to-bert.py

from sklearn.model_selection import train_test_split
from sklearn.utils import resample
import functools
import multiprocessing

import pandas as pd
from datetime import datetime
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'tensorflow==2.1.0'])
import tensorflow as tf
print(tf.__version__)
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'transformers==2.7.0'])
from transformers import DistilBertTokenizer
from tensorflow import keras
import os
import re
import collections
import argparse
import json
import os
import pandas as pd
import csv
import glob
from pathlib import Path

class InputFeatures(object):
  """A single set of features of data."""

  def __init__(self,
               input_ids,
               input_mask,
               segment_ids,
               label_id):
    self.input_ids = input_ids
    self.input_mask = input_mask
    self.segment_ids = segment_ids
    self.label_id 

Run this script as a processing job.  You also need to specify one `ProcessingInput` with the `source` argument of the Amazon S3 bucket and `destination` is where the script reads this data from `/opt/ml/processing/input` (inside the Docker container.)  All local paths inside the processing container must begin with `/opt/ml/processing/`.

Also give the `run()` method a `ProcessingOutput`, where the `source` is the path the script writes output data to.  For outputs, the `destination` defaults to an S3 bucket that the Amazon SageMaker Python SDK creates for you, following the format `s3://sagemaker-<region>-<account_id>/<processing_job_name>/output/<output_name>/`.  You also give the `ProcessingOutput` value for `output_name`, to make it easier to retrieve these output artifacts after the job is run.

The arguments parameter in the `run()` method are command-line arguments in our `preprocess-scikit-text-to-bert.py` script.

Note that we sharding the data using `ShardedS3Key` to spread the transformations across all worker nodes in the cluster.

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

processor = SKLearnProcessor(framework_version='0.20.0',
                             role=role,
                             instance_type='ml.c5.4xlarge',
                             instance_count=2)

In [6]:
processor.run(code='preprocess-scikit-text-to-bert.py',
                      inputs=[ProcessingInput(source=s3_input_data,
                                              destination='/opt/ml/processing/input/data/',
                                              s3_data_distribution_type='ShardedByS3Key')],
                      outputs=[
                               ProcessingOutput(s3_upload_mode='EndOfJob',
                                                output_name='bert-train',
                                                source='/opt/ml/processing/output/bert/train'),
                               ProcessingOutput(s3_upload_mode='EndOfJob',
                                                output_name='bert-validation',
                                                source='/opt/ml/processing/output/bert/validation'),
                               ProcessingOutput(s3_upload_mode='EndOfJob',
                                                output_name='bert-test',
                                                source='/opt/ml/processing/output/bert/test'),
                      ],
                      logs=True,
                      wait=False)



Job Name:  sagemaker-scikit-learn-2020-04-25-18-17-26-390
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-086401037028/amazon-reviews-pds/tsv/', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/input/code/preprocess-scikit-text-to-bert.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'bert-train', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/output/bert-train', 'LocalPath': '/opt/ml/processing/output/bert/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'bert-validation', 'S3Output': {'S3U

In [7]:
scikit_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']
print(scikit_processing_job_name)

sagemaker-scikit-learn-2020-04-25-18-17-26-390


In [8]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, scikit_processing_job_name)))


In [9]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, scikit_processing_job_name, region)))


# Please Wait Until the Processing Job Completes
Re-run this next cell until the job status shows `Completed`.

In [10]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=scikit_processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

processing_job_status = processing_job_description['ProcessingJobStatus']
print('\n')
print(processing_job_status)
print('\n')

print(processing_job_description)



InProgress


{'ProcessingInputs': [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-086401037028/amazon-reviews-pds/tsv/', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/input/code/preprocess-scikit-text-to-bert.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'bert-train', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/output/bert-train', 'LocalPath': '/opt/ml/processing/output/bert/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'bert-validation', 'S3Output': {'S3Uri'

# _Please Wait Until the Job Completes with Job Status ^^ `Completed` ^^._

# Inspect the Processed Output Data

Take a look at a few rows of the transformed dataset to make sure the processing was successful.

In [11]:
output_config = processing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'bert-train':
        processed_train_data = output['S3Output']['S3Uri']
    if output['OutputName'] == 'bert-validation':
        processed_validation_data = output['S3Output']['S3Uri']        
    if output['OutputName'] == 'bert-test':
        processed_test_data = output['S3Output']['S3Uri']
        
print(processed_train_data)
print(processed_validation_data)
print(processed_test_data)

s3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/output/bert-train
s3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/output/bert-validation
s3://sagemaker-us-west-2-086401037028/sagemaker-scikit-learn-2020-04-25-18-17-26-390/output/bert-test


In [12]:
!aws s3 ls $processed_train_data/

In [13]:
!aws s3 ls $processed_validation_data/

In [14]:
!aws s3 ls $processed_test_data/

# Pass `scikit_processing_job_name` to the next notebook

In [15]:
print(scikit_processing_job_name)

sagemaker-scikit-learn-2020-04-25-18-17-26-390


In [16]:
%store scikit_processing_job_name

Stored 'scikit_processing_job_name' (str)
