In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg,count

In [None]:
#initialize sparksession
spark=SparkSession.builder.appName("Flight Analysis").getOrCreate()

In [None]:
flights=spark.read.format("csv").option("header","true").load("departuredelays.csv")

In [None]:
#craete a temporary view of the Data Frame
flights.createOrReplaceTempView("flights")

In [None]:
#perform analysis using spark sql
#Example 1:avg delay by origin airport
avg_delay_by_origin=spark.sql("""SELECT origin, AVG(delay) as avg_delay
FROM flights
GROUP BY origin
ORDER BY avg_delay
DESC LIMIT 10""")

In [None]:
avg_delay_by_origin.show()

+------+------------------+
|origin|         avg_delay|
+------+------------------+
|   GUM| 33.87777777777778|
|   LSE|26.532467532467532|
|   MQT| 23.87012987012987|
|   EGE| 20.57012542759407|
|   ROA|19.885106382978723|
|   MDW|19.657658556043078|
|   BTV|  18.7246192893401|
|   ORD|18.588917606028524|
|   IAD| 18.40343803056027|
|   SCE| 17.91616766467066|
+------+------------------+



In [None]:
#example2:total flights and average delay by day of week
flights_by_day=spark.sql("""SELECT date,count(*) as total_flights,
AVG(delay) as avg_delay
FROM flights
GROUP BY date
ORDER BY date
LIMIT 7 """)

In [None]:
flights_by_day.collect()

[Row(date='01010005', total_flights=1, avg_delay=-8.0),
 Row(date='01010010', total_flights=1, avg_delay=-6.0),
 Row(date='01010020', total_flights=2, avg_delay=-1.0),
 Row(date='01010023', total_flights=1, avg_delay=14.0),
 Row(date='01010025', total_flights=2, avg_delay=15.0),
 Row(date='01010029', total_flights=1, avg_delay=49.0),
 Row(date='01010030', total_flights=3, avg_delay=-5.666666666666667)]

In [None]:
flights_by_day.show()

+--------+-------------+------------------+
|    date|total_flights|         avg_delay|
+--------+-------------+------------------+
|01010005|            1|              -8.0|
|01010010|            1|              -6.0|
|01010020|            2|              -1.0|
|01010023|            1|              14.0|
|01010025|            2|              15.0|
|01010029|            1|              49.0|
|01010030|            3|-5.666666666666667|
+--------+-------------+------------------+



In [None]:
#Example 3: top5 routres with the highest total delay
top_delayed_routes=spark.sql("""SELECT origin,destination,SUM(delay) as total_delay,
COUNT(*) as flight_count
FROM flights
GROUP BY origin,destination
ORDER BY total_delay DESC
LIMIT 5""")

In [None]:
top_delayed_routes.show()

+------+-----------+-----------+------------+
|origin|destination|total_delay|flight_count|
+------+-----------+-----------+------------+
|   LAX|        SFO|    51844.0|        3198|
|   ORD|        SFO|    41653.0|        1731|
|   SFO|        LAX|    40798.0|        3232|
|   LGA|        ATL|    35761.0|        2500|
|   JFK|        LAX|    35755.0|        2720|
+------+-----------+-----------+------------+



In [None]:
#show results
print("Top 10 origins by average delay:")
avg_delay_by_origin.show()
print("Flights and average delay by day(first week):")
flights_by_day.show()
print("Top 5 routes with highest total delay:")
top_delayed_routes.show()

Top 10 origins by average delay:
+------+------------------+
|origin|         avg_delay|
+------+------------------+
|   GUM| 33.87777777777778|
|   LSE|26.532467532467532|
|   MQT| 23.87012987012987|
|   EGE| 20.57012542759407|
|   ROA|19.885106382978723|
|   MDW|19.657658556043078|
|   BTV|  18.7246192893401|
|   ORD|18.588917606028524|
|   IAD| 18.40343803056027|
|   SCE| 17.91616766467066|
+------+------------------+

Flights and average delay by day(first week):
+--------+-------------+------------------+
|    date|total_flights|         avg_delay|
+--------+-------------+------------------+
|01010005|            1|              -8.0|
|01010010|            1|              -6.0|
|01010020|            2|              -1.0|
|01010023|            1|              14.0|
|01010025|            2|              15.0|
|01010029|            1|              49.0|
|01010030|            3|-5.666666666666667|
+--------+-------------+------------------+

Top 5 routes with highest total delay:
+---

In [None]:
#stop the spark session
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,avg,count,hour,month

In [None]:
spark=SparkSession.builder.appName("Extended flights analysis").getOrCreate()

In [None]:
flights=spark.read.format("csv").option("header","true").option("inferSchema","true").load("departuredelays.csv")

In [None]:
#create a temporary view of the dataframe
flights.createOrReplaceTempView("flights")

In [None]:
#example 4: avg delay by origin airport(top10)
avg_delay_by_origin=spark.sql("""SELECT origin,AVG(delay) as avg_delay
FROM flights
GROUP BY origin
ORDER BY avg_delay DESC
LIMIT 10""")

In [None]:
#example 5: top5 busiest routes
busiest_routes=spark.sql("""SELECT origin,destination,COUNT(*) as flight_count
FROM flights
GROUP BY origin,destination
ORDER BY flight_count DESC
LIMIT 5""")

In [None]:
#example6: monthly flight trends
monthly_trends=spark.sql("""SELECT SUBSTRING(CAST(date AS STRING),5,2) as month,COUNT(*) as total_flights,
AVG(delay) as avg_delay
FROM flights
GROUP BY SUBSTRING(CAST(date AS STRING),5,2)
ORDER BY month""")

In [None]:
#example 7: percentage of delayed flights by origin
delayed_percentage=spark.sql("""SELECT origin,COUNT(*) as total_flights,
SUM(CASE WHEN delay > 0 THEN 1 ELSE 0 END) as delayed_flights,
(SUM(CASE WHEN delay >0 THEN 1 ELSE 0 END)*100.0/ COUNT(*)) as delayed_percentage
FROM flights
GROUP BY origin
ORDER BY delayed_percentage DESC
LIMIT 10""")

In [None]:
#example 8: average delay by hour of day
delay_by_hour=spark.sql("""SELECT CAST(SUBSTRING(CAST(date AS STRING),10,2) AS INT) as hour,
AVG(delay) as avg_delay
FROM flights
GROUP BY SUBSTRING(CAST(date AS STRING),10,2)
ORDER BY hour""")

In [None]:
# SHow results
print("Top 10 origins by average delay:")
avg_delay_by_origin.show()
print("Top 5 busiest routes:")
busiest_routes.show()
print("Monthly flight trends:")
monthly_trends.show()
print("Top 10 origins by delayed percentage:")
delayed_percentage.show()
print("Average delay by hour of day:")
delay_by_hour.show()

Top 10 origins by average delay:
+------+------------------+
|origin|         avg_delay|
+------+------------------+
|   GUM| 33.87777777777778|
|   LSE|26.532467532467532|
|   MQT| 23.87012987012987|
|   EGE| 20.57012542759407|
|   ROA|19.885106382978723|
|   MDW|19.657658556043078|
|   BTV|  18.7246192893401|
|   ORD|18.588917606028524|
|   IAD| 18.40343803056027|
|   SCE| 17.91616766467066|
+------+------------------+

Top 5 busiest routes:
+------+-----------+------------+
|origin|destination|flight_count|
+------+-----------+------------+
|   SFO|        LAX|        3232|
|   LAX|        SFO|        3198|
|   LAS|        LAX|        3016|
|   LAX|        LAS|        2964|
|   JFK|        LAX|        2720|
+------+-----------+------------+

Monthly flight trends:
+-----+-------------+------------------+
|month|total_flights|         avg_delay|
+-----+-------------+------------------+
|   00|        30649|11.788965382231067|
|   01|        25780|11.812063615205586|
|   02|        22

In [None]:
spark.stop()