# 2 - Preprocessing <a class="anchor" id="top"></a>
* [Introduction](#intro)
* [Setup](#setup)
* [Loading the data](#load-data)
* [Feature engineering](#ft-eng)
    * [Target variable](#target)
    * [Categorical variables](#cat)
    * [Continuous variables](#continuous)
* [Train-test split](#train-test)
* [Define and fit pipeline](#pipeline)
* [Transform and write data](#write-s3)
* [Serialize and store pipelines](#store-pipeline)

## Introduction <a class="anchor" id="intro"></a>
In this section, we preform all neccesary data operations to prepare our data for use in models.
This includes filtering, feature subsetting, feature engineering, cleaning, and finally exporting to S3.

## Setup <a class="anchor" id="setup"></a>
First, we must import relevant Spark modules as well as libraries for statistical analysis and visualizations.
Note that will also start the Spark application that creates the `SparkSession` and sets it to the `spark` variable.

In [None]:
%%cleanup -f

In [51]:
import pyspark
import pyspark.ml as ml
import pyspark.sql as sql
import pyspark.sql.types as types
import pyspark.sql.functions as F

import boto3
import mleap
import zipfile
import tarfile
import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Loading the data <a class="anchor" id="load-data"></a>

Now, we load the airline data from S3.
To do this, we must first get the bucket name where the data is stored.
This variable is stored on the local Sagemaker notebook instance during the `DevEnvironment` stack creation 
and must be explicitly passed to the Spark cluster.

Additionally, the data is also subsetted and filter before preforming any feature engineering.
This is to reduce the number of features we are working with 
and ensure that the features we have are meaningful.

In [2]:
%%local
import json
with open("/home/ec2-user/.aiml-bb/stack-data.json", "r") as f:
    data = json.load(f)
    data_bucket = data["data_bucket"]
    model_bucket = data["model_bucket"]

In [3]:
%%send_to_spark -i data_bucket -t str

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'data_bucket' as 'data_bucket' to Spark kernel

In [4]:
%%send_to_spark -i model_bucket -t str

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'model_bucket' as 'model_bucket' to Spark kernel

In [32]:
# Subset ETL output to only relevant columns for model input.
join_df = spark.read.parquet(f"s3://{data_bucket}/etl_output/joined_airline_weather_data/")

# Work on a subset of the data that reflects the input schema 
# we'd like our endpoint to recieve.
input_df = (
    join_df
    # Drop all other NaN values.
    .dropna(
        subset=["origin", "dest", "origin_station_id", "dest_station_id"]
    )
    # Fill NaN values where neccesary.
    .na.fill(
        0.0,
        subset=[
            "arr_delay",
            "origin_prcp", "origin_snow", "origin_snwd",
            "dest_prcp", "dest_snow", "dest_snwd"
        ]
    )
    # Make feature selection.
    .select(
        "arr_delay", "cancelled", "diverted",
        "op_carrier",
        "origin", "origin_latitude", "origin_longitude",
        "dest", "dest_latitude", "dest_longitude",
        "origin_tmax", "origin_tmin", 
        "origin_prcp", "origin_snow", "origin_snwd",
        "dest_tmax", "dest_tmin", 
        "dest_prcp", "dest_snow", "dest_snwd"
    )
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
%%pretty
input_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

arr_delay,cancelled,diverted,op_carrier,origin,origin_latitude,origin_longitude,dest,dest_latitude,dest_longitude,origin_tmax,origin_tmin,origin_prcp,origin_snow,origin_snwd,dest_tmax,dest_tmin,dest_prcp,dest_snow,dest_snwd
-9.0,0.0,0.0,EV,ATL,33.6367,-84.428101,ABE,40.652099609375,-75.44080352783203,178.0,94.0,0.0,0.0,0.0,172.0,133.0,81.0,0.0,0.0
213.0,0.0,0.0,EV,ATL,33.6367,-84.428101,ABE,40.652099609375,-75.44080352783203,244.0,194.0,462.0,0.0,0.0,289.0,228.0,13.0,0.0,0.0
73.0,0.0,0.0,EV,ATL,33.6367,-84.428101,ABE,40.652099609375,-75.44080352783203,244.0,194.0,462.0,0.0,0.0,289.0,228.0,13.0,0.0,0.0
48.0,0.0,0.0,EV,ATL,33.6367,-84.428101,ABE,40.652099609375,-75.44080352783203,244.0,194.0,462.0,0.0,0.0,289.0,228.0,13.0,0.0,0.0
106.0,0.0,0.0,9E,ATL,33.6367,-84.428101,ABE,40.652099609375,-75.44080352783203,106.0,50.0,254.0,0.0,0.0,11.0,-21.0,58.0,0.0,30.0


## Feature engineering <a class="anchor" id="ft-eng"></a>
We now define the feature engineering steps that will make up our preprocessing `Pipeline`.
Note that all steps must be either a Spark `Transformer` or `Estimator` to be able to be serialized by MLeap.

### Target variable <a class="anchor" id="target"></a>
We will reduce the regression problem of determining arrival delay into the binary classfication problem or detmering if a delay (or cancellation) will occur.
Because this step will not be a part of our inference pipeline, we do not need to wrap it in a Spark `Transformer`.

In [34]:
# Define the new categorical column we want to predict.
# Strings are used here so we can postprocess the binary 
# classification to readable results. 
input_df = (
    input_df
    .withColumn(
        "delay_status", 
        F.when(
            (F.col("arr_delay") > 15) 
                | (F.col("cancelled") == 1) 
                | (F.col("diverted") == 1), 
            "Delayed"
        )
        .otherwise("OnTime")
    )
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Now encode target and apply to input data.
target_indexer = ml.feature.StringIndexer(inputCol="delay_status", outputCol="target")
target_indexer_model = target_indexer.fit(input_df)
input_df = target_indexer_model.transform(input_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# Define a decoder to be used in postprocessing.
target_idx_decoder = ml.feature.IndexToString(inputCol="target", outputCol="delay_status")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
%%pretty
input_df.select("arr_delay", "cancelled", "diverted", "delay_status", "target").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

arr_delay,cancelled,diverted,delay_status,target
39.0,0.0,0.0,Delayed,1.0
-8.0,0.0,0.0,OnTime,0.0
-8.0,0.0,0.0,OnTime,0.0
-4.0,0.0,0.0,OnTime,0.0
-21.0,0.0,0.0,OnTime,0.0


### Categorical variables <a class="anchor" id="cat"></a>
Here, we will encode the relevant categorical variables to numeric types. 
For this data set, this means we must encode the carrier as well as the origin and destination airport information.

In [11]:
carrier_indexer = ml.feature.StringIndexer(inputCol="op_carrier", outputCol="carrier_idx")
carrier_encoder = ml.feature.OneHotEncoderEstimator(inputCols=["carrier_idx"], outputCols=["carrier_ohe"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Because there are ~300 airports, and we need to account for both origin and destination,
one hot encoding would create too many features.
Airports do appear in geographic clusters though, for example around urban locations.
So, to encode airports, we'll cluster the latitudes and longitudes using K-Means.
Note that we will only preform this search on the origin airports because of the similarities in the distribution of origin and destination airports.

In [45]:
# Vectorize geolocation for use in clustering.
origin_geo_assembler = ml.feature.VectorAssembler(
    inputCols=["origin_latitude", "origin_longitude"], 
    outputCol="origin_geo_features",
    handleInvalid="keep"
)
kmeans_eval_df = origin_geo_assembler.transform(input_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Search for an optimal K.
# silhouette_scores = []
evaluator = ml.evaluation.ClusteringEvaluator(
    metricName="silhouette", 
    distanceMeasure="squaredEuclidean",
    featuresCol="origin_geo_features",
    predictionCol="origin_geo_cluster"
)

for k in range(20, 30):
    candidate_kmeans = ml.clustering.KMeans(
        k=k,
        featuresCol="origin_geo_features",
        predictionCol="origin_geo_cluster"
    )
    candidate_kmeans_model = candidate_kmeans.fit(kmeans_eval_df)
    transformed_kmeans_eval_df = candidate_kmeans_model.transform(kmeans_eval_df)

    evaluation_score = evaluator.evaluate(transformed_kmeans_eval_df)
    silhouette_scores.append(evaluation_score)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
silhouette_scores

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.7935991349235418, 0.7951541999640632, 0.7962580427641034, 0.6208395707571753, 0.5181488912024634, 0.581405617511063, 0.6719085293109444, 0.6588834486946222, 0.7117989190279977, 0.7185847094251225, 0.5679515291075474, 0.602948717944024, 0.6165610818226945, 0.6303616774089789, 0.6488521893517488, 0.685676735367946, 0.7330370289036479, 0.753958241456658]

In [None]:
# Plot model results.
fig, ax = plt.subplots()
ax.plot(range(2, 20), silhouette_scores)
ax.set_xlabel("Number of clusters")
ax.set_ylabel("Silhouette score")
ax.set_title("optimal clustering parameters for airport geolocations")
%matplot plt

While it isn't e can see that 6 is near the elbow of the plot and will be our choice for K.

In [None]:
# Create finalized assemblers and clusterers for the 
# preprocessing pipeline.
origin_geo_assembler = ml.feature.VectorAssembler(
    inputCols=["origin_latitude", "origin_longitude"], 
    outputCol="origin_geo_features",
    handleInvalid="keep"
)
dest_geo_assembler = ml.feature.VectorAssembler(
    inputCols=["dest_latitude", "dest_longitude"], 
    outputCol="dest_geo_features",
    handleInvalid="keep"
)
origin_geo_kmeans = ml.clustering.KMeans(
        k=6,
        featuresCol="origin_geo_features",
        predictionCol="origin_geo_cluster"
)
dest_geo_kmeans = ml.clustering.KMeans(
        k=6,
        featuresCol="dest_geo_features",
        predictionCol="dest_geo_cluster"
)

### Continuous variables <a class="anchor" id="continuous"></a>
The continuous variables in our dataset include only the weather data. 
We will be using standardization for all metrics, because weather data is subject to meaningful outliers. 

In [14]:
# Collect all weather columns so standardization can 
# be preformed on all weather features at once.
weather_cols = [
    "origin_tmax", "origin_tmin", "origin_prcp", "origin_snow", "origin_snwd",
    "dest_tmax", "dest_tmin", "dest_prcp", "dest_snow", "dest_snwd"
]
weather_col_assembler = ml.feature.VectorAssembler(
    inputCols=weather_cols, 
    outputCol="weather_features",
    handleInvalid="keep"
)
weather_std_scalar = ml.feature.StandardScaler(
    inputCol="weather_features", 
    outputCol="weather_features_scaled"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Define and fit pipeline <a class="anchor" id="pipeline"></a>

In [15]:
# Collect all the generated features into a single vector.
# THis will be the last stage in the pipeline.
feature_assembler = ml.feature.VectorAssembler(
    inputCols=[
        "carrier_ohe",
        "origin_freq",
        "dest_freq",
        "origin_state_ohe",
        "dest_state_ohe",
        "weather_features_scaled"
    ],
    outputCol="features"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Define Pipeline object.
preprocess_pipeline = ml.Pipeline(
    stages=[
        carrier_indexer,
        carrier_encoder,
        origin_geo_assembler,
        dest_geo_assembler,
        origin_geo_kmeans,
        dest_geo_kmeans,
        weather_col_assembler,
        weather_std_scalar,
        feature_assembler,
    ]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Train the preprocess model and transform the input data.
# Note that the transformed data is cached to avoid recomputation 
# when writing partitions (see below).
preprocess_model = preprocess_pipeline.fit(input_df)
transformed_input_df = preprocess_model.transform(input_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Partition and write data <a class="anchor" id="write-s3"></a>
Partitition input data into train, validation, and test sets and write out files to CSVs in S3 for training models.

In [22]:
# Spliting in train and test set. Note that this operation 
# sorts the data set.
(train_df, validation_df, test_df) = transformed_input_df.randomSplit([0.75, 0.15, 0.10])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
%%pretty
train_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

arr_delay,cancelled,diverted,op_carrier,origin,origin_state_abr,dest,dest_state_abr,origin_tmax,origin_tmin,origin_prcp,origin_snow,origin_snwd,dest_tmax,dest_tmin,dest_prcp,dest_snow,dest_snwd,delay_status,target,carrier_idx,carrier_ohe,origin_freq,dest_freq,origin_state_idx,dest_state_idx,origin_state_ohe,dest_state_ohe,weather_features,weather_features_scaled,features
-63.0,0.0,0.0,OO,SFO,CA,GEG,WA,183.0,106.0,0.0,0.0,0.0,72.0,-5.0,28.0,8.0,0.0,OnTime,0.0,3.0,"(22,[3],[1.0])",0.0258478457994818,0.001903570441812...,0.0,12.0,"(52,[0],[1.0])","(52,[12],[1.0])","[183.0,106.0,0.0,...",[1.76520170281059...,"(138,[3,22,23,24,..."
-58.0,0.0,0.0,DL,MSP,MN,GEG,WA,-10.0,-49.0,0.0,0.0,50.0,-5.0,-43.0,0.0,0.0,150.0,OnTime,0.0,1.0,"(22,[1],[1.0])",0.0211605881421423,0.001903570441812...,13.0,12.0,"(52,[13],[1.0])","(52,[12],[1.0])","[-10.0,-49.0,0.0,...",[-0.0964591094432...,"(138,[1,22,23,37,..."
-56.0,0.0,0.0,DL,MSP,MN,GEG,WA,28.0,11.0,23.0,0.0,30.0,61.0,-32.0,0.0,0.0,150.0,OnTime,0.0,1.0,"(22,[1],[1.0])",0.0211605881421423,0.001903570441812...,13.0,12.0,"(52,[13],[1.0])","(52,[12],[1.0])","[28.0,11.0,23.0,0...",[0.27008550644096...,"(138,[1,22,23,37,..."
-55.0,0.0,0.0,UA,ORD,IL,GEG,WA,44.0,-5.0,0.0,0.0,0.0,83.0,6.0,91.0,0.0,0.0,OnTime,0.0,4.0,"(22,[4],[1.0])",0.0475189969489895,0.001903570441812...,3.0,12.0,"(52,[3],[1.0])","(52,[12],[1.0])","(10,[0,1,5,6,7],[...","(10,[0,1,5,6,7],[...","(138,[4,22,23,27,..."
-54.0,0.0,0.0,DL,MSP,MN,GEG,WA,-10.0,-49.0,0.0,0.0,50.0,-5.0,-43.0,0.0,0.0,150.0,OnTime,0.0,1.0,"(22,[1],[1.0])",0.0211605881421423,0.001903570441812...,13.0,12.0,"(52,[13],[1.0])","(52,[12],[1.0])","[-10.0,-49.0,0.0,...",[-0.0964591094432...,"(138,[1,22,23,37,..."


The writing operation takes a while. 
It's recommended to increase the number of core nodes in the cluster at this point, if possible.
Alternatively, this code can be run in a Glue Job that transforms the data,
writes the partitions to S3, 
then serializes and saves the processing models.

In [27]:
# Save transformed training data to CSV in S3 by converting to RDD.
train_lines = train_df.rdd.map(
    lambda row: str(row.target) + "," + ",".join(map(str, row.features.toArray()))
)
train_lines.saveAsTextFile(f"s3a://{model_bucket}/preprocessing_output/train/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Save transformed training data to CSV in S3 by converting to RDD.
validation_lines = validation_df.rdd.map(
    lambda row: str(row.target) + "," + ",".join(map(str, row.features.toArray()))
)
validation_lines.saveAsTextFile(f"s3a://{model_bucket}/preprocessing_output/validation/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Repeat the same saving method for the test data.
test_lines = test_df.rdd.map(
    lambda row: str(row.target) + "," + ",".join(map(str, row.features.toArray()))
)
test_lines.saveAsTextFile(f"s3a://{model_bucket}/preprocessing_output/test/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Serialize and store pipelines <a class="anchor" id="store-pipeline"></a>

In [28]:
import boto3
from mleap.pyspark.spark_support import SimpleSparkSerializer

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
serializer = mleap.pyspark.spark_support.SimpleSparkSerializer()
serializer.serializeToBundle(
    preprocess_model, 
    "jar:file:/tmp/preprocess_model.zip", 
    transformed_input_df
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o651.serializeToBundle.
: java.util.NoSuchElementException: key not found: org.apache.spark.ml.feature.SQLTransformer
	at scala.collection.MapLike$class.default(MapLike.scala:228)
	at scala.collection.AbstractMap.default(Map.scala:59)
	at scala.collection.MapLike$class.apply(MapLike.scala:141)
	at scala.collection.AbstractMap.apply(Map.scala:59)
	at ml.combust.bundle.BundleRegistry.opForObj(BundleRegistry.scala:102)
	at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:31)
	at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:30)
	at scala.util.Try$.apply(Try.scala:192)
	at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30)
	at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21)
	at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:

In [None]:
f"s3://{model_bucket}/spark-preprocessor/"