In [37]:
###IMPORTING THE LIBRARIES
#Session
from pyspark.sql import SparkSession

##Functions for creating the structure
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

##Functions for Data engineering
from pyspark.sql.functions import col,sum,avg,round,monotonically_increasing_id,lit

##Functions for the model
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler,Bucketizer
from pyspark.ml import Pipeline

###Functions for NLP
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

##Importing the Machine learning models
from pyspark.ml.regression import GBTRegressor

##Importing the evaluators
from pyspark.ml.evaluation import RegressionEvaluator

##Grid and cross validator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder,CrossValidator

#CREATING THE SESSION
sesion=SparkSession.builder.appName("sesion").getOrCreate()

In [15]:
# Read data from CSV file
flights = sesion.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
print(flights.dtypes)

The data contain 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| NULL|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| NULL|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


In [16]:
##FINDING THE % OF NULL VALUES
from pyspark.sql import functions as F

df=flights

# Calculating number of rows
total_rows = df.count()

# Creating a list to calculate the percentage of null values
null_percentage_expr = [
    round((F.sum(F.col(c).isNull().cast("int")) / total_rows * 100), 1).alias(c) 
    for c in df.columns
]

# Applying expression to gert the null values
null_percentages_df = df.agg(*null_percentage_expr)

# Converting into a dataframe of 02 columns
null_percentages = null_percentages_df.select(F.explode(F.array([
    F.struct(F.lit(c).alias("Column"), F.col(c).alias("Null_Percentage")) 
    for c in df.columns
]))).select("col.*")

# Ordenin in descending order by the percenting of null values
null_percentages.orderBy(F.col("Null_Percentage").desc()).show()

+--------+---------------+
|  Column|Null_Percentage|
+--------+---------------+
|   delay|            6.0|
|     mon|            0.0|
|     dom|            0.0|
|     dow|            0.0|
| carrier|            0.0|
|  flight|            0.0|
|     org|            0.0|
|    mile|            0.0|
|  depart|            0.0|
|duration|            0.0|
+--------+---------------+



In [17]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()

# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

47022


In [18]:
# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_valid_delay.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights.delay>=15).cast('integer'))

# Check first five records
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [19]:
# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0,3,6,9,12,15,18,21,24], inputCol='depart',
outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(flights_km)
bucketed.select('depart','depart_bucket').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
| 16.33|          5.0|
|  6.17|          2.0|
| 10.33|          3.0|
|  7.98|          2.0|
| 10.83|          3.0|
+------+-------------+
only showing top 5 rows



In [24]:
##Dropping the depart column
bucketed=bucketed.drop("depart")
bucketed.show(3)

+---+---+---+-------+---+--------+-----+------+-----+-------------+
|mon|dom|dow|carrier|org|duration|delay|    km|label|depart_bucket|
+---+---+---+-------+---+--------+-----+------+-----+-------------+
|  0| 22|  2|     UA|ORD|      82|   30| 509.0|    1|          5.0|
|  2| 20|  4|     UA|SFO|      82|   -8| 542.0|    0|          2.0|
|  9| 13|  1|     AA|ORD|     195|   -5|1989.0|    0|          3.0|
+---+---+---+-------+---+--------+-----+------+-----+-------------+
only showing top 3 rows



In [33]:
#Converting depart bucket to categorical datatype
bucketed=bucketed.withColumn("depart_bucket",col("depart_bucket").cast("string")).drop("label")
bucketed.show(2)

+---+---+---+-------+---+--------+-----+-----+-------------+
|mon|dom|dow|carrier|org|duration|delay|   km|depart_bucket|
+---+---+---+-------+---+--------+-----+-----+-------------+
|  0| 22|  2|     UA|ORD|      82|   30|509.0|          5.0|
|  2| 20|  4|     UA|SFO|      82|   -8|542.0|          2.0|
+---+---+---+-------+---+--------+-----+-----+-------------+
only showing top 2 rows



In [31]:
categorical_columns = [col for col, dtype in bucketed.dtypes if dtype == 'string']
categorical_columns

['carrier', 'org', 'depart_bucket']

In [43]:
##Creating a training and testing dataset
df_train,df_test = bucketed.randomSplit([0.8, 0.2], seed=420)

In [48]:
##Convert categorical columns to numerical
indexer = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]
onehot = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_columns]

# Creating a feature vector
feature_columns = ['mon','dom','dow','delay','km'] + [col+"_encoded" for col in categorical_columns]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

#Creating a model
regression = GBTRegressor(labelCol='duration')

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=indexer + onehot + [assembler, regression])

# Creatin a list of params
params = ParamGridBuilder() \
    .addGrid(regression.maxIter, [1, 2, 5, 10]) \
    .addGrid(regression.maxDepth, [5, 10, 15]) \
    .build()

# Evaluating the model for R2, RMSE, and MAE
evaluator_r2 = RegressionEvaluator(labelCol="duration", predictionCol="prediction", metricName="r2")

cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params,
          evaluator=evaluator_r2)


In [49]:
##Training the model
cv=cv.fit(df_train)
cv

CrossValidatorModel_0b20bec58947

In [53]:
# Obtaining the best model
best_model = cv.bestModel

# Getting the classifier
rf_model = best_model.stages[-1] 

# Making predictions
predictions = best_model.transform(df_test)

# Show the comparison
predictions=predictions.withColumn('prediction', round(col("prediction"),2))
predictions.select("duration","prediction").show(7)

# Calculating the R2 Score
r2_score =evaluator_r2.evaluate(predictions)

print(f"R2 score on the test data: {r2_score*100:.2f} %")

+--------+----------+
|duration|prediction|
+--------+----------+
|     379|    384.27|
|     185|    186.87|
|      80|     74.96|
|     120|    125.94|
|     180|    181.92|
|     251|    257.36|
|     260|     264.1|
+--------+----------+
only showing top 7 rows

R2 score on the test data: 99.44 %
