## End to End SparkML on Sagemaker

In [1]:
import boto3
import sagemaker
import sagemaker_pyspark

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import (VectorAssembler, StringIndexer,
                               OneHotEncoder)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from sagemaker_pyspark import IAMRole
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator
from sagemaker_pyspark import SageMakerResourceCleanup
from sagemaker import get_execution_role



In [2]:
conn = boto3.client("s3")
bucket = "sparkml-s3"

### S3 connection

In [3]:


role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

## create sparksession
spark = (
    SparkSession.builder.config("spark.driver.extraClassPath", classpath)
    .master("local[*]")
    .getOrCreate()
)


region = boto3.Session().region_name

## sc._jsc is the Java Spark Context which is a proxy into the SparkContext in that JVM
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", 
                                     "s3.{}.amazonaws.com".format(region))


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/12 07:05:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read data

In [4]:
# reading a data from S3 using spark
data_sdf = (spark
            .read
            .option("header", True)
            .csv(f"s3a://{bucket}/data/train.csv")
)

23/07/12 07:05:54 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

### Data Display

In [5]:
data_sdf.limit(5).show()

+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+-----------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|   Spa|VRDeck|             Name|Transported|
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+------+------+-----------------+-----------+
|    0001_01|    Europa|    False|B/0/P|TRAPPIST-1e|39.0|False|        0.0|      0.0|         0.0|   0.0|   0.0|  Maham Ofracculy|      False|
|    0002_01|     Earth|    False|F/0/S|TRAPPIST-1e|24.0|False|      109.0|      9.0|        25.0| 549.0|  44.0|     Juanna Vines|       True|
|    0003_01|    Europa|    False|A/0/S|TRAPPIST-1e|58.0| True|       43.0|   3576.0|         0.0|6715.0|  49.0|    Altark Susent|      False|
|    0003_02|    Europa|    False|A/0/S|TRAPPIST-1e|33.0|False|        0.0|   1283.0|       371.0|3329.0| 193.0|     Solam Susent|      False|

### Check Min/Max of Numeric values

In [6]:
data_sdf.select(F.min("Age").alias("min_age"),
             F.max("Age").alias("max_age"),
             F.min("RoomService").alias("min_room_service"),
             F.max("RoomService").alias("max_room_service"),
             F.min("FoodCourt").alias("min_food_court"),
             F.max("FoodCourt").alias("max_food_court"),
             F.min("ShoppingMall").alias("min_shopping_mall"),
             F.max("ShoppingMall").alias("max_shopping_mall"),
             F.min("Spa").alias("min_spa"),
             F.max("Spa").alias("max_spa"),
             F.min("VRDeck").alias("min_vr_deck"),
             F.max("VRDeck").alias("max_vr_deck"),
            )\
        .show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------+-------+----------------+----------------+--------------+--------------+-----------------+-----------------+-------+-------+-----------+-----------+
|min_age|max_age|min_room_service|max_room_service|min_food_court|max_food_court|min_shopping_mall|max_shopping_mall|min_spa|max_spa|min_vr_deck|max_vr_deck|
+-------+-------+----------------+----------------+--------------+--------------+-----------------+-----------------+-------+-------+-----------+-----------+
|    0.0|    9.0|             0.0|           999.0|           0.0|         999.0|              0.0|            994.0|    0.0|  998.0|        0.0|      998.0|
+-------+-------+----------------+----------------+--------------+--------------+-----------------+-----------------+-------+-------+-----------+-----------+



                                                                                

### Identify missing value

In [7]:
data_sdf.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull() | \
                            F.isnan(c), c 
                           )).alias(c)
                    for c in data_sdf.columns]).show()

[Stage 5:>                                                          (0 + 1) / 1]

+-----------+----------+---------+-----+-----------+---+---+-----------+---------+------------+---+------+----+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination|Age|VIP|RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|Name|Transported|
+-----------+----------+---------+-----+-----------+---+---+-----------+---------+------------+---+------+----+-----------+
|          0|       201|      217|  199|        182|179|203|        181|      183|         208|183|   188| 200|          0|
+-----------+----------+---------+-----+-----------+---+---+-----------+---------+------------+---+------+----+-----------+



                                                                                

### Remove Missing values

In [33]:
clean_sdf = data_sdf.dropna(how="any", subset=["CryoSleep", "HomePlanet", "Cabin",
                                              "Destination", "Age", "VIP", "RoomService",
                                              "FoodCourt", "ShoppingMall", "Spa", "VRDeck",
                                              "Name", "Transported"])\
                    .withColumn("Age", F.col("Age").cast("int"))\
                    .withColumn("RoomService", F.col("RoomService").cast("int"))

In [34]:
clean_sdf.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull() | \
                            F.isnan(c), c 
                           )).alias(c)
                    for c in data_sdf.columns]).show()

+-----------+----------+---------+-----+-----------+---+---+-----------+---------+------------+---+------+----+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination|Age|VIP|RoomService|FoodCourt|ShoppingMall|Spa|VRDeck|Name|Transported|
+-----------+----------+---------+-----+-----------+---+---+-----------+---------+------------+---+------+----+-----------+
|          0|         0|        0|    0|          0|  0|  0|          0|        0|           0|  0|     0|   0|          0|
+-----------+----------+---------+-----+-----------+---+---+-----------+---------+------------+---+------+----+-----------+



No Missing values reported in clean dataframe

In [35]:
clean_sdf.limit(2).show()

+-----------+----------+---------+-----+-----------+---+-----+-----------+---------+------------+-----+------+---------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination|Age|  VIP|RoomService|FoodCourt|ShoppingMall|  Spa|VRDeck|           Name|Transported|
+-----------+----------+---------+-----+-----------+---+-----+-----------+---------+------------+-----+------+---------------+-----------+
|    0001_01|    Europa|    False|B/0/P|TRAPPIST-1e| 39|False|          0|      0.0|         0.0|  0.0|   0.0|Maham Ofracculy|      False|
|    0002_01|     Earth|    False|F/0/S|TRAPPIST-1e| 24|False|        109|      9.0|        25.0|549.0|  44.0|   Juanna Vines|       True|
+-----------+----------+---------+-----+-----------+---+-----+-----------+---------+------------+-----+------+---------------+-----------+



### Structurize Data

While reading csv we didn't specify schema. So, now we have to create a proper data type for columns based on their data

In [37]:
clean_sdf.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- CryoSleep: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- VIP: string (nullable = true)
 |-- RoomService: integer (nullable = true)
 |-- FoodCourt: string (nullable = true)
 |-- ShoppingMall: string (nullable = true)
 |-- Spa: string (nullable = true)
 |-- VRDeck: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: string (nullable = true)



In [38]:
clean_sdf = (clean_sdf
                 .withColumn("CryoSleep", 
                         F.when(F.col("CryoSleep") == "True", 1.0)
                                .otherwise(0.0))
                 .withColumn("Age", F.col("Age").cast("double"))
                 .withColumn("VIP", 
                             F.when(F.col("VIP") == "True", 1.0)
                                    .otherwise(0.0))
                 .withColumn("RoomService", F.col("RoomService").cast("double"))
                 .withColumn("FoodCourt", F.col("FoodCourt").cast("double"))
                 .withColumn("ShoppingMall", F.col("ShoppingMall").cast("double"))
                 .withColumn("Spa", F.col("Spa").cast("double"))
                 .withColumn("VRDeck", F.col("VRDeck").cast("double"))
                 .withColumn("Transported", 
                                 F.when(F.col("Transported") == "True", 1.0)
                                        .otherwise(0.0))
)

In [39]:
clean_sdf.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- CryoSleep: double (nullable = false)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- VIP: double (nullable = false)
 |-- RoomService: double (nullable = true)
 |-- FoodCourt: double (nullable = true)
 |-- ShoppingMall: double (nullable = true)
 |-- Spa: double (nullable = true)
 |-- VRDeck: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: double (nullable = false)



### Dop extra columns
In this experiment, we are going to use one hot encoder for categorical columns. We are interested in columns with low cardinality. `Cabin` has high cardinality this is why we are dropping here for this experiment

In [41]:
clean_sdf = clean_sdf.drop("Cabin", "Name")

### Train/Test Split

In [42]:
train_sdf, test_sdf = clean_sdf.randomSplit([0.8, 0.2],
                                            seed=4)

### Convert data to spark required format

In [43]:
clean_sdf.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- CryoSleep: double (nullable = false)
 |-- Destination: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- VIP: double (nullable = false)
 |-- RoomService: double (nullable = true)
 |-- FoodCourt: double (nullable = true)
 |-- ShoppingMall: double (nullable = true)
 |-- Spa: double (nullable = true)
 |-- VRDeck: double (nullable = true)
 |-- Transported: double (nullable = false)



SparkML expects two columns 1. features (in vector form), 2. label

#### Handling Categorical Columns

In [44]:
# categorical_cols = [field for (field, dataType) in clean_sdf.dtypes if dataType == "string"]
# categorical_cols.remove("PassengerId")
# index_output_cols = [x + "_Index" for x in categorical_cols]
# ohe_output_cols = [x + "_OHE" for x in categorical_cols]

# string_indexer = StringIndexer(inputCols=categorical_cols,  outputCols=index_output_cols, handleInvalid="skip")

# s_i_model = string_indexer.fit(clean_sdf)


# transform_sdf = s_i_model.transform(clean_sdf)

# ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)
# ohe_model = ohe_encoder.fit(transform_sdf)
# transform_sdf = ohe_model.transform(transform_sdf)


In [45]:
categorical_cols = [field for (field, dataType) in clean_sdf.dtypes if dataType == "string"]
categorical_cols.remove("PassengerId")
index_output_cols = [x + "_Index" for x in categorical_cols]
ohe_output_cols = [x + "_OHE" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols,  outputCols=index_output_cols, handleInvalid="skip")

s_i_model = string_indexer.fit(clean_sdf)


train_sdf = s_i_model.transform(train_sdf)

ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)
ohe_model = ohe_encoder.fit(train_sdf)

train_sdf = ohe_model.transform(train_sdf)


#### Handling Numerical Columns

In [48]:
numeric_cols = [field for (field, dataType) in clean_sdf.dtypes if dataType == "double"]
numeric_cols.remove("Transported") ## remove label column

In [49]:
numeric_cols

['CryoSleep',
 'Age',
 'VIP',
 'RoomService',
 'FoodCourt',
 'ShoppingMall',
 'Spa',
 'VRDeck']

In [50]:
assembler_inputs = ohe_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")


In [51]:
train_sdf = vec_assembler.transform(train_sdf)

In [52]:
train_sdf = train_sdf.select(F.col("Transported").alias("label"), 
                             "features")
train_sdf

DataFrame[label: double, features: vector]

### Model Development & Deployment

In [58]:
xgboost_estimator = XGBoostSageMakerEstimator(
    sagemakerRole=IAMRole(role),
    trainingInstanceType="ml.m4.xlarge",
    trainingInstanceCount=1,
    endpointInstanceType="ml.m4.xlarge",
    endpointInitialInstanceCount=1
)

In [59]:
xgboost_estimator.setNumRound(10)

In [60]:
model = xgboost_estimator.fit(train_sdf)

23/07/12 07:16:03 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/ec2-user/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
23/07/12 07:16:04 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/ec2-user/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
23/07/12 07:16:04 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/07/12 07:16:05 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


                                                                                

In [54]:
test_sdf = s_i_model.transform(test_sdf)
test_sdf = ohe_model.transform(test_sdf)

In [55]:
test_sdf = vec_assembler.transform(test_sdf)

test_sdf = test_sdf.select(F.col("Transported").alias("label"), 
                             "features")
test_sdf

DataFrame[label: double, features: vector]

#### Inferencing

In [62]:
pred_sdf = model.transform(test_sdf)

#### Evauation

In [69]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="label")



In [85]:
evaluator.evaluate(pred_sdf, {evaluator.metricName: "areaUnderPR"})

                                                                                

0.893227109013756

In [86]:
evaluator.evaluate(pred_sdf, {evaluator.metricName: "areaUnderROC"})

                                                                                

0.8804688975022495

### Delete endpoint

In [None]:
resource_cleanup = SageMakerResourceCleanup(model.sagemakerClient)
resource_cleanup.deleteResources(model.getCreatedResources())