## Creating a Pipeline

In this exercise, you will implement a pipeline that includes multiple stages of *transformers* and *estimators* to prepare features and train a classification model. The resulting trained *PipelineModel* can then be used as a transformer to predict whether or not a flight will be late.

### Import Spark SQL and Spark ML Libraries

First, import the libraries you will need:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "4098m").\
        getOrCreate()

In [3]:
!pip install numpy

Collecting numpy
  Downloading numpy-1.20.2-cp37-cp37m-manylinux2010_x86_64.whl (15.3 MB)
[K     |████████████████████████████████| 15.3 MB 6.8 MB/s eta 0:00:01    |███████████▊                    | 5.6 MB 6.1 MB/s eta 0:00:02     |██████████████                  | 6.7 MB 6.1 MB/s eta 0:00:02     |██████████████████████▍         | 10.7 MB 6.2 MB/s eta 0:00:01     |██████████████████████████▉     | 12.8 MB 6.2 MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.20.2


In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler

### Load Source Data
The data for this exercise is provided as a CSV file containing details of flights. The data includes specific characteristics (or *features*) for each flight, as well as a column indicating whether or not the flight was late.

You will load this data into a dataframe and display it.

In [5]:
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", StringType(), False),
  StructField("DestAirportID", StringType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
  StructField("Late", IntegerType(), False),
])

data = spark.read.csv('../data/flights.csv', schema=flightSchema, header=True)
data.show()

+----------+---------+-------+---------------+-------------+--------+--------+----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|Late|
+----------+---------+-------+---------------+-------------+--------+--------+----+
|        21|        2|     WN|          10721|        13342|      26|      57|   1|
|        13|        1|     AA|          15016|        12892|      51|      27|   1|
|         5|        5|     FL|          10397|        11433|       9|       4|   0|
|        22|        1|     US|          11278|        14100|      35|      71|   1|
|        23|        4|     WN|          12451|        10693|       9|       5|   0|
|         5|        7|     AA|          11298|        15016|      39|      42|   1|
|         4|        6|     UA|          13930|        14307|      71|      58|   1|
|        10|        3|     9E|          14307|        11433|      68|     140|   1|
|        29|        7|     UA|          14057|        14771|     130|     12

In [6]:
spark.catalog.clearCache()

### Split the Data
It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. In this exercise, you will use 70% of the data for training, and reserve 30% for testing.

In [8]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 445982  Testing Rows: 191101


### Define the Pipeline
A predictive model often requires multiple stages of feature preparation. For example, it is common when using some algorithms to distingish between continuous features (which have a calculable numeric value) and categorical features (which are numeric representations of discrete categories). It is also common to *normalize* continuous numeric features to use a common scale - for example, by scaling all numbers to a proportional decimal value between 0 and 1 (strictly speaking, it only really makes sense to do this when you have multiple numeric columns - normalizing them all to similar scales prevents a feature with particularly large values from dominating the training of the model - in this case, we only have one non-categorical numeric feature; but I've included this so you can see how it's done!).

A pipeline consists of a a series of *transformer* and *estimator* stages that typically prepare a dataframe for
modeling and then train a predictive model. In this case, you will create a pipeline with seven stages:
- A **StringIndexer** estimator for each categorical variable to generate numeric indexes for categorical features
- A **VectorAssembler** that creates a vector of continuous numeric features
- A **MinMaxScaler** that normalizes vector of numeric features
- A **VectorAssembler** that creates a vector of categorical and continuous features
- A **LogisticRegression** algorithm that trains a classification model.

In [9]:
monthdayIndexer = StringIndexer(inputCol="DayofMonth", outputCol="DayofMonthIdx")
weekdayIndexer = StringIndexer(inputCol="DayOfWeek", outputCol="DayOfWeekIdx")
carrierIndexer = StringIndexer(inputCol="Carrier", outputCol="CarrierIdx")
originIndexer = StringIndexer(inputCol="OriginAirportID", outputCol="OriginAirportIdx")
destIndexer = StringIndexer(inputCol="DestAirportID", outputCol="DestAirportIdx")
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normNums")
featVect = VectorAssembler(inputCols=["DayofMonthIdx", "DayOfWeekIdx", "CarrierIdx", "OriginAirportIdx", "DestAirportIdx", "normNums"], outputCol="features")
lr = LogisticRegression(labelCol="Late", featuresCol="features")
pipeline = Pipeline(stages=[monthdayIndexer, weekdayIndexer, carrierIndexer, originIndexer, destIndexer, numVect, minMax, featVect, lr])

### Run the Pipeline as an Estimator
The pipeline itself is an estimator, and so it has a **fit** method that you can call to run the pipeline on a specified dataframe. In this case, you will run the pipeline on the training data to train a model.

In [10]:
piplineModel = pipeline.fit(train)
print ("Pipeline complete!")

Pipeline complete!


### Test the Pipeline Model
The model produced by the pipeline is a transformer that will apply all of the stages in the pipeline to a specified dataframe and apply the trained model to generate predictions. In this case, you will transform the **test** dataframe using the pipeline to generate label predictions.

In [11]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", col("prediction").cast("Int"), col("Late").alias("trueLabel"))
predicted.show(100, truncate=False)

+---------------------------------------------+----------+---------+
|features                                     |prediction|trueLabel|
+---------------------------------------------+----------+---------+
|[25.0,2.0,10.0,57.0,33.0,0.3582089552238806] |1         |1        |
|[25.0,2.0,10.0,11.0,12.0,0.24378109452736318]|0         |1        |
|[25.0,2.0,10.0,11.0,12.0,0.29850746268656714]|0         |1        |
|[25.0,2.0,10.0,50.0,15.0,0.2288557213930348] |0         |0        |
|[25.0,2.0,10.0,18.0,12.0,0.2288557213930348] |0         |1        |
|[25.0,2.0,10.0,36.0,15.0,0.23383084577114427]|0         |0        |
|[25.0,2.0,10.0,38.0,53.0,0.4129353233830846] |1         |1        |
|[25.0,2.0,10.0,38.0,19.0,0.5522388059701493] |1         |1        |
|[25.0,2.0,10.0,38.0,22.0,0.9950248756218906] |1         |1        |
|[25.0,2.0,10.0,38.0,7.0,0.5621890547263682]  |1         |1        |
|[25.0,2.0,10.0,38.0,30.0,0.27860696517412936]|0         |0        |
|[25.0,2.0,10.0,38.0,49.0,0.253731

The resulting dataframe is produced by applying all of the transformations in the pipline to the test data. The **prediction** column contains the predicted value for the label, and the **trueLabel** column contains the actual known value from the testing data.