<a href="https://colab.research.google.com/github/imdhanush13/Flight-prediction/blob/main/Flight_Delay_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=9c0ec412bd75bf66f38f8c08d8b40951e76090e3d61b20b675efb52716c4a836
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
spark=SparkSession.builder.appName("bda_mini").getOrCreate()

In [None]:
df = spark.read.csv('/content/flights.csv', header = True)

In [None]:
df.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [None]:
# filtering columns
model_data = df.select('MONTH', 'DAY_OF_WEEK', 'AIRLINE', 'TAIL_NUMBER', 'DESTINATION_AIRPORT', 'AIR_TIME', 'DISTANCE', 'ARRIVAL_DELAY',)

# Remove missing values
model_data = model_data.filter("ARRIVAL_DELAY is not NULL and AIRLINE is not NULL and AIR_TIME is not NULL and TAIL_NUMBER is not NULL")

# rows left
model_data.count()

40506

In [None]:
# Create is_late (label)
model_data = model_data.withColumn("is_late", model_data.ARRIVAL_DELAY > 0)

# cast
model_data = model_data.withColumn("is_late", model_data.is_late.cast("integer"))

# rename column
model_data = model_data.withColumnRenamed("is_late", 'label')

In [None]:
model_data.show(15)

+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
|MONTH|DAY_OF_WEEK|AIRLINE|TAIL_NUMBER|DESTINATION_AIRPORT|AIR_TIME|DISTANCE|ARRIVAL_DELAY|label|
+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
|    1|          4|     AS|     N407AS|                SEA|     169|    1448|          -22|    0|
|    1|          4|     AA|     N3KUAA|                PBI|     263|    2330|           -9|    0|
|    1|          4|     US|     N171US|                CLT|     266|    2296|            5|    1|
|    1|          4|     AA|     N3HYAA|                MIA|     258|    2342|           -9|    0|
|    1|          4|     AS|     N527AS|                ANC|     199|    1448|          -21|    0|
|    1|          4|     DL|     N3730B|                MSP|     206|    1589|            8|    1|
|    1|          4|     NK|     N635NK|                MSP|     154|    1299|          -17|    0|
|    1|          4| 

In [None]:
# Set the fraction of data you want to sample (e.g., 0.2 for 20%)
sample_fraction = 0.1

# Randomly sample the data
model_data = model_data.sample(fraction=sample_fraction, seed=42)

# Show the first few rows of the sampled dataset
model_data.show()

+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
|MONTH|DAY_OF_WEEK|AIRLINE|TAIL_NUMBER|DESTINATION_AIRPORT|AIR_TIME|DISTANCE|ARRIVAL_DELAY|label|
+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
|    1|          4|     US|     N584UW|                CLT|     228|    2125|          -10|    0|
|    1|          4|     DL|     N3743H|                SEA|     171|    1448|          -24|    0|
|    1|          4|     AS|     N413AS|                PDX|     187|    1542|          -18|    0|
|    1|          4|     B6|     N307JB|                MCO|     163|    1189|           85|    1|
|    1|          4|     HA|     N476HA|                KOA|      30|     163|            5|    1|
|    1|          4|     NK|     N616NK|                MYR|     112|     738|          -10|    0|
|    1|          4|     US|     N676AW|                PHX|     146|    1107|           -2|    0|
|    1|          4| 

In [None]:
model_data.count()

4002

In [None]:
print('Labels distrubution:')
model_data.groupBy('label').count().show()

Labels distrubution:
+-----+-----+
|label|count|
+-----+-----+
|    1| 1992|
|    0| 2010|
+-----+-----+



In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# Create a StringIndexer
airline_indexer = StringIndexer(inputCol="AIRLINE", outputCol="airline_index")

# Create a OneHotEncoder
airline_encoder = OneHotEncoder(inputCol="airline_index", outputCol="airline_fact")

In [None]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

In [None]:
# Create a StringIndexer
day_indexer = StringIndexer(inputCol="DAY_OF_WEEK", outputCol="day_of_week_index")

# Create a OneHotEncoder
day_encoder = OneHotEncoder(inputCol="day_of_week_index", outputCol="day_of_week_fact")

In [None]:
# Create a StringIndexer
air_indexer = StringIndexer(inputCol="AIR_TIME", outputCol="air_time_index")

# Create a OneHotEncoder
air_encoder = OneHotEncoder(inputCol="air_time_index", outputCol="air_time_fact")

In [None]:
# Create a StringIndexer
dis_indexer = StringIndexer(inputCol="DISTANCE", outputCol="distance_index")

# Create a OneHotEncoder
dis_encoder = OneHotEncoder(inputCol="distance_index", outputCol="distance_fact")

In [None]:
# Create a StringIndexer
tail_indexer = StringIndexer(inputCol="TAIL_NUMBER", outputCol="tail_index")

# Create a OneHotEncoder
tail_encoder = OneHotEncoder(inputCol="tail_index", outputCol="tail_fact")

In [None]:
from pyspark.ml.feature import VectorAssembler

# Make a VectorAssembler of 'MONTH', 'DAY_OF_WEEK', 'AIR_TIME', 'DISTANCE', 'ARRIVAL_DELAY','AIRLINE', 'TAIL_NUMBER', 'DESTINATION_AIRPORT'
vec_assembler = VectorAssembler(inputCols=["day_of_week_fact", "air_time_fact", "distance_fact", "airline_fact", "dest_fact", "tail_fact"], outputCol="features")

In [None]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, airline_indexer, airline_encoder,day_indexer,day_encoder,air_indexer,air_encoder,dis_indexer,dis_encoder, tail_indexer, tail_encoder, vec_assembler])

In [None]:
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [None]:
train_data, test_data = piped_data.randomSplit([.7, .3])

## Logistic Regression

In [None]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

In [None]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.MulticlassClassificationEvaluator(metricName="accuracy")

In [None]:
# Import the tuning submodule
import pyspark.ml.tuning as tune
import numpy as np
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

In [None]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator)

In [None]:
# Call lr.fit()
best_lr = lr.fit(train_data)

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_ccee20d5f289, numClasses=2, numFeatures=4043


In [None]:
# Use the model to predict the test set
test_results = best_lr.transform(test_data)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.5252692626346314


## After Hyper-parameter Tuning

In [None]:
cvModel = cv.fit(train_data)
cvPreds = cvModel.transform(test_data)
evaluator.evaluate(cvPreds)

0.6014913007456504

## Decision Tree Classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train_data)
predictions = dtModel.transform(test_data)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Accuracy: 0.58
Test Area Under ROC: 0.5803638662159563


## After Hyper Parameter Tuning

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder() .addGrid(dt.maxDepth, [4, 6, 7]) .addGrid(dt.minInstancesPerNode, [1, 3, 5]) .build()

In [None]:
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train_data)
cvPreds = cvModel.transform(test_data)
evaluator.evaluate(cvPreds)

0.595311512353313

## Random forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_data)
predictions = rfModel.transform(test_data)

In [None]:
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Accuracy: 0.61
Test Area Under ROC: 0.6114134168796551


# After Hyper Parameter Tuning

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .build()

# Create a CrossValidator with 5-fold cross-validation
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5,
                          parallelism=2)

In [None]:
cvModel = crossval.fit(train_data)
cvPreds = cvModel.transform(test_data)
evaluator.evaluate(cvPreds)

0.6491535438480779

## GRADIENT-BOOSTED TREE CLASSIFIER

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train_data)
predictions = gbtModel.transform(test_data)

In [None]:
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Accuracy: 0.63
Test Area Under ROC: 0.6349108197982799


## After Hyper Parameter Tuning

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [None]:
# Run cross validations.
# This can take some minutes since it is training over 20 trees!
cvModel = cv.fit(train_data)
cvPreds = cvModel.transform(test_data)
evaluator.evaluate(cvPreds)

0.647356198642373