# SageMaker Processing (PySpark) Example - Script Mode

### 1. Create SparkML Pre-process Script

### Bring Your Own Script 

In [1]:
%%writefile preprocess.py
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

import argparse
import time
import sys
import os

# https://spark.apache.org/docs/latest/ml-features.html


def main():
    # --------------------------------- CONSTRUCT ---------------------------------
    spark = SparkSession.builder.appName("PySparkJob").getOrCreate()
    
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")
    # --------------------------------- CONSTRUCT ---------------------------------
    
    schema = StructType([StructField('id', IntegerType(), True), 
                     StructField('name', StringType(), True),
                     StructField('age', IntegerType(), True),
                     StructField('sex', StringType(), True),
                     StructField('weight', DoubleType(), True),
                     StructField('eye_color', StringType(), True)
                    ])
    
    # Downloading the data from S3 into a Dataframe
    # without header (IMPORTANT)
    df = spark.read.csv(('s3a://' + os.path.join(args.s3_input_bucket, 
                                                 args.s3_input_key_prefix,
                                                 'raw.csv')), header=False, schema=schema)
    sex_indexer = StringIndexer(inputCol='sex', outputCol='indexed_sex')
    sex_encoder = OneHotEncoder(inputCol='indexed_sex', outputCol='sex_vector')
    eye_color_indexer = StringIndexer(inputCol='eye_color', outputCol='indexed_eye_color')
    eye_color_encoder = OneHotEncoder(inputCol='indexed_eye_color', outputCol='eye_color_vector')
    assembler = VectorAssembler(inputCols=['age',
                                           'weight',
                                           'sex_vector',
                                           'eye_color_vector'], 
                                outputCol='features')
    scaler = MinMaxScaler(inputCol='features', outputCol='scaledFeatures')
    
    # The pipeline comprises of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, eye_color_indexer, eye_color_encoder, assembler, scaler])

    # This step trains the feature transformers
    model = pipeline.fit(df)

    # This step transforms the dataset with information obtained from the previous fit
    df = model.transform(df)
    
    age_udf = udf(lambda x: x[0].item(), DoubleType())
    weight_udf = udf(lambda x: x[1].item(), DoubleType())
    sex_udf = udf(lambda x: x[2].item(), DoubleType())
    blue_eye_udf = udf(lambda x: x[3].item(), DoubleType())
    black_eye_udf = udf(lambda x: x[3].item(), DoubleType())
    
    df = df.select(age_udf('scaledFeatures').alias('age'), 
               weight_udf('scaledFeatures').alias('weight'),
               sex_udf('scaledFeatures').alias('sex'),
               blue_eye_udf('scaledFeatures').alias('is_blue_eye'),
               black_eye_udf('scaledFeatures').alias('is_black_eye'),
              )
    df.show()
    
    df.write.format('csv') \
        .option('header', True) \
        .mode('overwrite') \
        .option('sep', ',') \
        .save('s3a://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix))
    
    
if __name__ == '__main__':
    main()

Overwriting preprocess.py


### Create a SageMaker Processing Job (PySpark)

In [2]:
from sagemaker.spark.processing import PySparkProcessor
from time import gmtime, strftime
import pandas as pd
import sagemaker
import logging

In [3]:
sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

prefix = 'spark-preprocess-jobs/' + timestamp_prefix
input_prefix = prefix + '/raw'
input_preprocessed_prefix = prefix + '/transformed'

In [4]:
# Uploading the training data to S3
sagemaker_session.upload_data(path='./DATA/raw.csv', bucket=bucket, key_prefix=input_prefix)

's3://sagemaker-us-east-1-892313895307/spark-preprocess-jobs/2020-12-10-00-22-37/raw/raw.csv'

In [5]:
spark_processor = PySparkProcessor(base_job_name='spark-preprocessor',
                                    framework_version='2.4',
                                    role=role,
                                    instance_count=2,
                                    instance_type='ml.r5.xlarge',
                                    max_runtime_in_seconds=1200)

spark_processor.run(submit_app='preprocess.py',
                    arguments=["--s3_input_bucket", bucket,
                                "--s3_input_key_prefix", input_prefix,
                                "--s3_output_bucket", bucket,
                                "--s3_output_key_prefix", input_preprocessed_prefix],
                    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
                    logs=False)

Creating processing-job with name spark-preprocessor-2020-12-10-00-22-39-869



Job Name:  spark-preprocessor-2020-12-10-00-22-39-869
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-892313895307/spark-preprocessor-2020-12-10-00-22-39-869/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-892313895307/spark-preprocess-jobs/2020-12-10-00-22-37/spark_event_logs', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]
.......................................................................!

### Evaluate the Output

In [6]:
output_s3_path = f's3://{bucket}/{input_preprocessed_prefix}/'

In [7]:
output_s3_path

's3://sagemaker-us-east-1-892313895307/spark-preprocess-jobs/2020-12-10-00-22-37/transformed/'

In [8]:
!aws s3 cp {output_s3_path} ./DATA/ --recursive 

Completed 187 Bytes/187 Bytes (3.6 KiB/s) with 2 file(s) remainingdownload: s3://sagemaker-us-east-1-892313895307/spark-preprocess-jobs/2020-12-10-00-22-37/transformed/_SUCCESS to DATA/_SUCCESS
Completed 187 Bytes/187 Bytes (3.6 KiB/s) with 1 file(s) remainingdownload: s3://sagemaker-us-east-1-892313895307/spark-preprocess-jobs/2020-12-10-00-22-37/transformed/part-00000-6ee6f5b3-0fae-4510-ab77-b674c70a98bc-c000.csv to DATA/part-00000-6ee6f5b3-0fae-4510-ab77-b674c70a98bc-c000.csv


<b>Note:</b> Use the downloaded part file above to create the dataframe below

In [9]:
transformed_df = pd.read_csv('./DATA/part-00000-6ee6f5b3-0fae-4510-ab77-b674c70a98bc-c000.csv')
transformed_df

Unnamed: 0,age,weight,sex,is_blue_eye,is_black_eye
0,0.56,0.773279,0.0,0.0,0.0
1,0.6,0.567684,1.0,1.0,1.0
2,0.0,0.0,1.0,0.0,0.0
3,1.0,1.0,0.0,1.0,1.0
4,0.36,0.765063,1.0,0.0,0.0


### Start the History Server to access Spark UI

In [None]:
spark_processor.start_history_server()

<b>Note:</b> Use the URL above to access the Spark UI