# SageMaker Processing Job hands-on 🍛

Scope of hands-on

* How to prepare data for a SageMaker Processing Job
* How to run a SageMaker Processing Job
* How to create a Docker image for preprocessing

After studying SageMaker Processing Job, you will probably crave some extra spicy curry rice!!! 🌶️

# A whole image of SageMaker Processing Job
※ 🌶️ means a level of this hands-on!!

## Step. 1: Prepare data to S3 🌶️
![overview-preparing](./images/overview-preparing.png)

***By user***
1. Download data.
2. Compress data as Zip format.
3. Upload a Zip file to S3.

## Step. 2: Run a SageMaker Processing Job 🌶️🌶️🌶️
![overview-processing](./images/overview-processing.png)

***By user***
1. Build a image for preprocessing.
2. Push the image.
3. Execute APIs for preprocessing.

***By SageMaker***

4. Pull the image.
5. Download a Zip file.
6. Run a processing instance.
7. Upload preprocessed data.
8. Delete the processing instance.

# SageMaker Processing Job hands-on

### Objective
Run a SageMaker Processing job using the MNIST dataset.

## Step. 1: Prepare data to S3 🌶️

In [None]:
import sagemaker
print(f'Current sagemaker Version ={sagemaker.__version__}')

### Configuration

In [None]:
name = 'haedu'

In [None]:
import urllib.request, gzip, numpy as np, sagemaker, datetime, yaml, os, shutil
from matplotlib import pyplot as plt
from PIL import Image
from tqdm import tqdm

url_base = 'http://yann.lecun.com/exdb/mnist/'
key_file = {
    'train_img':'train-images-idx3-ubyte.gz',
    'train_label':'train-labels-idx1-ubyte.gz',
    'test_img':'t10k-images-idx3-ubyte.gz',
    'test_label':'t10k-labels-idx1-ubyte.gz'
}
# To uniquely identify various identifiers, timestamp is utilized.
timestamp = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y%m%d%H%M%S')

print(f'timestamp: {timestamp}')

### Download a MNIST data to this notebook instance from internet.

In [None]:
%%time

# a directory for saving image files and labels.
dataset_dir = './mnist/'

os.makedirs(dataset_dir, exist_ok=True)

for v in key_file.values():
    file_path = dataset_dir + '/' + v
    # download data
    urllib.request.urlretrieve(url_base + v, file_path)

In [None]:
!ls  {dataset_dir}

### Convert MNIST binary data into NumPy arrays.
Encode the label data using one-hot encoding.

In [None]:
%%time

file_path = dataset_dir + key_file['train_img']
with gzip.open(file_path, 'rb') as f:
    train_x = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28,28)
file_path = dataset_dir + key_file['train_label']
with gzip.open(file_path, 'rb') as f:
    train_y = np.frombuffer(f.read(), np.uint8, offset=8)
train_y = np.identity(10)[train_y]

file_path = dataset_dir + key_file['test_img']
with gzip.open(file_path, 'rb') as f:
    test_x = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28,28)
file_path = dataset_dir + key_file['test_label']
with gzip.open(file_path, 'rb') as f:
    test_y = np.frombuffer(f.read(), np.uint8, offset=8)
test_y = np.identity(10)[test_y]

### Save numpy arrays as PNG format.

In [None]:
%%time

# save images in local
base_dir = './dataset/'
train_x_dir = base_dir + 'train_x/'
test_x_dir = base_dir + 'test_x/'

os.makedirs(train_x_dir, exist_ok=True)
os.makedirs(test_x_dir, exist_ok=True)

for i in tqdm(range(train_x.shape[0])):
    Image.fromarray(train_x[i,:,:]).save(train_x_dir + str(i).zfill(5) + ".png")

for i in tqdm(range(test_x.shape[0])):
    Image.fromarray(test_x[i,:,:]).save(test_x_dir + str(i).zfill(5) + ".png")

np.save(base_dir + 'train_y.npy',train_y)
np.save(base_dir + 'test_y.npy',test_y)

In [None]:
!ls {base_dir}

In [None]:
!ls {base_dir}train_x | head -n5

In [None]:
!ls {base_dir}test_x | head -n5

### Compress the mnist data as a Zip file

* When performing preprocessing, a zip file improves transfer efficiency from S3 to the container (transferring smaller individual files takes more time). 
* The decompression process should also be included in the preprocessing step.

In [None]:
zip_file = shutil.make_archive('./dataset', 'zip', root_dir='./dataset/')
print(zip_file)

### Upload the Zip file using the SageMaker SDK to S3

By using the upload_data method, you can upload data to the SageMaker default bucket (sagemaker-{region}-{account}) with just one line of code.



In [None]:
prefix = f'sagemaker-handson-{name}/dataset-{timestamp}'
zip_dataset_s3_uri = sagemaker.session.Session().upload_data(path=zip_file, key_prefix=prefix)

print(zip_dataset_s3_uri)

## Step. 2: Run a SageMaker Processing Job 🌶️🌶️🌶️

In [None]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
from sagemaker.s3 import parse_s3_url
import yaml,boto3, io

role = get_execution_role()

print(f'role: {role}')
print(f'name: {name}')
print(f'zip_dataset_s3_uri: {zip_dataset_s3_uri}')
print(f'timestamp: {timestamp}')

### Create a container image for preprocessing
SageMaker provides built-in containers for Apache Spark and scikit-learn, but there are no libraries for image processing (scikit-image, opencv, pillow, etc.) in default containers. 
But, you can use these libraries using a option of bring your own container.

In [None]:
!cat ./container/Dockerfile

In [None]:
# Build a image
%cd container
!docker build -t sagemaker-tf-handson-{name}-{timestamp} .
%cd ../

SageMaker Notebooks pre-installed Docker. (In SageMaker Studio, the Docker is not installed. So, you have to use sm-docker commands instead of docker commands)
1. Build the image locally.
2. Push the image to an Elastic Container Registry repository.
3. Use the pushed image to perform preprocessing.

In [None]:
import boto3

# Retrieve a AWS account info for ECR
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name
tag = ':latest'

ecr_repository = f'sagemaker-tf-handson-{name}-{timestamp}'
image_uri = f'{account_id}.dkr.ecr.{region}.amazonaws.com/{ecr_repository+tag}'

!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
 
# Create a repository
!aws ecr create-repository --repository-name $ecr_repository
 
!docker build -t {ecr_repository} .
!docker tag {ecr_repository + tag} $image_uri
!docker push $image_uri

print(f'This container image was registered in {image_uri}.')

### Directories of data to run a processing instance
Create a processing instance to perform preprocessing and start the job. Set the directory where the processor will perform the processing and the job name in advance.

In [None]:
processing_input_dir = '/opt/ml/processing/input'
processing_output_train_dir = '/opt/ml/processing/train'
processing_output_test_dir = '/opt/ml/processing/test'
job_name = f'sagemaker-preprocess-handson-{name}'

### Run a processing instance

In [None]:
!pygmentize ./preprocessing_script/preprocess.py

* To process with your own Docker image, create a processing instance from a ScriptProcessor class. 
* Specify the URI of the image you pushed earlier in `image_uri`. 
* Additionally, since the image you created has the Python 3.7 path set up for `python3`, specify `python3` in the command.

In [None]:
processor = ScriptProcessor(
    base_job_name=job_name,
    image_uri=image_uri,
    command=['python3'],
    role=role,
    instance_count=1,
    instance_type='ml.c5.xlarge'
)

* After creating a processor instance, you can run a SageMaker Processing Job using the `run` method. 
* Specify a `.py` file in the code argument. 
* Pass values to the processing script through the arguments.

In [None]:
# Run a Processing Job to decompress a zip file and execute a method of histogram equalization
processor.run(
    code='./preprocessing_script/preprocess.py',
    inputs=[
        ProcessingInput(source=zip_dataset_s3_uri,destination=processing_input_dir)
    ],
    outputs=[
        ProcessingOutput(output_name='train',source=processing_output_train_dir),
        ProcessingOutput(output_name='test',source=processing_output_test_dir)],
    arguments=[
        '--hist-flatten', 'True',
        '--input-dir',processing_input_dir,
        '--output-train-dir',processing_output_train_dir,
        '--output-test-dir',processing_output_test_dir
    ]
)

By using the describe method, you can understand the execution results of a job (such as the output destination for data).

In [None]:
# describe the detail info of the preprocessing job run 
processor_description = processor.jobs[-1].describe()
print(processor_description)

### Confirm the execution result
* The execution results are stored in S3. 
* This time, you can confirm that npy files are loaded with numpy, and an example of the pre-processed image.

In [None]:
train_data_uri = processor_description['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
test_data_uri = processor_description['ProcessingOutputConfig']['Outputs'][1]['S3Output']['S3Uri']
print(f'train_data_uri: {train_data_uri}')
print(f'test_data_uri: {test_data_uri}')

In [None]:
bucket,train_key = parse_s3_url(train_data_uri)
bucket,test_key = parse_s3_url(test_data_uri)
s3 = boto3.client('s3')
obj_list=s3.list_objects_v2(Bucket=bucket, Prefix=train_key)
file=[]
for contents in obj_list['Contents']:
    file.append(contents['Key'])
obj_list=s3.list_objects_v2(Bucket=bucket, Prefix=test_key)
for contents in obj_list['Contents']:
    file.append(contents['Key'])

print(file)

In [None]:
res = boto3.client('s3').get_object(Bucket = bucket, Key = file[0])["Body"].read()
train_x = np.load(io.BytesIO(res))
plt.imshow(train_x[100,:,:,0],'gray')

## Let's go to eat an extra spicy curry rice!!! 🍛🌶️
![extra-spicy-curry](./images/extra-spicy-curry.png)