In [None]:
!pip show sagemaker

In [None]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
role

In [None]:
# Fetch the dataset from the SageMaker bucket
import boto3

s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-sample-files", "datasets/tabular/uci_abalone/abalone.csv", "./data/abalone.csv"
)

In [None]:
%%writefile ./code/preprocess.py

from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)

from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType
)

def csv_line(data):
    r = ",".join(str(d) for d in data[1])
    return str(data[0]) + "," + r

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help = "s3 inupt bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    
    args = parser.parse_args()
    
    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )
    
    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )
    
    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(
        ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")),
        header=False,
        schema=schema,
    )
    
    # StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
    
    # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")
    
    # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(
        inputCols=[
            "sex_vec",
            "length",
            "diameter",
            "height",
            "whole_weight",
            "shucked_weight",
            "viscera_weight",
            "shell_weight",
        ],
        outputCol="features",
    )
    
    # The pipeline is comprised of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

    # This step trains the feature transformers
    model = pipeline.fit(total_df)

    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)

    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
    )

    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
    )
    
if __name__ == "__main__":
    main()
    

In [None]:
from sagemaker.spark.processing import PySparkProcessor

# Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)

sagemaker_session.upload_data(
    path="./data/abalone.csv", bucket=bucket, key_prefix=input_prefix_abalone
)

# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app="./code/preprocess.py",
    arguments=[
        "--s3_input_bucket",
        bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False,
)

In [None]:
print("Top 5 rows from s3://{}/{}/train/".format(bucket, input_preprocessed_prefix_abalone))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5

In [None]:
spark_processor.start_history_server()

In [None]:
spark_processor.terminate_history_server()

In [None]:
%%writefile ./code/hello_py_spark_app.py
import argparse
import time

# Import local module to test spark-submit--py-files dependencies
import hello_py_spark_udfs as udfs
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import time

if __name__ == "__main__":
    print("Hello World, this is PySpark!")

    parser = argparse.ArgumentParser(description="inputs and outputs")
    parser.add_argument("--input", type=str, help="path to input data")
    parser.add_argument("--output", required=False, type=str, help="path to output data")
    args = parser.parse_args()
    spark = SparkSession.builder.appName("SparkTestApp").getOrCreate()
    sqlContext = SQLContext(spark.sparkContext)

    # Load test data set
    inputPath = args.input
    outputPath = args.output
    salesDF = spark.read.json(inputPath)
    salesDF.printSchema()

    salesDF.createOrReplaceTempView("sales")

    # Define a UDF that doubles an integer column
    # The UDF function is imported from local module to test spark-submit--py-files dependencies
    double_udf_int = udf(udfs.double_x, IntegerType())

    # Save transformed data set to disk
    salesDF.select("date", "sale", double_udf_int("sale").alias("sale_double")).write.json(
        outputPath
    )

In [None]:
%%writefile ./code/hello_py_spark_udfs.py
def double_x(x):
    return x + x

In [None]:
# Define job input/output URIs
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_sales = "{}/input/sales".format(prefix)
output_prefix_sales = "{}/output/sales".format(prefix)
input_s3_uri = "s3://{}/{}".format(bucket, input_prefix_sales)
output_s3_uri = "s3://{}/{}".format(bucket, output_prefix_sales)

sagemaker_session.upload_data(
    path="./data/data.jsonl", bucket=bucket, key_prefix=input_prefix_sales
)

spark_processor = PySparkProcessor(
    base_job_name="sm-spark-udfs",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app="./code/hello_py_spark_app.py",
    submit_py_files=["./code/hello_py_spark_udfs.py"],
    arguments=["--input", input_s3_uri, "--output", output_s3_uri],
    logs=False,
)