# Configure environment

In [0]:
################# Import libraries #################

import pyspark.sql.functions as F
import pyspark.sql.types as T
import matplotlib.pyplot as plt
import pandas as pd
import graphframes as GF
import networkx as nx



# Raw data preprocessing

## Flight data schema definition

In [0]:
################# Define schema to load CSV data #################

air_schema = T.StructType([
    T.StructField("Year", T.IntegerType()),
    T.StructField("Quarter", T.IntegerType()),
    T.StructField("Month", T.IntegerType()),
    T.StructField("DayofMonth", T.IntegerType()),
    T.StructField("DayOfWeek", T.IntegerType()),
    T.StructField("FlightDate", T.StringType()),
    T.StructField("UniqueCarrier", T.StringType()),
    T.StructField("AirlineID", T.LongType()),
    T.StructField("Carrier", T.StringType()),
    T.StructField("TailNum", T.StringType()),
    T.StructField("FlightNum", T.IntegerType()),
    T.StructField("OriginAirportID", T.IntegerType()),
    T.StructField("OriginAirportSeqID", T.IntegerType()),
    T.StructField("OriginCityMarketID", T.IntegerType()),
    T.StructField("Origin", T.StringType()),
    T.StructField("OriginCityName", T.StringType()),
    T.StructField("OriginState", T.StringType()),
    T.StructField("OriginStateFips", T.IntegerType()),
    T.StructField("OriginStateName", T.StringType()),
    T.StructField("OriginWac", T.IntegerType()),
    T.StructField("DestAirportID", T.IntegerType()),
    T.StructField("DestAirportSeqID", T.IntegerType()),
    T.StructField("DestCityMarketID", T.IntegerType()),
    T.StructField("Dest", T.StringType()),
    T.StructField("DestCityName", T.StringType()),
    T.StructField("DestState", T.StringType()),
    T.StructField("DestStateFips", T.IntegerType()),
    T.StructField("DestStateName", T.StringType()),
    T.StructField("DestWac", T.IntegerType()),
    T.StructField("CRSDepTime", T.StringType()),
    T.StructField("DepTime", T.StringType()),
    T.StructField("DepDelay", T.DoubleType()),
    T.StructField("DepDelayMinutes", T.DoubleType()),
    T.StructField("DepDel15", T.DoubleType()),
    T.StructField("DepartureDelayGroups", T.IntegerType()),
    T.StructField("DepTimeBlk", T.StringType()),
    T.StructField("TaxiOut", T.DoubleType()),
    T.StructField("WheelsOff", T.StringType()),
    T.StructField("WheelsOn", T.StringType()),
    T.StructField("TaxiIn", T.DoubleType()),
    T.StructField("CRSArrTime", T.StringType()),
    T.StructField("ArrTime", T.StringType()),
    T.StructField("ArrDelay", T.DoubleType()),
    T.StructField("ArrDelayMinutes", T.DoubleType()),
    T.StructField("ArrDel15", T.DoubleType()),
    T.StructField("ArrivalDelayGroups", T.IntegerType()),
    T.StructField("ArrTimeBlk", T.StringType()),
    T.StructField("Cancelled", T.DoubleType()),
    T.StructField("CancellationCode", T.StringType()),
    T.StructField("Diverted", T.DoubleType()),
    T.StructField("CRSElapsedTime", T.DoubleType()),
    T.StructField("ActualElapsedTime", T.DoubleType()),
    T.StructField("AirTime", T.DoubleType()),
    T.StructField("Flights", T.DoubleType()),
    T.StructField("Distance", T.DoubleType()),
    T.StructField("DistanceGroup", T.IntegerType()),
    T.StructField("CarrierDelay", T.DoubleType()),
    T.StructField("WeatherDelay", T.DoubleType()),
    T.StructField("NASDelay", T.DoubleType()),
    T.StructField("SecurityDelay", T.DoubleType()),
    T.StructField("LateAircraftDelay", T.DoubleType()),
    T.StructField("FirstDepTime", T.StringType()),
    T.StructField("TotalAddGTime", T.StringType()),
    T.StructField("LongestAddGTime", T.StringType()),
    T.StructField("DivAirportLandings", T.StringType()),
    T.StructField("DivReachedDest", T.StringType()),
    T.StructField("DivActualElapsedTime", T.StringType()),
    T.StructField("DivArrDelay", T.StringType()),
    T.StructField("DivDistance", T.StringType()),
    T.StructField("Div1Airport", T.StringType()),
    T.StructField("Div1AirportID", T.StringType()),
    T.StructField("Div1AirportSeqID", T.StringType()),
    T.StructField("Div1WheelsOn", T.StringType()),
    T.StructField("Div1TotalGTime", T.StringType()),
    T.StructField("Div1LongestGTime", T.StringType()),
    T.StructField("Div1WheelsOff", T.StringType()),
    T.StructField("Div1TailNum", T.StringType()),
    T.StructField("Div2Airport", T.StringType()),
    T.StructField("Div2AirportID", T.StringType()),
    T.StructField("Div2AirportSeqID", T.StringType()),
    T.StructField("Div2WheelsOn", T.StringType()),
    T.StructField("Div2TotalGTime", T.StringType()),
    T.StructField("Div2LongestGTime", T.StringType()),
    T.StructField("Div2WheelsOff", T.StringType()),
    T.StructField("Div2TailNum", T.StringType()),
    T.StructField("Div3Airport", T.StringType()),
    T.StructField("Div3AirportID", T.StringType()),
    T.StructField("Div3AirportSeqID", T.StringType()),
    T.StructField("Div3WheelsOn", T.StringType()),
    T.StructField("Div3TotalGTime", T.StringType()),
    T.StructField("Div3LongestGTime", T.StringType()),
    T.StructField("Div3WheelsOff", T.StringType()),
    T.StructField("Div3TailNum", T.StringType()),
    T.StructField("Div4Airport", T.StringType()),
    T.StructField("Div4AirportID", T.StringType()),
    T.StructField("Div4AirportSeqID", T.StringType()),
    T.StructField("Div4WheelsOn", T.StringType()),
    T.StructField("Div4TotalGTime", T.StringType()),
    T.StructField("Div4LongestGTime", T.StringType()),
    T.StructField("Div4WheelsOff", T.StringType()),
    T.StructField("Div4TailNum", T.StringType()),
    T.StructField("Div5Airport", T.StringType()),
    T.StructField("Div5AirportID", T.StringType()),
    T.StructField("Div5AirportSeqID", T.StringType()),
    T.StructField("Div5WheelsOn", T.StringType()),
    T.StructField("Div5TotalGTime", T.StringType()),
    T.StructField("Div5LongestGTime", T.StringType()),
    T.StructField("Div5WheelsOff", T.StringType()),
    T.StructField("Div5TailNum", T.StringType())
])

## Process flight data and write to disk

In [0]:
################## Load raw data into DataFrame #################

# Load raw data CSV files
raw_df = spark.read.csv( 
        'dbfs:///FileStore/tables/msc_project/raw/On_Time*.csv', 
        header=True, 
        schema=air_schema,
        escape='"')


# Create DataFrame as subset of raw data with desired fields
airline_data = raw_df.select(
        "Year","Quarter","Month","DayofMonth","DayOfWeek","FlightDate","UniqueCarrier","AirlineID",
        "Carrier","TailNum","FlightNum","OriginAirportID","OriginAirportSeqID","OriginCityMarketID",
        "Origin","OriginCityName","OriginState","OriginStateFips","OriginStateName","OriginWac",
        "DestAirportID","DestAirportSeqID","DestCityMarketID","Dest","DestCityName","DestState",
        "DestStateFips","DestStateName","DestWac","CRSDepTime","DepTime","DepDelay","DepDelayMinutes",
        "DepDel15","DepartureDelayGroups","DepTimeBlk","TaxiOut","WheelsOff","WheelsOn","TaxiIn","CRSArrTime",
        "ArrTime","ArrDelay","ArrDelayMinutes","ArrDel15","ArrivalDelayGroups","ArrTimeBlk","Cancelled",
        "CancellationCode","Diverted","CRSElapsedTime","ActualElapsedTime","AirTime","Flights","Distance",
        "DistanceGroup","CarrierDelay","WeatherDelay","NASDelay","SecurityDelay","LateAircraftDelay"
    ).withColumn(
        'FlightDate', F.to_date(F.col('FlightDate'),'yyyy-MM-dd')
    )


# Write flight data to parquet database on disk, partitioning by year
airline_data.repartition('Year').write.partitionBy(
        "Year"
    ).parquet(
        'dbfs:///FileStore/tables/msc_project/processed_data/airline_data',
        mode='overwrite'
    )

In [0]:
(spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airline_data').select("FlightDate", "Origin", "OriginAirportID", "OriginCityName", "Dest", "DestAirportID", "DestCityName").limit(20).toPandas())

## Process lookup data

### Process airline lookup data and write to disk

In [0]:
#################### Load lookup data for airlines ####################

from pyspark.sql import Row

# Function to split airline ID and name
def mapAirlineIdRow(r):
    airline_id = int(r.Code)
    airline_name_parts = r.Description.split(':')
    airline_name = airline_name_parts[0].strip()
    iata_carrier = airline_name_parts[1].strip()
    out = Row(
        AirlineID=airline_id,
        AirlineName=airline_name,
        Carrier=iata_carrier
    )
    return out;

# Load airline lookup into DataFrame
airline_id_csv = spark.read.csv(
    'dbfs:///FileStore/tables/msc_project/raw/airline_id.csv',
    header=True,
    escape='"'
)

# Create new DataFrame with split 'Description' column
airline_id_df = airline_id_csv.rdd.map(mapAirlineIdRow).toDF().coalesce(1)


# Write airline lookup to parquet database
airline_id_df.write.parquet(
        'dbfs:///FileStore/tables/msc_project/processed_data/DOT_airline_codes_table',
        mode='overwrite'
    )

### Process airport lookup data and write to disk

#### Airport code

In [0]:
#################### Load US DOT airports code lookup data ####################

# Define schema to import lookup data
airport_schema = T.StructType([
    T.StructField("Code", T.StringType()),
    T.StructField("Description", T.StringType()),
])

# Function to split Description field
def mapAirportIdRow(r):
    airport_id = r.Code
    airport_city = ''
    airport_name = ''
    airport_name_parts = r.Description.split(':')
    if len(airport_name_parts) == 2:
        airport_city = airport_name_parts[0].strip()
        airport_name = airport_name_parts[1].strip()
    elif len(airport_name_parts) == 1:
        airport_city = airport_name_parts[0]
        airport_name = r.Code
    out = Row(
        AirportID=airport_id,
        City=airport_city,
        Name=airport_name
    )
    return out;

# Load airline lookup into DataFrame
airport_codes_csv = spark.read.csv(
    'dbfs:///FileStore/tables/msc_project/airport_code.csv',
    header=True,
    escape='"',
    schema=airport_schema
)

# Create new DataFrame with split 'Description' column
airport_codes_df = airport_codes_csv.rdd.map(mapAirportIdRow).toDF().coalesce(1)

# Write airline lookup to parquet database
airport_codes_df.write.parquet(
        'dbfs:///FileStore/tables/msc_project/processed_data/airport_codes_table',
        mode='overwrite'
    )

#### Airport ID

In [0]:
# Load US DOT airport IDs

airport_id_csv = spark.read.csv(
    'dbfs:///FileStore/tables/msc_project/airport_id.csv',
    header=True,
    escape='"',
    schema=airport_schema
)

# Create airport ID DataFrame

airport_id_df = (
    airport_id_csv
    .rdd.map(mapAirportIdRow)
    .toDF()
    .withColumn(
        'AirportID',
        F.col('AirportID').cast(T.IntegerType())
    )
    .coalesce(1)
)

# Write data to parquet files

airport_id_df.write.parquet(
        'dbfs:///FileStore/tables/msc_project/processed_data/airport_id_table',
        mode='overwrite'
    )

## Airport and Flight Count in Source Data

In [0]:
########################## Establish baseline counts from source data ##########################

# Read in flight data for 2019
airline_data2019 = spark.read.parquet(
    'dbfs:///FileStore/tables/msc_project/processed_data/airline_data'
).filter('Year = 2019')

# Count number of flights
flight_count = airline_data2019.count()


# Create DataFrame containing all known airports from lookup data
distinct_airports_count = spark.read.parquet(
    'dbfs:///FileStore/tables/msc_project/processed_data/airport_id_table'
).select(F.col("AirportID")).distinct().count()


# Count distinct flights for 2019
distinct_flight_count = spark.read.parquet(
    'dbfs:///FileStore/tables/msc_project/processed_data/airline_data'
).filter('Year = 2019').groupBy("OriginAirportID", "DestAirportID").count().count()


# Print results
print(f'No of flights for 2019: {flight_count}')
print(f'No of distinct flights for 2019: {distinct_flight_count}')
print(f'No of distinct airports: {distinct_airports_count}')

## Vertices and Edges Data Preparation

In [0]:
######################## Prepare data for vertices and edges #########################

# Read in 2019 flight
airline_data_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airline_data').filter('Year = 2019')

# Read in airport codes
airport_codes_df = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_codes_table')


# Prepare intermediate vertices DataFrame
airport_vertices = (
    airport_codes_df
    .withColumnRenamed('AirportID','id')
    .withColumnRenamed('Name','name')
    .select('id','name','City')
)

# Prepare intermediate edges DataFrame

airport_edges_2019 = (
    airline_data_2019
    .select(
        F.col('OriginAirportID').alias('src'),
        F.col('DestAirportID').alias('dst'),
        'AirlineID',
        'Year',
        F.format_string('%d-%02d',F.col('Year'),F.col('Month')).alias('YearMonth')
    )
    .join(
        airline_ids_df.select('AirlineID','AirlineName'),
        on='AirlineID',
        how='inner'
    )
    .drop('AirlineID')
).cache()


# Write vertices DataFrame to parquet database
airport_vertices.write.parquet(
        'dbfs:///FileStore/tables/msc_project/processed_data/airport_vertices',
        mode='overwrite'
    )


# Write 2019 edges DataFrame to parquet database
airport_edges_2019.write.parquet(
        'dbfs:///FileStore/tables/msc_project/processed_data/airport_edges_2019',
        mode='overwrite'
    )

## GraphFrames Graph

### Create Source DataFrames

In [0]:
# Create vertices DataFrame
airport_vertices = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_vertices')


# Creates edges DataFrame
airport_edges = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_edges_2019')

### Create GraphFrames Graph

In [0]:
# Create graph
graph = GF.GraphFrame(airport_vertices, airport_edges)

print(graph)

### Check numbers of edges and vertices

In [0]:
# Check nodes and edges
# GraphFrames duplicates edges with repeated flights

gf_edge_count = graph.edges.count()

gf_distinct_edge_count = graph.edges.groupBy("src", "dst").count().count()

gf_vertice_count = graph.vertices.count()

print(f'No of all edges (flights) in GF graph (with repetition): {gf_edge_count}')
print(f'No of distinct edges in GF graph: {gf_distinct_edge_count}')
print(f'No of vertices in GF graph: {gf_vertice_count}')

### Generate GF Graph InDegrees and OutDegrees

In [0]:
################## Generating Degree Counts for GraphFrames Graph ##################

# Read in vertices data to Pandas DataFrame
airport_vertices = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_vertices').toPandas()

# Get indegrees and join with vertices data to get airport names
gf_inDegrees2019 = graph.inDegrees.toPandas().merge(airport_vertices, on = "id")

# Get outdegrees and join with vertices data to get airport names
gf_outDegrees2019 = graph.outDegrees.toPandas().merge(airport_vertices, on = "id")


# Convert to Spark DataFrame
gf_inDegrees2019 = spark.createDataFrame(gf_inDegrees2019)

gf_outDegrees2019 = spark.createDataFrame(gf_outDegrees2019)

# Combine indegree and outdegree counts
gf_inout_2019 = gf_inDegrees2019.join(gf_outDegrees2019.select("id", "outDegree"), on = "id").select("id", "name", "City", "inDegree", "outDegree")


# Write inDegrees to disk
gf_inDegrees2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_inDegrees2019',mode='overwrite')

# Write outDegrees to disk
gf_outDegrees2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_outDegrees2019',mode='overwrite')

# Write combined inDegrees and outDegrees to disk
gf_inout_2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_inout_2019',mode='overwrite')

### Generate PageRank

In [0]:
# Generate 2019 airport pageranks
gf_pagerank_2019 = graph.pageRank(maxIter=10)


# Write ranking to disk
gf_pagerank_2019.vertices.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_pagerank_2019', mode='overwrite')

### Combine GraphFrames Degree and PageRank

In [0]:
################## Combine GF Degrees and HITS ranks ##################

# Read in GF indegree and outdegree data
gf_inout_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_inout_2019')

# Read in GF pagerank data
gf_pageranks_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_pagerank_2019')

# Join degree and pagerank data
gf_combined_degree_rank_2019 = gf_inout_2019.join(gf_pageranks_2019.select("id", "pagerank"), on = "id")

# Write data to disk
gf_combined_degree_rank_2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_combined_degree_rank_2019', mode = 'overwrite')

## NetworkX Graph

### Create Source DataFrames

In [0]:
######################### Generate counts of origin-destination pair for use as a weight attribute #########################

# Create edges DataFrame with count of each flight
airport_nxedges_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_edges_2019').groupBy('src', 'dst').count()

# Change 'count' column to 'weight'
airport_nxedges_2019 = airport_nxedges_2019.select("src", "dst", F.col("count").alias("weight"))


# Write NX edge counts to disk
airport_nxedges_2019.write.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_nxedges_2019', mode='overwrite')

In [0]:
######################### Read in vertices and edges data for NetworkX graph #########################

# Create vertices DataFrame
airport_nxvertices = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_vertices').toPandas()

# Read NX edge data
airport_nxedges_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/processed_data/airport_nxedges_2019').toPandas()

### Create NetworkX Graph

In [0]:
# Create NetworkX Graph
nxGraph2019 = nx.DiGraph()

# Create list of all airport IDs (vertices/nodes)
node_list = airport_nxvertices.values.tolist()

# Create list of all flights (edges)
edge_list = airport_nxedges_2019.values.tolist()

# Iteratively add nodes to NX graph
for node in node_list:
    nxGraph2019.add_node(node[0])

# Iteratively add edges to NX graph
for edge in edge_list:
    nxGraph2019.add_edge(edge[0], edge[1], weight=edge[2])

### Check numbers of edges and vertices

In [0]:
# Check number of vertices and edges matches GraphFrames graph
# Note: NX counts distinct vertices/nodes so count is same as distinct airports

nx_edge_count = len(list(nxGraph2019.edges))

nx_vertice_count = len(list(nxGraph2019.nodes))

print(f"No. of edges (flights) in NX graph: {nx_edge_count}")

print(f"No. of vertices/nodes (airports) in NX graph: {nx_vertice_count}")

### Generate NX HITS rankings

In [0]:
# Generate 2019 HITS ranks
hubValues2019, authorityValues2019 = nx.hits(nxGraph2019, max_iter = 10)

In [0]:
################ Convert hub and authority dictionaries to DataFrames ################

# Convert hub values dictionary
hubValues2019 = pd.DataFrame.from_dict(hubValues2019, orient = 'index').reset_index().rename(columns = {"index":"id", 0:"HubValue"})

# Convert authority values dictionary to a DataFrame
authorityValues2019 = pd.DataFrame.from_dict(authorityValues2019, orient = 'index').reset_index().rename(columns = {"index":"id", 0:"AuthorityValue"})

# Combine hub and authority DataFrames on node id
hitsranks_2019 = hubValues2019.join(authorityValues2019, lsuffix = '_caller', rsuffix = '_other').rename(columns = {"id_caller":"id"})

# Select needed columns
hitsranks_2019 = hitsranks_2019[["id", "HubValue", "AuthorityValue"]]


# Join with vertices DataFrame to get name and city of airport
hitsranks_2019 = hitsranks_2019.merge(airport_nxvertices, on = "id")

# Convert to Spark DataFrame
hitsranks_2019 = spark.createDataFrame(hitsranks_2019)

# Write data to parquet database on disk, partitioning by year and month of the year
hitsranks_2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/hitsranks_2019',mode='overwrite')

### Generate NX Graph InDegrees and outDegrees

In [0]:
################## Generate inDegrees ##################

# Indegrees 2019
nxInDegrees2019 = nxGraph2019.in_degree(weight = "weight")

# Convert to DataFrame and name columns
nxInDegrees2019 = pd.DataFrame([*nxInDegrees2019]).rename(columns = {0:"id", 1:"inDegrees"})

# Merge with Airport attributes
nxInDegrees2019 = nxInDegrees2019.merge(airport_nxvertices, on = "id")

# Convert to Spark DataFrame
nxInDegrees2019 = spark.createDataFrame(nxInDegrees2019)


################## Generate outDegrees ##################

# Outdegrees 2019
nxOutDegrees2019 = nxGraph2019.out_degree(weight = "weight")

# Convert to DataFrame and name columns
nxOutDegrees2019 = pd.DataFrame([*nxOutDegrees2019]).rename(columns = {0:"id", 1:"outDegrees"})

# Merge with Airport attributes
nxOutDegrees2019 = nxOutDegrees2019.merge(airport_nxvertices, on = "id")

# Convert to Spark DataFrame
nxOutDegrees2019 = spark.createDataFrame(nxOutDegrees2019)


################## Combine indegree and outDegree data ##################

# Combine indegree and outdegree data
nx_inout_2019 = nxInDegrees2019.join(nxOutDegrees2019.select("id", "outDegrees"), on = "id").select("id", "name", "City", "inDegrees", "outDegrees")

# Write 2019 inDegrees to parquet
nxInDegrees2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/nxInDegrees2019',mode='overwrite')

# Write 2019 outDegrees to parquet
nxOutDegrees2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/nxOutDegrees2019',mode='overwrite')

# Write combined inDegrees and outDegrees to parquet
nx_inout_2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/nx_inout_2019',mode='overwrite')

### Combine NX degree and ranking

In [0]:
################## Combine NX Degrees and HITS ranks ##################

# Read in NX indegree and outdegree data
nx_inout_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/nx_inout_2019')

# Read in NX HITS rank data
hitsranks_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/hitsranks_2019')

# Join degree and HITS rank data
nxcombined_degree_rank_2019 = nx_inout_2019.join(hitsranks_2019.select("id", "HubValue", "AuthorityValue"), on = "id")

# Write data to disk
# nxcombined_degree_rank_2019.write.parquet('dbfs:///FileStore/tables/msc_project/analysis/nxcombined_degree_rank_2019', mode = 'overwrite')

display(nxcombined_degree_rank_2019)

# Top n Rankings

## GraphFrames

In [0]:
######################### Get Top GF Airports #########################

# Select column to order by: "inDegree", "outDegree", "pagerank"
rank_by = "pagerank"


# Specify number of top airports
top_airport_count = 10


# Load saved calculated airport pagerank data and analyze for top N airports
gf_top_airports_2019 = spark.read.parquet(
    'dbfs:///FileStore/tables/msc_project/analysis/gf_combined_degree_rank_2019'
).orderBy(rank_by, ascending=False).limit(top_airport_count)



display(gf_top_airports_2019)

id,name,City,inDegree,outDegree,pagerank
10397,Hartsfield-Jackson Atlanta International,"Atlanta, GA",395026,395009,85.9096135916287
13930,Chicago O'Hare International,"Chicago, IL",339569,339606,80.22292719246487
11298,Dallas/Fort Worth International,"Dallas/Fort Worth, TX",304346,304344,78.2557952036805
11292,Denver International,"Denver, CO",252064,252026,62.925602564685946
11057,Charlotte Douglas International,"Charlotte, NC",235490,235496,51.40672098063955
12892,Los Angeles International,"Los Angeles, CA",219996,219952,44.24017551002431
13487,Minneapolis-St Paul International,"Minneapolis, MN",160955,160960,41.95790398432532
12266,George Bush Intercontinental/Houston,"Houston, TX",179682,179688,38.70907317438266
11433,Detroit Metro Wayne County,"Detroit, MI",161741,161768,38.45995192427848
14107,Phoenix Sky Harbor International,"Phoenix, AZ",175343,175328,35.98524303525492


## NetworkX

In [0]:
######################### Get Top NX Airports #########################

# Select column to order by: "inDegrees", "outDegrees", "HubValue", "AuthorityValue"
rank_by = "AuthorityValue"

# Specify number of records
top_airport_count = 10

# Load saved calculated airport pagerank data and analyze for top N airports
nx_top_airports_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/nxcombined_degree_rank_2019').orderBy(rank_by, ascending=False).limit(top_airport_count)

display(nx_top_airports_2019)

id,name,City,inDegrees,outDegrees,HubValue,AuthorityValue
10397,Hartsfield-Jackson Atlanta International,"Atlanta, GA",395026,395009,0.0346491189939222,0.0346276649083788
13930,Chicago O'Hare International,"Chicago, IL",339569,339606,0.0333522378622844,0.0333487114579796
12892,Los Angeles International,"Los Angeles, CA",219996,219952,0.0282838071096862,0.0283046355015831
11298,Dallas/Fort Worth International,"Dallas/Fort Worth, TX",304346,304344,0.0261054282926308,0.0261023093415864
11292,Denver International,"Denver, CO",252064,252026,0.0261144230373412,0.0260979565647016
12953,LaGuardia,"New York, NY",171665,171665,0.0226118690330458,0.0226176539366563
10721,Logan International,"Boston, MA",150554,150564,0.0224242476939394,0.0224198725620355
14771,San Francisco International,"San Francisco, CA",170966,170918,0.0222422566561243,0.0222236236891365
12889,McCarran International,"Las Vegas, NV",164043,164020,0.021070812613931,0.0210697524779774
11057,Charlotte Douglas International,"Charlotte, NC",235490,235496,0.0207056836095692,0.020695018202344


# Top n Rankings (Normalised)

## GraphFrames

In [0]:
######################### Get Top GF Airports #########################

# Select column to order by: "inDegree", "outDegree", "pagerank"
rank_by = "pagerank"


# Specify number of top airports
top_airport_count = 10


# Load saved calculated airport pagerank data and analyze for top N airports
gf_top_airports_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/gf_combined_degree_rank_2019').orderBy(rank_by, ascending=False).limit(top_airport_count)
# display(top_airports_rank_2019)

# Define normalisation function
def normDF(df):
    # Calculate range of indegree counts
    maxInDegree = df.agg(F.max("inDegree").alias("maxInDegree")).collect()[0].maxInDegree
    minInDegree = df.agg(F.min("inDegree").alias("minInDegree")).collect()[0].minInDegree
    rangeInDegree = maxInDegree - minInDegree
    
    # Calculate range of outdegree counts
    maxOutDegree = df.agg(F.max("outDegree").alias("maxOutDegree")).collect()[0].maxOutDegree
    minOutDegree = df.agg(F.min("outDegree").alias("minOutDegree")).collect()[0].minOutDegree
    rangeOutDegree = maxOutDegree - minOutDegree
    
    # Calculate range of pagerank counts
    maxPagerank = df.agg(F.max("pagerank").alias("maxPagerank")).collect()[0].maxPagerank
    minPagerank = df.agg(F.min("pagerank").alias("minPagerank")).collect()[0].minPagerank
    rangePagerank = maxPagerank - minPagerank
    
    # Add normalised columns to DataFrame
    gf_normalized_top = df.withColumn('normInDegree', (F.col('inDegree')-minInDegree)/rangeInDegree)
    gf_normalized_top = gf_normalized_top.withColumn('normOutDegree', (F.col('outDegree')-minOutDegree)/rangeOutDegree)
    gf_normalized_top = gf_normalized_top.withColumn('normPagerank', (F.col('pagerank')-minPagerank)/rangePagerank)
    
    return display(gf_normalized_top.select("id", "name", "City", "normInDegree", "normOutDegree", "normPagerank"));

# Execute function to output data
normDF(gf_top_airports_2019)

id,name,City,normInDegree,normOutDegree,normPagerank
10397,Hartsfield-Jackson Atlanta International,"Atlanta, GA",1.0,1.0,1.0
13930,Chicago O'Hare International,"Chicago, IL",0.7630761606521098,0.7632846113420694,0.886093979036901
11298,Dallas/Fort Worth International,"Dallas/Fort Worth, TX",0.6125961780827185,0.6126238522702511,0.8466917398727015
11292,Denver International,"Denver, CO",0.3892365991515394,0.3890894641720323,0.5396234189675043
11057,Charlotte Douglas International,"Charlotte, NC",0.3184290236723046,0.3184632277856346,0.3088967927591786
12892,Los Angeles International,"Los Angeles, CA",0.252235432838754,0.2520497844468466,0.1653487541810478
13487,Minneapolis-St Paul International,"Minneapolis, MN",0.0,0.0,0.119634176305261
12266,George Bush Intercontinental/Houston,"Houston, TX",0.0800056393145669,0.0800174322470935,0.0545591283129356
11433,Detroit Metro Wayne County,"Detroit, MI",0.003357955492137,0.0034522685420574,0.0495691555335516
14107,Phoenix Sky Harbor International,"Phoenix, AZ",0.0614685287797292,0.061388854470645,0.0


## NetworkX

### Top n Airports by HITS Rank

In [0]:
######################### Get Top NX Airports #########################

# Select column to order by: "inDegrees", "outDegrees", "HubValue", "AuthorityValue"
rank_by = "inDegrees"

# Specify number of records
top_airport_count = 10

# Load saved calculated airport pagerank data and analyze for top N airports
nx_top_airports_2019 = spark.read.parquet('dbfs:///FileStore/tables/msc_project/analysis/nxcombined_degree_rank_2019').orderBy(rank_by, ascending=False).limit(top_airport_count)
# display(nx_top_airports_2019)

# Define normalisation function
def normDF(df):
    
    sumInDegree = df.agg(F.sum("inDegrees").alias("sumInDegree")).collect()[0].sumInDegree
    
    sumOutDegree = df.agg(F.sum("outDegrees").alias("sumOutDegree")).collect()[0].sumOutDegree
    
    # Calculate range of indegree counts
    maxInDegree = df.agg(F.max("inDegrees").alias("maxInDegree")).collect()[0].maxInDegree
    minInDegree = df.agg(F.min("inDegrees").alias("minInDegree")).collect()[0].minInDegree
    rangeInDegree = maxInDegree - minInDegree
    
    # Calculate range of outdegree counts
    maxOutDegree = df.agg(F.max("outDegrees").alias("maxOutDegree")).collect()[0].maxOutDegree
    minOutDegree = df.agg(F.min("outDegrees").alias("minOutDegree")).collect()[0].minOutDegree
    rangeOutDegree = maxOutDegree - minOutDegree
    
    # Add normalised columns to DataFrame
    nx_normalized_top = df.withColumn('normInDegree', (F.col('inDegrees')-minInDegree)/rangeInDegree)
    nx_normalized_top = nx_normalized_top.withColumn('normOutDegree', (F.col('outDegrees')-minOutDegree)/rangeOutDegree)
    
    # Output DataFrame with notmalised column
    return display(nx_normalized_top.select("id", "name", "normInDegree", "AuthorityValue", "normOutDegree", "HubValue"));

# Execute function to output data
normDF(nx_top_airports_2019)

id,name,normInDegree,AuthorityValue,normOutDegree,HubValue
10397,Hartsfield-Jackson Atlanta International,1.0,0.0346276649083788,1.0,0.0346491189939222
13930,Chicago O'Hare International,0.752490404355976,0.0333487114579796,0.7527656175393033,0.0333522378622844
11298,Dallas/Fort Worth International,0.595286976702669,0.0261023093415864,0.5954099004422311,0.0261054282926308
11292,Denver International,0.361947692582344,0.0260979565647016,0.3619422466765734,0.0261144230373412
11057,Charlotte Douglas International,0.2879764348835133,0.020695018202344,0.2881775707190382,0.0207056836095692
12892,Los Angeles International,0.2188253146478621,0.0283046355015831,0.218812892976514,0.0282838071096862
12266,George Bush Intercontinental/Houston,0.038900294563956,0.019044420412917,0.0391358867602893,0.0190395126788521
14107,Phoenix Sky Harbor International,0.019534945996608,0.0196867408678777,0.019679505201012,0.0196906421933799
12953,LaGuardia,0.0031197000803356,0.0226176539366563,0.0033334672075183,0.0226118690330458
14771,San Francisco International,0.0,0.0222236236891365,0.0,0.0222422566561243
