In [1]:
import pandas as pd
import numpy as np

In [3]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [7]:
# german_df = pd.read_csv('./Data/GermanAirFares.csv')
# main_df = pd.read_csv('./Data/itineraries.csv')
# main_df.head()

In [9]:
pip show pyspark

Name: pyspark
Version: 3.5.5
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /opt/anaconda3/lib/python3.12/site-packages
Requires: py4j
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [11]:
spark = SparkSession.builder.appName("FlightPricePrediction").getOrCreate()

# Read large CSV (distributed processing)
df = spark.read.csv("./Data/itineraries.csv", header=True, inferSchema=True)


25/03/23 20:42:50 WARN Utils: Your hostname, Fiona-X-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.83.192 instead (on interface en0)
25/03/23 20:42:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/23 20:42:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [13]:
flight_df = df

In [15]:
flight_df.select("searchDate","flightDate", "startingAirport", "destinationAirport").show(n=10, truncate=False)

+----------+----------+---------------+------------------+
|searchDate|flightDate|startingAirport|destinationAirport|
+----------+----------+---------------+------------------+
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
|2022-04-16|2022-04-17|ATL            |BOS               |
+----------+----------+---------------+------------------+
only showing top 10 rows



Taking a small sample and save it as parquet, this will be used to explore the data in pandas.

In [19]:
flight_df.limit(500000).write.parquet("sample_flights.parquet")

                                                                                

In [23]:
flight_df_pd = pd.read_parquet("sample_flights.parquet")

In [34]:
flight_df_pd.head()

Unnamed: 0,legId,searchDate,flightDate,startingAirport,destinationAirport,fareBasisCode,travelDuration,elapsedDays,isBasicEconomy,isRefundable,...,segmentsArrivalTimeEpochSeconds,segmentsArrivalTimeRaw,segmentsArrivalAirportCode,segmentsDepartureAirportCode,segmentsAirlineName,segmentsAirlineCode,segmentsEquipmentDescription,segmentsDurationInSeconds,segmentsDistance,segmentsCabinCode
0,9ca0e81111c683bec1012473feefd28f,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H29M,0,False,False,...,1650223560,2022-04-17T15:26:00.000-04:00,BOS,ATL,Delta,DL,Airbus A321,8940,947,coach
1,98685953630e772a098941b71906592b,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H30M,0,False,False,...,1650200400,2022-04-17T09:00:00.000-04:00,BOS,ATL,Delta,DL,Airbus A321,9000,947,coach
2,98d90cbc32bfbb05c2fc32897c7c1087,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H30M,0,False,False,...,1650218700,2022-04-17T14:05:00.000-04:00,BOS,ATL,Delta,DL,Boeing 757-200,9000,947,coach
3,969a269d38eae583f455486fa90877b4,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H32M,0,False,False,...,1650227460,2022-04-17T16:31:00.000-04:00,BOS,ATL,Delta,DL,Airbus A321,9120,947,coach
4,980370cf27c89b40d2833a1d5afc9751,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H34M,0,False,False,...,1650213180,2022-04-17T12:33:00.000-04:00,BOS,ATL,Delta,DL,Airbus A321,9240,947,coach


In [84]:
# print(flight_df_pd.head())  # View first few rows
# print(flight_df_pd.info())  # Check data types
# print(flight_df_pd.describe())

1. examine the data and take out unrelated and repeating colums: 
legId, segmentsCabinCode
2. drop repeated, unrelated, and price columns:
3. translate int valued strings into numerical


In [78]:
unique_airlines = flight_df_pd["segmentsAirlineName"].unique()
unique_total_distance = flight_df_pd["totalTravelDistance"].unique()

unique_distance = flight_df_pd["segmentsDistance"].unique()

prices = flight_df_pd["baseFare"]
print(unique_total_distance[:10])
print(unique_distance[:10])

[ 947.  956. 1462.   nan 1192.  868.  228. 1307. 1675. 1207.]
['947' '228||728' '541||406' '600||862' 'None||None' '762||185' '600||592'
 '541||327' '228' '762||545']


go back to pyspark and implement the data cleaning and preparation after examination

<h1>clean irrelevant columns and create new columns</h1>

In [18]:
from pyspark.sql import functions as F

columns_to_drop = [
    'legId',
    'totalFare',
    'elapsedDays',
    'fareBasisCode',
    'segmentsAirlineCode',
    'segmentsDepartureTimeEpochSeconds',  # Keep raw time format
    'segmentsArrivalTimeEpochSeconds', # Keep raw time format
    'segmentsArrivalAirportCode',
    'segmentsDepartureAirportCode',
    'segmentsEquipmentDescription',
    'segmentsDistance'
]

flight_clean = flight_df.drop(*columns_to_drop)

flight_clean.show(n=3, truncate=False)

+----------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+--------------+-------------------+-----------------------------+-----------------------------+-------------------+-------------------------+-----------------+
|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|isBasicEconomy|isRefundable|isNonStop|baseFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeRaw     |segmentsArrivalTimeRaw       |segmentsAirlineName|segmentsDurationInSeconds|segmentsCabinCode|
+----------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+--------------+-------------------+-----------------------------+-----------------------------+-------------------+-------------------------+-----------------+
|2022-04-16|2022-04-17|ATL            |BOS               |PT2H29M       |false         |false       |true     |217.67  |9             |947                |2022-

In [20]:
#create columns: isCoach, daysUntilFlight

# Create isCoach column from segmentsCabinCode before dropping it
flight_clean = flight_clean.withColumn(
    'isCoach',
    F.when(F.col('segmentsCabinCode').contains('coach'), 1).otherwise(0)
)

flight_clean = flight_clean.drop('segmentsCabinCode')

# Process dates to create days_until_flight feature
flight_clean = flight_clean.withColumn(
    'days_until_flight',
    F.datediff(F.to_date('flightDate'), F.to_date('searchDate'))
)

flight_clean.show(n=1, truncate=False)

+----------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+--------------+-------------------+-----------------------------+-----------------------------+-------------------+-------------------------+-------+-----------------+
|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|isBasicEconomy|isRefundable|isNonStop|baseFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeRaw     |segmentsArrivalTimeRaw       |segmentsAirlineName|segmentsDurationInSeconds|isCoach|days_until_flight|
+----------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+--------------+-------------------+-----------------------------+-----------------------------+-------------------+-------------------------+-------+-----------------+
|2022-04-16|2022-04-17|ATL            |BOS               |PT2H29M       |false         |false       |true     |217.67  |9             |9

In [22]:
# pip install holidays # only need to run once

In [24]:
# create column:isWeekendOrHoliday
import holidays

us_holidays = holidays.US(years=2024)  # Replace with your target year
print(us_holidays.keys()) 

holiday_dates = [d.strftime("%Y-%m-%d") for d in us_holidays]

print(holiday_dates)
    

dict_keys([datetime.date(2024, 1, 1), datetime.date(2024, 5, 27), datetime.date(2024, 6, 19), datetime.date(2024, 7, 4), datetime.date(2024, 9, 2), datetime.date(2024, 11, 11), datetime.date(2024, 11, 28), datetime.date(2024, 12, 25), datetime.date(2024, 1, 15), datetime.date(2024, 2, 19), datetime.date(2024, 10, 14)])
['2024-01-01', '2024-05-27', '2024-06-19', '2024-07-04', '2024-09-02', '2024-11-11', '2024-11-28', '2024-12-25', '2024-01-15', '2024-02-19', '2024-10-14']


In [26]:
# Create isWeekend column (1 if weekend, 0 otherwise)
flight_clean = flight_clean.withColumn(
    "isWeekend",
    F.when(F.dayofweek("flightDate").isin([1, 7]), 1).otherwise(0)
)

# Create isHoliday column (1 if holiday, 0 otherwise)
flight_clean = flight_clean.withColumn(
    "isHoliday",
    F.when(F.col("flightDate").isin(holiday_dates), 1).otherwise(0)
)


In [28]:
# Handle missing totalTravelDistance
median_distance = flight_clean.approxQuantile('totalTravelDistance', [0.5], 0.01)[0]
flight_clean = flight_clean.fillna(median_distance, subset=['totalTravelDistance'])


                                                                                

In [29]:
# extract total travel time (in minutes) from travelDuration, then drop travelDuration

flight_clean = flight_clean.withColumn(
    "hours",
    F.regexp_extract(F.col("travelDuration"), r"(\d+)H", 1).cast("int")
).withColumn(
    "minutes",
    F.regexp_extract(F.col("travelDuration"), r"(\d+)M", 1).cast("int")
)


flight_clean = flight_clean.withColumn(
    "travelMinutes",
    F.col("hours") * 60 + F.col("minutes")
).drop("hours", "minutes")



In [30]:
flight_clean = flight_clean.drop('travelDuration')


In [34]:
flight_clean.show(n=1)

+----------+----------+---------------+------------------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+-------------------+-------------------------+-------+-----------------+---------+---------+-------------+
|searchDate|flightDate|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonStop|baseFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeRaw|segmentsArrivalTimeRaw|segmentsAirlineName|segmentsDurationInSeconds|isCoach|days_until_flight|isWeekend|isHoliday|travelMinutes|
+----------+----------+---------------+------------------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+-------------------+-------------------------+-------+-----------------+---------+---------+-------------+
|2022-04-16|2022-04-17|            ATL|               BOS|         false|       false|     true|  217.67|          

In [36]:
#use segmentsDurationInSeconds to identify if this is a nonstop flight create a isNonstop column
flight_clean = flight_clean.withColumn(
    "segmentsDurationArray",
    F.split(F.col("segmentsDurationInSeconds"), r"\|\|")
)
flight_clean = flight_clean.withColumn(
    "isNonstop",
    F.when(F.size("segmentsDurationArray") == 1, 1).otherwise(0)
)

flight_clean = flight_clean.drop("segmentsDurationArray","segmentsDurationInSeconds")

In [56]:
flight_clean.show(n=3)



+----------+----------+---------------+------------------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+-------------------+-------+-----------------+---------+---------+-------------+
|searchDate|flightDate|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonstop|baseFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeRaw|segmentsArrivalTimeRaw|segmentsAirlineName|isCoach|days_until_flight|isWeekend|isHoliday|travelMinutes|
+----------+----------+---------------+------------------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+-------------------+-------+-----------------+---------+---------+-------------+
|2022-04-16|2022-04-17|            ATL|               BOS|         false|       false|        1|  217.67|             9|                947|    2022-04-17T12:57:...|  2022-04-17T15:26:...|     

                                                                                

<h1>separate into train:70%, validation:15%, test:15%</h1>

<h3>note: since this is time series data, split on searchDate</h3>

In [41]:
# Sort by searchDate (ascending)
flight_clean = flight_clean.orderBy("searchDate")

In [102]:
# selected_columns = [
#     "searchDate",
#     "flightDate",
#     "isBasicEconomy",
#     "isRefundable",
#     "isNonstop",
#     "isCoach",
#     "baseFare"]
    
# flight_clean.select(selected_columns).show(n=10)


In [43]:
total_rows = flight_clean.count()

                                                                                

In [132]:
train_size = int(total_rows * 0.7)
remaining = total_rows - train_size
val_size = remaining // 2
test_size = remaining - val_size 
print(train_size, val_size, test_size)
print(total_rows == train_size + val_size + test_size)

57497127 12320813 12320813
True


In [134]:
# Split the data
train_df = flight_clean.limit(train_size)
remaining_df = flight_clean.subtract(train_df)
val_df = remaining_df.limit(val_size)
test_df = remaining_df.subtract(val_df)


In [45]:
# Get sorted searchDates
dates = flight_clean.select("searchDate").distinct().orderBy("searchDate")

# Split dates into train/val/test fractions
total_dates = dates.count()
train_end_idx = int(total_dates * 0.7)
val_end_idx = train_end_idx + int(total_dates * 0.15)

train_dates = dates.limit(train_end_idx).collect()
val_dates = dates.limit(val_end_idx).tail(val_end_idx - train_end_idx)
test_dates = dates.tail(total_dates - val_end_idx)

# Extract date strings
train_cutoff = train_dates[-1]["searchDate"]
val_cutoff = val_dates[-1]["searchDate"]

                                                                                

In [52]:
print(total_dates,train_end_idx,val_end_idx )

171 119 144


In [47]:
train_df = flight_clean.filter(F.col("searchDate") <= train_cutoff)
val_df = flight_clean.filter((F.col("searchDate") > train_cutoff) & (F.col("searchDate") <= val_cutoff))
test_df = flight_clean.filter(F.col("searchDate") > val_cutoff)

In [54]:
train_count = train_df.count()
val_count = val_df.count()
test_count = test_df.count()


                                                                                

In [59]:
print(f"Train: {train_count}, Val: {val_count}, Test: {test_count}")
print(f"Total rows: {train_count + val_count + test_count}")


Train: 57499920, Val: 11660046, Test: 12978787
Total rows: 82138753


In [67]:
val_df.show(n=2)



+----------+----------+---------------+------------------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+--------------------+-------+-----------------+---------+---------+-------------+
|searchDate|flightDate|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonstop|baseFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeRaw|segmentsArrivalTimeRaw| segmentsAirlineName|isCoach|days_until_flight|isWeekend|isHoliday|travelMinutes|
+----------+----------+---------------+------------------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+--------------------+-------+-----------------+---------+---------+-------------+
|2022-08-15|2022-08-16|            ATL|               BOS|         false|       false|        0|   104.0|             0|               1447|    2022-08-16T10:15:...|  2022-08-16T12:15:...|Sp

                                                                                

In [61]:
# Check temporal order
train_max = train_df.agg(F.max("searchDate")).first()[0]
val_min = val_df.agg(F.min("searchDate")).first()[0]
test_min = test_df.agg(F.min("searchDate")).first()[0]

assert val_min > train_max, "Validation data leaks into training period!"
assert test_min > val_min, "Test data leaks into validation period!"

                                                                                

<h3>save everything into parquet files before encoding</h3>

In [73]:
# train_df.write.parquet("train_preencode.parquet")
# val_df.write.parquet("val_preencode.parquet")
# test_df.write.parquet("test_preencode.parquet")

train_df.write.parquet("train_preencode.parquet", mode="overwrite", compression="snappy")
val_df.write.parquet("val_preencode.parquet", mode="overwrite", compression="snappy")
test_df.write.parquet("test_preencode.parquet", mode="overwrite", compression="snappy")

25/03/23 21:16:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/03/23 21:16:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/03/23 21:16:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/03/23 21:16:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/03/23 21:16:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/03/23 21:18:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/03/23 21:20:26 WARN MemoryManager: Total allocation exceeds 95.00% 

<h1>encoding for training </h1>

In [63]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline


In [None]:

# Binary Encoding for True/False columns
binary_cols = ["isBasicEconomy", "isRefundable", "isNonstop", "isCoach", "isWeekend", "isHoliday"]
for col in binary_cols:
    train_df = train_df.withColumn(col, F.col(col).cast("int"))
    val_df = val_df.withColumn(col, F.col(col).cast("int"))
    test_df = test_df.withColumn(col, F.col(col).cast("int"))


In [None]:
# Multi-hot Encoding for segmentsAirlineName

# One-hot Encoding for Airports

# Binary Encoding for Boolean Columns

# Numerical Feature Scaling

# num_cols = ["seatsRemaining", "totalTravelDistance", "travelMinutes"]

# "baseFare" should be taken out as y

<h2>work still in progress due to a mistake in spliting by search date instead of "segmentsDepartureTimeRaw" </h2>