In [1]:


# Import the PySpark module
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()





flights = spark.read.csv("/Users/Rafael/Desktop/data/flights.csv", sep = ',', header=True,  inferSchema=True, nullValue='NA' )



# Read attribute reads the CSV file and returns the DataFrame

In [2]:
flights.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [3]:
flights.printSchema()

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)



In [4]:
print("The data contain %d records." % flights.count())

The data contain 50000 records.


In [5]:
#  Develop a model which will predict whether or not a given flight will be delayed.
 # We are gonna  to trim those data down by removing an uninformative column and removing rows which do not have information about whether or not a flight was delayed.
flights_drop_column = flights.drop('flight')

In [6]:
# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()

2978

In [7]:
#  Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

In [8]:
# Remove records with missing values in any column:
flights_none_missing = flights_valid_delay.dropna()

In [9]:
print(flights_none_missing.count())

47022


In [10]:
# Import the required function
from pyspark.sql.functions import round

In [11]:
# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')


In [12]:
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|
+---+---+---+-------+---+------+--------+-----+------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|
+---+---+---+-------+---+------+--------+-----+------+
only showing top 5 rows



In [13]:
# Create a label column with a value of 1 indicating the delay was 15 minutes or more and 0 otherwise.
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

In [14]:
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [15]:
# Transform the columns into indexed numerical values.
from pyspark.ml.feature import StringIndexer

In [16]:
# Create an indexer 
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

flights = flights_km

In [17]:
# Indexer identifies categories in the data
indexer_model = indexer.fit(flights)

In [18]:
# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)


In [19]:
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

In [20]:
flights_indexed.show(10)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|
|  1| 16|  6|     UA|ORD|   8.0|     232|   -7|2317.0|    0|        0.0|    0.0|
|  1| 22|  5|     UA|SJC|  7.98|     250|  -13|2943.0|    0|        0.0|    5.0|
| 11|  8|  1|     OO|SFO|  7.77|      60|   88| 254.0|    1|        2.0|    1.0|
|  4| 26|  1|     AA|SFO| 13.25|     210|  -10|2356.0|    0|        1.0|    1.0|
|  4| 25|  0|     AA|ORD| 13

In [21]:
## The final stage of data preparation is to consolidate all of the predictor columns into a single column.
## Import the class which will assemble the predictors.
from pyspark.ml.feature import VectorAssembler

In [22]:

flights = flights_indexed

In [23]:
## Create an assembler object that will allow you to merge the predictors columns into a single column.
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'],
                            outputCol = 'features')

In [24]:
# Consolidate predictor columns
flights_assembled = assembler.transform(flights)


In [25]:

flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |30   |
|[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |-8   |
|[9.0,13.0,1.0,1.0,0.0,1989.0,10.33,195.0]|-5   |
|[5.0,2.0,1.0,0.0,1.0,885.0,7.98,102.0]   |2    |
|[7.0,2.0,6.0,1.0,0.0,1180.0,10.83,135.0] |54   |
+-----------------------------------------+-----+
only showing top 5 rows



In [26]:
flights = flights_assembled

## Build the Decsion Tree model 


In [27]:
## To objectively assess a Machine Learning model we need to be able to test it on an independent set of data.
## We can't use the same data that we used to train the model: of course the model will perform (relatively) well on those data
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=17)

In [28]:
training_ratio = flights_train.count() / flights.count()

In [29]:
print(training_ratio)

0.7980732423121092


In [30]:
[flights_train.count(), flights_test.count()]

[37527, 9495]

In [31]:
## Import the class for creating a Decision Tree classifier.
from pyspark.ml.classification import DecisionTreeClassifier

In [32]:
## Create a classifier object and fit it to the training data.
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

In [33]:
## Make predictions for the testing data
prediction = tree_model.transform(flights_test)


In [39]:
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |0.0       |[0.509000900090009,0.490999099909991]   |
|1    |1.0       |[0.4044414535666218,0.5955585464333782] |
|1    |1.0       |[0.32537139528109527,0.6746286047189047]|
|1    |1.0       |[0.32537139528109527,0.6746286047189047]|
|1    |1.0       |[0.32537139528109527,0.6746286047189047]|
+-----+----------+----------------------------------------+
only showing top 5 rows



# Evaluate the Decision Tree

In [40]:
### Create a confusion matrix.
### We can assess the quality of the model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1245|
|    0|       0.0| 2491|
|    1|       1.0| 3579|
|    0|       1.0| 2180|
+-----+----------+-----+



In [41]:
### Count the number of True Negatives, True Positives, False Negatives and False Positives.
### Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

In [42]:
###  Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

0.6392838335966298


In [None]:
### The accuracy is decent but there are a lot of false predictions. We can make this model better!