In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import mean
from pyspark.sql.types import DoubleType
import pyspark.ml.feature as feature
from pyspark.ml.feature import Normalizer
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when, col

In [2]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("FlightAnalysisApp") \
    .getOrCreate()

24/01/04 17:05:47 WARN Utils: Your hostname, alvaro-ThinkPad-X1-Carbon-Gen-10 resolves to a loopback address: 127.0.1.1; using 192.168.1.130 instead (on interface wlp0s20f3)
24/01/04 17:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/04 17:05:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
DATA_2008 = "../ds/bigData/flight-2008.csv.bz2"
#DATA_1990 = "../ds/flight-1990.csv.bz2"
flights_df = spark.read.csv(DATA_2008, header=True, inferSchema=True)
print(flights_df.count())

[Stage 2:=====>                                                    (1 + 9) / 10]

2389217


                                                                                

In [14]:
# Create a new DataFrame with only the first 1000 instances
flights_subset_df = flights_df.sample(withReplacement=False, fraction=10000/flights_df.count(), seed=42)

flights_subset_df.show(n=2)

                                                                                

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   1703|      1645|     33|        35|           WN|      767

In [15]:
# Display the number of rows in the subset DataFrame
print("Subset DataFrame Count:", flights_subset_df.count())

output_path = "../ds/samples/sample_10000"

#flights_subset_df.write.csv(output_path,compression="gzip")


[Stage 24:=====>                                                   (1 + 9) / 10]

Subset DataFrame Count: 10043


                                                                                

In [16]:
pandas_df = flights_subset_df.toPandas()

pandas_df.head()

                                                                                

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2008,1,3,4,1703,1645,33,35,WN,767,...,4,9,0,,0,,,,,
1,2008,1,3,4,1630,1600,23,2350,WN,416,...,4,23,0,,0,6.0,0.0,3.0,0.0,24.0
2,2008,1,3,4,737,735,855,855,WN,547,...,4,8,0,,0,,,,,
3,2008,1,3,4,1138,1110,1252,1250,WN,2463,...,2,12,0,,0,,,,,
4,2008,1,3,4,1824,1815,1938,1935,WN,2034,...,4,13,0,,0,,,,,


In [13]:
pandas_df.to_csv(output_path + '.bz2', index=False, compression='bz2')

In [46]:
# Dump the DataFrame to Parquet format
flights_subset_df.write.parquet(output_path)


# Load the DataFrame back
loaded_df = spark.read.parquet(output_path)
loaded_df.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   1343|      1325|   1451|      1435|           WN|      588

# Preprocesing

In [5]:
features_to_drop = [
    "ArrTime", "ActualElapsedTime", "AirTime", "TaxiIn",
    "Diverted", "CarrierDelay", "WeatherDelay", "NASDelay",
    "SecurityDelay", "LateAircraftDelay"
]

flights_df = flights_df.drop(*features_to_drop)

In [None]:
analysis_result = (
    flights_df
    .groupBy("Year", "Month")
    .agg({"ArrDelay": "avg", "DepDelay": "avg", "Distance": "sum"})
    .orderBy("Year", "Month")
)
analysis_result.show()


In [None]:
flights_df.printSchema()
# flights_df.describe().show()

In [6]:
 # Calculate mean value
 ###THIS CODE IS FOR REPLACING THE NULL VALUES WITH THE MEAN VALUE OF THE COLUMN
# mean_val = flights_df.select(mean(flights_df[TARGET])).collect()[0][0]
# flights_df = flights_df.na.fill(mean_val, subset=[TARGET])
flights_df = flights_df.withColumn('ArrDelay', flights_df['ArrDelay'].cast(IntegerType()))
flights_df = flights_df.withColumn('DepDelay', flights_df['DepDelay'].cast(IntegerType()))

# Calculate the mean value of the 'Distance' column
mean_distance = flights_df.select(mean(col('Distance'))).collect()[0][0]

# Replace all non-numeric values in the 'Distance' column with the mean value
flights_df = flights_df.withColumn('Distance', when(col('Distance').cast('integer').isNull(), mean_distance).otherwise(col('Distance').cast('integer')))

flights_df.printSchema()
#flights_df.describe().show()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)



**Check for Missing Values**: If a column has a high number of missing values, it might not be very useful for prediction. You can check the number of missing values in a column with `df.filter(df["column"].isNull()).count()`.


In [7]:
selected_features = []
total_rows = flights_df.count()
for column in flights_df.columns:
    missing_count = flights_df.filter(flights_df[column].isNull()).count()
    missing_percentage = (missing_count / total_rows) * 100
    print(f"Number of missing values in {column}: {missing_count} ({missing_percentage:.2f}%)")
    if missing_percentage > 50:
        print(f"Dropping {column} due to high percentage of missing values.")
        flights_df = flights_df.drop(column)
    elif missing_percentage > 30:
        print(f"Warning: {column} has more than a 30% of missing values, consider dropping it.")
        selected_features.append(column)
    elif missing_count>0:
        print(f"Warning: {column} has at least one missing value , consider dropping those rows or replacing them.")
        selected_features.append(column)

Number of missing values in Year: 0 (0.00%)
Number of missing values in Month: 0 (0.00%)
Number of missing values in DayofMonth: 0 (0.00%)
Number of missing values in DayOfWeek: 0 (0.00%)
Number of missing values in DepTime: 0 (0.00%)
Number of missing values in CRSDepTime: 0 (0.00%)
Number of missing values in CRSArrTime: 0 (0.00%)
Number of missing values in UniqueCarrier: 0 (0.00%)
Number of missing values in FlightNum: 0 (0.00%)
Number of missing values in TailNum: 0 (0.00%)
Number of missing values in CRSElapsedTime: 0 (0.00%)
Number of missing values in ArrDelay: 68412 (1.30%)
Number of missing values in DepDelay: 52458 (1.00%)
Number of missing values in Origin: 0 (0.00%)
Number of missing values in Dest: 0 (0.00%)
Number of missing values in Distance: 0 (0.00%)
Number of missing values in TaxiOut: 0 (0.00%)
Number of missing values in Cancelled: 0 (0.00%)
Number of missing values in CancellationCode: 0 (0.00%)


In [None]:
selected_features.remove('CancellationCode')
flights_df = flights_df.drop('CancellationCode')


In [8]:
# filtered_df = flights_df.limit(100000)
dtype_dict = dict(flights_df.dtypes)
for i in selected_features:
    print(f"Filtering {i}, type: {dtype_dict[i]}")
    # if dtype_dict[i] == IntegerType():
    #     print(f"Number of rows before filtering: {flights_df.count()}")
    #     flights_df = flights_df.filter(col(i).isNotNull() & col(i).cast("integer").isNotNull())
    #     print(f"Number of rows after filtering: {flights_df.count()}")
    print(f"Number of rows before filtering: {flights_df.count()}")
    flights_df = flights_df.filter(col(i).isNotNull())
    print(f"Number of rows after filtering: {flights_df.count()}")
# flights_df = flights_df.filter(col(TARGET).isNotNull() & col(TARGET).cast("integer").isNotNull())
# print(f"Number of rows after filtering: {flights_df.count()}")

Filtering ArrDelay, type: int
Number of rows before filtering: 5270893
Number of rows after filtering: 5202481
Filtering DepDelay, type: int
Number of rows before filtering: 5202481
Number of rows after filtering: 5202481


# EDA
To decide which variables to drop, you can perform exploratory data analysis (EDA). Here are some steps you can follow:

1. **Check for Unique Values**: If a column has a unique value for every row (like an ID), it won't be useful for prediction. You can check the number of distinct values in a column with `df.select("column").distinct().count()`. If the count is equal to the number of rows in your DataFrame, you can drop the column.
2. **Check for Constant Values**: If a column has the same value for every row, it also won't be useful for prediction. You can check this with `df.select("column").distinct().count()`. If the count is 1, you can drop the column.


In [None]:
num_of_rows = flights_df.count()
print(f"Number of rows in the dataframe: {num_of_rows}")
for featur in flights_df.columns:
    c1 = flights_df.select(featur).distinct().count() 
    print(f"Feature {featur} has {c1} distinct values")
    if c1 == num_of_rows or c1 == 1:
        flights_df = flights_df.drop(featur)
        print(f"Feature dropped: {featur}")

In [9]:
flights_df.printSchema()
#flights_df.describe().show()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)



In [None]:

#flights_df = flights_df.withColumn('Distance', flights_df['Distance'].cast(IntegerType()))


3. **Check for High Correlation**: If two columns are highly correlated, they carry similar information, and you can drop one of them. You can calculate the correlation between two columns with `df.stat.corr("column1", "column2")`.

In [17]:
time_vars = ['Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'CRSArrTime']
#create an array of the numerical variables except for time variabes
numerical_vars= [item[0] for item in flights_df.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]
#vars = list(set(numerical_vars) - set(time_vars))
vars = numerical_vars
vars

['Month',
 'DayofMonth',
 'DayOfWeek',
 'CRSDepTime',
 'CRSArrTime',
 'FlightNum',
 'CRSElapsedTime',
 'ArrDelay',
 'DepDelay',
 'Distance',
 'Cancelled']

In [None]:
from pyspark.sql.functions import corr

#vars=numerical_vars
for i in range(len(vars)):
    for j in range(i+1, len(vars)):
        correlation = flights_df.stat.corr(vars[i], vars[j])
        if correlation > 0.75 or correlation < -0.75:
            print(f"ATTENTION: Correlation between {vars[i]} and {vars[j]} is too high: {correlation}, consider dropping one of them.")
        else:
            print(f"Correlation between {vars[i]} and {vars[j]}: {correlation}")

In [None]:
flights_df.dtypes

# MODEL BUILDING

In [18]:
assembler = feature.VectorAssembler(inputCols=vars,outputCol="features")
data = assembler.transform(flights_df)

In [None]:
data

In [19]:
# Split the data into train and test sets
(train_data, test_data) = data.randomSplit([0.7, 0.3])
print("Training Dataset Count: " + str(train_data.count()))

Training Dataset Count: 3639915


In [None]:
# Create a Linear Regression Model
# lr = LinearRegression(featuresCol="features",labelCol=TARGET)
# lr_model = lr.fit(train_data)

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

# Assuming you have 'train_data' as your training dataset and 'TARGET' as the target column
# Define the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol=TARGET)

# Create a pipeline with the Linear Regression model
pipeline = Pipeline(stages=[lr])

# Set up the parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define the evaluator for regression tasks
evaluator = RegressionEvaluator(labelCol=TARGET, metricName="rmse")

# Create a CrossValidator with the Linear Regression model, parameter grid, and evaluator
cross_validator = CrossValidator(estimator=pipeline,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)  # You can adjust the number of folds as needed

# Run cross-validation to find the best model
cv_model = cross_validator.fit(train_data)

# Get the best model from the cross-validation
best_lr_model = cv_model.bestModel.stages[0]

# Optionally, you can print the best hyperparameters
print("Best RegParam:", best_lr_model.getRegParam())
print("Best ElasticNetParam:", best_lr_model.getElasticNetParam())

Best RegParam: 0.01
Best ElasticNetParam: 1.0


In [21]:
# Print model coefficients and intercept
print(f"Coefficients: {best_lr_model.coefficients}")
print(f"Intercept: {best_lr_model.intercept}")

# Get training summary
trainingSummary = best_lr_model.summary

# Print summary statistics
print(f"numIterations: {trainingSummary.totalIterations}")
trainingSummary.residuals.show()
print(f"RMSE: {trainingSummary.rootMeanSquaredError}")
print(f"r2: {trainingSummary.r2}")

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9996001704832248,0.0,0.0,0.0]
Intercept: 0.002723639988620084
numIterations: 23
+--------------------+
|           residuals|
+--------------------+
|0.026064085219204003|
|0.010070904548186377|
|0.002074314212682893|
|0.008871415997859344|
|4.749961455816631...|
|0.006872268413985694|
|0.001274655179132722|
|-7.24492404743593E-4|
|-0.00472278757249...|
|-0.00552244660604...|
|-0.00432295805572...|
|-3.24662887968507...|
|-0.00472278757249...|
|-0.00552244660604...|
|4.749961455816631...|
|-7.24492404743593E-4|
|0.002873973246233...|
|-0.00272363998862...|
|-0.01112005984089...|
|0.003273802763008149|
+--------------------+
only showing top 20 rows

RMSE: 0.010000178809415262
r2: 0.9999998401363576


MODEL EVALUATION

In [22]:
# Model Evaluation
predictions = best_lr_model.transform(test_data)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="ArrDelay", metricName="rmse")
rmse = evaluator.evaluate(predictions)

# Root Mean Squared Error (RMSE)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Mean Absolute Error (MAE)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print("Mean Absolute Error (MAE) on test data = %g" % mae)

# R-squared
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("R-squared on test data = %g" % r2)


Root Mean Squared Error (RMSE) on test data = 0.00999665
Mean Absolute Error (MAE) on test data = 0.00573577
R-squared on test data = 1


In [23]:
spark.stop()