# Multiple Preprocessing through Two Instances on SageMaker
* __Techniques included:__
    * Test a preprocessing script on notebook
    * Prepreocess the preprocessing script on two instances


* Reference:
    - Code Reference: https://github.com/aws-samples/amazon-sagemaker-script-mode/tree/master/Mtf-2-workflow
    - Related Blog: https://aws.amazon.com/ko/blogs/machine-learning/creating-a-complete-tensorflow-2-workflow-in-amazon-sagemaker/

### (1) Processing numerical data on Boston Pricing Data

In [1]:
# Create folders 

import os
import sagemaker
# import tensorflow as tf

sess = sagemaker.Session()
bucket = sess.default_bucket() 

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

In [2]:
import numpy as np
from tensorflow.python.keras.datasets import boston_housing
from sklearn.preprocessing import StandardScaler

(x_train, y_train), (x_test, y_test) = boston_housing.load_data()






In [3]:
x_train

array([[1.23247e+00, 0.00000e+00, 8.14000e+00, ..., 2.10000e+01,
        3.96900e+02, 1.87200e+01],
       [2.17700e-02, 8.25000e+01, 2.03000e+00, ..., 1.47000e+01,
        3.95380e+02, 3.11000e+00],
       [4.89822e+00, 0.00000e+00, 1.81000e+01, ..., 2.02000e+01,
        3.75520e+02, 3.26000e+00],
       ...,
       [3.46600e-02, 3.50000e+01, 6.06000e+00, ..., 1.69000e+01,
        3.62250e+02, 7.83000e+00],
       [2.14918e+00, 0.00000e+00, 1.95800e+01, ..., 1.47000e+01,
        2.61950e+02, 1.57900e+01],
       [1.43900e-02, 6.00000e+01, 2.93000e+00, ..., 1.56000e+01,
        3.76700e+02, 4.38000e+00]])

In [4]:
np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)
s3_prefix = 'preprocess-two-instances'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
print(raw_s3)

s3://sagemaker-us-east-1-057716757052/preprocess-two-instances/data/raw


In [5]:
np.load("data/raw/x_train.npy")

array([[1.23247e+00, 0.00000e+00, 8.14000e+00, ..., 2.10000e+01,
        3.96900e+02, 1.87200e+01],
       [2.17700e-02, 8.25000e+01, 2.03000e+00, ..., 1.47000e+01,
        3.95380e+02, 3.11000e+00],
       [4.89822e+00, 0.00000e+00, 1.81000e+01, ..., 2.02000e+01,
        3.75520e+02, 3.26000e+00],
       ...,
       [3.46600e-02, 3.50000e+01, 6.06000e+00, ..., 1.69000e+01,
        3.62250e+02, 7.83000e+00],
       [2.14918e+00, 0.00000e+00, 1.95800e+01, ..., 1.47000e+01,
        2.61950e+02, 1.57900e+01],
       [1.43900e-02, 6.00000e+01, 2.93000e+00, ..., 1.56000e+01,
        3.76700e+02, 4.38000e+00]])

In [6]:
output_train_data_path = "data/processing/train"
output_test_data_path = "data/processing/test"
! mkdir -p {output_train_data_path}
! mkdir -p {output_test_data_path}

### Create preprocessing script

In [7]:
%%writefile gs-preprocessing.py

import argparse
import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    # Set input_data_path to internal folder on docker
    input_data_path = '/opt/ml/processing/input'
    parser.add_argument('--input_data_path', type=str, default=input_data_path)
    
    # output_train_data_path
    output_train_data_path = '/opt/ml/processing/train'
    parser.add_argument('--output_train_data_path', type=str, default=output_train_data_path)    
    # output_test_data_path
    output_test_data_path = '/opt/ml/processing/test'
    parser.add_argument('--output_test_data_path', type=str, default=output_test_data_path)    

    
    args, _ = parser.parse_known_args()
    
    print("Received arguments {}".format(args))
    
    
    input_path = '{}/*.npy'.format(args.input_data_path)
    print("input_path:\n ", input_path)
    input_files = glob.glob(input_path)    
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    
    scaler = StandardScaler()
    for file in input_files:
        raw = np.load(file)
        transformed = scaler.fit_transform(raw)
        print("transformed: \n", transformed[0])
        if 'train' in file:
#             output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
            output_path = os.path.join(args.output_train_data_path, 'x_train.npy')
            np.save(output_path, transformed)
            print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
#             output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
            output_path = os.path.join(args.output_test_data_path, 'x_test.npy')
            np.save(output_path, transformed)
            print('SAVED TRANSFORMED TEST DATA FILE\n')            
            

Overwriting gs-preprocessing.py


In [8]:
raw_dir

'/home/ec2-user/SageMaker/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/data/raw'

### Local Script Test

In [9]:
! python gs-preprocessing.py --input_data_path {raw_dir} --output_train_data_path {output_train_data_path} --output_test_data_path {output_test_data_path}

Received arguments Namespace(input_data_path='/home/ec2-user/SageMaker/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/data/raw', output_test_data_path='data/processing/test', output_train_data_path='data/processing/train')
input_path:
  /home/ec2-user/SageMaker/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/data/raw/*.npy

INPUT FILE LIST: 
['/home/ec2-user/SageMaker/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/data/raw/x_train.npy', '/home/ec2-user/SageMaker/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/data/raw/x_test.npy']

transformed: 
 [-0.27224633 -0.48361547 -0.43576161 -0.25683275 -0.1652266  -0.1764426
  0.81306188  0.1166983  -0.62624905 -0.59517003  1.14850044  0.44807713
  0.8252202 ]
SAVED TRANSFORMED TRAINING DATA FILE

transformed: 
 [ 2.8040301  -0.50784934  0.96960877 -0.32969024  1.23174581  0.11934137
  1.14739788 -0.91935276  1.60609286  1.40778227  0.90513041 -4.2782

### Launch Two Instances to transform data

In [10]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=get_execution_role(),
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2)

In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime

processing_job_name = "Preprocess-two-instances-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

sklearn_processor.run(code = 'gs-preprocessing.py',
                      job_name = processing_job_name,
                      inputs = [ProcessingInput(
                          source = raw_s3,
                          destination = '/opt/ml/processing/input',
                          s3_data_distribution_type = 'ShardedByS3Key')],
                      outputs = [ProcessingOutput(output_name = 'train',
                                                 destination = '{}/train'.format(output_destination),
                                                 source = '/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test',
                                                destination='{}/test'.format(output_destination),
                                                source='/opt/ml/processing/test')])


preprocessing_job_description = sklearn_processor.jobs[-1].describe()


Job Name:  Preprocess-two-instances-02-14-24-26
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/preprocess-two-instances/data/raw', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/Preprocess-two-instances-02-14-24-26/input/code/gs-preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/preprocess-two-instances/data/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/preprocess-two-instances/data/test', 'Local

### (2) Example to Read two csv files on each instance

In [12]:
process_train_folder = "processing/train"
process_test_folder = "processing/test"
! mkdir -p {process_train_folder}
! mkdir -p {process_test_folder}

In [13]:
import pandas as pd

region = 'us-east-1'
input_data = 's3://sagemaker-sample-data-{}/processing/census/census-income.csv'.format(region)

s3_input_data = 's3://sagemaker-sample-data-{}/processing/census'.format(region)
# df = pd.read_csv(input_data, nrows=10)
df = pd.read_csv(input_data)
print(df.shape)
df.head(n=3)

(199523, 42)


Unnamed: 0,age,class of worker,detailed industry recode,detailed occupation recode,education,wage per hour,enroll in edu inst last wk,marital stat,major industry code,major occupation code,...,country of birth father,country of birth mother,country of birth self,citizenship,own business or self employed,fill inc questionnaire for veteran's admin,veterans benefits,weeks worked in year,year,income
0,73,Not in universe,0,0,High school graduate,0,Not in universe,Widowed,Not in universe or children,Not in universe,...,United-States,United-States,United-States,Native- Born in the United States,0,Not in universe,2,0,95,- 50000.
1,58,Self-employed-not incorporated,4,34,Some college but no degree,0,Not in universe,Divorced,Construction,Precision production craft & repair,...,United-States,United-States,United-States,Native- Born in the United States,0,Not in universe,2,52,94,- 50000.
2,18,Not in universe,0,0,10th grade,0,High school,Never married,Not in universe or children,Not in universe,...,Vietnam,Vietnam,Vietnam,Foreign born- Not a citizen of U S,0,Not in universe,2,0,95,- 50000.


### Write script to Read each csv file on each instance

In [14]:
%%writefile gs-csv-preprocessing.py

import argparse
import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler
import pandas as pd

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    # Set input_data_path to internal folder on docker
    input_data_path = '/opt/ml/processing/input'
    parser.add_argument('--input_data_path', type=str, default=input_data_path)
    
    # output_train_data_path
    output_train_data_path = '/opt/ml/processing/train'
    parser.add_argument('--output_train_data_path', type=str, default=output_train_data_path)    
    # output_test_data_path
    output_test_data_path = '/opt/ml/processing/test'
    parser.add_argument('--output_test_data_path', type=str, default=output_test_data_path)    

    
    args, _ = parser.parse_known_args()
    
    print("Received arguments {}".format(args))
    
    input_path = '{}/*.csv'.format(args.input_data_path)
    print("input_path:\n ", input_path)
    input_files = glob.glob(input_path)    
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    


Overwriting gs-csv-preprocessing.py


### Download two csv files from S3

In [15]:
csv_folder = 'data/csv'
! mkdir {csv_folder}

mkdir: cannot create directory ‘data/csv’: File exists


In [16]:
census_file_path = 's3://sagemaker-sample-data-us-east-1/processing/census/census-income.csv'
!aws s3 cp {census_file_path} data/csv/census-income-01.csv
! cp census-income-01.csv data/csv/census-income-02.csv

download: s3://sagemaker-sample-data-us-east-1/processing/census/census-income.csv to data/csv/census-income-01.csv


### Upload two csv files to S3

In [17]:
s3_prefix = 'processing/csv-example'
s3_input_path = 's3://{}/{}/data'.format(bucket, s3_prefix)
s3_output_path = 's3://{}/{}/data'.format(bucket, 'output')

In [18]:
! aws s3 cp data/csv/census-income-01.csv {s3_input_path}/
! aws s3 cp data/csv/census-income-02.csv {s3_input_path}/
! aws s3 ls {s3_input_path}/

upload: data/csv/census-income-01.csv to s3://sagemaker-us-east-1-057716757052/processing/csv-example/data/census-income-01.csv
upload: data/csv/census-income-02.csv to s3://sagemaker-us-east-1-057716757052/processing/csv-example/data/census-income-02.csv
2020-06-02 14:29:17  103875379 census-income-01.csv
2020-06-02 14:29:18  103875379 census-income-02.csv


In [19]:
! python gs-csv-preprocessing.py  --input_data_path {csv_folder} --process_train_folder {process_train_folder} --process_test_folder {process_train_folder}

Received arguments Namespace(input_data_path='data/csv', output_test_data_path='/opt/ml/processing/test', output_train_data_path='/opt/ml/processing/train')
input_path:
  data/csv/*.csv

INPUT FILE LIST: 
['data/csv/census-income-02.csv', 'data/csv/census-income-01.csv']



In [20]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=get_execution_role(),
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2)

In [21]:
%%time 

from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime

processing_job_name = "processing-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

sklearn_processor.run(code = 'gs-csv-preprocessing.py',
                      job_name = processing_job_name,
                      inputs = [ProcessingInput(
                          source = s3_input_path,                          
                          destination = '/opt/ml/processing/input',
                          s3_data_distribution_type = 'ShardedByS3Key')],
                      outputs = [ProcessingOutput(output_name = 'train',
                                                 destination = '{}/train'.format(s3_output_path),
                                                 source = '/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test',
                                                destination='{}/test'.format(s3_output_path),
                                                source='/opt/ml/processing/test')])


preprocessing_job_description = sklearn_processor.jobs[-1].describe()


Job Name:  processing-workflow-02-14-29-20
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/processing/csv-example/data', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/processing-workflow-02-14-29-20/input/code/gs-csv-preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/output/data/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-057716757052/output/data/test', 'LocalPath': '/opt/ml/processing/test', 'S3UploadMode'