In [None]:
!pip install pyspark

import pyspark 
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *

import pandas as pd

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=03fbc593ce5ed28e185bbf23fdf40a52196ba210f23fd45a3ce37f85c7290bb6
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession

# Creating a SparkSession

spark = SparkSession.builder.appName("FlightPricesEDA").getOrCreate()

# from google colab importing drive

from google.colab import drive

drive.mount('/content/drive/') 

# Load the dataset from CSV file

df = spark.read.csv("/content/drive/My Drive/Colab Notebooks/itineraries.csv", header=True, inferSchema=True)

Mounted at /content/drive/


In [None]:
df.show()

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

In [None]:
# Checking number of rows and columns

print("Number of Rows:", df.count())
print("Number of Columns:", len(df.columns))

Number of Rows: 82138753
Number of Columns: 27


In [None]:
# Checking missing values

from pyspark.sql.functions import col, count, avg

import pyspark.sql.functions as F 

print("Missing values:")
df.select([count(F.when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

Missing values:
+-----+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDistance|segment

In [None]:
# Checking for duplicates
print("Number of Duplicates:", df.count() - df.dropDuplicates().count())

# Removing duplicates
df = df.dropDuplicates()

Number of Duplicates: 0


In [None]:
df.show()

In [None]:
# Analyzing the summary statistics of numerical variables

df.describe(["baseFare", "totalFare", "travelDuration", "totalTravelDistance"]).show()

+-------+-----------------+------------------+--------------+-------------------+
|summary|         baseFare|         totalFare|travelDuration|totalTravelDistance|
+-------+-----------------+------------------+--------------+-------------------+
|  count|         82138753|          82138753|      82138753|           76044221|
|   mean|292.6628527106287|  340.386849139608|          null| 1609.9033457519408|
| stddev|183.1887833739836|196.02950991170584|          null|   857.325734547569|
|    min|             0.01|             19.59|           P1D|                 89|
|    max|          7662.33|           8260.61|        PT9H9M|               7252|
+-------+-----------------+------------------+--------------+-------------------+



In [None]:
# Exploring categorical variables

df.groupBy("StartingAirport").agg(count("*").alias("count")).orderBy(col("count").desc()).show()
df.groupBy("destinationAirport").agg(count("*").alias("count")).orderBy(col("count").desc()).show()
df.groupBy("segmentsAirlineName").agg(count("*").alias("count")).orderBy(col("count").desc()).show()
df.groupBy("segmentsAirlineCode").agg(count("*").alias("count")).orderBy(col("count").desc()).show()
df.groupBy("segmentsCabinCode").agg(count("*").alias("count")).orderBy(col("count").desc()).show()

+---------------+-------+
|StartingAirport|  count|
+---------------+-------+
|            LAX|8073281|
|            LGA|5919323|
|            BOS|5883876|
|            SFO|5706482|
|            DFW|5674959|
|            ORD|5503476|
|            CLT|5494510|
|            ATL|5312028|
|            MIA|4930213|
|            PHL|4726187|
|            DEN|4697143|
|            DTW|4547052|
|            JFK|4425164|
|            EWR|3970797|
|            OAK|3809884|
|            IAD|3464378|
+---------------+-------+

+------------------+-------+
|destinationAirport|  count|
+------------------+-------+
|               LAX|8006721|
|               LGA|6093450|
|               DFW|5957280|
|               BOS|5801538|
|               ORD|5717699|
|               SFO|5586204|
|               CLT|5411448|
|               ATL|5211169|
|               MIA|5103390|
|               PHL|4703822|
|               DEN|4630696|
|               DTW|4456269|
|               JFK|4413765|
|              

In [None]:
# Calculating the average fare prices for each airline

df.groupBy("segmentsAirlineName").agg(avg("baseFare").alias("avgBaseFare"), avg("totalFare").alias("avgTotalFare")).orderBy(col("avgBaseFare").desc()).show()


+--------------------+------------------+------------------+
| segmentsAirlineName|       avgBaseFare|      avgTotalFare|
+--------------------+------------------+------------------+
|JetBlue Airways||...| 3174.415384615385| 3431.598461538462|
|American Airlines...|3172.4033505154616| 3429.437654639175|
|United||Cape Air|...|1880.2847222222215|2047.5519444444444|
|Cape Air||Delta||...|1781.3014285714285|            1934.0|
|Cape Air||Cape Ai...|1652.6216666666667|           1795.67|
|Cape Air||Cape Ai...| 833.7541747572814| 915.3861165048544|
|Hawaiian Airlines...|  829.431111111111|  881.312222222222|
|Cape Air||Cape Ai...|            825.86|             906.9|
|Delta||United||Ca...|            825.03|            906.01|
|Delta||United||Un...|   801.92698630137| 902.3435616438354|
|Delta||Cape Air||...| 786.3267800000001| 870.0805000000008|
|JetBlue Airways||...| 785.3941732283463| 863.3644094488186|
|Delta||United||Un...|           779.065|           875.695|
|Alaska Airlines||...| 7

In [None]:
# Calculating the number of flights between each airport pair

df.groupBy("startingAirport", "destinationAirport").agg(count("*").alias("count")).orderBy(col("count").desc()).show()

+---------------+------------------+------+
|startingAirport|destinationAirport| count|
+---------------+------------------+------+
|            ATL|               LAX|709809|
|            LAX|               BOS|679169|
|            LGA|               LAX|677713|
|            LAX|               ATL|669609|
|            LAX|               LGA|663659|
|            BOS|               LAX|644390|
|            LAX|               JFK|625496|
|            LAX|               ORD|620576|
|            DFW|               LAX|612390|
|            LAX|               DFW|610669|
|            JFK|               LAX|605017|
|            LAX|               DTW|601537|
|            ORD|               LAX|597847|
|            LAX|               EWR|587270|
|            DTW|               LAX|582022|
|            CLT|               LAX|572097|
|            JFK|               ORD|557152|
|            LAX|               CLT|554474|
|            LGA|               ORD|550319|
|            LAX|               

In [None]:
# Calculating the percentage of basic economy tickets sold, Refundable and NonStop 

df.groupBy("isBasicEconomy").agg(count("*").alias("count")).withColumn("percentage", col("count")/df.count()*100).show()
df.groupBy("isRefundable").agg(count("*").alias("count")).withColumn("percentage", col("count")/df.count()*100).show()
df.groupBy("isNonStop").agg(count("*").alias("count")).withColumn("percentage", col("count")/df.count()*100).show()

NameError: ignored

In [None]:
import pyspark.pandas as ps
import seaborn as sns
import matplotlib.pyplot as plt

# Converting PySpark DataFrame to PySpark.Pandas DataFrame
pdf = ps.DataFrame(df.select("baseFare", "totalFare").toPandas())

# Plotting histograms of baseFare and totalFare
fig, axs = plt.subplots(ncols=2, figsize=(10,5))
sns.histplot(pdf["baseFare"], bins=30, ax=axs[0])
sns.histplot(pdf["totalFare"], bins=30, ax=axs[1])

plt.show()



Py4JJavaError: ignored

In [None]:
# Plotting histograms of travelDuration and totalTravelDistance
fig, axs = plt.subplots(ncols=2, figsize=(10,5))
sns.histplot(pdf["travelDuration"], bins=30, ax=axs[0])
sns.histplot(pdf["totalTravelDistance"], bins=30, ax=axs[1])

plt.show()

In [None]:
# Plotting boxplots of seatsRemaining 
fig, axs = plt.subplots(ncols=2, figsize=(10,5))
sns.boxplot(x=pdf["seatsRemaining"], ax=axs[0])
plt.show()

In [None]:
# Converting PySpark DataFrame to PySpark.Pandas DataFrame
pdf = ps.DataFrame(df.select("segmentsAirlineName", "baseFare"))

# Plotting a box plot of baseFare for each airline
airlines = pdf["segmentsAirlineName"].unique()
airline_data = [pdf[pdf["segmentsAirlineName"] == a]["baseFare"].to_list() for a in airlines]

fig, ax = plt.subplots()
ax.boxplot(airline_data, labels=airlines)
ax.set_xlabel("Airline")
ax.set_ylabel("Base Fare")
plt.show()

In [None]:
# Converting PySpark DataFrame to PySpark.Pandas DataFrame
pdf = df.select("travelDuration").toPandas()

# Plotting histogram of travelDuration

plt.hist(pdf, bins=20)
plt.xlabel("Travel Duration")
plt.ylabel("Count")
plt.show()

In [None]:
fig, ax = plt.subplots(figsize=(8,6))

# Converting PySpark DataFrame to PySpark.Pandas DataFrame

pdf = df.select("baseFare", "totalFare").toPandas()

# Plotting a scatter plot between baseFare and totalFare
ax.scatter(pdf["baseFare"], pdf["totalFare"])
ax.set_xlabel("Base Fare")
ax.set_ylabel("Total Fare")

plt.show()

In [None]:
# creating a dataframe with searchDate and flightDate columns
date_df = df.select('searchDate', 'flightDate')

# converting date format to yyyy-MM-dd for plotting
date_df = date_df.withColumn('searchDate', F.date_format('searchDate', 'yyyy-MM-dd'))
date_df = date_df.withColumn('flightDate', F.date_format('flightDate', 'yyyy-MM-dd'))

# converting dataframe to pandas dataframe for plotting
date_pandas = date_df.toPandas()

# plotting the line graph
plt.plot('searchDate', 'flightDate', data=date_pandas)
plt.xticks(rotation=45)
plt.xlabel('Search Date')
plt.ylabel('Flight Date')
plt.title('Search Date vs. Flight Date')
plt.show()

In [None]:
!pip install pyspark_dist_explore

import pyspark_dist_explore as pde

# Selecting the relevant columns for correlation analysis
cor_df = df.select("travelDuration", "elapsedDays", "isBasicEconomy", "isRefundable", "isNonStop", "baseFare", "totalFare", "seatsRemaining", "totalTravelDistance", "segmentsDurationInSeconds", "segmentsDistance")

# Calculating the correlation matrix
corr_matrix = cor_df.toPandas().corr()

# Plotting the heatmap
fig, ax = pde.plot_corr(corr_matrix, figsize=(10,10))

In [None]:
# Creating a feature vector from the numeric columns
from pyspark.ml.feature import VectorAssembler

feature_cols = ["baseFare", "totalFare"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(df).select("features", "totalFare").cache()

# Splitting the data into training and testing sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=1234)

In [None]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import IsotonicRegression
from pyspark.ml.regression import GeneralizedLinearRegression

# Standardizing the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)
scaled_train_data = scaler.fit(train_data).transform(train_data)
scaled_test_data = scaler.fit(test_data).transform(test_data)

# Linear Regression
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="totalFare", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Decision Tree Regression
dt = DecisionTreeRegressor(featuresCol="scaledFeatures", labelCol="totalFare", maxDepth=5)

# Fitting the models

lr_model = lr.fit(scaled_train_data)
dt_model = dt.fit(scaled_train_data)

# Evaluate the models on the test set
lr_predictions = lr_model.transform(scaled_test_data)
lr_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")
lr_rmse = lr_evaluator.evaluate(lr_predictions)

dt_predictions = dt_model.transform(scaled_test_data)
dt_evaluator = RegressionEvaluator(labelCol="totalFare", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)

# New Section

In [None]:
print("Linear Regression RMSE:", lr_rmse)

In [None]:
print("Decision Tree Regression RMSE:", dt_rmse)