# Analyzing Flight Interconnected Data


## Table of Contents
1. [Project Overview](#project-overview)
2. [Data Source Description](#data-source-description)
3. [Project Steps](#project-steps)
4. [Conclusion](#conclusion)


## 1. Project Overview




## 2. Data Source Description

## 3. Projet Steps

### 3.1 Initialize PySpark Session


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Page Rank Flights") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.ui.retainedStages", 100) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


23/11/26 18:11:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.sql.functions import col, count, avg

### 3.2 Data Loading


In [3]:
# Load the dataset
flights_df = spark.read.csv("../data/2018.csv", header=True, inferSchema=True)
flights_df.show(5)

23/11/26 18:11:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

### 3.3 Exploring Data

In [23]:
flights_df.printSchema()

root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEATHER_DELAY: do

> Given the schema, we can define the vertices and edges as follows:

* Vertices: Unique airports from the ORIGIN and DEST columns.
* Edges: Each flight from an origin airport to a destination airport. Weights are the count of number of flights from origin to dest.

### 3.4 Data Cleaning and Preparation

In [4]:
flights_df = flights_df.na.drop(subset=["ORIGIN", "DEST"])


#### 3.4.1 Defining vertics

In [5]:
# Creating vertices DataFrame
airports_df = flights_df.select("ORIGIN").distinct().union(
    flights_df.select("DEST").distinct()
).distinct().withColumnRenamed("ORIGIN", "id")

# Displaying the vertices DataFrame
airports_df.show(5)




+---+
| id|
+---+
|BNA|
|CLT|
|TVC|
|CLL|
|CGI|
+---+
only showing top 5 rows



                                                                                

#### 3.4.2 Defining edges

In [6]:
edges_df = flights_df.groupBy("ORIGIN", "DEST").agg(
    count("*").alias("flight_count")
).withColumnRenamed("ORIGIN", "src").withColumnRenamed("DEST", "dst")

In [7]:
edges_df.show(5)



+---+---+------------+
|src|dst|flight_count|
+---+---+------------+
|ANC|DEN|         459|
|BNA|IAH|        1864|
|TPA|SFO|         381|
|ORD|PDX|        2762|
|ANC|OTZ|         671|
+---+---+------------+
only showing top 5 rows



                                                                                

## 3.5 Implementing Page Rank

In [8]:
from pyspark.sql.functions import col, lit, sum

In [9]:
airports_df = airports_df.withColumn("rank", lit(1.0))


In [10]:
damping_factor = 0.85
max_iter = 10

In [11]:
for iteration in range(max_iter):
    # Join edges with ranks
    ranks_with_edges = edges_df.join(airports_df, edges_df.src == airports_df.id)\
                               .select("dst", "rank")

    # Calculate contributions
    contributions = ranks_with_edges.groupBy("dst")\
                                    .agg(sum("rank").alias("contributions"))

    # Update ranks
    airports_df = airports_df.join(contributions, airports_df.id == contributions.dst, "left_outer")\
                             .select(airports_df.id, 
                                     (col("contributions") * damping_factor + (1 - damping_factor)).alias("rank"))
    
    # Handling vertices without incoming edges
    airports_df = airports_df.na.fill(1.0 - damping_factor, ["rank"])


In [12]:
airports_df.orderBy("rank", ascending=False).show(5)



+---+--------------------+
| id|                rank|
+---+--------------------+
|ATL|8.291052955909923...|
|ORD|8.126526736172300...|
|DEN|8.029572613266470...|
|DFW|7.897698069076595...|
|CLT|7.442846123136857...|
+---+--------------------+
only showing top 5 rows



                                                                                

- ATL: Hartsfield-Jackson Atlanta International Airport
- ORD: Chicago O'Hare International Airport
- DEN: Denver International Airport
- DFW: Dallas/Fort Worth International Airport
- CLT: Charlotte Douglas International Airport
- MSP: Minneapolis-Saint Paul International Airport
- LAS: McCarran International Airport (Las Vegas)
- DTW: Detroit Metropolitan Wayne County Airport
- IAH: George Bush Intercontinental Airport (Houston)
- MCO: Orlando International Airport
- PHX: Phoenix Sky Harbor International Airport
- EWR: Newark Liberty International Airport
- LAX: Los Angeles International Airport
- PHL: Philadelphia International Airport
- DCA: Ronald Reagan Washington National Airport
- SLC: Salt Lake City International Airport
- FLL: Fort Lauderdale-Hollywood International Airport
- BWI: Baltimore/Washington International Thurgood Marshall Airport
- SEA: Seattle-Tacoma International Airport
- SFO: San Francisco International Airport


### Using gephi

In [15]:
# Assuming 'airports_df' contains the final PageRank values
airports_df.select("id", "rank").write.csv("../airoports_temp.csv", header=True)


                                                                                

In [14]:
edges_df = edges_df.withColumnRenamed("src", "Source").withColumnRenamed("dst", "Target")
edges_df.coalesce(1).write.csv("../edges_temp.csv", header=True)


                                                                                

Data Preparation for Gephi Visualization

1. Set permissions for edges_temp.csv and airports_temp.csv:

   ```bash
   sudo chmod -R 777 edges_temp.csv
   sudo chmod -R 777 airoports_temp.csv
2. Consolidates all CSV files within the edges_temp.csv directory into a single file named edges.csv. This step is essential for organizing the data for Gephi visualization, especially when we save distributed data into separate CSV files.
   ```bash
   cat edges_temp.csv/*.csv > edges.csv
   cat airoports_temp.csv/*.csv > airports.csv

### Using Plotly to show top 50 graph 

In [None]:
top_airports_df = airports_df.orderBy("rank", ascending=False).limit(50)
# Get a list of top 10 airport IDs
top_airport_ids = top_airports_df.select("id").rdd.flatMap(lambda x: x).collect()

# Filter edges to include only those that connect the top 10 airports
top_edges_df = edges_df.filter(edges_df.src.isin(top_airport_ids) & edges_df.dst.isin(top_airport_ids))
import networkx as nx



In [39]:
# Convert Spark DataFrame to Pandas DataFrame
top_airports_pd = top_airports_df.toPandas()
top_edges_pd = top_edges_df.toPandas()


# Assuming you have top_airports_df and top_edges_df as Pandas DataFrames
G = nx.DiGraph()

# Add nodes with attributes (e.g., PageRank scores)
for idx, row in top_airports_pd.iterrows():
    G.add_node(row['id'], rank=row['rank'])

# Add edges
for idx, row in top_edges_pd.iterrows():
    G.add_edge(row['src'], row['dst'])
    
nx.write_gexf(G, "../graph.gexf")   

                                                                                

In [40]:
import plotly.graph_objs as go
import networkx as nx

# Assuming G is your graph and pos is the position dictionary for nodes
pos = nx.spring_layout(G)

# Edges
edge_x = []
edge_y = []
for edge in G.edges():
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x.extend([x0, x1, None])
    edge_y.extend([y0, y1, None])

edge_trace = go.Scatter(
    x=edge_x, y=edge_y,
    line=dict(width=0.5, color='#888'),
    hoverinfo='none',
    mode='lines')

# Nodes
node_x = []
node_y = []
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)

# Calculate node sizes based on PageRank
max_rank = top_airports_pd['rank'].max()
node_sizes = [10 + 20 * (rank / max_rank) for rank in top_airports_pd['rank']]

node_trace = go.Scatter(
    x=node_x, y=node_y,
    mode='markers',
    hoverinfo='text',
    marker=dict(
        showscale=True,
        colorscale='YlGnBu',
        reversescale=True,
        size=node_sizes,  # Use the calculated sizes
        colorbar=dict(
            thickness=15,
            title='Node Connections',
            xanchor='left',
            titleside='right'
        ),
        line_width=2))

# Draw the graph
fig = go.Figure(data=[edge_trace, node_trace],
                layout=go.Layout(
                    title='<br>Network graph made with Python',
                    titlefont_size=16,
                    showlegend=False,
                    hovermode='closest',
                    margin=dict(b=20,l=5,r=5,t=40),
                    xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                    yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))
                )



In [37]:
fig.write_html("../test1.html")