# SageMaker Processing Job using PySpark

This notebook will show you how can be used pyspark for processing data using the fully-managed service Amazon SageMaker Processing

## Visualize the processing script

In [None]:
## This script was created to express what we saw in the previous exercise.
## It will get the raw data from the turbine sensors, select some features, 
## denoise, normalize, encode and reshape it as a 6x10x10 tensor
## This script is the entrypoint of the first step of the ML Pipelie: Data preparation
!pygmentize ./processing.py

### Define imports and global variables

In [None]:
import boto3
import logging
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
from time import gmtime, strftime

In [None]:
sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

In [None]:
s3_bucket_name = "sm-emr-sc-blog-06be16c8160f"
file_name = "LD2011_2014.csv"

s3_input_file = "s3://{}/data/input/{}".format(s3_bucket_name, file_name)
s3_output_path = "s3://{}/data/output/".format(s3_bucket_name)

In [None]:
boto_session = boto3.Session()

sagemaker_client = boto_session.client("sagemaker")
runtime_client = boto_session.client("sagemaker-runtime")

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_runtime_client=runtime_client,
    default_bucket=s3_bucket_name
)

In [None]:
role = sagemaker.get_execution_role()

### Create PySpark Processor

By using [SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html) we can interact with SageMaker Jobs by using the official SageMaker Containers.

In the following example, we are creating a PySparkProcessor, which allows us to create a Amazon SageMaker Processing Job by using the SageMaker container with the PySpark modules configured

In [None]:
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    sagemaker_session=sagemaker_session
)

### Run SageMaker PySpark Job

In [None]:
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

spark_processor.run(
    submit_app="./processing.py",
    arguments=[
        "--s3_input_file",
        s3_input_file,
        "--s3_output_path",
        s3_output_path
    ],
    spark_event_logs_s3_uri="s3://{}/spark_execution/{}/spark_event_logs".format(s3_bucket_name, timestamp_prefix),
    logs=False,
)

## Useful Tips

### Override PySpark configuration in SageMaker Processing Jobs

SageMaker Processing Jobs create the PySpark environment during the creation of the container itself.
For overriding Spark configurations, such as SPARK_DRIVER_MEMORY and SPARK_EXECUTOR_MEMORY, we can provide a configuration file as input to the Processing Job itself

In [None]:
import boto3
import json

In [None]:
s3_client = boto3.client('s3')

#### Define spark-defaults configurations

In [None]:
SPARK_DRIVER_MEMORY = "2g"
SPARK_EXECUTOR_MEMORY = "2g"

configurations = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.driver.memory": SPARK_DRIVER_MEMORY, "spark.executor.memory": SPARK_EXECUTOR_MEMORY}
    }
]

#### Store configurations as file on S3

In [None]:
s3_bucket_name = "sm-emr-sc-blog-06be16c8160f"
config_path = "spark/configurations"

In [None]:
s3_client.put_object(
    Body=(bytes(json.dumps(configurations).encode('UTF-8'))),
    Bucket=s3_bucket_name,
    Key=config_path + "/configuration.json"
)

#### Run SageMaker Processing Job by providing configuration file as input

In [None]:
from sagemaker.processing import ProcessingInput

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

spark_processor.run(
    submit_app="./processing.py",
    inputs=[
        ProcessingInput(
            source="s3://{}/{}/configuration.json".format(s3_bucket_name, config_path), 
            destination="/opt/ml/processing/input/conf"),
    ],
    arguments=[
        "--s3_input_file",
        s3_input_file,
        "--s3_output_path",
        s3_output_path
    ],
    spark_event_logs_s3_uri="s3://{}/spark_execution/{}/spark_event_logs".format(s3_bucket_name, timestamp_prefix),
    logs=False,
)

## Build Your Own Container

It's possible to override the SageMaker PySpark container by creating your container starting from one of the public container imanges provided by SageMaker for Spark.

You can find the full list of images here: https://github.com/aws/sagemaker-spark-container/blob/master/available_images.md

In [None]:
from sagemaker.processing import Processor

In [None]:
ecr_image_uri = "<ECR_IMAGE_URI>"

In [None]:
processor = Processor(
    image_uri=ecr_image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    sagemaker_session=sagemaker_session
)

In [None]:
processor.run(
    arguments=[
        "--s3_input_file",
        s3_input_file,
        "--s3_output_path",
        s3_output_path
    ],
    logs=False
)