In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from collections import defaultdict
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col,sum
import datetime
from pyspark.ml import PipelineModel

In [2]:
def get_types(df):
    print(df.schema)
    print(df.schema.fields)
    data_types_map = defaultdict(list)
    for entry in df.schema.fields:
      data_types_map[str(entry.dataType)].append(entry.name)
    print(data_types_map)
    return data_types_map

def filter_rows_with_missing_labels(df, target):
    #print(df.count())
    tmp = df.filter(df[target].isNotNull())
    print(tmp.count())
    return tmp
  
def drop_features(df, features):
    print(len(df.columns))
    for feature in features:
      df = df.drop(feature)
    print(len(df.columns))
    return df

def filter_features_with_missing_data(df):
    tmp = df.filter(df[target].isNotNull())
    return tmp
  
def get_missing_info(df):
    tmp = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
    display(tmp)

def eda_categorical_feature(df, feature):
    tmp = df.groupby(feature).count().toPandas()
    print(tmp)
        
def eda_continuous_features(df, feature):
    df.describe(feature).show()
  
def cast_integer_features_as_double(df, data_types_map):
    for c in data_types_map["IntegerType"]:
       df = df.withColumn(c, df[c].cast('double'))
    df.printSchema()
    return df
        
def imputer_continuous_features(df, data_types_map):
    continuous_features = list(set(data_types_map['DoubleType']) - set(['DEP_DEL15']))
    continuous_features_imputed = [var + "_imputed" for var in  continuous_features]
    imputer = Imputer(inputCols = continuous_features, outputCols = continuous_features_imputed)
    tmp = imputer.fit(df).transform(df)
    get_missing_info(tmp)
    return [imputer]

def impute_categorical_features(df, data_types_map):
    missing_data_fill = {}
    for var in data_types_map['StringType']:
      missing_data_fill[var] = "missing"
    tmp = df.fillna(missing_data_fill)
    get_missing_info(tmp)
    return tmp
    
def encoder_categorical_features(df, data_types_map):
    categorical_features = list(set(data_types_map['StringType']) - set(['_c46']))
    string_indexers = [StringIndexer(inputCol=feature, outputCol=feature+"_index") for feature in categorical_features]

    ohe_input_features = [ feature+"_index" for feature in categorical_features]
    ohe_output_features = [ feature+"_encoded" for feature in categorical_features]
    ohe_encoder = OneHotEncoderEstimator(inputCols=ohe_input_features, outputCols=ohe_output_features)  
    pipeline = Pipeline(stages=string_indexers + [ohe_encoder])
    tmp = pipeline.fit(df).transform(df)
    tmp.printSchema()
    return string_indexers + [ohe_encoder]

def assembler_for_feature_vector(df, data_types_map):
    features = []
    #categorical_features = list(set(data_types_map['StringType']) - set(['_c46']))
    #features = features + [ feature+"_encoded" for feature in categorical_features]
    
    continuous_features = list(set(data_types_map['DoubleType']) - set(['DEP_DEL15']))
    features = features + [var + "_imputed" for var in  continuous_features]
    print(features)
    assembler = VectorAssembler(inputCols= features, outputCol="features")
    #tmp = assembler.transform(df)
    #tmp.printSchema()
    return [assembler]

def split_data(df, train_percent):
    return df.randomSplit([train_percent, 1-train_percent], seed=123);

def build_and_tune_model_with_cv(pipeline, param_grid, evaluatior, df):
    # define grid based cross validator
    crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator= evaluator,
                          numFolds=2)

    # train model using fit
    cv_model = crossval.fit(df)
    return cv_model
  
def model_summary_lr(cv_model):
    print(cv_model)
    print(cv_model.avgMetrics)
    best_model = cv_model.bestModel
    print(best_model)
    print(best_model.stages)
    lr_model = best_model.stages[-1]
    print(lr_model)
    #print(lr_model.featureImportances)
    print(lr_model.coefficients)
    print(lr_model.intercept)
  
def predict(cv_model, evaluator, df):
    # evaluate model on test set
    predictions = cv_model.transform(df)
    print(predictions)
    predictions.printSchema()
    print(evaluator.evaluate(predictions))

def persist_model(cv_model, type, model_dir):
    datestamp = datetime.datetime.now().strftime('%m-%d-%Y-%s');
    file_name = "cv_model_" + type + "_" + datestamp
    path = model_dir + file_name
    print(path)
    cv_model.bestModel.save(path)
    
def load_model(path):
    return PipelineModel.load(path)



In [3]:
airline_delays = spark.read.load("/FileStore/tables/airline_delays.csv",
                             format="csv", header="true", inferSchema="true", sep=",")
airline_delays.printSchema()


In [4]:
display(airline_delays)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c46
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N282WN,1325,14843,SJU,"San Juan, PR",PR,13204,MCO,"Orlando, FL",FL,1105,1100.0,-5.0,0.0,0.0,-1.0,15.0,1115.0,1303.0,6.0,1315,1309.0,-6.0,0.0,0.0,-1.0,0.0,,0.0,190.0,189.0,168.0,1.0,1189.0,5,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N8686A,5159,14843,SJU,"San Juan, PR",PR,13204,MCO,"Orlando, FL",FL,1800,1752.0,-8.0,0.0,0.0,-1.0,12.0,1804.0,1949.0,6.0,2010,1955.0,-15.0,0.0,0.0,-1.0,0.0,,0.0,190.0,183.0,165.0,1.0,1189.0,5,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N430WN,5890,14843,SJU,"San Juan, PR",PR,13204,MCO,"Orlando, FL",FL,1545,1601.0,16.0,16.0,1.0,1.0,14.0,1615.0,1759.0,5.0,1755,1804.0,9.0,9.0,0.0,0.0,0.0,,0.0,190.0,183.0,164.0,1.0,1189.0,5,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N499WN,6618,14843,SJU,"San Juan, PR",PR,13204,MCO,"Orlando, FL",FL,600,553.0,-7.0,0.0,0.0,-1.0,14.0,607.0,751.0,6.0,810,757.0,-13.0,0.0,0.0,-1.0,0.0,,0.0,190.0,184.0,164.0,1.0,1189.0,5,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N8522P,1701,14843,SJU,"San Juan, PR",PR,13232,MDW,"Chicago, IL",IL,1620,1614.0,-6.0,0.0,0.0,-1.0,11.0,1625.0,1908.0,25.0,1925,1933.0,8.0,8.0,0.0,0.0,0.0,,0.0,305.0,319.0,283.0,1.0,2057.0,9,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N8639B,844,14843,SJU,"San Juan, PR",PR,15304,TPA,"Tampa, FL",FL,1715,1737.0,22.0,22.0,1.0,1.0,14.0,1751.0,1943.0,5.0,1925,1948.0,23.0,23.0,1.0,1.0,0.0,,0.0,190.0,191.0,172.0,1.0,1237.0,5,0.0,0.0,1.0,0.0,22.0,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N8530W,4679,14843,SJU,"San Juan, PR",PR,15304,TPA,"Tampa, FL",FL,1445,1441.0,-4.0,0.0,0.0,-1.0,18.0,1459.0,1648.0,3.0,1655,1651.0,-4.0,0.0,0.0,-1.0,0.0,,0.0,190.0,190.0,169.0,1.0,1237.0,5,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N205WN,6294,14869,SLC,"Salt Lake City, UT",UT,10800,BUR,"Burbank, CA",CA,1950,2019.0,29.0,29.0,1.0,1.0,15.0,2034.0,2103.0,2.0,2045,2105.0,20.0,20.0,1.0,1.0,0.0,,0.0,115.0,106.0,89.0,1.0,574.0,3,1.0,0.0,0.0,0.0,19.0,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N720WN,5245,14869,SLC,"Salt Lake City, UT",UT,11259,DAL,"Dallas, TX",TX,535,533.0,-2.0,0.0,0.0,-1.0,25.0,558.0,900.0,8.0,925,908.0,-17.0,0.0,0.0,-2.0,0.0,,0.0,170.0,155.0,122.0,1.0,999.0,4,,,,,,
2018,1,2,2,2018-01-02T00:00:00.000+0000,WN,N222WN,2278,14869,SLC,"Salt Lake City, UT",UT,11292,DEN,"Denver, CO",CO,1245,1242.0,-3.0,0.0,0.0,-1.0,21.0,1303.0,1357.0,5.0,1405,1402.0,-3.0,0.0,0.0,-1.0,0.0,,0.0,80.0,80.0,54.0,1.0,391.0,2,,,,,,


In [5]:
print(airline_delays.count())
data_types_map = get_types(airline_delays)

In [6]:
airline_delays = cast_integer_features_as_double(airline_delays, data_types_map)

In [7]:
airline_delays.printSchema()

In [8]:
target = 'DEP_DEL15'
eda_categorical_feature(airline_delays, target)

In [9]:
airline_delays_not_null = filter_rows_with_missing_labels(airline_delays, target)


In [10]:
eda_categorical_feature(airline_delays_not_null, target)

In [11]:
#split the data into train and validation sets
airline_delays_train, airline_delays_test = split_data(airline_delays_not_null, 0.75)

#persist the train and test frames
airline_delays_train.persist(); 
print(airline_delays_train.count());
airline_delays_test.persist(); 
print(airline_delays_test.count());


In [12]:
get_missing_info(airline_delays_train)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c46
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,172,172,543,543,0,543,1373,1373,1373,1373,0,414382,0,0,1233,1233,0,0,0,341223,341223,341223,341223,341223,414637


In [13]:
#impute categorical features
airline_delays_train = impute_categorical_features(airline_delays_train, data_types_map)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c46
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,172,172,543,543,0,543,1373,1373,1373,1373,0,0,0,0,1233,1233,0,0,0,341223,341223,341223,341223,341223,0


In [14]:
#imputer for continuous features
imputer = imputer_continuous_features(airline_delays_train, data_types_map)

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c46,ACTUAL_ELAPSED_TIME_imputed,AIR_TIME_imputed,CANCELLED_imputed,ARR_DELAY_NEW_imputed,FLIGHTS_imputed,SECURITY_DELAY_imputed,DEP_DELAY_imputed,ARR_DEL15_imputed,NAS_DELAY_imputed,DIVERTED_imputed,TAXI_IN_imputed,CRS_ELAPSED_TIME_imputed,CARRIER_DELAY_imputed,LATE_AIRCRAFT_DELAY_imputed,ARR_DELAY_imputed,DEP_DELAY_NEW_imputed,DISTANCE_imputed,TAXI_OUT_imputed,WEATHER_DELAY_imputed
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,172,172,543,543,0,543,1373,1373,1373,1373,0,0,0,0,1233,1233,0,0,0,341223,341223,341223,341223,341223,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [15]:
#encoder for categorical features
encoder = encoder_categorical_features(airline_delays_train, data_types_map)

In [16]:
#assembler for vector data
assembler = assembler_for_feature_vector(airline_delays_train, data_types_map)

In [17]:
pipeline = Pipeline(stages=imputer + encoder + assembler)
tmp = pipeline.fit(airline_delays_train).transform(airline_delays_train)
tmp.printSchema()

In [18]:
#define the estimator
lr = LogisticRegression(featuresCol='features', labelCol = target)

# define the modeling pipeline with formula + feature transofrmations + estimator
pipeline = Pipeline(stages=imputer + encoder + assembler + [lr])

#define binary classification evaluator with right metric
evaluator = BinaryClassificationEvaluator(labelCol=target, metricName="areaUnderROC")

# Define the parameter grid for random forest
param_grid = ParamGridBuilder() \
             .addGrid(lr.regParam, [0.01, 0.5]) \
             .addGrid(lr.elasticNetParam, [0.5, 1.0]) \
             .addGrid(lr.maxIter, [1, 5]) \
             .build()

cv_model = build_and_tune_model_with_cv(pipeline, param_grid, evaluator, airline_delays_train)


In [19]:
model_summary_lr(cv_model)

In [20]:
predict(cv_model, evaluator, airline_delays_test)

In [21]:
persist_model(cv_model, "lr", "/FileStore/tables/")

In [22]:
loaded_model = load_model("/FileStore/tables/cv_model_lr_10-06-2018-1538862687")
print(loaded_model)
predict(loaded_model, evaluator, airline_delays_test)