# Predict airtravel delay

## Load data


In [2]:
df = spark.sql("SELECT DayOfWeek, DepTime, AirTime, CASE WHEN (ArrDelay < 60) THEN 0 ELSE 1 END AS DelayType FROM demodata.flightdelay")
df.count()

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 4, Finished, Available)

484551

In [3]:
display(df.head(15))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 25672267-38c7-483d-8535-71cb6de4d82d)

In [4]:
from pyspark import ml as sparkml
print(dir(sparkml))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 6, Finished, Available)

['Estimator', 'Model', 'Pipeline', 'PipelineModel', 'PredictionModel', 'Predictor', 'TorchDistributor', 'Transformer', 'UnaryTransformer', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'base', 'classification', 'clustering', 'common', 'evaluation', 'feature', 'fpm', 'image', 'linalg', 'param', 'pipeline', 'recommendation', 'regression', 'stat', 'torch', 'tree', 'tuning', 'util', 'wrapper']


## Determine features and label

In [5]:
print(df.columns)
featureslist = df.columns[:-1]
print(featureslist)
label = df.columns[-1]
print(label)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 7, Finished, Available)

['DayOfWeek', 'DepTime', 'AirTime', 'DelayType']
['DayOfWeek', 'DepTime', 'AirTime']
DelayType


In [6]:
# VectorAssembler builds a 2 column dataset: features and labels
from pyspark.ml.feature import VectorAssembler
myVA = VectorAssembler(inputCols=featureslist, outputCol="features")
display(myVA.transform(df).head(15))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, 194a3eb8-1b40-4eec-b1e2-bd70c9801355)

## Pick a machine learning algorithm

In [7]:
print(dir(sparkml.classification))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 9, Finished, Available)



In [8]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(featuresCol="features", labelCol=label)
print(dtc.explainParams())

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 10, Finished, Available)

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: DelayType)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for disc

## Let's build a pipeline

In [9]:
from pyspark.ml import Pipeline, Estimator
sqlbitspipeline = Pipeline(stages=[myVA,dtc])
print(isinstance(sqlbitspipeline, Estimator))
print(dir(sqlbitspipeline))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 11, Finished, Available)

True
['__abstractmethods__', '__annotations__', '__class__', '__class_getitem__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__orig_bases__', '__parameters__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_abc_impl', '_copyValues', '_copy_params', '_defaultParamMap', '_dummy', '_fit', '_from_java', '_gorilla_active_patch_fit', '_gorilla_original_fit', '_input_kwargs', '_is_protocol', '_paramMap', '_params', '_randomUID', '_resetUid', '_resolveParam', '_set', '_setDefault', '_shouldOwn', '_testOwnParam', '_to_java', 'clear', 'copy', 'explainParam', 'explainParams', 'extractParamMap', 'fit', 'fitMultiple', 'getOrDefault', 'getParam', 'getStages', 'hasDefault', 'hasParam', 'isDefined', 'isSet', 'load', 'params', 'read', 'save', 'set', 

## Split training and test data set

In [10]:
train, test = df.randomSplit([0.7,0.3])
print('training set size',train.count())
print('test set size',test.count())

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 12, Finished, Available)

training set size 339635
test set size 144916


## Run, pipeline, run!

In [11]:
predictor = sqlbitspipeline.fit(train)
predictions = predictor.transform(test)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 13, Finished, Available)



In [12]:
print(predictor.stages[1].toDebugString)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 14, Finished, Available)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c2be9eaf191b, depth=5, numNodes=21, numClasses=2, numFeatures=3
  If (feature 1 <= 2220.5)
   Predict: 0.0
  Else (feature 1 > 2220.5)
   If (feature 2 <= 206.5)
    If (feature 1 <= 2314.5)
     Predict: 1.0
    Else (feature 1 > 2314.5)
     If (feature 2 <= 47.5)
      Predict: 1.0
     Else (feature 2 > 47.5)
      If (feature 2 <= 184.5)
       Predict: 1.0
      Else (feature 2 > 184.5)
       Predict: 0.0
   Else (feature 2 > 206.5)
    If (feature 2 <= 324.5)
     If (feature 2 <= 232.5)
      If (feature 1 <= 2314.5)
       Predict: 1.0
      Else (feature 1 > 2314.5)
       Predict: 0.0
     Else (feature 2 > 232.5)
      Predict: 0.0
    Else (feature 2 > 324.5)
     If (feature 0 <= 5.5)
      Predict: 1.0
     Else (feature 0 > 5.5)
      If (feature 0 <= 6.5)
       Predict: 0.0
      Else (feature 0 > 6.5)
       Predict: 1.0



In [13]:
display(predictions.head(15))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 85b48245-3eed-4112-94f5-53bb67067dd5)

## Let's evaluate the quality

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
eval = BinaryClassificationEvaluator(labelCol='DelayType', rawPredictionCol='prediction')
eval.evaluate(predictions)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 16, Finished, Available)



0.5284200050049226

In [15]:
from sklearn.metrics import classification_report, confusion_matrix

y_true = predictions.select(['DelayType']).collect()
y_pred = predictions.select(['prediction']).collect()
print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true,y_pred))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 17, Finished, Available)

              precision    recall  f1-score   support

           0       0.67      0.98      0.79     94702
           1       0.69      0.07      0.13     50214

    accuracy                           0.67    144916
   macro avg       0.68      0.53      0.46    144916
weighted avg       0.68      0.67      0.57    144916

[[93037  1665]
 [46477  3737]]


## Back to the drawing board

1. More features
1. Hyperparameterization
1. More ML algorithms

## FLAML to the recue!

In [16]:
from flaml import AutoML
automl = AutoML()

automl_settings = {
    "time_budget": 100,  # in seconds
    "metric": 'accuracy',
    "task": 'classification',
    "log_file_name": "flightdelay.log",
}

pandastrain = train.toPandas()
automl.fit(X_train=pandastrain[featureslist], y_train=pandastrain[label],
           **automl_settings)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 18, Finished, Available)

[flaml.automl.logger: 03-21 07:51:05] {1750} INFO - task = classification
[flaml.automl.logger: 03-21 07:51:05] {1761} INFO - Evaluation method: holdout
[flaml.automl.logger: 03-21 07:51:05] {1859} INFO - Minimizing error metric: 1-accuracy
[flaml.automl.logger: 03-21 07:51:07] {1977} INFO - List of ML learners in AutoML Run: ['lgbm', 'rf', 'xgboost', 'extra_tree', 'xgb_limitdepth', 'sgd', 'catboost', 'lrl1']
[flaml.automl.logger: 03-21 07:51:07] {2272} INFO - iteration 0, current learner lgbm
[flaml.automl.logger: 03-21 07:51:07] {2401} INFO - Estimated sufficient time budget=134545s. Estimated necessary time budget=3317s.




[flaml.automl.logger: 03-21 07:51:30] {2450} INFO -  at 2.9s,	estimator lgbm's best error=0.3429,	best estimator lgbm's best error=0.3429
[flaml.automl.logger: 03-21 07:51:30] {2272} INFO - iteration 1, current learner lgbm




[flaml.automl.logger: 03-21 07:51:49] {2450} INFO -  at 25.7s,	estimator lgbm's best error=0.3429,	best estimator lgbm's best error=0.3429
[flaml.automl.logger: 03-21 07:51:49] {2272} INFO - iteration 2, current learner lgbm




[flaml.automl.logger: 03-21 07:52:07] {2450} INFO -  at 44.2s,	estimator lgbm's best error=0.3255,	best estimator lgbm's best error=0.3255
[flaml.automl.logger: 03-21 07:52:07] {2272} INFO - iteration 3, current learner lgbm




[flaml.automl.logger: 03-21 07:52:25] {2450} INFO -  at 62.9s,	estimator lgbm's best error=0.3234,	best estimator lgbm's best error=0.3234
[flaml.automl.logger: 03-21 07:52:25] {2272} INFO - iteration 4, current learner lgbm




[flaml.automl.logger: 03-21 07:52:44] {2450} INFO -  at 80.9s,	estimator lgbm's best error=0.3234,	best estimator lgbm's best error=0.3234
[flaml.automl.logger: 03-21 07:52:44] {2272} INFO - iteration 5, current learner lgbm




[flaml.automl.logger: 03-21 07:53:02] {2450} INFO -  at 99.2s,	estimator lgbm's best error=0.3230,	best estimator lgbm's best error=0.3230
[flaml.automl.logger: 03-21 07:53:02] {2693} INFO - retrain lgbm for 0.2s
[flaml.automl.logger: 03-21 07:53:02] {2696} INFO - retrained model: LGBMClassifier(colsample_bytree=0.8304072431299575,
               learning_rate=0.7590459488450938, max_bin=255,
               min_child_samples=5, n_estimators=1, n_jobs=-1, num_leaves=5,
               reg_alpha=0.0019513780315197576, reg_lambda=0.04792552866398477,
               verbose=-1)
[flaml.automl.logger: 03-21 07:53:02] {2697} INFO - Auto Feature Engineering pipeline: None
[flaml.automl.logger: 03-21 07:53:02] {2699} INFO - Best MLflow run name: 
[flaml.automl.logger: 03-21 07:53:02] {2700} INFO - Best MLflow run id: 8fd4103e-db3c-43ee-a60f-72cbf1c4dd06
[flaml.automl.logger: 03-21 07:53:02] {2007} INFO - fit succeeded
[flaml.automl.logger: 03-21 07:53:02] {2008} INFO - Time taken to find the bes

In [17]:
print(automl.model.estimator)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 19, Finished, Available)

LGBMClassifier(colsample_bytree=0.8304072431299575,
               learning_rate=0.7590459488450938, max_bin=255,
               min_child_samples=5, n_estimators=1, n_jobs=-1, num_leaves=5,
               reg_alpha=0.0019513780315197576, reg_lambda=0.04792552866398477,
               verbose=-1)


In [18]:
print(dir(automl))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 20, Finished, Available)

['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__sklearn_clone__', '__str__', '__subclasshook__', '__version__', '__weakref__', '_active_estimators', '_auto_augment', '_best_estimator', '_best_iteration', '_build_request_for_signature', '_check_feature_names', '_check_n_features', '_config_history', '_decide_eval_method', '_df', '_early_stop', '_eci', '_ensemble', '_estimator_index', '_estimator_type', '_feature_names_in_', '_featurization', '_force_cancel', '_fullsize_reached', '_get_default_requests', '_get_metadata_request', '_get_param_names', '_get_tags', '_hpo_method', '_iter_per_learner', '_iter_per_learner_fullsize', '_label_transformer', '_learner_selector', '_log_trial', '_log_type', '

In [25]:
testPandas = test.toPandas()
prediction_arr = automl.predict(testPandas[featureslist])
predictions = testPandas
predictions['prediction'] = prediction_arr
display(predictions)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 27, Finished, Available)

SynapseWidget(Synapse.DataFrame, e950395e-3d5e-4f6c-a2ee-ead2ce5d210a)

In [26]:
y_true = predictions['DelayType']
y_pred = predictions['prediction']
print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true,y_pred))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 28, Finished, Available)

              precision    recall  f1-score   support

           0       0.67      0.97      0.80     94702
           1       0.69      0.11      0.19     50214

    accuracy                           0.67    144916
   macro avg       0.68      0.54      0.50    144916
weighted avg       0.68      0.67      0.59    144916

[[92083  2619]
 [44507  5707]]


## Add more features

In [28]:
df = spark.sql("SELECT DayOfWeek, DepTime, AirTime, UniqueCarrier, Origin, Dest, CASE WHEN (ArrDelay < 60) THEN 0 ELSE 1 END AS DelayType FROM demodata.flightdelay")
automl_settings = {
    "time_budget": 500,  # in seconds
    "metric": 'accuracy',
    "task": 'classification',
    "log_file_name": "flightdelay.log",
}
train, test = df.randomSplit([0.7,0.3])
pandastrain = train.toPandas()
featureslist = df.columns[:-1]
automl.fit(X_train=pandastrain[featureslist], y_train=pandastrain[label],
           **automl_settings)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 30, Finished, Available)

[flaml.automl.logger: 03-21 08:12:12] {1750} INFO - task = classification
[flaml.automl.logger: 03-21 08:12:12] {1761} INFO - Evaluation method: holdout
[flaml.automl.logger: 03-21 08:12:12] {1859} INFO - Minimizing error metric: 1-accuracy
[flaml.automl.logger: 03-21 08:12:12] {1977} INFO - List of ML learners in AutoML Run: ['lgbm', 'rf', 'xgboost', 'extra_tree', 'xgb_limitdepth', 'sgd', 'catboost', 'lrl1']
[flaml.automl.logger: 03-21 08:12:12] {2272} INFO - iteration 0, current learner lgbm
[flaml.automl.logger: 03-21 08:12:12] {2401} INFO - Estimated sufficient time budget=12199s. Estimated necessary time budget=301s.
[flaml.automl.logger: 03-21 08:12:12] {2450} INFO -  at 0.8s,	estimator lgbm's best error=0.3451,	best estimator lgbm's best error=0.3451
[flaml.automl.logger: 03-21 08:12:12] {2272} INFO - iteration 1, current learner lgbm
[flaml.automl.logger: 03-21 08:12:12] {2450} INFO -  at 0.8s,	estimator lgbm's best error=0.3451,	best estimator lgbm's best error=0.3451
[flaml.a

In [29]:
print(automl.model.estimator)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 31, Finished, Available)

LGBMClassifier(colsample_bytree=0.9429529415401117,
               learning_rate=0.17149110323428812, max_bin=255,
               min_child_samples=3, n_estimators=1, n_jobs=-1, num_leaves=4032,
               reg_alpha=0.06148057774142418, reg_lambda=0.10402728169360116,
               verbose=-1)


In [30]:
testPandas = test.toPandas()
prediction_arr = automl.predict(testPandas[featureslist])
predictions = testPandas
predictions['prediction'] = prediction_arr
display(predictions)

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 32, Finished, Available)

SynapseWidget(Synapse.DataFrame, 06c0db12-da57-4fcc-9bea-abecdf62a1a0)

In [31]:
y_true = predictions['DelayType']
y_pred = predictions['prediction']
print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true,y_pred))

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 33, Finished, Available)

              precision    recall  f1-score   support

           0       0.79      0.89      0.84     95811
           1       0.72      0.53      0.61     49672

    accuracy                           0.77    145483
   macro avg       0.75      0.71      0.73    145483
weighted avg       0.77      0.77      0.76    145483

[[85608 10203]
 [23131 26541]]


In [33]:
(85608 + 26541) / ( 26541 + 23131 + 10203 + 85608 )

StatementMeta(, 4797ad00-b78c-4ff3-9bf9-e026ac033a53, 35, Finished, Available)

0.7708735728573098