In [1]:
import configparser
import sys
from pyspark.rdd import RDD
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkContext, SparkConf
import faulthandler
from pyspark.sql.functions import col, count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row
from pyspark.sql import Column
from pyspark.sql.functions import sum
from pyspark.storagelevel import StorageLevel

In [2]:
def data_writer_parquet(df: DataFrame, mode: str, path: str) -> None:
    (df
     .write
     .mode(mode)
     .parquet(path))

In [3]:
def loading_data_set_to_df(path: str, spark: SparkSession) -> DataFrame:
    df = (spark
          .read
          .format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .load(path))

    return df


In [4]:
def find_all_the_flight_that_canceled(flightDF: DataFrame) -> DataFrame:
    canceled_flight = (flightDF
                       .select('*')
                       .where('CANCELLED = 1'))

    return canceled_flight

In [5]:
def find_airlines_total_number_of_flights_cancelled(flightDF: DataFrame, airlineDF: DataFrame) -> DataFrame:
    join_airline_flights_df = (
        flightDF.join(airlineDF.withColumnRenamed('AIRLINE', 'AIRLINE_NAME'),
                      flightDF['AIRLINE'] == airlineDF['IATA_CODE'],
                      'inner')
    )

    all_cancelled_flights = (
        join_airline_flights_df
            .select('*')
            .where('CANCELLED = 1')
    )

    airline_and_number_flights_cancelled = (
        all_cancelled_flights
            .groupby('AIRLINE_NAME')
            .count()
            .withColumnRenamed('count', 'TOTAL_NUMBER_FLIGHTS_CANCELLED')
            .orderBy('TOTAL_NUMBER_FLIGHTS_CANCELLED')
    ).collect()

    schema = StructType(
        [
            StructField('AIRLINE_NAME', StringType(), True),
            StructField('TOTAL_NUMBER_OF_FLIGHTS_CANCELLED', StringType(), True)
        ]
    )

    df = spark.createDataFrame(data=airline_and_number_flights_cancelled, schema=schema)

    return df


In [6]:
def find_total_number_of_departure_flight_from_airport_to(flightDF: DataFrame, airportDF: DataFrame) -> DataFrame:
    all_departure_flights = (
        flightDF
            .select('*')
            .where('CANCELLED != 1')
    )

    join_flights_and_airports = (
        all_departure_flights.join(airportDF,
                                   all_departure_flights['ORIGIN_AIRPORT'] == airportDF['IATA_CODE'],
                                   'inner')
    )

    total_departure_flights_from_each_airport = (
        join_flights_and_airports
            .groupby('AIRPORT')
            .count()
            .withColumnRenamed('count', 'TOTAL_NUMBER_DEPARTURE_FLIGHTS')
            .orderBy('TOTAL_NUMBER_DEPARTURE_FLIGHTS')
    ).persist(storageLevel=StorageLevel.MEMORY_AND_DISK).collect()

    schema = StructType(
        [
            StructField('AIRPORT_NAME', StringType(), True),
            StructField('TOTAL_NUMBER_DEPARTURE_FLIGHTS', StringType(), True)
        ]
    )

    df = spark.createDataFrame(data=total_departure_flights_from_each_airport, schema=schema)

    return df


In [7]:
def find_max_flight_cancelled_airline(flightDF: DataFrame, airlineDF: DataFrame) -> DataFrame:
    cancelled_flights = (
        flightDF
            .select('*')
            .where('CANCELLED = 1')
    )

    join_flights_and_airline = (
        cancelled_flights.join(airlineDF.withColumnRenamed('AIRLINE', 'AIRLINE_NAME')
                               , flightDF['AIRLINE'] == airlineDF['IATA_CODE'], 'inner')
    )

    find_max_cancelled_airline = (
        join_flights_and_airline.groupby('AIRLINE_NAME')
            .count()
            .withColumnRenamed('count', 'TOTAL_NUMBER_CANCELLED_FLIGHTS')
            .orderBy(col('TOTAL_NUMBER_CANCELLED_FLIGHTS').desc())
            .limit(1)
    ).persist(storageLevel=StorageLevel.MEMORY_AND_DISK).collect()

    schema = StructType(
        [
            StructField('AIRLINE_NAME', StringType(), True),
            StructField('TOTAL_NUMBER_CANCELLED_FLIGHTS', StringType(), True)
        ]
    )

    df = spark.createDataFrame(data=find_max_cancelled_airline, schema=schema)

    return df

In [8]:
def find_total_distance_flown_each_airline(flightDF: DataFrame, airlineDF: DataFrame) -> DataFrame:
    cancelled_flights = flightDF.select('*').where('CANCELLED == 1')

    join_flights_and_airline = (
        cancelled_flights.join(airlineDF.withColumnRenamed('AIRLINE', 'AIRLINE_NAME')
                               , flightDF['AIRLINE'] == airlineDF['IATA_CODE'], 'inner')
    )

    columns_name = join_flights_and_airline.columns

    filter_col = list(filter(lambda x: x != 'AIRLINE_NAME' and x != 'DISTANCE', columns_name))

    new_df = join_flights_and_airline.drop(*filter_col)

    total_distance_flown = (
        new_df
            .groupby(col('AIRLINE_NAME'))
            .agg(sum('DISTANCE').alias('TOTAL_DISTANCE'))
            .orderBy('TOTAL_DISTANCE', ascending=False)
    ).persist(storageLevel=StorageLevel.MEMORY_AND_DISK).collect()

    schema = StructType(
        [
            StructField('AIRLINE_NAME', StringType(), True),
            StructField('TOTAL_DISTANCE', StringType(), True)
        ]
    )

    df = spark.createDataFrame(data=total_distance_flown, schema=schema)

    return df


In [9]:
def find_average_departure_delay_of_airliner(flightDF: DataFrame, airlineDF: DataFrame) -> DataFrame:
    filter_flight_data = (
        flightDF.filter(flightDF.DEPARTURE_DELAY is not None and flightDF.DEPARTURE_DELAY > 0)
    )

    join_flights_and_airline = (
        filter_flight_data.join(airlineDF.withColumnRenamed('AIRLINE', 'AIRLINE_NAME')
                                , flightDF['AIRLINE'] == airlineDF['IATA_CODE'], 'inner')
    )

    columns_name = join_flights_and_airline.columns

    filter_col = list(filter(lambda x: x != 'AIRLINE_NAME' and x != 'DEPARTURE_DELAY', columns_name))

    delayed_flights_df = join_flights_and_airline.drop(*filter_col)

    airline_flight_count = (
        delayed_flights_df
            .groupby('AIRLINE_NAME')
            .agg(sum('DEPARTURE_DELAY').alias('TOTAL_DELAY'),
                 count('AIRLINE_NAME').alias('TOTAL_DELAYED_FLIGHTS'))
    ).persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

    result_df = (
        airline_flight_count
            .withColumn('AVERAGE', airline_flight_count.TOTAL_DELAY / airline_flight_count.TOTAL_DELAYED_FLIGHTS)
            .orderBy('AVERAGE', ascending=False)
    ).persist(storageLevel=StorageLevel.MEMORY_AND_DISK).collect()

    schema = StructType(
        [
            StructField('AIRLINE_NAME', StringType(), True),
            StructField('TOTAL_DELAY', StringType(), True),
            StructField('TOTAL_DELAYED_FLIGHTS', StringType(), True),
            StructField('AVERAGE', StringType(), True)
        ]
    )

    df = spark.createDataFrame(data=result_df, schema=schema)

    return df

In [10]:
config = configparser.ConfigParser()

conf = SparkConf().setAppName('FlightDataAnalysis').setMaster('spark://172.26.176.1:7077')

conf.set('spark.executor.memory', '32G')
conf.set('spark.driver.memory', '32G')
conf.set("spark.sql.shuffle.partitions", '200')
#conf.set('spark.dynamicAllocation.enabled', 'true')
#conf.set('spark.dynamicAllocation.minExecutors', '2')
#conf.set('spark.dynamicAllocation.maxExecutors', '20')

sc = SparkContext(conf=conf)

spark = (SparkSession
        .builder.config(conf=conf)
        # .master("local[*]")
        # .config("spark.executor.memory", "16G")
        # .config("spark.driver.memory", "16G")
        # .appName("FlightDataAnalysis")
         .getOrCreate())

data_source_path = 'C:\\Users\\rahin\\source-code\\PycharmProjects\\PySpark-Filght-Data-Analysis\\2015_flights_data'

flightDF = loading_data_set_to_df(path=data_source_path + '/flights.csv', spark=spark)
airlineDF = loading_data_set_to_df(path=data_source_path + '/airlines.csv', spark=spark)
airportDF = loading_data_set_to_df(path=data_source_path + '/airports.csv', spark=spark)

In [11]:
total_flight_cancelled_by_airline_name = find_airlines_total_number_of_flights_cancelled(flightDF=flightDF,
                                                                                                 airlineDF=airlineDF)
total_flight_cancelled_by_airline_name.show(20, truncate=False)

+----------------------------+---------------------------------+
|AIRLINE_NAME                |TOTAL_NUMBER_OF_FLIGHTS_CANCELLED|
+----------------------------+---------------------------------+
|Hawaiian Airlines Inc.      |171                              |
|Virgin America              |534                              |
|Frontier Airlines Inc.      |588                              |
|Alaska Airlines Inc.        |669                              |
|Spirit Air Lines            |2004                             |
|Delta Air Lines Inc.        |3824                             |
|US Airways Inc.             |4067                             |
|JetBlue Airways             |4276                             |
|United Air Lines Inc.       |6573                             |
|Skywest Airlines Inc.       |9960                             |
|American Airlines Inc.      |10919                            |
|American Eagle Airlines Inc.|15025                            |
|Atlantic Southeast Airli

In [12]:
total_departure_flights_from_each_airport = find_total_number_of_departure_flight_from_airport_to(
            flightDF=flightDF, airportDF=airportDF)
total_departure_flights_from_each_airport.show(20, truncate=False)

+----------------------------------------------------------+------------------------------+
|AIRPORT_NAME                                              |TOTAL_NUMBER_DEPARTURE_FLIGHTS|
+----------------------------------------------------------+------------------------------+
|Ithaca Tompkins Regional Airport                          |30                            |
|King Salmon Airport                                       |63                            |
|Gustavus Airport                                          |76                            |
|Dillingham Airport                                        |77                            |
|St. Cloud Regional Airport                                |78                            |
|Barnstable Municipal Airport                              |82                            |
|Adak Airport                                              |89                            |
|Wilmington Airport                                        |95                  

In [13]:
total_departure_flights_from_each_airport = find_max_flight_cancelled_airline(
            flightDF=flightDF, airlineDF=airlineDF)
total_departure_flights_from_each_airport.show(20, truncate=False)

+----------------------+------------------------------+
|AIRLINE_NAME          |TOTAL_NUMBER_CANCELLED_FLIGHTS|
+----------------------+------------------------------+
|Southwest Airlines Co.|16043                         |
+----------------------+------------------------------+



In [14]:
total_distance_flown = find_total_distance_flown_each_airline(
            flightDF=flightDF, airlineDF=airlineDF)
total_distance_flown.show(20, truncate=False)

+----------------------------+--------------+
|AIRLINE_NAME                |TOTAL_DISTANCE|
+----------------------------+--------------+
|American Airlines Inc.      |10296318      |
|Southwest Airlines Co.      |10146965      |
|United Air Lines Inc.       |8062931       |
|Atlantic Southeast Airlines |7113045       |
|American Eagle Airlines Inc.|6130194       |
|Skywest Airlines Inc.       |4232860       |
|JetBlue Airways             |3943059       |
|Delta Air Lines Inc.        |3045010       |
|US Airways Inc.             |2735834       |
|Spirit Air Lines            |1900041       |
|Virgin America              |759501        |
|Alaska Airlines Inc.        |578235        |
|Frontier Airlines Inc.      |555826        |
|Hawaiian Airlines Inc.      |88107         |
+----------------------------+--------------+



In [15]:
total_delayed_average = find_average_departure_delay_of_airliner(
            flightDF=flightDF, airlineDF=airlineDF)
total_delayed_average.show(20, truncate=False)

+----------------------------+-----------+---------------------+------------------+
|AIRLINE_NAME                |TOTAL_DELAY|TOTAL_DELAYED_FLIGHTS|AVERAGE           |
+----------------------------+-----------+---------------------+------------------+
|Frontier Airlines Inc.      |1554161    |34893                |44.54076748918121 |
|Spirit Air Lines            |2183787    |52089                |41.924149052583076|
|Atlantic Southeast Airlines |6938681    |169897               |40.840515135641006|
|American Eagle Airlines Inc.|3764411    |93726                |40.16399931715853 |
|Skywest Airlines Inc.       |6726185    |171572               |39.20327908982818 |
|JetBlue Airways             |3839241    |102061               |37.6171211334398  |
|American Airlines Inc.      |8451818    |245904               |34.370396577526186|
|United Air Lines Inc.       |8364090    |256550               |32.60218281036835 |
|Virgin America              |708056     |23379                |30.285983147

In [27]:
iata_code = 'ANC'

cancelled_flights = flightDF.select('*').where('CANCELLED = 1').where(flightDF.ORIGIN_AIRPORT == iata_code)

join_airline_flight = join_flights_and_airline = (
        cancelled_flights.join(airportDF.withColumnRenamed('AIRPORT', 'AIRPORT_NAME')
                               , flightDF['ORIGIN_AIRPORT'] == airportDF['IATA_CODE'], 'inner')
    )

filter_col = list(filter(lambda x: x != 'AIRPORT_NAME' and x != 'DEPARTURE_DELAY', columns_name))


+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+---------+--------------------+---------+-----+-------+--------+----------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|IATA_CODE|        AIRPORT_NAME|     CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+----+-----+---+-----------+-------+-------------+------