In [2]:
# Import libraries 
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import explode, split, col, size, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import log
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import functions as F
import scipy.stats as stats
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
# Start spark session
spark = SparkSession.builder.appName('Spark-Group1-Linear-Regression').getOrCreate()

### Loading train dataset (75% of the data)

In [4]:
# Load train dataset
train_gcs_path = "gs://msca-bdp-student-gcs/Group1/traindata.parquet/"

# Read the CSV file from GCS
df_train = spark.read.parquet(train_gcs_path, header=True, inferSchema=True)

# Show the train DataFrame
df_train.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+----------+----------+---------------+------------------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|Layovers|NumStops|        AirlineNames|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|
+--------------------+----------+----------+---------------+------------------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+
|00000f65c6af0f881...|2022-05-25|2022-05-28|            DTW|               ATL|           185|      

                                                                                

### Loading test dataset (15% of the data)

In [5]:
# Load test dataset
test_gcs_path = "gs://msca-bdp-student-gcs/Group1/testdata.parquet/"

# Read the CSV file from GCS
df_test = spark.read.parquet(test_gcs_path, header=True, inferSchema=True)

# Show the train DataFrame
df_test.show(10)

+--------------------+----------+----------+---------------+------------------+--------------+-----------+------------+---------+--------------+----------------+----------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|  Layovers|NumStops|        AirlineNames|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|
+--------------------+----------+----------+---------------+------------------+--------------+-----------+------------+---------+--------------+----------------+----------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+
|000012c2ee248f229...|2022-05-01|2022-05-16|            CLT|               JFK|           112|

### Loading validation dataset (10% of the data)

In [6]:
# Load validation dataset
val_gcs_path = "gs://msca-bdp-student-gcs/Group1/valdata.parquet/"

# Read the CSV file from GCS
df_val = spark.read.parquet(val_gcs_path, header=True, inferSchema=True)

# Show the train DataFrame
df_val.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+----------+----------+---------------+------------------+--------------+-----------+------------+---------+--------------+----------------+----------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|  Layovers|NumStops|        AirlineNames|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|
+--------------------+----------+----------+---------------+------------------+--------------+-----------+------------+---------+--------------+----------------+----------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+
|00002d04072aa6a76...|2022-09-18|2022-11-06|            OAK|               MIA|           856|

                                                                                

### Repartitioning data

In [7]:
# Repartition train data
num = df_train.rdd.getNumPartitions()
print(num)
df_train = df_train.repartition(num)

20


In [8]:
# Repartition test data
num = df_test.rdd.getNumPartitions()
print(num)
df_test = df_test.repartition(num)

4


In [9]:
# Repartition validation data
num = df_val.rdd.getNumPartitions()
print(num)
df_val = df_val.repartition(num)

6


## Featuring Engineering

In [30]:
df_train.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: integer (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isRefundable: integer (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- DaysBeforeFlight: integer (nullable = true)
 |-- Layovers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- NumStops: integer (nullable = true)
 |-- AirlineNames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- NumUniqueAirlines: integer (nullable = true)
 |-- AircraftType: integer (nullable = true)
 |-- NumUniqueCabins: integer (nullable = true)
 |-- hasFirstClass: integer (nullable = true)
 |-- FlightArrivalDate: date (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)



### StartingAirport and DestinationAirport columns are categorical variable that need to be transofmred before running a regression model on them. 

In [31]:
# Count unique values in the startingAirport column
unique_starting_airports = df_train.select("startingAirport").distinct().count()
print(f"Number of unique starting airports: {unique_starting_airports}")



Number of unique starting airports: 16


                                                                                

In [32]:
# Count unique values in the destinationAirport column
unique_destination_airports = df_train.select("destinationAirport").distinct().count()
print(f"Number of unique destination airports: {unique_destination_airports}")



Number of unique destination airports: 16


                                                                                

#### Although one-hot encoding increases dimensionality, it avoids the ordinal problem introduced by label encoding.Since there are only 16 unique airports, the increase in dimensions (16 columns) is relatively manageable, especially given that it’s a small number of categories.

In [33]:
# List of categorical columns to be one-hot encoded
categorical_columns = ["startingAirport", "destinationAirport"]

# Initialize StringIndexer and OneHotEncoder for each column
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_onehot") for col in categorical_columns]

In [34]:
# Create a Pipeline to apply transformations
pipeline = Pipeline(stages=indexers + encoders)

In [35]:
# Fit and transform the data
df_train_transformed = pipeline.fit(df_train).transform(df_train)
df_test_transformed = pipeline.fit(df_test).transform(df_test)
df_val_transformed = pipeline.fit(df_val).transform(df_val)

# Show the transformed data
df_train_transformed.select("startingAirport", "destinationAirport", "startingAirport_onehot", "destinationAirport_onehot").show(5)

                                                                                

+---------------+------------------+----------------------+-------------------------+
|startingAirport|destinationAirport|startingAirport_onehot|destinationAirport_onehot|
+---------------+------------------+----------------------+-------------------------+
|            DTW|               ATL|       (15,[11],[1.0])|           (15,[7],[1.0])|
|            DEN|               MIA|       (15,[10],[1.0])|           (15,[8],[1.0])|
|            DEN|               MIA|       (15,[10],[1.0])|           (15,[8],[1.0])|
|            LAX|               PHL|        (15,[0],[1.0])|           (15,[9],[1.0])|
|            ATL|               DFW|        (15,[7],[1.0])|           (15,[2],[1.0])|
+---------------+------------------+----------------------+-------------------------+
only showing top 5 rows



In [36]:
# Drop the original categorical columns and the index columns after encoding
df_train_transformed = df_train_transformed.drop("startingAirport", "destinationAirport",
                                                "startingAirport_index", "destinationAirport_index")
df_test_transformed = df_test_transformed.drop("startingAirport", "destinationAirport", 
                                              "startingAirport_index", "destinationAirport_index")
df_val_transformed = df_val_transformed.drop("startingAirport", "destinationAirport",
                                             "startingAirport_index", "destinationAirport_index")

### AirlineNames is in an array form, showing the airline taken for each leg of the journey. We need to tranform this before using it in our regression model. 

In [37]:
# Get the maximum size of AirlineNames to create new columns dynamically
max_airline_count = df_train.select(size(col("AirlineNames"))).agg({"size(AirlineNames)": "max"}).collect()[0][0]

# Create a list of string names for the airline columns
airline_columns = [f"AirlineName_{i}" for i in range(max_airline_count)]

# Select original columns and the new airline columns
df_train_transformed = df_train_transformed.select("*", *[col("AirlineNames")[i].alias(f"AirlineName_{i}") for i in range(max_airline_count)])
df_test_transformed = df_test_transformed.select("*", *[col("AirlineNames")[i].alias(f"AirlineName_{i}") for i in range(max_airline_count)])
df_val_transformed = df_val_transformed.select("*", *[col("AirlineNames")[i].alias(f"AirlineName_{i}") for i in range(max_airline_count)])

df_train_transformed.show()

[Stage 98:>                                                         (0 + 1) / 1]

+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+--------------------+-----------------+------------+---------------+-------------+-----------------+-------------------+----------------------+-------------------------+-----------------+-----------------+-------------+-------------+-------------+
|               legId|searchDate|flightDate|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|Layovers|NumStops|        AirlineNames|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|startingAirport_onehot|destinationAirport_onehot|    AirlineName_0|    AirlineName_1|AirlineName_2|AirlineName_3|AirlineName_4|
+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+--------------------+-----------------+------------+---------------+----

                                                                                

In [38]:
# Show the transformed data
df_train_transformed.select("AirlineNames", "AirlineName_0", "AirlineName_1", "AirlineName_2","AirlineName_3","AirlineName_4").show(5)

+--------------------+-----------------+-----------------+-------------+-------------+-------------+
|        AirlineNames|    AirlineName_0|    AirlineName_1|AirlineName_2|AirlineName_3|AirlineName_4|
+--------------------+-----------------+-----------------+-------------+-------------+-------------+
|      [Delta, Delta]|            Delta|            Delta|         null|         null|         null|
|[American Airline...|American Airlines|American Airlines|         null|         null|         null|
|[American Airline...|American Airlines|American Airlines|         null|         null|         null|
|    [United, United]|           United|           United|         null|         null|         null|
| [American Airlines]|American Airlines|             null|         null|         null|         null|
+--------------------+-----------------+-----------------+-------------+-------------+-------------+
only showing top 5 rows



In [39]:
# Drop the redundant AirlineNames column
df_train_transformed = df_train_transformed.drop("AirlineNames")
df_test_transformed = df_test_transformed.drop("AirlineNames")
df_val_transformed = df_val_transformed.drop("AirlineNames")

In [40]:
# Replace null values with "None" for airline columns before transforming
for i in range(max_airline_count):
    airline_col = f"AirlineName_{i}"
    df_train_transformed = df_train_transformed.withColumn(
        airline_col, when(col(airline_col).isNull(), "None").otherwise(col(airline_col))
    )

    df_test_transformed = df_test_transformed.withColumn(
        airline_col, when(col(airline_col).isNull(), "None").otherwise(col(airline_col))
    )

    df_val_transformed = df_val_transformed.withColumn(
        airline_col, when(col(airline_col).isNull(), "None").otherwise(col(airline_col))
    )

# Initialize StringIndexer and OneHotEncoder for each column using string names
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index") for col_name in airline_columns]
encoders = [OneHotEncoder(inputCols=[col_name + "_index"], outputCols=[col_name + "_onehot"]) for col_name in airline_columns]

# Create a Pipeline to apply transformations
pipeline = Pipeline(stages=indexers + encoders)

# Fit and transform the data
df_train_transformed = pipeline.fit(df_train_transformed).transform(df_train_transformed)
df_test_transformed = pipeline.fit(df_test_transformed).transform(df_test_transformed)
df_val_transformed = pipeline.fit(df_val_transformed).transform(df_val_transformed)

# Show the transformed DataFrame
df_train_transformed.show()

24/12/02 03:27:42 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 145:>                                                        (0 + 1) / 1]

+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+-----------------+------------+---------------+-------------+-----------------+-------------------+----------------------+-------------------------+-----------------+-----------------+-------------+-------------+-------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               legId|searchDate|flightDate|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|Layovers|NumStops|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|startingAirport_onehot|destinationAirport_onehot|    AirlineName_0|    AirlineName_1|AirlineName_2|AirlineName_3|AirlineName_4|AirlineName_0_index|AirlineName_1_index|AirlineName_2_

                                                                                

In [139]:
# Drop the original categorical columns and the index columns after encoding
df_train_transformed = df_train_transformed.drop("AirlineName_0", "AirlineName_1","AirlineName_2",
                                                "AirlineName_3","AirlineName_4", "AirlineName_0_index",
                                                "AirlineName_1_index","AirlineName_2_index",
                                                "AirlineName_3_index","AirlineName_4_index")
df_test_transformed = df_test_transformed.drop("AirlineName_0", "AirlineName_1","AirlineName_2",
                                                "AirlineName_3","AirlineName_4", "AirlineName_0_index",
                                                "AirlineName_1_index","AirlineName_2_index",
                                                "AirlineName_3_index","AirlineName_4_index")
df_val_transformed = df_val_transformed.drop("AirlineName_0", "AirlineName_1","AirlineName_2",
                                                "AirlineName_3","AirlineName_4", "AirlineName_0_index",
                                                "AirlineName_1_index","AirlineName_2_index",
                                                "AirlineName_3_index","AirlineName_4_index")

In [140]:
df_train_transformed.show()

[Stage 510:>                                                        (0 + 1) / 1]

+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+-----------------+------------+---------------+-------------+-----------------+-------------------+----------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               legId|searchDate|flightDate|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|Layovers|NumStops|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|startingAirport_onehot|destinationAirport_onehot|AirlineName_0_onehot|AirlineName_1_onehot|AirlineName_2_onehot|AirlineName_3_onehot|AirlineName_4_onehot|
+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+-----------------+------------+---------------+-------------

                                                                                

### totalFare is heavily right-skewed so I am log transforming it to improve its linear relationshop

In [141]:
# log transform totalFare column
df_train_transformed = df_train_transformed.withColumn("log_totalFare", log("totalFare"))
df_test_transformed = df_test_transformed.withColumn("log_totalFare", log("totalFare"))
df_val_transformed = df_val_transformed.withColumn("log_totalFare", log("totalFare"))

df_train_transformed.show()

[Stage 511:>                                                        (0 + 1) / 1]

+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+-----------------+------------+---------------+-------------+-----------------+-------------------+----------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|               legId|searchDate|flightDate|travelDuration|elapsedDays|isRefundable|totalFare|seatsRemaining|DaysBeforeFlight|Layovers|NumStops|NumUniqueAirlines|AircraftType|NumUniqueCabins|hasFirstClass|FlightArrivalDate|totalTravelDistance|startingAirport_onehot|destinationAirport_onehot|AirlineName_0_onehot|AirlineName_1_onehot|AirlineName_2_onehot|AirlineName_3_onehot|AirlineName_4_onehot|    log_totalFare|
+--------------------+----------+----------+--------------+-----------+------------+---------+--------------+----------------+--------+--------+-----------------+------

                                                                                

In [142]:
df_train_transformed.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- travelDuration: integer (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isRefundable: integer (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- DaysBeforeFlight: integer (nullable = true)
 |-- Layovers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- NumStops: integer (nullable = true)
 |-- NumUniqueAirlines: integer (nullable = true)
 |-- AircraftType: integer (nullable = true)
 |-- NumUniqueCabins: integer (nullable = true)
 |-- hasFirstClass: integer (nullable = true)
 |-- FlightArrivalDate: date (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- startingAirport_onehot: vector (nullable = true)
 |-- destinationAirport_onehot: vector (nullable = true)
 |-- AirlineName_0_onehot: vector (nullable = true)
 |-- AirlineName_1_onehot:

## Writing the transformed dataframes to GCS to access for further model building

In [143]:
df_train_transformed.write.format("parquet").save("gs://msca-bdp-student-gcs/Group1/lr_data/traindata_transformed")

                                                                                

In [144]:
df_test_transformed.write.format("parquet").save("gs://msca-bdp-student-gcs/Group1/lr_data/testdata_transformed")

                                                                                

In [145]:
df_val_transformed.write.format("parquet").save("gs://msca-bdp-student-gcs/Group1/lr_data/valdata_transformed")

                                                                                