## Setup S3 bucket locations and roles


First, setup some locations in the default SageMaker bucket to store the raw input datasets and the Spark job output. Here, you’ll also define the role that will be used to run all SageMaker Processing jobs.


In [None]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()



## Download sample dataset, and save file to local file system

In [None]:
print(bucket)

# Fetch the dataset from the SageMaker bucket
import boto3

s3 = boto3.client("s3")

FILE_NAME = "abalone.csv"
s3.download_file("sagemaker-sample-files", f"datasets/tabular/uci_abalone/abalone.csv", FILE_NAME)

## Write the PySpark script

The source for a preprocessing script is in the cell below. 
The cell uses the %%writefile directive to save this file locally. 

This script does some basic feature engineering on a raw input dataset. 
In this example, the dataset is the Abalone Data Set and the code below performs 
- string indexing, 
- one hot encoding, 
- vector assembly, and 
- combines them into a pipeline to perform these transformations in order. 

The script then does an 80-20 split to produce training and validation datasets as output.

In [None]:
%%writefile ./preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)


def csv_line(data):
    r = ",".join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(
        ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
        header=False,
        schema=schema,
    )

    # StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

    # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(
        inputCols=[
            "sex_vec",
            "length",
            "diameter",
            "height",
            "whole_weight",
            "shucked_weight",
            "viscera_weight",
            "shell_weight",
        ],
        outputCol="features",
    )

    # The pipeline is comprised of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

    # This step trains the feature transformers
    model = pipeline.fit(total_df)

    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)

    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
    )

    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
    )


if __name__ == "__main__":
    main()

## Run the SageMaker Processing Job

Next, you’ll use the PySparkProcessor class to define a Spark job and run it using SageMaker Processing. A few things to note in the definition of the PySparkProcessor:

* This is a multi-node job with two m5.xlarge instances (which is specified via the instance_count and instance_type parameters)

* Spark framework version 3.1 is specified via the framework_version parameter

* The PySpark script defined above is passed via via the submit_app parameter

* Command-line arguments to the PySpark script (such as the S3 input and output locations) are passed via the arguments parameter

* Spark event logs will be offloaded to the S3 location specified in spark_event_logs_s3_uri and can be used to view the Spark UI while the job is in progress or after it completes

In [None]:
from sagemaker.spark.processing import PySparkProcessor

# Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)

sagemaker_session.upload_data(
    path="./abalone.csv", bucket=bucket, key_prefix=input_prefix_abalone
)

# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app="./preprocess.py",
    arguments=[
        "--s3_input_bucket",
        bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False,
)

## Validate Data Processing Results

Next, validate the output of our data preprocessing job by looking at the first 5 rows of the output dataset.

In [None]:
print("Top 5 rows from s3://{}/{}/train/".format(bucket, input_preprocessed_prefix_abalone))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5

## View the Spark UI

Next, you can view the Spark UI by running the history server locally in this notebook. 
(Note: this feature will only work in a local development environment with docker installed or on a Sagemaker Notebook Instance. This feature does not currently work in SageMaker Studio.)

In [None]:
spark_processor.start_history_server()

After viewing the Spark UI, you can terminate the history server before proceeding.

In [None]:
spark_processor.terminate_history_server()