# Summary

<p>
This project is doing explorative data analysis using the pyspark and sql application programming interfaces (API's) of Apache spark.
</p> 

<p>
Apache Spark is an open-source engine developed specifically for handling large-scale data processing and analytics. Spark offers the ability to access data in a variety of sources. Apache Spark is designed to accelerate analytics on Hadoop with speed and efficiency.
</p> 
<a href="https://www.webopedia.com/TERM/A/apache-spark.html" target="_blank">webopedia</a> 

<p>
The airtraffic system data records consist of the tables flights, planes,
and airports. The data sources used  are csv-files stored locally. 
The functions and methods applied here include SQL-queries and
SQL-calculations and pyspark implementations like select, filter,
collect, join, and aggregate. 
</p> 

<p>
Those can of course applied to Big Data on remote machines. This is the whole point of the Apache Spark system architecture. It allows even analysing streaming data in real time. The Spark master distributes then the data as Resilient Distributed Datasets (RDD) or immutable distributed collections of objects on the remote machines (workers) using a process of mapping, sort and shuffle, and reducing. RDD's are not generated in this project, but are in others.
</p> 



<img src="spark_architecture.jpg" alt="Smiley face" align="left"  style="margin-left: 0px; margin-right: 0px; margin-top: 20px; margin-bottom: 20px; float: left; width: 800px; height: 300px"> 

<p>
The main focus of this project is prepare the data for machine learning for example by:
</p>

<ul>
  <li>joining</li>
  <li>creating labels</li>
  <li>cleaning missing values</li>
  <li>doing a train-test-split</li>
</ul> 


<p>    
and then apply Pyspark ML library alogrithms like:
</p> 

<ul>
  <li>Random Forest</li>
  <li>Random Forest in cross-validation</li>
  <li>Logistic Regression in cross-validation</li>
  <li>Gradient-Boosted Tree Classifier</li>
</ul> 


## Import packages

In [188]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.rdd import RDD
from pyspark.sql.functions import round, max, min, mean, stddev , col, avg 
from pyspark.sql.types import IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, \
StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evaluation
import pyspark.ml.tuning as tune
from pyspark.mllib.tree import RandomForest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
from time import *



### Inspect working directory

In [189]:
cwd_1=os.getcwd()
# print(cwd_1)
os.chdir(r'C:\Users\gamarandor\spark\spark-3.0.0-preview-bin-hadoop2.7\projects')
cwd_2=os.getcwd()
# print(cwd_2)

## Initializing Spark

<p>
"A SparkSession is beyond a time-bounded interaction, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. Most importantly, it curbs the number of concepts and constructs a developer has to juggle while interacting with Spark."
</p> 
<a href="https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html" target="_blank">Class SparkSession</a> 

<p>
"A SparkContext represents the connection to a Spark cluster, 
and can be used to create RDDs, accumulators and broadcast variables 
on that cluster." The SparkContext is the entry point into the cluster. Without creating a SparkContext nothing in the Spark session will work.
</p> 
<a href="https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/SparkContext.html" target="_blank">Class SparkContext</a> 








In [190]:

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000001F09530D748>


In [191]:
## Create Spark context 
sqlContext = SQLContext (sc)
print(sqlContext)


<pyspark.sql.context.SQLContext object at 0x000001F0975354C8>


## Inspect spark context

In [192]:
# Verify SparkContext
print(sc)

# Print Spark version
print(sc.version)

# Master URL to connect to
print(sc.master)

# Path where Spark is installed on worker nodes
print(str(sc.sparkHome))

# Retrieve name of the Spark User running
print(str(sc.sparkUser()))

# Return application name
print(sc.appName)

# Retrieve application ID
print(sc.applicationId)

# Return default level of parallelism 
print(sc.defaultParallelism)

# Default minimum number of partitions for RDDs
print(sc.defaultMinPartitions)

<SparkContext master=local[*] appName=PySparkShell>
3.0.0-preview
local[*]
None
gamarandor
PySparkShell
local-1578052098871
4
2


## Reading csv-files into the Spark cluster

In [193]:
path_source = r"planes.csv"

df_planes = sqlContext.read.load (path_source,
                           format='com.databricks.spark.csv',
                           header='true',
                           inferSchema='true')
                           
print(df_planes)

DataFrame[tailnum: string, year: string, type: string, manufacturer: string, model: string, engines: int, seats: int, speed: string, engine: string]


In [194]:
temp_table_name = "planes_csv"
print(temp_table_name)

planes_csv


In [195]:
df_planes.createOrReplaceTempView(temp_table_name)

In [196]:
src_flights = r"flights_small.csv"

df_flights = sqlContext.read.load (src_flights,
                           format='com.databricks.spark.csv',
                           header='true',
                           inferSchema='true')

In [197]:
# schema delivers the data types of the variables or columns
print(df_flights.schema)

print("")
temp_table_name_2 = "df_flights"
print(temp_table_name_2)
df_flights.createOrReplaceTempView(temp_table_name_2)

StructType(List(StructField(year,IntegerType,true),StructField(month,IntegerType,true),StructField(day,IntegerType,true),StructField(dep_time,StringType,true),StructField(dep_delay,StringType,true),StructField(arr_time,StringType,true),StructField(arr_delay,StringType,true),StructField(carrier,StringType,true),StructField(tailnum,StringType,true),StructField(flight,IntegerType,true),StructField(origin,StringType,true),StructField(dest,StringType,true),StructField(air_time,StringType,true),StructField(distance,IntegerType,true),StructField(hour,StringType,true),StructField(minute,StringType,true)))

df_flights


In [198]:
src_airports =  r"airports.csv"

df_airports = sqlContext.read.load (src_airports,
                           format='com.databricks.spark.csv',
                           header='true',
                           inferSchema='true')
df_airports.schema

StructType(List(StructField(faa,StringType,true),StructField(name,StringType,true),StructField(lat,DoubleType,true),StructField(lon,DoubleType,true),StructField(alt,IntegerType,true),StructField(tz,IntegerType,true),StructField(dst,StringType,true)))

In [199]:
temp_table_name_3 = "df_airports"
print(temp_table_name_3)
df_airports.createOrReplaceTempView(temp_table_name_3)

df_airports


#### Check if object is a data frame or RDD

In [200]:
def check_df(x):
    if isinstance(x, RDD):
        return "RDD"
    if isinstance(x, DataFrame):
        return "DataFrame"
    
check_df(df_airports)

'DataFrame'

### SparkSession.catalog.listTables()
<p>
The attribute catalog of the active Spark session lists the data 
inside the cluster. The .listTables() method generates
all the tables inside the cluster as a list. 
This helps to get orientation.
</p>

In [201]:
print(my_spark.catalog.listTables())

[Table(name='df_airports', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='df_flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='planes_csv', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## Import packages

In [202]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.rdd import RDD
from pyspark.sql.functions import mean, stddev , col, avg, round
from pyspark.sql.types import IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os


### Inspect working directory

In [203]:
cwd_1=os.getcwd()
print(cwd_1)
os.chdir(r'C:\Users\gamarandor\spark\spark-3.0.0-preview-bin-hadoop2.7\projects')
cwd_2=os.getcwd()
print(cwd_2)

C:\Users\gamarandor\spark\spark-3.0.0-preview-bin-hadoop2.7\projects
C:\Users\gamarandor\spark\spark-3.0.0-preview-bin-hadoop2.7\projects


## Initializing Spark

<p>
"A SparkSession is beyond a time-bounded interaction, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. Most importantly, it curbs the number of concepts and constructs a developer has to juggle while interacting with Spark."
</p> 
<a href="https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html" target="_blank">Class SparkSession</a> 

<p>
"A SparkContext represents the connection to a Spark cluster, 
and can be used to create RDDs, accumulators and broadcast variables 
on that cluster." The SparkContext is the entry point into the cluster. Without creating a SparkContext nothing in the Spark session will work.
</p> 
<a href="https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/SparkContext.html" target="_blank">Class SparkContext</a> 








In [204]:

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000001F09530D748>


In [205]:
## Create Spark context 
sqlContext = SQLContext (sc)
print(sqlContext)


<pyspark.sql.context.SQLContext object at 0x000001F09753C648>


## Inspect spark context

In [206]:
# Verify SparkContext
print(sc)

# Print Spark version
print(sc.version)

# Master URL to connect to
print(sc.master)

# Path where Spark is installed on worker nodes
print(str(sc.sparkHome))

# Retrieve name of the Spark User running
print(str(sc.sparkUser()))

# Return application name
print(sc.appName)

# Retrieve application ID
print(sc.applicationId)

# Return default level of parallelism 
print(sc.defaultParallelism)

# Default minimum number of partitions for RDDs
print(sc.defaultMinPartitions)

<SparkContext master=local[*] appName=PySparkShell>
3.0.0-preview
local[*]
None
gamarandor
PySparkShell
local-1578052098871
4
2


## Reading csv-files into the Spark cluster

In [207]:
path_source = r"planes.csv"

df_planes = sqlContext.read.load (path_source,
                           format='com.databricks.spark.csv',
                           header='true',
                           inferSchema='true')
                           
print(df_planes)

DataFrame[tailnum: string, year: string, type: string, manufacturer: string, model: string, engines: int, seats: int, speed: string, engine: string]


In [208]:
temp_table_name = "planes_csv"
print(temp_table_name)

planes_csv


In [209]:
df_planes.createOrReplaceTempView(temp_table_name)

In [210]:
src_flights = r"flights_small.csv"

df_flights = sqlContext.read.load (src_flights,
                           format='com.databricks.spark.csv',
                           header='true',
                           inferSchema='true')

In [211]:
# schema delivers the data types of the variables or columns
print(df_flights.schema)

print("")
temp_table_name_2 = "df_flights"
print(temp_table_name_2)
df_flights.createOrReplaceTempView(temp_table_name_2)

StructType(List(StructField(year,IntegerType,true),StructField(month,IntegerType,true),StructField(day,IntegerType,true),StructField(dep_time,StringType,true),StructField(dep_delay,StringType,true),StructField(arr_time,StringType,true),StructField(arr_delay,StringType,true),StructField(carrier,StringType,true),StructField(tailnum,StringType,true),StructField(flight,IntegerType,true),StructField(origin,StringType,true),StructField(dest,StringType,true),StructField(air_time,StringType,true),StructField(distance,IntegerType,true),StructField(hour,StringType,true),StructField(minute,StringType,true)))

df_flights


In [212]:
src_airports =  r"airports.csv"

df_airports = sqlContext.read.load (src_airports,
                           format='com.databricks.spark.csv',
                           header='true',
                           inferSchema='true')
df_airports.schema

StructType(List(StructField(faa,StringType,true),StructField(name,StringType,true),StructField(lat,DoubleType,true),StructField(lon,DoubleType,true),StructField(alt,IntegerType,true),StructField(tz,IntegerType,true),StructField(dst,StringType,true)))

In [213]:
temp_table_name_3 = "df_airports"
print(temp_table_name_3)
df_airports.createOrReplaceTempView(temp_table_name_3)

df_airports


#### Check if object is a data frame or RDD

In [214]:
def check_df(x):
    if isinstance(x, RDD):
        return "RDD"
    if isinstance(x, DataFrame):
        return "DataFrame"
    
check_df(df_airports)

'DataFrame'

### SparkSession.catalog.listTables()
<p>
The attribute catalog of the active Spark session lists the data 
inside the cluster. The .listTables() method generates
all the tables inside the cluster as a list. 
This helps to get orientation.
</p>

In [215]:
print(my_spark.catalog.listTables())

[Table(name='df_airports', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='df_flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='planes_csv', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


### Read in csc files with read.csv as data frames

In [216]:
planes2 = spark.read.csv("planes.csv", inferSchema=True, header=True)
print(planes2.schema)
print(isinstance(planes2, DataFrame ))

print("") 

flights2 = \
spark.read.csv("flights_small.csv", inferSchema=True, header=True)

airports2 = spark.read.csv("airports.csv", inferSchema=True, header=True)


StructType(List(StructField(tailnum,StringType,true),StructField(year,StringType,true),StructField(type,StringType,true),StructField(manufacturer,StringType,true),StructField(model,StringType,true),StructField(engines,IntegerType,true),StructField(seats,IntegerType,true),StructField(speed,StringType,true),StructField(engine,StringType,true)))
True



### Investigating the data frames

In [217]:
# schema displays the column types
# it has shown that a lot of numeric variables were stored as strings
# this needed to be corrected

print(flights2.columns)
print("")

# convert air_time from string to integer
# with column creates a new column air_time of type integer
flights2 = \
flights2.withColumn("air_time", flights2["air_time"].cast(IntegerType()))

# converting multiple columns with a for loop

flights_cols = \
['dep_time', 'dep_delay', 'arr_time', 'arr_delay',  'hour', 'minute']


for i in flights_cols:
    flights2 = flights2.withColumn(i, flights2[i].cast(IntegerType()))
    


# printSchema displays the column types neatly
print(flights2.printSchema())
print(isinstance(flights2, DataFrame ))

print("")



['year', 'month', 'day', 'dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'carrier', 'tailnum', 'flight', 'origin', 'dest', 'air_time', 'distance', 'hour', 'minute']

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)

None
True



In [218]:
flights2.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('dep_time', 'int'),
 ('dep_delay', 'int'),
 ('arr_time', 'int'),
 ('arr_delay', 'int'),
 ('carrier', 'string'),
 ('tailnum', 'string'),
 ('flight', 'int'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('air_time', 'int'),
 ('distance', 'int'),
 ('hour', 'int'),
 ('minute', 'int')]

In [219]:
print(planes2.schema)
print(isinstance(planes2, DataFrame ))

print("") 

print(airports2.printSchema())
print(isinstance(airports2, DataFrame ))

StructType(List(StructField(tailnum,StringType,true),StructField(year,StringType,true),StructField(type,StringType,true),StructField(manufacturer,StringType,true),StructField(model,StringType,true),StructField(engines,IntegerType,true),StructField(seats,IntegerType,true),StructField(speed,StringType,true),StructField(engine,StringType,true)))
True

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)

None
True


In [220]:
# show displays the df

print(planes2.show(n=3))
print("")

print(flights2.show(n=3))
print("")

print(airports2.show(n=3))

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 3 rows

None

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+-

# Preparing

the data for using it into machine learning model. This includes here:


<ul>
<li>joining the tables containing the features or variables used as predictors,</li>
<li>convert data types from type object or string to a 
unambiguous numeric,</li>
<li>generating new features from existing variables,</li>
<li>creating the target variables, which should be predicted,</li>        
<li>convert data types from type object or string to a 
unambiguous numeric,</li>
<li>encoding 
string variables with numeric as Spark can only process numerics, </li>
<li> incorporate everything into a pipeline object for smooth execution
in the model.</li>

</ul> 

  

### Joining

#### Join 1

In [221]:
# the goal is to join the flights and the planes table
print(planes2.columns)
print(len(planes2.columns))

print("")

print(flights2.columns)
print(len(flights2.columns))

print("")

print("Number of joined columns: ", 9+16-1)

# year is in both df and without renamimg duplicate column names will 
# occur after joining

['tailnum', 'year', 'type', 'manufacturer', 'model', 'engines', 'seats', 'speed', 'engine']
9

['year', 'month', 'day', 'dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'carrier', 'tailnum', 'flight', 'origin', 'dest', 'air_time', 'distance', 'hour', 'minute']
16

Number of joined columns:  24


In [222]:
planes3 = planes2.withColumnRenamed('year', 'planes_year')
print(planes3.columns)

['tailnum', 'planes_year', 'type', 'manufacturer', 'model', 'engines', 'seats', 'speed', 'engine']


In [223]:
# tailnum is the key variable
joined_1 = flights2.join(planes3, on='tailnum', how='leftouter')

print("Number of joined columns: ", len(joined_1.columns))

Number of joined columns:  24


#### Join 2

In [224]:
# the goal is to join the flights and the aiports table
airports2.columns

airports3 = airports2.withColumnRenamed('faa' , 'dest')
# faa = old name, dest = new name
print(airports3.columns)

join_2 = flights2.join(airports3, on="dest", how='leftouter')
# left table = flights2, right table = aiports3

join_2.\
select('tailnum', 'name', 'lat', 'lon', 'dest', 'air_time').show(5)

print(join_2.columns)
print(len(join_2.columns))


['dest', 'name', 'lat', 'lon', 'alt', 'tz', 'dst']
+-------+--------------------+---------+-----------+----+--------+
|tailnum|                name|      lat|        lon|dest|air_time|
+-------+--------------------+---------+-----------+----+--------+
| N846VA|    Los Angeles Intl|33.942536|-118.408075| LAX|     132|
| N559AS|       Honolulu Intl|21.318681|-157.922428| HNL|     360|
| N847VA|  San Francisco Intl|37.618972|-122.374889| SFO|     111|
| N360SW|Norman Y Mineta S...|  37.3626|-121.929022| SJC|      83|
| N612AS|            Bob Hope|34.200667|-118.358667| BUR|     127|
+-------+--------------------+---------+-----------+----+--------+
only showing top 5 rows

['dest', 'year', 'month', 'day', 'dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'carrier', 'tailnum', 'flight', 'origin', 'air_time', 'distance', 'hour', 'minute', 'name', 'lat', 'lon', 'alt', 'tz', 'dst']
22


#### Join 3

Before it was not possible to join aiports with planes
for lack of common key. As now tailnum is in Join 2 it can be joined 
with planes. If there is not immediate common key to join
sometimes such a detour is necessary.

In [225]:
join_3 = join_2.join(planes2, on='tailnum', how='inner')

print(join_3.columns)
print(len(join_3.columns))

['tailnum', 'dest', 'year', 'month', 'day', 'dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'carrier', 'flight', 'origin', 'air_time', 'distance', 'hour', 'minute', 'name', 'lat', 'lon', 'alt', 'tz', 'dst', 'year', 'type', 'manufacturer', 'model', 'engines', 'seats', 'speed', 'engine']
30


### Convert data types 

<p>
to numeric values (integers or doubles) if it makes sense as Spark can process only numeric values. One reason this is needed is because
Spark does not grasp the data type always correctly with infer_schema,
when loading the data into the cluster.
</p> 


In [226]:
joined_1.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- planes_year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



In [227]:
print(joined_1.columns)

['tailnum', 'year', 'month', 'day', 'dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'carrier', 'flight', 'origin', 'dest', 'air_time', 'distance', 'hour', 'minute', 'planes_year', 'type', 'manufacturer', 'model', 'engines', 'seats', 'speed', 'engine']


In [228]:
cols_1 = ['dep_time', 'dep_delay', 'arr_time', 'arr_delay', 
          'air_time', 'distance', 'planes_year','speed']

for i in cols_1:
    joined_1 = joined_1.withColumn(i, joined_1[i].cast('integer'))

In [229]:
joined_1.select(cols_1).printSchema()

root
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- planes_year: integer (nullable = true)
 |-- speed: integer (nullable = true)



### Generating a new variable from existing variables

In [230]:
# Years the plane is used in years
joined_2 = joined_1.withColumn('plane_years', 
                               joined_1.year - joined_1.planes_year)

joined_2.select('plane_years', 'year', 'planes_year').printSchema()

root
 |-- plane_years: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- planes_year: integer (nullable = true)



### Creating new binary boolean target or label variables

In [231]:
# Is late departure occuring?
joined_3_dep = \
joined_2.withColumn('late_departure', joined_2.dep_delay > 0)

joined_3_dep.select('late_departure').show(5)

+--------------+
|late_departure|
+--------------+
|         false|
|          true|
|         false|
|          true|
|         false|
+--------------+
only showing top 5 rows



In [232]:
joined_3_dep_b = \
joined_3_dep.withColumn('label', joined_3_dep['late_departure'].\
                        cast('integer'))

print(joined_3_dep_b.select('label').show(3))

+-----+
|label|
+-----+
|    0|
|    1|
|    0|
+-----+
only showing top 3 rows

None


In [233]:
# Is late arrival occurring?
joined_3_arr = \
joined_2.withColumn('late_arrival', joined_2.arr_delay > 0)

joined_3_arr.select('late_arrival').show(5)

+------------+
|late_arrival|
+------------+
|       false|
|        true|
|        true|
|        true|
|        true|
+------------+
only showing top 5 rows



In [234]:
# spark processes numeric data
# the boolean string are replaced with binary 1 and 0
joined_3_arr_b = \
joined_3_arr.withColumn('label', joined_3_arr['late_arrival'].
                        cast('integer'))

joined_3_arr_b.select('label').show(4)

+-----+
|label|
+-----+
|    0|
|    1|
|    1|
|    1|
+-----+
only showing top 4 rows



In [235]:
# Is the flight long distance or above 1000?
joined_2.select([max("distance")]).show()
joined_2.select([min("distance")]).show()
print("")

joined_4_dist = \
joined_2.withColumn('long_distance', joined_2.distance > 1000)

joined_4_dist.select('long_distance').show(5)

+-------------+
|max(distance)|
+-------------+
|         2724|
+-------------+

+-------------+
|min(distance)|
+-------------+
|           93|
+-------------+


+-------------+
|long_distance|
+-------------+
|        false|
|         true|
|        false|
|        false|
|        false|
+-------------+
only showing top 5 rows



In [236]:
joined_4_dist_b = \
joined_4_dist.withColumn('label', joined_4_dist['long_distance'].
                         cast('integer'))

joined_4_dist_b.select('label').show(4)

+-----+
|label|
+-----+
|    0|
|    1|
|    0|
|    0|
+-----+
only showing top 4 rows



### Missing values?

Checking if missing values are given in the variables
of the data set utilized for the model.

In [237]:
print("Number of observations: ", joined_3_arr_b.count())

print("")

check_null_list = ["arr_delay", "dep_delay", "plane_years"]

for i in check_null_list:
    n_notnull = \
    joined_3_arr_b.filter(joined_3_arr_b[i].isNotNull()).count()
    
    print("isNotNull of variable" , i, ":", n_notnull)
    
    n_observations = joined_3_arr_b.count()
    
    n_null = n_observations - n_notnull
    
    print("isNUll of variable", i, n_null)

Number of observations:  10000

isNotNull of variable arr_delay : 9925
isNUll of variable arr_delay 75
isNotNull of variable dep_delay : 9952
isNUll of variable dep_delay 48
isNotNull of variable plane_years : 9354
isNUll of variable plane_years 646


Turning the for loop above into a function.

In [238]:
def check_null(cols, df):
    for i in cols:
            n_notnull = \
            df.filter(df[i].isNotNull()).count()

            print("isNotNull of variable" , i, ":", n_notnull)

            n_observations = df.count()

            n_null = n_observations - n_notnull

            print("isNUll of variable", i, n_null)

In [239]:
# test if function works
# works
check_null(cols=check_null_list , df=joined_3_arr_b)

isNotNull of variable arr_delay : 9925
isNUll of variable arr_delay 75
isNotNull of variable dep_delay : 9952
isNUll of variable dep_delay 48
isNotNull of variable plane_years : 9354
isNUll of variable plane_years 646


Dropping null values.

In [240]:
joined3c = joined_3_arr_b.na.drop(subset=check_null_list)
len(joined3c.columns)
# All columns are remaining, drop did not interfer with this

27

In [241]:
check_null(cols=check_null_list, df=joined3c)

isNotNull of variable arr_delay : 9303
isNUll of variable arr_delay 0
isNotNull of variable dep_delay : 9303
isNUll of variable dep_delay 0
isNotNull of variable plane_years : 9303
isNUll of variable plane_years 0


In [242]:
joined1c = joined_3_dep_b.na.drop(subset=check_null_list) 
check_null(cols=check_null_list, df=joined1c )

isNotNull of variable arr_delay : 9303
isNUll of variable arr_delay 0
isNotNull of variable dep_delay : 9303
isNUll of variable dep_delay 0
isNotNull of variable plane_years : 9303
isNUll of variable plane_years 0


In [243]:
joined4c = joined_4_dist_b.na.drop(subset=check_null_list) 
check_null(cols=check_null_list, df=joined4c)

isNotNull of variable arr_delay : 9303
isNUll of variable arr_delay 0
isNotNull of variable dep_delay : 9303
isNUll of variable dep_delay 0
isNotNull of variable plane_years : 9303
isNUll of variable plane_years 0


### One-hot-encoding & Pipelines

<p>
transforming string variables with a limited number of categories and
make it ready for execution with one pipeline object.
In Pyspark the subsequent methods are applied sequentially.
</p> 


 <ul>
  <li><b>StringIndexer</b>: maps by estimating and then transforming 
      each unique string value of the data frame to a number. This is comparable to label encoding in scikit for example.
  </li>
  <li><b>OneHotEncoder (OHE)</b>: The output of the StringIndexer 
      is the input into the OHE. The OHE creates a vector of possible
      values over given observations. A 1 represents the actual given
      value, whereas a 0 indicates this is not the value of the variable.
  </li>
  <li><b>VectorAssembler</b>: As explained above OHE creates a vector from
      variable, where all values are 0 and but one. This increases the
      number of columns. Many OHE vectors are incoporated into the
      data frame with the VectorAssembler class as new columns.
  </li>
  <li><b>Pipeline</b>: as also used in Scikit for example binds every
         estimator and transformer object created beforehand into one 
         one object. This can then be executed in sequential steps. 
         More on pipelines in the
         <a href="https://spark.apache.org/docs/latest/ml-pipeline.html"
            target="_blank">doc</a>.
    </li>
</ul> 

<p>This process is shown below.</p> 

#### OneHotEncoder

In [244]:
# The String Indexer creates a number for every unique string value.
# This is comparable to the label encoder in scikit
# https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.LabelEncoder.htmlwqassasay

origin_indexer = \
StringIndexer(inputCol="origin", outputCol="origin_index")

# The output of the String Indexer is the input to the one hot encoder
# This is carried out by the pipeline later

origin_encoder = \
OneHotEncoder(inputCol="origin_index", outputCol="origin_fact")

In [245]:
# Create a StringIndexer
carr_indexer = \
StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = \
OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

In [246]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", 
outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index",
outputCol="dest_fact")

#### VectorAssembler

In [247]:
# Make a VectorAssembler collecting the features
# and take in the beforehand encoded variables
# and others features
# used later in the data model

vec_assembler = \
VectorAssembler(inputCols=["month", "air_time", "origin_fact", 
                            "carrier_fact", "dest_fact", "plane_years"], 
                outputCol="features")

#### Pipeline

In [248]:
# Import Pipeline
# Make the pipeline
airtransport_pipeline = \
Pipeline(stages=[origin_indexer, origin_encoder,
                 dest_indexer, dest_encoder,
                 carr_indexer, carr_encoder, 
                 vec_assembler])

airtransport_pipeline

Pipeline_c97f2d1d88db

A estimator is an algorithm, which can be fit on a dataframe to fit a 
transformer. A transformer "transforms" a dataframe into another enhanced
data frame (from features to predictions for example.)

In [249]:
dt_into_pipe = airtransport_pipeline.fit(joined3c).transform(joined3c)

### Splitting the data into train and a test set

<p>
is a core data science technique to prevent overfitting of the model
on the (train) data and decreasing performance on new data (real world).
Iterating through the training data with cross-validation-methods
can improve the performance and validitdy of the model further.
The final check with the test set (or holdout set, when the data was 
never used before in the process give an realistic idea of the model
performance in the "real world".
</p> 

<a href="https://en.wikipedia.org/wiki/Training,_validation,_and_test_sets" target="_blank">For more info compare Wikipedia</a> 

<p>
Here only simple train-test-splits are used.
</p>

In [250]:
training_1, test_1 = dt_into_pipe.randomSplit([.7, .3])

In [251]:
print((training_1.count(), len(training_1.columns)))
print((test_1.count(), len(test_1.columns)))


(6524, 34)
(2779, 34)


In [252]:
training_1.columns

['tailnum',
 'year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute',
 'planes_year',
 'type',
 'manufacturer',
 'model',
 'engines',
 'seats',
 'speed',
 'engine',
 'plane_years',
 'late_arrival',
 'label',
 'origin_index',
 'origin_fact',
 'dest_index',
 'dest_fact',
 'carrier_index',
 'carrier_fact',
 'features']

## Is the flight late? <br> Answer with predictions generated by different models.



In [253]:
# Create a LogisticRegression Estimator to fit on the df
logreg = LogisticRegression(featuresCol="features", labelCol = 'label')
logreg

LogisticRegression_acd133027103

### Cross validation and hyper parameter tuning

<p>
is used to increase the model performance and decrease the
model error. In cross validation the data set is split into
different data partitions a number of times. 
For every subset or fold the majority of partitions is used for training and the remaining block is used as validation (testing) data. Every partition is used once as validation data. Applying an algorithm like logistic regression generates predictions evaluated with a metric
like the model error for every subset. 
</p>     
<p>
Averaging the metrics
gives a good estimation of the actual error. Using this together
with hyper parameter tuning, whereby given parameters of the algorithm 
are provided with a list of values, allows to find and select the best model with the lowest model error. The selected model is finally evaluated 
with the test data. 
</p>  
<p>
Fortunately Apache Pyspark provides a workflow
to do all of this similar to Scikit, so there is not a need to code this
from the start. It is helpful to keep in mind that cross-validation is
a highly computing intensive process. Pyspark running on remote machines
in the cloud could manage this process better than a single machine like 
here.
</p> 

<p> 
<img src="https://scikit-learn.org/stable/_images/grid_search_cross_validation.png" alt="Cross-validation: evaluating estimator performance" 
height="250" width="450"> 
</p>

<p> 
<div align="right">
<a href="https://scikit-learn.org/stable/modules/cross_validation.html" target="_blank">Scikit: Cross-validation: evaluating estimator performance</a> 
</div> 
</p>

### Simple RandomForestClassifier as a base model

In [254]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(training_1)
predictions = rfModel.transform(test_1)

In [255]:
printout = [ 'features', 'probability', 'label', 'prediction']

In [256]:
predictions.select(printout).show(5)

+--------------------+--------------------+-----+----------+
|            features|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|(82,[0,1,2,8,46,8...|[0.61853808920144...|    1|       0.0|
|(82,[0,1,8,46,81]...|[0.65090958909260...|    1|       0.0|
|(82,[0,1,2,8,45,8...|[0.56267528148207...|    1|       0.0|
|(82,[0,1,2,8,46,8...|[0.63558665243240...|    0|       0.0|
|(82,[0,1,2,8,45,8...|[0.61134811849834...|    0|       0.0|
+--------------------+--------------------+-----+----------+
only showing top 5 rows



In [257]:
print(rf.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 

### Logistic regression model within cross validation

<p>
A logistic regression model predicts certain outcomes 
with a probability between 0 and 1. In boolean Terms a probability
above a choosen cut-off-point is then True or Yes and below False or No.
</p> 

In [258]:
# Import the evaluation module of the pyspark machine leraning library
# It is able to evaluate a range of models created with 
# linear regression, classification, support vector machines ...

# The BinaryClassificationEvaluator uses the area under the curve metric
# based on confusion matrix of true positives, false positives ...

evaluation_instance_log = \
evaluation.BinaryClassificationEvaluator(metricName="areaUnderROC")

Which hyperparameters can be hyper tuned?
<br>An overview is given here:
 

<p>
class pyspark.ml.classification.LogisticRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto")
</p> 

<a href="https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression" target="_blank">pyspark.ml package</a> 


<p>
The params use here are: maxIter, elasticNetParam, regParam. 
</p> 

In [259]:
# ParamGridBuilder delivers the grid of tuned values
grid_log = tune.ParamGridBuilder()

# this fills in the values to run into grid log
grid_log = grid_log.addGrid(logreg.regParam, np.arange(0, .1, .02))
grid_log = grid_log.addGrid(logreg.elasticNetParam, [0,1])
grid_log = grid_log.addGrid(logreg.maxIter, [100, 200])

# tell pyspark to build the grid
grid_log = grid_log.build()

In [260]:
cv_log = tune.CrossValidator(estimator=logreg,
               estimatorParamMaps=grid_log,
               evaluator=evaluation_instance_log
               )

In [261]:
start_time = time()

# Fit the model logreg to the training data
best_model_logreg = cv_log.fit(training_1)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 104.029 seconds


In [262]:
print(best_model_logreg)

CrossValidatorModel_6d44d455be53


In [263]:
# Use the model to predict the test set
test_results = best_model_logreg.transform(test_1)

# look into the predictions
test_results.select(printout).show(5)


+--------------------+--------------------+-----+----------+
|            features|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|(82,[0,1,2,8,46,8...|[0.44012009952712...|    1|       1.0|
|(82,[0,1,8,46,81]...|[0.57344578108556...|    1|       0.0|
|(82,[0,1,2,8,45,8...|[0.33496975534556...|    1|       1.0|
|(82,[0,1,2,8,46,8...|[0.76282840824656...|    0|       0.0|
|(82,[0,1,2,8,45,8...|[0.83695555021854...|    0|       0.0|
+--------------------+--------------------+-----+----------+
only showing top 5 rows



In [264]:
# Evaluate the predictions
print("Area under the curve - AUC: ", 
      evaluation_instance_log.evaluate(test_results))

Area under the curve - AUC:  0.7004728340352523


In [265]:
# look into the predictions - alternatively
prediction = best_model_logreg.transform(test_1)
selected = prediction.select('label')

predicted_labels = []

for row in selected.collect():
    predicted_labels.append(row)
    
print(predicted_labels[100:115])

[Row(label=0), Row(label=0), Row(label=0), Row(label=0), Row(label=1), Row(label=0), Row(label=0), Row(label=0), Row(label=0), Row(label=1), Row(label=0), Row(label=1), Row(label=0), Row(label=0), Row(label=0)]


### Random Forest Model within cross validation

In [266]:
# create model
model_rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# create contingency table or confustion matrix
# https://en.wikipedia.org/wiki/Confusion_matrix

evaluator_rf = MulticlassClassificationEvaluator() 
evaluator_rf_2 = \
evaluation.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [267]:
# create variables
RANDOM_SEED = 302
RF_NUM_TREES = [30, 60, 90]
RF_MAX_DEPTH = [4,6]
RF_NUM_BINS = [32, 64]

In [268]:
# ParamGridBuilder delivers the grid of tuned values
grid_rf = tune.ParamGridBuilder()

# this fills in the values to run into grid log
grid_rf = grid_rf.addGrid(model_rf.maxBins, RF_NUM_BINS)
grid_rf = grid_rf.addGrid(model_rf.maxDepth, RF_MAX_DEPTH)
grid_rf = grid_rf.addGrid(model_rf.numTrees, RF_NUM_BINS)

# tell pyspark to build the grid
grid_rf = grid_rf.build()

In [269]:
# cross validator takes inputs: model, grid, evaluation
cv_rf = tune.CrossValidator(estimator=model_rf,
                             estimatorParamMaps=grid_rf,
                             evaluator=evaluator_rf_2)

In [270]:
start_time = time()

# Fit the model logreg to the training data
best_model_rf = cv_rf.fit(training_1)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 55.616 seconds


In [271]:
bestModel = best_model_rf.bestModel

bestParams = bestModel.extractParamMap()

# bestParams.keys()
print(bestModel.getParam('maxBins'))
print(bestModel.getParam('maxDepth'))

RandomForestClassifier_9eea922fe729__maxBins
RandomForestClassifier_9eea922fe729__maxDepth


In [272]:
# Use the model to predict the test set
test_results_rf = best_model_rf.transform(test_1)

# look into the predictions
test_results_rf.select(printout).show(5)

+--------------------+--------------------+-----+----------+
|            features|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|(82,[0,1,2,8,46,8...|[0.63114203042744...|    1|       0.0|
|(82,[0,1,8,46,81]...|[0.65056148774525...|    1|       0.0|
|(82,[0,1,2,8,45,8...|[0.62100497594102...|    1|       0.0|
|(82,[0,1,2,8,46,8...|[0.63930185146379...|    0|       0.0|
|(82,[0,1,2,8,45,8...|[0.64834766347528...|    0|       0.0|
+--------------------+--------------------+-----+----------+
only showing top 5 rows



In [273]:

# Evaluate the predictions
print("Area under the curve - AUC: ", 
      evaluator_rf_2.evaluate(test_results_rf))

#  	metricName() param for metric name in evaluation 
# (supports "f1" (default), "weightedPrecision", "weightedRecall", 
# "accuracy")
# https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.html
print("f1: ", evaluator_rf.evaluate(test_results_rf))

Area under the curve - AUC:  0.599337482543627
f1:  0.4769752475538342


### Gradient-Boosted Tree Classifier

<p>produces normally good results and just for fun a simple 
model is tested here.</p> 

In [274]:
# create Gradient-Boosted Tree Classifier instance 
# and fit it into data
gbt = GBTClassifier(maxIter=10)

start_time = time()

# Fit the model logreg to the training data
gbtModel = gbt.fit(training_1)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)



Time to train model: 6.912 seconds


In [275]:
# generate data frame with predictions
predictions_gbt = gbtModel.transform(test_1)
print(predictions_gbt.select(printout).show(5))

+--------------------+--------------------+-----+----------+
|            features|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|(82,[0,1,2,8,46,8...|[0.52071097957816...|    1|       0.0|
|(82,[0,1,8,46,81]...|[0.65573352669052...|    1|       0.0|
|(82,[0,1,2,8,45,8...|[0.52006585456176...|    1|       0.0|
|(82,[0,1,2,8,46,8...|[0.66444710531219...|    0|       0.0|
|(82,[0,1,2,8,45,8...|[0.66444710531219...|    0|       0.0|
+--------------------+--------------------+-----+----------+
only showing top 5 rows

None


In [276]:
evaluator_gbt = BinaryClassificationEvaluator()
print("Area under the curve - AUC:" 
      + str(evaluator_gbt.evaluate(predictions, 
     {evaluator.metricName: "areaUnderROC"})))

Area under the curve - AUC:0.6036257023784655


Logistic regression model within cross validation produces the best result
in terms of AUC.

#### Appendix

In [277]:
# a nice feature is explainParams
print(rf.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 