In [11]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("AirlineDataAnalysis") \
    .getOrCreate()

print("Spark initialized successfully!")


Spark initialized successfully!


In [13]:
# Paths to the datasets
file_paths = ["/Users/apple/Documents/CloudTech/Cloud Assignment/datasets/2015.csv", 
              "/Users/apple/Documents/CloudTech/Cloud Assignment/datasets/2016.csv", 
              "/Users/apple/Documents/CloudTech/Cloud Assignment/datasets/2015.csv"]

def load_data(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

#Combine all datasets into one DataFrame
dfs = [load_data(file) for file in file_paths if load_data(file) is not None]
data = dfs[0].union(dfs[1]).union(dfs[2]) if len(dfs) > 1 else dfs[0]

if data is None:
    raise Exception("No valid data loaded!")

print("Data loaded successfully!")

# Standardize column names (lowercase, trimmed)
data = data.toDF(*[col_name.strip().lower() for col_name in data.columns])

# Show schema and initial rows to verify
data.printSchema()
data.show(5)

                                                                                

Data loaded successfully!
root
 |-- fl_date: date (nullable = true)
 |-- op_carrier: string (nullable = true)
 |-- op_carrier_fl_num: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- crs_dep_time: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- dep_delay: double (nullable = true)
 |-- taxi_out: double (nullable = true)
 |-- wheels_off: double (nullable = true)
 |-- wheels_on: double (nullable = true)
 |-- taxi_in: double (nullable = true)
 |-- crs_arr_time: integer (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- cancelled: double (nullable = true)
 |-- cancellation_code: string (nullable = true)
 |-- diverted: double (nullable = true)
 |-- crs_elapsed_time: double (nullable = true)
 |-- actual_elapsed_time: double (nullable = true)
 |-- air_time: double (nullable = true)
 |-- distance: double (nullable = true)
 |-- carrier_delay: double (nullable = true

24/12/20 11:15:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   fl_date|op_carrier|op_carrier_fl_num|origin|dest|crs_dep_time|dep_time|dep_delay|taxi_out|wheels_off|wheels_on|taxi_in|crs_arr_time|arr_time|arr_delay|cancelled|cancellation_code|diverted|crs_elapsed_time|actual_elapsed_time|air_time|distance|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

In [15]:
from pyspark.sql.functions import *

In [17]:
# Step 1: Data Cleaning
# Select relevant columns for the analysis

columns_needed = ["origin", "dest", "fl_date"]
missing_columns = [col for col in columns_needed if col not in data.columns]
if missing_columns:
    raise Exception(f"Missing required columns: {missing_columns}")

# Select and clean the relevant columns
data_cleaned = data.select(*columns_needed).dropna()

# Trim whitespace and standardize case for origin and destination columns
data_cleaned = data_cleaned.withColumn("origin", trim(lower(col("origin")))) \
                           .withColumn("dest", trim(lower(col("dest"))))

print("Data cleaned successfully!")
data_cleaned.show(5)

Data cleaned successfully!
+------+----+----------+
|origin|dest|   fl_date|
+------+----+----------+
|   mco| fll|2015-01-01|
|   lga| fll|2015-01-01|
|   fll| mco|2015-01-01|
|   iah| las|2015-01-01|
|   iah| ord|2015-01-01|
+------+----+----------+
only showing top 5 rows



In [19]:
# Step 2: Create Route Column
# Combine origin and destination into a single Route column
data_routes = data_cleaned.withColumn("route", concat_ws("-", col("origin"), col("dest")))

In [21]:
# Step 3: Analyze Top 5 Routes
# Count flights per route
route_counts = data_routes.groupBy("route").agg(count("*").alias("flight_count")).orderBy(desc("flight_count"))

In [23]:
# Show top 5 routes
top_routes = route_counts.limit(5)
print("Top 5 Routes by Flight Count:")
top_routes.show()

Top 5 Routes by Flight Count:




+-------+------------+
|  route|flight_count|
+-------+------------+
|sfo-lax|       47915|
|lax-sfo|       46915|
|jfk-lax|       38789|
|lax-jfk|       38778|
|las-lax|       32795|
+-------+------------+



                                                                                

In [73]:
# Step 4: Additional Analysis - Monthly Flight Trends
# Extract year and month from fl_date
data_routes = data_routes.withColumn("year", year(col("fl_date"))) \
                         .withColumn("month", month(col("fl_date")))

In [75]:
# Group by year, month, and route
monthly_trends = data_routes.groupBy("year", "month", "route").agg(count("*").alias("monthly_flight_count"))
print("Monthly flight trends:")
monthly_trends.show(10)

Monthly flight trends:




+----+-----+-------+--------------------+
|year|month|  route|monthly_flight_count|
+----+-----+-------+--------------------+
|2015|    1|cvg-ord|                 328|
|2015|    1|dsm-ord|                 303|
|2015|    1|sea-fat|                  64|
|2015|    1|bzn-sfo|                   5|
|2015|    1|hnl-ewr|                  31|
|2015|    1|phl-sfo|                 178|
|2015|    1|sfo-fll|                 136|
|2015|    1|fll-stl|                  67|
|2015|    1|hou-clt|                  31|
|2015|    1|mci-lax|                  63|
+----+-----+-------+--------------------+
only showing top 10 rows



                                                                                

In [79]:
import shutil
import os

# Local paths for saving results
local_top_routes_path = "/Users/apple/Documents/CloudTech/Cloud Assignment/top_routes"
local_monthly_trends_path = "/Users/apple/Documents/CloudTech/Cloud Assignment/monthly_trends"

# Delete existing directories if they exist
if os.path.exists(local_top_routes_path):
    shutil.rmtree(local_top_routes_path)

if os.path.exists(local_monthly_trends_path):
    shutil.rmtree(local_monthly_trends_path)

# Save the top 5 routes and monthly trends to local paths
try:
    top_routes.write.csv(local_top_routes_path, header=True, mode="overwrite")
    monthly_trends.write.csv(local_monthly_trends_path, header=True, mode="overwrite")
    print(f"Analysis results saved successfully at:\n{local_top_routes_path}\n{local_monthly_trends_path}")
except Exception as e:
    print(f"Error saving results: {e}")




Analysis results saved successfully at:
/Users/apple/Documents/CloudTech/Cloud Assignment/top_routes
/Users/apple/Documents/CloudTech/Cloud Assignment/monthly_trends


                                                                                