## Creating a Classification Model

In this exercise, you will implement a classification model that uses features of a flight to predict whether or not the flight will be delayed.

### Import Spark SQL and Spark ML Libraries

First, import the libraries you will need:

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1501437322204_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


### Load Source Data
The data for this exercise is provided as a CSV file containing details of flights. The data includes specific characteristics (or *features*) for each flight, as well as a column indicating how many minutes late or early the flight arrived.

You will load this data into a DataFrame and display it.

In [5]:
csv = spark.read.csv('wasb:///data/flights.csv', inferSchema=True, header=True)
csv.show()


+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [6]:
csv.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)

### Prepare the Data
Most modeling begins with exhaustive exploration and preparation of the data. In this example, the data has been cleaned for you. You will simply select a subset of columns to use as *features* and create a Boolean *label* field named **Late** with the value **1** for flights that arrived 15 minutes or more after the scheduled arrival time, or **0** if the flight was early or on-time.

(Note that in a real scenario, you would perform additional tasks such as handling missing or duplicated data, scaling numeric columns, and using a process called *feature engineering* to create new features for your model).

In [10]:
data = csv.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).alias("Late").cast("Int")))
data.show()

+----------+---------+---------------+-------------+--------+----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+---------------+-------------+--------+----+
|        19|        5|          11433|        13303|      -3|   0|
|        19|        5|          14869|        12478|       0|   0|
|        19|        5|          14057|        14869|      -4|   0|
|        19|        5|          15016|        11433|      28|   1|
|        19|        5|          11193|        12892|      -6|   0|
|        19|        5|          10397|        15016|      -1|   0|
|        19|        5|          15016|        10397|       0|   0|
|        19|        5|          10397|        14869|      15|   1|
|        19|        5|          10397|        10423|      33|   1|
|        19|        5|          11278|        10397|     323|   1|
|        19|        5|          14107|        13487|      -7|   0|
|        19|        5|          11433|        11298|      22| 

### Split the Data
It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. In this exercise, you will use 70% of the data for training, and reserve 30% for testing.

In [25]:
splits = data.randomSplit([0.7, 0.3], seed = 123)
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print "Training Rows:", train_rows, " Testing Rows:", test_rows

Training Rows: 1891523  Testing Rows: 810695

In [26]:
train.show()

+----------+---------+---------------+-------------+--------+----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+---------------+-------------+--------+----+
|         1|        1|          10140|        10397|      -4|   0|
|         1|        1|          10140|        10397|       0|   0|
|         1|        1|          10140|        10821|       4|   0|
|         1|        1|          10140|        10821|       8|   0|
|         1|        1|          10140|        10821|      77|   1|
|         1|        1|          10140|        11259|      -5|   0|
|         1|        1|          10140|        11259|      -2|   0|
|         1|        1|          10140|        11259|      -2|   0|
|         1|        1|          10140|        11259|      -2|   0|
|         1|        1|          10140|        11259|      -1|   0|
|         1|        1|          10140|        11259|      21|   1|
|         1|        1|          10140|        11259|      21| 

### Prepare the Training Data
To train the classification model, you need a training data set that includes a vector of numeric features, and a label column. In this exercise, you will use the **VectorAssembler** class to transform the feature columns into a vector, and then rename the **Late** column to **label**.

In [27]:
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
training = assembler.transform(train).select(col("features"), col("Late").alias("label"))
training.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
+--------------------+-----+
only showing top 20 rows

### Train a Classification Model
Next, you need to train a classification model using the training data. To do this, create an instance of the classification algorithm you want to use and use its **fit** method to train a model based on the training DataFrame. In this exercise, you will use a *Logistic Regression* classification algorithm - though you can use the same technique for any of the classification algorithms supported in the spark.ml API.

In [28]:
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
model = lr.fit(training)
print "Model trained!"

Model trained!

### Prepare the Testing Data
Now that you have a trained model, you can test it using the testing data you reserved previously. First, you need to prepare the testing data in the same way as you did the training data by transforming the feature columns into a vector. This time you'll rename the **Late** column to **trueLabel**.

In [29]:
testing = assembler.transform(test).select(col("features"), col("Late").alias("trueLabel"))
testing.show()

+--------------------+---------+
|            features|trueLabel|
+--------------------+---------+
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        1|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        1|
|[1.0,1.0,10140.0,...|        1|
|[1.0,1.0,10140.0,...|        1|
+--------------------+---------+
only showing top 20 rows

### Test the Model
Now you're ready to use the **transform** method of the model to generate some predictions. You can use this approach to predict delay status for flights where the label is unknown; but in this case you are using the test data which includes a known true label value, so you can compare the predicted status to the actual status. 

In [30]:
prediction = model.transform(testing)
predicted = prediction.select("features", "prediction", "probability", "trueLabel")
predicted.show(100)

+--------------------+----------+--------------------+---------+
|            features|prediction|         probability|trueLabel|
+--------------------+----------+--------------------+---------+
|[1.0,1.0,10140.0,...|       0.0|[0.82762772187607...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82762772187607...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82358425272917...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.83391149131953...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82998514518323...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82395776166499...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82395776166499...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.79816883287225...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82800848536521...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82397204836471...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.81777829053773...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.77461652992075...|        1|
|[1.0,1.0,10140.0,...|   

+--------------------+---------+--------------------+--------------------+----------+
|            features|trueLabel|       rawPrediction|         probability|prediction|
+--------------------+---------+--------------------+--------------------+----------+
|[1.0,1.0,10140.0,...|        0|[1.56890689646430...|[0.82762772187607...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.56890689646430...|[0.82762772187607...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.54082244509260...|[0.82358425272917...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.61360644012476...|[0.83391149131953...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.58552198875306...|[0.82998514518323...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.54339531169550...|[0.82395776166499...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.54339531169550...|[0.82395776166499...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.37488860346528...|[0.79816883287225...|       0.0|
|[1.0,1.0,10140.0,...|        0|[1.57157826028054...|[

Looking at the result, the **prediction** column contains the predicted value for the label, and the **trueLabel** column contains the actual known value from the testing data. It looks like there are a mix of correct and incorrect predictions - later in this course you'll learn how to measure the accuracy of a model.