#  Project 1: Airline Flight
We will develope a model which predice whether or not a given flight will be delayed.  

We're going to create the stages for the flights duration model pipeline. 

In [8]:
import findspark
findspark.init('C:/Users/Hamid/Anaconda3/lib/site-packages/pyspark')
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import round


# Create SparkSession object
spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()

# What version of Spark?
print(spark.version)

# Read data from CSV file, inferSchema - deduce column data types from data.
flights = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())


# Convert 'mile' to 'km' and drop 'mile' column
flights = flights.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

# Remove records with missing 'delay' values
flights = flights.filter('delay IS NOT NULL')

# View the first five records
flights.show(5)

# Check column data types
flights.dtypes

2.4.4
The data contain 50000 records.
+---+---+---+-------+------+---+------+--------+-----+------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|    km|
+---+---+---+-------+------+---+------+--------+-----+------+
|  0| 22|  2|     UA|  1107|ORD| 16.33|      82|   30| 509.0|
|  2| 20|  4|     UA|   226|SFO|  6.17|      82|   -8| 542.0|
|  9| 13|  1|     AA|   419|ORD| 10.33|     195|   -5|1989.0|
|  5|  2|  1|     UA|   704|SFO|  7.98|     102|    2| 885.0|
|  7|  2|  6|     AA|   380|ORD| 10.83|     135|   54|1180.0|
+---+---+---+-------+------+---+------+--------+-----+------+
only showing top 5 rows



[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int'),
 ('km', 'double')]

In [22]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=17)

# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# One-hot encode index values
onehot = OneHotEncoderEstimator(
    inputCols=['org_idx', 'dow'],
    outputCols=['org_dummy', 'dow_dummy']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

evaluator = RegressionEvaluator(labelCol='duration')



# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0.0, 0.5, 1.0])


# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))


# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, 
                    evaluator=evaluator, numFolds=5)

cv = cv.fit(flights_train)


# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
predictions = best_model.transform(flights_test)
evaluator.evaluate(predictions)


Number of models to be tested:  12
[StringIndexer_f2db87a100d1, OneHotEncoderEstimator_9b144c9e5a98, VectorAssembler_1a6c1e587c1e, LinearRegression_f9b7f52f5738]


11.222357206540181