## Amazon SageMaker Processing jobs

With Amazon SageMaker Processing jobs, you can leverage a simplified, managed experience to run data pre- or post-processing and model evaluation workloads on the Amazon SageMaker platform.

A processing job downloads input from Amazon Simple Storage Service (Amazon S3), then uploads outputs to Amazon S3 during or after the processing job.

<img src="Processing-1.jpg">

This notebook shows how you can use your own custom container to run processing jobs with your own Python libraries and dependencies.


## Initial Setup

To run the scikit-learn preprocessing script as a processing job, create a `ScriptProcessor`, which lets you run scripts inside of processing jobs using your custom container image.<br>
Lets start with importing some libraries we will need and setup our input and output S3 URI's<br>
**Please replace \<Input S3 URI\> with the S3 URI of the location of the source images to be processed. Note that the source images are expected to be located in a subfolder named "abc" from this URI.<br>
Also replace \<Output S3 URI\> with the output location of the processed images. Note that the script will place the output images to subfolder "abc". Both subfolders are specified in the arguments when running the processing job**

In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = boto3.session.Session().region_name

role = get_execution_role()

#input_data = "s3://tmp-demo-area/torchai/input/"
#output_data = "s3://tmp-demo-area/torchai/output/"

input_data = "<Input S3 URI>"
output_data = "<Output S3 URI>"

In [None]:
# Make sure input and output S3 URI's end with slash

if not input_data.endswith("/"):
    input_data += "/"
    
if not output_data.endswith("/"):
    output_data += "/"

This notebook cell writes a file `preprocessing.py`, which contains the pre-processing script. You can update the script, and rerun this cell to overwrite `preprocessing.py`. You run this as a processing job in the next cell. This script has been modified to incorporate your preprocessing code. Parameters have been mapped to arguments that can be provided at run time.

In [None]:
%%writefile preprocessing.py

import os
from PIL import Image
import numpy as np
import cv2
import argparse
from io import BytesIO
from tqdm import tqdm

parser = argparse.ArgumentParser(description=
                                 'Reads source images from an s3 bucket, '
                                 'applies preprocessing transformations to them,'
                                 'and saves them back to s3.')

#parser.add_argument('bucket', type=str)
parser.add_argument('--src_prefix', type=str, help='Prefix/folder for source files.')
parser.add_argument('--dest_prefix', type=str, help='Prefix/folder for preprocessed files.')
parser.add_argument('-is', '--img_size', type=int, help='Size to set images to. Defaults to 1800')
parser.add_argument('-bt', '--bin_thresh', type=int, help='Binary threshold for image smoothening. Defaults to 180')

args = parser.parse_args()


def process_img(pil_img, img_size, bin_thresh):
    if img_size is None:
        img_size = 1800
    if bin_thresh is None:
        bin_thresh = 180
    open_cv_image = np.array(pil_img) 
    # Convert RGB to BGR 
    open_cv_image = open_cv_image[:, :, ::-1].copy() 
    img_resized = set_image_size(open_cv_image, img_size)
    im_new = remove_noise_and_smooth(img_resized, bin_thresh)
    return im_new


def set_image_size(img, img_size):
    img = Image.fromarray(img)
    length_x, width_y = img.size
    factor = max(1, int(img_size / length_x))
    size = factor * length_x, factor * width_y
    # size = (1800, 1800)
    img_resized = img.resize(size, Image.ANTIALIAS)
    return img_resized


def image_smoothening(img, bin_thresh):
    ret1, th1 = cv2.threshold(img, bin_thresh, 255, cv2.THRESH_BINARY)
    ret2, th2 = cv2.threshold(th1, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    blur = cv2.GaussianBlur(th2, (1, 1), 0)
    ret3, th3 = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return th3


def remove_noise_and_smooth(img, bin_thresh):
    img = np.array(img)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    filtered = cv2.adaptiveThreshold(img.astype(np.uint8), 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 41, 3)
    kernel = np.ones((1, 1), np.uint8)
    opening = cv2.morphologyEx(filtered, cv2.MORPH_OPEN, kernel)
    closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)
    img = image_smoothening(img, bin_thresh)
    or_image = cv2.bitwise_or(img, closing)
    return or_image


def main(args):
    input_data_path = os.path.join("/opt/ml/processing/input", args.src_prefix)
    output_data_path = os.path.join("/opt/ml/processing/output", args.dest_prefix)
    
    print(f"Input Data Path {input_data_path}")
    print(f"Output Data Path {output_data_path}")
    
    try:
        os.mkdir(output_data_path)
    except OSError as error:
        print(error) 
    
    for img_obj in os.listdir(input_data_path):
        if img_obj.lower().endswith('.jpg'):
            # process image
            img_fname = f"{input_data_path}/{img_obj}"
            img = Image.open(img_fname)
            processed_img = process_img(img, args.img_size, args.bin_thresh)

            # save preprocessed image back to s3
            img_filename = img_obj.split('/')[-1]
            page_img_obj = f'{output_data_path}/{img_filename}'
            Image.fromarray(processed_img).save(page_img_obj, format='JPEG')


if __name__ == "__main__":
    main(args)



## Copy our sample image to your S3 Bucket

In [None]:
! aws s3 cp sample.JPG {input_data}abc/sample.JPG

## Running processing jobs with your own dependencies

First let's copy the previously created `preprocessing.py` script to `code` folder for next `run` method

In [None]:
!mkdir code
!cp preprocessing.py ./code

Below, you walk through how to create a processing container, and how to use a `ScriptProcessor` to run your own code within a container.  You can provide your own dependencies inside this container to run your processing script with.

In [None]:
!mkdir docker

This is the Dockerfile to create the processing container. Install `opencv`,`tqdm` and `Pillow` into it. You can install your own dependencies.

In [None]:
%%writefile docker/Dockerfile

FROM python:3.7-slim-buster

RUN apt-get update && apt-get install -y python3-opencv
RUN pip3 install Pillow opencv-python tqdm
ENV PYTHONUNBUFFERED=TRUE

ENTRYPOINT ["python3"]

This block of code builds the container using the `docker` command, creates an Amazon Elastic Container Registry (Amazon ECR) repository, and pushes the image to Amazon ECR.

In [None]:
import boto3

account_id = boto3.client("sts").get_caller_identity().get("Account")
ecr_repository = "sagemaker-processing-container"
tag = ":latest"

uri_suffix = "amazonaws.com"
if region in ["cn-north-1", "cn-northwest-1"]:
    uri_suffix = "amazonaws.com.cn"
processing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(
    account_id, region, uri_suffix, ecr_repository + tag
)

# Create ECR repository and push docker image
!docker build -t $ecr_repository docker
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

The `ScriptProcessor` class lets you run a command inside this container, which you can use to run your own script.

In [None]:
from sagemaker.processing import ScriptProcessor

script_processor = ScriptProcessor(
    command=["python3"],
    image_uri=processing_repository_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
)

Run the same `preprocessing.py` script you created above, inside of the Docker container you built in this notebook. You can add the dependencies to the Docker image, and run your own pre-processing, feature-engineering, and model evaluation scripts inside of this container. We provide the run method `SKLearnProcessor.run()` the S3 input location through `ProcessingInput`, the S3 output location using `ProcessingOutput` and the associated arguments to run the script

In [None]:
script_processor.run(
    code="preprocessing.py",
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output", destination=output_data)
    ],
    arguments=["--img_size", "1700", "--src_prefix", "abc", "--dest_prefix", "abc"],
)
script_processor_job_description = script_processor.jobs[-1].describe()
print(script_processor_job_description)

The script will process images located in your S3 input location from a subfolder named "abc" (src_prefix) and will put to processed images to the output location in a sub folder named "abc" (dest_prefix)

## Summary

You saw how to create a processing container, and how to use a `ScriptProcessor` to run your own code within a container.
