In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "10g").appName("Parquet Reader").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/21 16:48:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/21 16:48:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# Initialize the airline variable globally
airline = None

def read_parquet_and_print_top(parquet_path):
    global airline
    # Read Parquet files into a DataFrame
    airline = spark.read.parquet(parquet_path)

# Initialize Spark session and read Parquet file
parquet_path = "./airline.parquet"
read_parquet_and_print_top(parquet_path)

In [None]:
# get cols 
airline.printSchema()

In [None]:
flights_month = {}

months = airline.select('Month').distinct().collect()
months = [row.Month for row in months]
# assign total and delayed for each month
for month in months:
    total = airline.filter(airline.Month == month).count()
    delayed = airline.filter(airline.Month == month).filter(airline.ArrDelay > 0).count()
    flights_month[month] = {'total': total, 'delayed': delayed}
print(flights_month)

In [None]:
airline.columns

In [None]:
# records where delay is more than 15
delayed_flights = airline.filter((airline.ArrDelay > 15) & (airline.Distance > 2000)).count()
delayed_flights

In [5]:
airline.head(5)

24/03/21 16:50:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(Year=2019, Quarter=2, Month=4, DayofMonth=30, DayOfWeek=2, FlightDate=datetime.date(2019, 4, 30), Reporting_Airline='AA', DOT_ID_Reporting_Airline=19805, IATA_CODE_Reporting_Airline='AA', Tail_Number='N733UW', Flight_Number_Reporting_Airline=2119, OriginAirportID=10721, OriginAirportSeqID=1072102, OriginCityMarketID=30721, Origin='BOS', OriginCityName='Boston, MA', OriginState='MA', OriginStateFips=25, OriginStateName='Massachusetts', OriginWac=13, DestAirportID=11278, DestAirportSeqID=1127805, DestCityMarketID=30852, Dest='DCA', DestCityName='Washington, DC', DestState='VA', DestStateFips=51, DestStateName='Virginia', DestWac=38, CRSDepTime=1600, DepTime=1556, DepDelay=-4.0, DepDelayMinutes=0.0, DepDel15=0.0, DepartureDelayGroups=-1, DepTimeBlk='1600-1659', TaxiOut=15.0, WheelsOff='1611', WheelsOn='1723', TaxiIn=3.0, CRSArrTime=1745, ArrTime=1726, ArrDelay=-19.0, ArrDelayMinutes=0.0, ArrDel15=0.0, ArrivalDelayGroups=-2, ArrTimeBlk='1700-1759', Cancelled=0.0, CancellationCode=None

In [None]:
def get_top_airports(airline, years):
    top_airports = []
    result = []
    for year in years:
        # Filter flights by the specified year and on-time departures
        on_time_flights = airline.filter(
            (airline['Year'] == year) & (airline['DepDelay'] <= 0))

        # Group flights by originating airport and count occurrences also get the OriginStateName and OriginCityName
        airport_counts = on_time_flights.groupBy( 'Origin', 'OriginStateName', 'OriginCityName').count() 
        top_airports_year = airport_counts.orderBy(
            airport_counts['count'].desc()).limit(3)

        # get the OriginStateName and OriginCityName
        top_airports = []
        for i, airport in enumerate(top_airports_year.collect(), start=1):
            location = airport['OriginCityName'] + ', ' + airport['OriginStateName']    
            top_airports.append(
                {"airport": airport['Origin'], "count": airport['count'] ,"location":location, "year": year})
        result += top_airports
    return result

years = [1987, 1997, 2007, 2017]
result = get_top_airports(airline,years)
result

In [4]:
def get_worst_performing_airlines(airline):
    # Filter flights in the 20th century (years 1900-1999)
    century_flights = airline.filter(
        (airline['Year'] >= 1900) & (airline['Year'] <= 1999))


    # calculate average delay
    average_delay = century_flights.groupBy('Reporting_Airline').avg('ArrDelay')
    average_delay = average_delay.withColumnRenamed('avg(ArrDelay)', 'avg_delay')

    # Filter flights with an arrival delay greater than average delay
    worst_performing_airlines = century_flights.join(average_delay, on='Reporting_Airline')
    worst_performing_airlines = worst_performing_airlines.filter(worst_performing_airlines['ArrDelay'] > worst_performing_airlines['avg_delay'])
    worst_performing_airlines = worst_performing_airlines.groupBy('Reporting_Airline').count().orderBy('count', ascending=False).limit(3)

    worst_performing_airlines_list = worst_performing_airlines.collect()

    # print("The top three worst performing airlines in the 20th century are:")
    data = []
    for i, airline in enumerate(worst_performing_airlines_list, start=1):
        # print(
        #     f"{i}. {airline['Reporting_Airline']}, Total Delays: {airline['count']}")
        data.append(
            {"airline": airline['Reporting_Airline'], "total_delays": airline['count']})
    return data

get_worst_performing_airlines(airline)


                                                                                

[{'airline': 'DL', 'total_delays': 3442010},
 {'airline': 'US', 'total_delays': 3143834},
 {'airline': 'AA', 'total_delays': 2717380}]

In [6]:
# print unique values of Reporting_Airline
airline.select('Reporting_Airline').distinct().show()

                                                                                

+-----------------+
|Reporting_Airline|
+-----------------+
|               UA|
|               NK|
|               AA|
|               EV|
|               B6|
|               DL|
|               OO|
|               F9|
|               YV|
|               MQ|
|               OH|
|               HA|
|               G4|
|               YX|
|               AS|
|               VX|
|               WN|
|               9E|
|               US|
|               FL|
+-----------------+
only showing top 20 rows

