
# Flight Delay prediction in Pyspark

In [1]:
#Import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Spark context
sc

In [3]:
#sc master-running locally
sc.master

'local[*]'

#Read the input file from Hadoop Distributed File System

In [4]:
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('hdfs://localhost:9000/user1/weather_flightdelay.csv')

                                                                                

In [5]:
#Display the first three records of our dataframe
df.show(n=5,truncate=False, vertical=True)

2023-09-25 10:14:13,584 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0-----------------------------------------------
 MONTH                         | 1                      
 DAY_OF_WEEK                   | 7                      
 DEP_DEL15                     | 0                      
 DEP_TIME_BLK                  | 0800-0859              
 DISTANCE_GROUP                | 2                      
 SEGMENT_NUMBER                | 1                      
 CONCURRENT_FLIGHTS            | 25                     
 NUMBER_OF_SEATS               | 143                    
 CARRIER_NAME                  | Southwest Airlines Co. 
 AIRPORT_FLIGHTS_MONTH         | 13056                  
 AIRLINE_FLIGHTS_MONTH         | 107363                 
 AIRLINE_AIRPORT_FLIGHTS_MONTH | 5873                   
 AVG_MONTHLY_PASS_AIRPORT      | 1903352                
 AVG_MONTHLY_PASS_AIRLINE      | 13382999               
 FLT_ATTENDANTS_PER_PASS       | 6.178236301460919E-5   
 GROUND_SERV_PER_PASS          | 9.889412309998219E-5   
 PLANE_AGE                     

In [6]:
#Display the number of rows in the dataframe
df.count()

                                                                                

6489062

In [7]:
df.describe().show(truncate=False, vertical=True)



-RECORD 0-----------------------------------------------------
 summary                       | count                        
 MONTH                         | 6489062                      
 DAY_OF_WEEK                   | 6489062                      
 DEP_DEL15                     | 6489062                      
 DEP_TIME_BLK                  | 6489062                      
 DISTANCE_GROUP                | 6489062                      
 SEGMENT_NUMBER                | 6489062                      
 CONCURRENT_FLIGHTS            | 6489062                      
 NUMBER_OF_SEATS               | 6489062                      
 CARRIER_NAME                  | 6489062                      
 AIRPORT_FLIGHTS_MONTH         | 6489062                      
 AIRLINE_FLIGHTS_MONTH         | 6489062                      
 AIRLINE_AIRPORT_FLIGHTS_MONTH | 6489062                      
 AVG_MONTHLY_PASS_AIRPORT      | 6489062                      
 AVG_MONTHLY_PASS_AIRLINE      | 6489062               

                                                                                

### Exploratory data analysis (EDA)

In [8]:
#Display the dataframe schema(in tree format)
df.printSchema()

root
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- DEP_DEL15: integer (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- DISTANCE_GROUP: integer (nullable = true)
 |-- SEGMENT_NUMBER: integer (nullable = true)
 |-- CONCURRENT_FLIGHTS: integer (nullable = true)
 |-- NUMBER_OF_SEATS: integer (nullable = true)
 |-- CARRIER_NAME: string (nullable = true)
 |-- AIRPORT_FLIGHTS_MONTH: integer (nullable = true)
 |-- AIRLINE_FLIGHTS_MONTH: integer (nullable = true)
 |-- AIRLINE_AIRPORT_FLIGHTS_MONTH: integer (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRPORT: integer (nullable = true)
 |-- AVG_MONTHLY_PASS_AIRLINE: integer (nullable = true)
 |-- FLT_ATTENDANTS_PER_PASS: double (nullable = true)
 |-- GROUND_SERV_PER_PASS: double (nullable = true)
 |-- PLANE_AGE: integer (nullable = true)
 |-- DEPARTING_AIRPORT: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- PREVIOUS_AIRPORT: stri

### Have a peek of the first five observations.

In [9]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
MONTH,1,1,1,1,1
DAY_OF_WEEK,7,7,7,7,7
DEP_DEL15,0,0,0,0,0
DEP_TIME_BLK,0800-0859,0700-0759,0600-0659,0600-0659,0001-0559
DISTANCE_GROUP,2,7,7,9,7
SEGMENT_NUMBER,1,1,1,1,1
CONCURRENT_FLIGHTS,25,29,27,27,10
NUMBER_OF_SEATS,143,191,199,180,182
CARRIER_NAME,Southwest Airlines Co.,Delta Air Lines Inc.,Delta Air Lines Inc.,Delta Air Lines Inc.,Spirit Air Lines
AIRPORT_FLIGHTS_MONTH,13056,13056,13056,13056,13056


Drop 'LATITUDE' and 'LONGITUDE' columns

In [10]:
drop_columns = ['LATITUDE', 'LONGITUDE']
df = df.drop(*drop_columns)

### Checking if the dataset balanced

In [11]:
df.groupby('DEP_DEL15').count().toPandas()

                                                                                

Unnamed: 0,DEP_DEL15,count
0,1,1227368
1,0,5261694


#### Null Values

In [12]:
col_null_cnt_df =  df.select([
    count(when(col(c).isNull(),c)).alias(c) for c in df.columns]) 

In [13]:
col_null_cnt_df.show(truncate=False, vertical=True)



-RECORD 0----------------------------
 MONTH                         | 0   
 DAY_OF_WEEK                   | 0   
 DEP_DEL15                     | 0   
 DEP_TIME_BLK                  | 0   
 DISTANCE_GROUP                | 0   
 SEGMENT_NUMBER                | 0   
 CONCURRENT_FLIGHTS            | 0   
 NUMBER_OF_SEATS               | 0   
 CARRIER_NAME                  | 0   
 AIRPORT_FLIGHTS_MONTH         | 0   
 AIRLINE_FLIGHTS_MONTH         | 0   
 AIRLINE_AIRPORT_FLIGHTS_MONTH | 0   
 AVG_MONTHLY_PASS_AIRPORT      | 0   
 AVG_MONTHLY_PASS_AIRLINE      | 0   
 FLT_ATTENDANTS_PER_PASS       | 0   
 GROUND_SERV_PER_PASS          | 0   
 PLANE_AGE                     | 0   
 DEPARTING_AIRPORT             | 0   
 PREVIOUS_AIRPORT              | 0   
 PRCP                          | 0   
 SNOW                          | 0   
 SNWD                          | 0   
 TMAX                          | 0   
 AWND                          | 0   



                                                                                

#### Indexing Dataframe 

### Preprocess the data:

In [14]:
# Convert string features to numeric using StringIndexer
from pyspark.ml.feature import StringIndexer

# Example: Assuming 'feature1' is a string column
indexer = StringIndexer(inputCol="DEP_TIME_BLK", outputCol="DEP_TIME_BLK_numeric")
df = indexer.fit(df).transform(df)

                                                                                

In [15]:
indexer = StringIndexer(inputCol="PREVIOUS_AIRPORT",
                       outputCol="PREVIOUS_AIRPORT_numeric")
df = indexer.fit(df).transform(df)

                                                                                

In [16]:
indexer = StringIndexer(inputCol="CARRIER_NAME", outputCol="CARRIER_NAME_numeric")
df = indexer.fit(df).transform(df)

                                                                                

In [17]:
indexer = StringIndexer(inputCol="DEPARTING_AIRPORT",
                       outputCol="DEPARTING_AIRPORT_numeric")
df = indexer.fit(df).transform(df)

                                                                                

In [18]:
df.show(n=5,truncate=False, vertical=True)

-RECORD 0-----------------------------------------------
 MONTH                         | 1                      
 DAY_OF_WEEK                   | 7                      
 DEP_DEL15                     | 0                      
 DEP_TIME_BLK                  | 0800-0859              
 DISTANCE_GROUP                | 2                      
 SEGMENT_NUMBER                | 1                      
 CONCURRENT_FLIGHTS            | 25                     
 NUMBER_OF_SEATS               | 143                    
 CARRIER_NAME                  | Southwest Airlines Co. 
 AIRPORT_FLIGHTS_MONTH         | 13056                  
 AIRLINE_FLIGHTS_MONTH         | 107363                 
 AIRLINE_AIRPORT_FLIGHTS_MONTH | 5873                   
 AVG_MONTHLY_PASS_AIRPORT      | 1903352                
 AVG_MONTHLY_PASS_AIRLINE      | 13382999               
 FLT_ATTENDANTS_PER_PASS       | 6.178236301460919E-5   
 GROUND_SERV_PER_PASS          | 9.889412309998219E-5   
 PLANE_AGE                     

### Create feature vector

To enhance the speed and effectiveness of inference, we aim to transform our standard dataframe into a feature vector. Spark provides the Vector Assembler class for this purpose, allowing us to consolidate selected columns as input features into the feature vector series.

For optimal model training within Spark, we intend to include these chosen columns as components of our feature vector. The Vector Assembler will compile the data from these columns into a unified series of Vectors. This consolidated feature vector will be subsequently utilized for training our model effectively.

In [19]:
# Assemble all the features with VectorAssembler
required_features = ['MONTH', 'DAY_OF_WEEK', 
                     'DISTANCE_GROUP','SEGMENT_NUMBER','CONCURRENT_FLIGHTS', 
                     'NUMBER_OF_SEATS','CARRIER_NAME_numeric','AIRPORT_FLIGHTS_MONTH',
                     'AIRLINE_FLIGHTS_MONTH','AIRLINE_AIRPORT_FLIGHTS_MONTH',
                     'AVG_MONTHLY_PASS_AIRPORT', 'AVG_MONTHLY_PASS_AIRLINE', 
                     'FLT_ATTENDANTS_PER_PASS', 'GROUND_SERV_PER_PASS', 
                     'PLANE_AGE', 'DEP_TIME_BLK_numeric','DEPARTING_AIRPORT_numeric','PREVIOUS_AIRPORT_numeric', 'PRCP', 
                     'SNOW', 'SNWD', 'TMAX','AWND']

#from pyspark.ml.feature import VectorAssembler
#assembler = VectorAssembler(inputCols=required_features, outputCol='features')
#transformed_data = assembler.transform(df) 

### Scale the dataset

In [22]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
#df = VectorAssembler(inputCols= required_features, outputCol='feature', handleInvalid="keep").transform(df)
df = MinMaxScaler(min=0.0, max=1.0,inputCol='feature', outputCol='feature_scaled').fit(df).transform(df)

                                                                                

### Split the data into train & test

In [26]:
# Split the data
training_data, test_data = df.select('DEP_DEL15','feature_scaled').randomSplit([0.8,0.2], seed =2020)
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

                                                                                

Training Dataset Count: 5191634




Test Dataset Count: 1297428


                                                                                

In [27]:
training_data.show(1,truncate=False, vertical=True)

[Stage 41:>                                                         (0 + 1) / 1]

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DEP_DEL15      | 0                                                                                                                                                                                                                                                      
 feature_scaled | (23,[0,1,4,5,6,7,8,9,10,11,13,14,21,22],[0.09090909090909091,0.5,0.7129629629629629,0.020477815699658702,0.625,0.7878849982433541,0.10781481283327092,0.12204616230078769,1.0,0.05725000106513143,0.3893521665283559,0.46875,0.72,0.3179396092362345]) 
only showing top 1 row



                                                                                

## Machine learning Model Constructing

In [29]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes 

### Decision Tree

Decision trees are a popular choice due to their interpretability, ability to handle categorical data, versatility in multiclass classification, independence from feature scaling, and effectiveness in capturing non-linear relationships and feature interdependencies.

In [32]:
# Create a DecisionTreeClassifier instance
from pyspark.ml.classification import RandomForestClassifier

dt = DecisionTreeClassifier(labelCol='DEP_DEL15', 
                            featuresCol='feature_scaled', 
                             maxBins=400)

# Fit the model to the training data
model = dt.fit(training_data);
# Make predictions
dt_predictions = model.transform(test_data) 

2023-09-25 10:46:19,158 WARN memory.MemoryStore: Not enough space to cache rdd_143_1 in memory! (computed 28.3 MiB so far)
2023-09-25 10:46:19,177 WARN storage.BlockManager: Persisting block rdd_143_1 to disk instead.
2023-09-25 10:46:19,497 WARN memory.MemoryStore: Not enough space to cache rdd_143_0 in memory! (computed 28.3 MiB so far)
2023-09-25 10:46:19,512 WARN storage.BlockManager: Persisting block rdd_143_0 to disk instead.
2023-09-25 10:46:47,130 WARN memory.MemoryStore: Not enough space to cache rdd_143_3 in memory! (computed 28.3 MiB so far)
2023-09-25 10:46:47,132 WARN storage.BlockManager: Persisting block rdd_143_3 to disk instead.
2023-09-25 10:46:47,297 WARN memory.MemoryStore: Not enough space to cache rdd_143_2 in memory! (computed 28.3 MiB so far)
2023-09-25 10:46:47,298 WARN storage.BlockManager: Persisting block rdd_143_2 to disk instead.
2023-09-25 10:47:08,797 WARN memory.MemoryStore: Not enough space to cache rdd_143_4 in memory! (computed 28.3 MiB so far)
2023-

##### Decision Tree Evaluation

In [33]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', metricName = 'accuracy')
print('Decision Tree Accuracy:', multi_evaluator.evaluate(dt_predictions)) 



Decision Tree Accuracy: 0.8106669503047568


                                                                                

<br>

### LogisticRegression

Logistic regression is the ideal regression analysis when the outcome or dependent variable is binary, meaning it has two possible categories. Similar to other regression techniques, logistic regression is employed for prediction. It allows us to model and elucidate the connection between a single binary dependent variable and one or more independent variables, which can be of nominal, ordinal, interval, or ratio levels. Logistic regression is the go-to method when dealing with a categorical target variable.

In [34]:
lr = LogisticRegression(labelCol="DEP_DEL15", featuresCol="feature_scaled")

# Fit the model to the training data
model = lr.fit(training_data)
# Make predictions
lr_predictions = model.transform(test_data) 
print ("Training is done!")

2023-09-25 10:53:20,324 WARN memory.MemoryStore: Not enough space to cache rdd_186_1 in memory! (computed 33.0 MiB so far)
2023-09-25 10:53:20,326 WARN storage.BlockManager: Persisting block rdd_186_1 to disk instead.
2023-09-25 10:53:20,412 WARN memory.MemoryStore: Not enough space to cache rdd_186_0 in memory! (computed 33.0 MiB so far)
2023-09-25 10:53:20,416 WARN storage.BlockManager: Persisting block rdd_186_0 to disk instead.
2023-09-25 10:53:50,160 WARN memory.MemoryStore: Not enough space to cache rdd_186_2 in memory! (computed 65.0 MiB so far)
2023-09-25 10:53:50,184 WARN storage.BlockManager: Persisting block rdd_186_2 to disk instead.
2023-09-25 10:53:53,146 WARN memory.MemoryStore: Not enough space to cache rdd_186_2 in memory! (computed 65.0 MiB so far)
2023-09-25 10:54:23,040 WARN memory.MemoryStore: Not enough space to cache rdd_186_5 in memory! (computed 65.0 MiB so far)
2023-09-25 10:54:23,048 WARN storage.BlockManager: Persisting block rdd_186_5 to disk instead.
2023-

2023-09-25 10:56:38,635 WARN memory.MemoryStore: Not enough space to cache rdd_186_5 in memory! (computed 33.0 MiB so far)
2023-09-25 10:56:39,645 WARN memory.MemoryStore: Not enough space to cache rdd_186_8 in memory! (computed 33.0 MiB so far)
2023-09-25 10:56:40,432 WARN memory.MemoryStore: Not enough space to cache rdd_186_0 in memory! (computed 33.0 MiB so far)
2023-09-25 10:56:40,443 WARN memory.MemoryStore: Not enough space to cache rdd_186_1 in memory! (computed 17.0 MiB so far)
2023-09-25 10:56:41,318 WARN memory.MemoryStore: Not enough space to cache rdd_186_2 in memory! (computed 33.0 MiB so far)
2023-09-25 10:56:41,363 WARN memory.MemoryStore: Not enough space to cache rdd_186_3 in memory! (computed 17.0 MiB so far)
2023-09-25 10:56:42,351 WARN memory.MemoryStore: Not enough space to cache rdd_186_5 in memory! (computed 33.0 MiB so far)
2023-09-25 10:56:43,402 WARN memory.MemoryStore: Not enough space to cache rdd_186_8 in memory! (computed 33.0 MiB so far)
2023-09-25 10:56

2023-09-25 10:58:00,782 WARN memory.MemoryStore: Not enough space to cache rdd_186_5 in memory! (computed 33.0 MiB so far)
2023-09-25 10:58:01,399 WARN memory.MemoryStore: Not enough space to cache rdd_186_8 in memory! (computed 33.0 MiB so far)
                                                                                

Training is done!


##### Logistic Regression Evaluation

In [36]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', metricName = 'accuracy')
print('Logistic Regression Accuracy:', multi_evaluator.evaluate(lr_predictions))



Logistic Regression Accuracy: 0.810461929293957


                                                                                

### Naive Bayes classifier

The Naive Bayes classifier is a probabilistic machine learning algorithm based on Bayes' theorem, which calculates the probability of a specific event occurring given prior knowledge of related events. It assumes that the features used to describe an instance are conditionally independent, simplifying the computation of probabilities. Despite this simplification (the "naive" assumption), the Naive Bayes classifier often performs remarkably well in various classification tasks. It's particularly effective for text categorization, spam filtering, and other applications where the features represent word occurrences or frequencies. The algorithm calculates the probability of each class for a given instance based on the observed features, and assigns the instance to the class with the highest probability.

In [37]:
nb = NaiveBayes(labelCol="DEP_DEL15", featuresCol="feature_scaled")

# Fit the model to the training data
model = nb.fit(training_data)
# Make predictions
nb_predictions = model.transform(test_data) 

                                                                                

##### Naive Bayes Evaluation

In [38]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', metricName = 'accuracy')
print('Naive Bayes Accuracy:', multi_evaluator.evaluate(nb_predictions))



Naive Bayes Accuracy: 0.8106045190947012


                                                                                

<br>

### Random Forest Classifier

The Random Forest Classifier is an ensemble machine learning algorithm that constructs multiple decision trees by using bootstrapped samples and random feature subsets from the training data. Each tree independently predicts the class for a given instance, and the final classification is determined by a majority vote from all the trees. This approach handles high-dimensional data efficiently.

In [42]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='DEP_DEL15', 
                            featuresCol='feature_scaled',
                            )
model = rf.fit(training_data)
rf_predictions = model.transform(test_data)

2023-09-25 11:38:20,900 WARN memory.MemoryStore: Not enough space to cache rdd_513_1 in memory! (computed 19.2 MiB so far)
2023-09-25 11:38:20,902 WARN storage.BlockManager: Persisting block rdd_513_1 to disk instead.
2023-09-25 11:38:21,695 WARN memory.MemoryStore: Not enough space to cache rdd_513_0 in memory! (computed 64.9 MiB so far)
2023-09-25 11:38:21,696 WARN storage.BlockManager: Persisting block rdd_513_0 to disk instead.
2023-09-25 11:38:53,175 WARN memory.MemoryStore: Not enough space to cache rdd_513_2 in memory! (computed 98.5 MiB so far)
2023-09-25 11:38:53,176 WARN storage.BlockManager: Persisting block rdd_513_2 to disk instead.
2023-09-25 11:38:53,624 WARN memory.MemoryStore: Not enough space to cache rdd_513_3 in memory! (computed 98.5 MiB so far)
2023-09-25 11:38:53,625 WARN storage.BlockManager: Persisting block rdd_513_3 to disk instead.
2023-09-25 11:39:29,389 WARN memory.MemoryStore: Not enough space to cache rdd_513_5 in memory! (computed 98.5 MiB so far)
2023-

In [43]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))



Random Forest classifier Accuracy: 0.8106045190947012


                                                                                

<br>

### Gradient-boosted Tree classifier Model

he Gradient-boosted Tree classifier is an ensemble learning method that combines the strength of multiple decision trees to create a powerful predictive model. It builds trees sequentially, with each subsequent tree focusing on correcting the errors made by the previous ones. During training, the algorithm assigns higher weight to misclassified instances, emphasizing areas where the model needs improvement. By optimizing a loss function through gradient descent, the algorithm continually refines its predictions, ultimately producing a robust classifier.

In [44]:
from pyspark.ml.classification import GBTClassifier
gb = GBTClassifier(labelCol = 'DEP_DEL15', featuresCol = 'feature_scaled')
gbModel = gb.fit(training_data)
gb_predictions = gbModel.transform(test_data)

2023-09-25 12:23:26,647 WARN memory.MemoryStore: Not enough space to cache rdd_574_1 in memory! (computed 26.6 MiB so far)
2023-09-25 12:23:26,649 WARN storage.BlockManager: Persisting block rdd_574_1 to disk instead.
2023-09-25 12:23:28,035 WARN memory.MemoryStore: Not enough space to cache rdd_574_0 in memory! (computed 63.2 MiB so far)
2023-09-25 12:23:28,035 WARN storage.BlockManager: Persisting block rdd_574_0 to disk instead.
2023-09-25 12:23:56,060 WARN memory.MemoryStore: Not enough space to cache rdd_574_2 in memory! (computed 63.2 MiB so far)
2023-09-25 12:23:56,061 WARN storage.BlockManager: Persisting block rdd_574_2 to disk instead.
2023-09-25 12:24:19,447 WARN memory.MemoryStore: Not enough space to cache rdd_574_4 in memory! (computed 41.6 MiB so far)
2023-09-25 12:24:19,452 WARN storage.BlockManager: Persisting block rdd_574_4 to disk instead.
2023-09-25 12:24:23,936 WARN memory.MemoryStore: Not enough space to cache rdd_574_5 in memory! (computed 26.6 MiB so far)
2023-

2023-09-25 12:31:56,226 WARN memory.MemoryStore: Not enough space to cache rdd_574_8 in memory! (computed 41.6 MiB so far)
2023-09-25 12:31:59,922 WARN memory.MemoryStore: Not enough space to cache rdd_574_0 in memory! (computed 26.6 MiB so far)
2023-09-25 12:31:59,931 WARN memory.MemoryStore: Not enough space to cache rdd_574_1 in memory! (computed 26.6 MiB so far)
2023-09-25 12:32:03,284 WARN memory.MemoryStore: Not enough space to cache rdd_574_2 in memory! (computed 63.2 MiB so far)
2023-09-25 12:32:03,299 WARN memory.MemoryStore: Not enough space to cache rdd_574_3 in memory! (computed 63.2 MiB so far)
2023-09-25 12:32:10,004 WARN memory.MemoryStore: Not enough space to cache rdd_574_8 in memory! (computed 63.2 MiB so far)
2023-09-25 12:32:10,012 WARN memory.MemoryStore: Not enough space to cache rdd_574_9 in memory! (computed 63.2 MiB so far)
2023-09-25 12:32:16,394 WARN memory.MemoryStore: Not enough space to cache rdd_574_2 in memory! (computed 26.6 MiB so far)
2023-09-25 12:32

2023-09-25 12:38:48,237 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 41.6 MiB so far)
2023-09-25 12:38:48,653 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 11.7 MiB so far)
2023-09-25 12:38:48,686 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 1027.9 KiB so far)
2023-09-25 12:39:03,786 WARN memory.MemoryStore: Not enough space to cache rdd_574_3 in memory! (computed 63.2 MiB so far)
2023-09-25 12:39:04,050 WARN memory.MemoryStore: Not enough space to cache rdd_574_2 in memory! (computed 63.2 MiB so far)
2023-09-25 12:39:09,941 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 26.6 MiB so far)
2023-09-25 12:39:12,094 WARN memory.MemoryStore: Not enough space to cache rdd_574_8 in memory! (computed 41.6 MiB so far)
2023-09-25 12:39:12,441 WARN memory.MemoryStore: Not enough space to cache rdd_574_9 in memory! (computed 17.6 MiB so far)
2023-09-25 12:

2023-09-25 12:46:27,163 WARN memory.MemoryStore: Not enough space to cache rdd_574_0 in memory! (computed 41.6 MiB so far)
2023-09-25 12:46:43,552 WARN memory.MemoryStore: Not enough space to cache rdd_574_6 in memory! (computed 63.2 MiB so far)
2023-09-25 12:46:44,065 WARN memory.MemoryStore: Not enough space to cache rdd_574_6 in memory! (computed 7.8 MiB so far)
2023-09-25 12:46:44,066 WARN memory.MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_574_6 in memory.
2023-09-25 12:46:44,066 WARN memory.MemoryStore: Not enough space to cache rdd_574_6 in memory! (computed 384.0 B so far)
2023-09-25 12:46:44,498 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 63.2 MiB so far)
2023-09-25 12:46:45,093 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 41.6 MiB so far)
2023-09-25 12:46:46,134 WARN memory.MemoryStore: Not enough space to cache rdd_574_7 in memory! (computed 63.2 MiB so

2023-09-25 12:50:07,909 WARN memory.MemoryStore: Not enough space to cache rdd_574_9 in memory! (computed 41.6 MiB so far)
2023-09-25 12:50:14,749 WARN memory.MemoryStore: Not enough space to cache rdd_574_4 in memory! (computed 41.6 MiB so far)
2023-09-25 12:50:17,228 WARN memory.MemoryStore: Not enough space to cache rdd_574_6 in memory! (computed 41.6 MiB so far)
2023-09-25 12:50:19,294 WARN memory.MemoryStore: Not enough space to cache rdd_574_8 in memory! (computed 41.6 MiB so far)
2023-09-25 12:50:23,621 WARN memory.MemoryStore: Not enough space to cache rdd_574_0 in memory! (computed 63.2 MiB so far)
2023-09-25 12:50:23,751 WARN memory.MemoryStore: Not enough space to cache rdd_574_1 in memory! (computed 63.2 MiB so far)
2023-09-25 12:50:29,950 WARN memory.MemoryStore: Not enough space to cache rdd_574_6 in memory! (computed 41.6 MiB so far)
2023-09-25 12:50:32,536 WARN memory.MemoryStore: Not enough space to cache rdd_574_8 in memory! (computed 41.6 MiB so far)
2023-09-25 12:50

2023-09-25 12:53:31,665 WARN memory.MemoryStore: Not enough space to cache rdd_574_3 in memory! (computed 63.2 MiB so far)
2023-09-25 12:53:31,766 WARN memory.MemoryStore: Not enough space to cache rdd_574_3 in memory! (computed 7.8 MiB so far)
2023-09-25 12:53:31,766 WARN memory.MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_574_3 in memory.
2023-09-25 12:53:31,766 WARN memory.MemoryStore: Not enough space to cache rdd_574_3 in memory! (computed 384.0 B so far)
2023-09-25 12:53:40,881 WARN memory.MemoryStore: Not enough space to cache rdd_574_6 in memory! (computed 41.6 MiB so far)
2023-09-25 12:53:55,335 WARN memory.MemoryStore: Not enough space to cache rdd_574_0 in memory! (computed 63.2 MiB so far)
2023-09-25 12:53:59,094 WARN memory.MemoryStore: Not enough space to cache rdd_574_2 in memory! (computed 63.2 MiB so far)
2023-09-25 12:54:01,514 WARN memory.MemoryStore: Not enough space to cache rdd_574_4 in memory! (computed 63.2 MiB so

In [45]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', metricName = 'accuracy')
print('Gradient-boosted Trees Accuracy:', multi_evaluator.evaluate(gb_predictions))



Gradient-boosted Trees Accuracy: 0.8133322234451545


                                                                                

### Artificial Nueral Network 

ANN model created with four layers. Input layer with 23 input features, followed by two hidden layers with 64 and  32. Output layer with 2 neurons.

In [47]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
mlp = MultilayerPerceptronClassifier(featuresCol='feature_scaled', labelCol='DEP_DEL15',
                                    maxIter=100, seed=42, layers=(23, 64, 32, 2), blockSize=32, stepSize=0.03, solver='l-bfgs')

#blockSize=32: The size of the blocks.
#stepSize=0.03: The learning rate for the optimization algorithm.
#solver='l-bfgs': The optimization algorithm to use. In this case, it's set to 'l-bfgs',
#which stands for Limited-memory Broyden-Fletcher-Goldfarb-Shanno.

In [48]:
# Train the model
model = mlp.fit(training_data) 

2023-09-25 13:25:52,693 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 47.2 MiB so far)
2023-09-25 13:25:52,710 WARN storage.BlockManager: Persisting block rdd_1036_0 to disk instead.
2023-09-25 13:25:56,440 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 47.2 MiB so far)
2023-09-25 13:25:56,440 WARN storage.BlockManager: Persisting block rdd_1036_1 to disk instead.
2023-09-25 13:26:32,877 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 20.9 MiB so far)
2023-09-25 13:26:32,878 WARN storage.BlockManager: Persisting block rdd_1036_2 to disk instead.
2023-09-25 13:26:33,340 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 47.2 MiB so far)
2023-09-25 13:26:33,340 WARN storage.BlockManager: Persisting block rdd_1036_3 to disk instead.
2023-09-25 13:27:04,700 WARN memory.MemoryStore: Not enough space to cache rdd_1036_5 in memory! (computed 13.6 MiB so f

2023-09-25 13:30:45,134 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 13:30:47,982 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:30:55,280 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 13.6 MiB so far)
2023-09-25 13:30:55,313 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 8.9 MiB so far)
2023-09-25 13:30:58,848 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 20.9 MiB so far)
2023-09-25 13:30:58,905 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 3.9 MiB so far)
2023-09-25 13:31:02,207 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 13:31:05,352 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25

2023-09-25 13:34:02,059 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:34:08,961 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 13.6 MiB so far)
2023-09-25 13:34:08,993 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 8.9 MiB so far)
2023-09-25 13:34:12,516 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 13.6 MiB so far)
2023-09-25 13:34:12,518 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 8.9 MiB so far)
2023-09-25 13:34:15,977 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 13:34:19,230 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:34:26,582 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 13.6 MiB so far)
2023-09-25

2023-09-25 13:41:01,060 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:41:12,894 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 13.6 MiB so far)
2023-09-25 13:41:12,917 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 8.9 MiB so far)
2023-09-25 13:41:16,687 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 20.9 MiB so far)
2023-09-25 13:41:16,828 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 3.9 MiB so far)
2023-09-25 13:41:19,960 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 13:41:24,470 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:41:37,342 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 13.6 MiB so far)
2023-09-25

2023-09-25 13:49:21,361 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:49:28,630 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 3.9 MiB so far)
2023-09-25 13:49:28,660 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 20.9 MiB so far)
2023-09-25 13:49:32,611 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 20.9 MiB so far)
2023-09-25 13:49:33,113 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 3.9 MiB so far)
2023-09-25 13:49:36,396 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 13:49:39,888 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:49:46,867 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 13.6 MiB so far)
2023-09-25

2023-09-25 13:56:42,671 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:56:53,308 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 13.6 MiB so far)
2023-09-25 13:56:53,363 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 8.9 MiB so far)
2023-09-25 13:57:00,135 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 13.6 MiB so far)
2023-09-25 13:57:00,160 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 8.9 MiB so far)
2023-09-25 13:57:04,865 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 13:57:08,128 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 13:57:15,787 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 8.9 MiB so far)
2023-09-25 

2023-09-25 14:03:35,371 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 14:03:42,259 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 13.6 MiB so far)
2023-09-25 14:03:42,290 WARN memory.MemoryStore: Not enough space to cache rdd_1036_0 in memory! (computed 8.9 MiB so far)
2023-09-25 14:03:45,821 WARN memory.MemoryStore: Not enough space to cache rdd_1036_3 in memory! (computed 3.9 MiB so far)
2023-09-25 14:03:45,828 WARN memory.MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 20.9 MiB so far)
2023-09-25 14:03:48,813 WARN memory.MemoryStore: Not enough space to cache rdd_1036_4 in memory! (computed 20.9 MiB so far)
2023-09-25 14:03:51,828 WARN memory.MemoryStore: Not enough space to cache rdd_1036_6 in memory! (computed 20.9 MiB so far)
2023-09-25 14:03:58,730 WARN memory.MemoryStore: Not enough space to cache rdd_1036_1 in memory! (computed 3.9 MiB so far)
2023-09-25 

In [49]:
# Evaluate the model
pred = model.transform(test_data)

### Evaluation

In [50]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', metricName = 'accuracy')
print('ANN without dropout layers:', multi_evaluator.evaluate(pred))



ANN without dropout layers: 0.8103879367487059


                                                                                

In [51]:
pred.show(10)

[Stage 634:>                                                        (0 + 1) / 1]

+---------+--------------------+--------------------+--------------------+----------+
|DEP_DEL15|      feature_scaled|       rawPrediction|         probability|prediction|
+---------+--------------------+--------------------+--------------------+----------+
|        0|(23,[0,4,5,6,7,8,...|[1.07008309571700...|[0.90762414421428...|       0.0|
|        0|(23,[0,4,5,6,7,8,...|[0.90881288811543...|[0.87882912772997...|       0.0|
|        0|(23,[0,5,6,7,8,9,...|[1.32631239707130...|[0.96166660361926...|       0.0|
|        0|(23,[1,2,4,5,6,7,...|[1.12214744793363...|[0.92603082480535...|       0.0|
|        0|(23,[1,2,4,5,6,7,...|[1.07465465643952...|[0.91772455021429...|       0.0|
|        0|(23,[1,2,4,5,6,7,...|[1.02423400045176...|[0.91077845147398...|       0.0|
|        0|(23,[1,2,4,5,6,7,...|[1.13446109668602...|[0.92691711528350...|       0.0|
|        0|(23,[1,2,4,5,6,7,...|[1.04889518164688...|[0.91354564126449...|       0.0|
|        0|(23,[1,2,4,5,6,7,...|[0.88698499985177...|[

                                                                                

In [2]:
# Stop the SparkSession
spark.stop()