In [0]:
from pprint import pprint as pp

# Chapter 3: Machine Learning Pipelines

### specify database and create dataframes

In [0]:
%sql
use u23_2_3_schema;
show tables;

database,tableName,isTemporary
u23_2_3_schema,airports,False
u23_2_3_schema,flights,False
u23_2_3_schema,planes,False


In [0]:
print(spark.catalog.listTables())

In [0]:
flights = spark.table('flights')
print('length = ', flights.count())
flights.show(5)

In [0]:
airports = spark.table('airports')
print('length = ', airports.count())
airports.show(5)

In [0]:
planes = spark.table('planes')
print('length = ', planes.count())
planes.show(5)

### join `flights` and `planes`
##### both have `year` and `tailnum` columns
##### rename `year` column in `planes`

In [0]:
planes = planes.withColumnRenamed('year', 'plane_year')
planes.show(5)

In [0]:
model_data = flights.join(planes, on='tailnum', how="leftouter")
model_data.schema
model_data.show(5)

### change type of some columns

In [0]:
model_data = model_data.withColumn('arr_delay', model_data.arr_delay.cast('integer'))
model_data.printSchema()

### add column for plane age

In [0]:
model_data = model_data.withColumn('plane_age', model_data.year - model_data.plane_year)
model_data.show(5)

### add `label` column indicating whether or not flight was late

In [0]:
model_data = model_data.withColumn('is_late', model_data.arr_delay > 0)
model_data = model_data.withColumn('label', model_data.is_late.cast('integer'))
model_data.printSchema()
model_data.show(5)

### remove rows with missing values

In [0]:
print('num rows original = ', model_data.count())

In [0]:
temp_data = model_data.where(model_data.arr_delay.isNotNull() & 
                             model_data.dep_delay.isNotNull() &
                             model_data.air_time.isNotNull() &
                             model_data.plane_year.isNotNull()
                            )
print('num rows filtered = ', temp_data.count())

In [0]:
model_data = model_data.where(model_data.arr_delay.isNotNull() & 
                             model_data.dep_delay.isNotNull() &
                             model_data.air_time.isNotNull() &
                             model_data.plane_year.isNotNull()
                            )
print('num rows filtered = ', model_data.count())

### `import` for pre-processing

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

### one-hot encoding of `carrier`

In [0]:
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carrier_fact')

### one-hot encoding of `dest`

In [0]:
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

### assemble data

In [0]:
vec_assembler = VectorAssembler(inputCols=['month', 'air_time', 'carrier_fact', 'dest_fact', 'plane_age'],
                                outputCol='features')

### create pipeline

In [0]:
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

### execute pipeline

In [0]:
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [0]:
print(type(piped_data))
print('length piped_data = ', piped_data.count())
piped_data.printSchema()
piped_data.show(5)

### train-test split (60% train)

In [0]:
training, test = piped_data.randomSplit([0.6, 0.4])

In [0]:
print('training set info:')
print(type(training))
print('length = ', training.count())
training.printSchema()

print('test set info:')
print(type(test))
print('length = ', test.count())
test.printSchema()

# Chapter 4: Model Tuning and Selection

### `import` for modeling

In [0]:
import numpy as np

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

### instantiate objects

##### model and evaluator

In [0]:
lr = LogisticRegression()
evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderROC')

##### hyper-parameter grid

In [0]:
grid = ParamGridBuilder()
grid = grid.addGrid(lr.regParam,        np.arange(0, 0.1, 0.01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])
grid = grid.build()

##### cross validator

In [0]:
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=grid,
                    evaluator=evaluator)

### perform modeling

##### fit

In [0]:
models = cv.fit(training)

In [0]:
print(type(models))
print(models)

In [0]:
best_lr = models.bestModel
print(type(best_lr))
print(best_lr)

##### investigate `models`

In [0]:
pp(models.avgMetrics)
print('length = ', len(models.avgMetrics))

In [0]:
print(models.estimatorParamMaps)

In [0]:
pp(models.explainParams())

In [0]:
pp(models.getEstimatorParamMaps())

##### investigate best model

In [0]:
print('elasticNetParam = ', best_lr.getElasticNetParam()) 
print('regParam        = ', best_lr.getRegParam())

In [0]:
pp(best_lr.summary)

In [0]:
pp(best_lr.params)

In [0]:
pp(best_lr.getParam('elasticNetParam'))

##### use best model with test data
`elasticNetParam = 0`, `regParam = 0`; these are default values

In [0]:
test_results = best_lr.transform(test)
print(evaluator.evaluate(test_results))