In [1]:
sc.addPyFile("/opt/spark/jars/graphframes-0.8.2-spark3.2-s_2.12.jar")

In [2]:
import numpy as np
from graphframes import GraphFrame
from pyspark.sql.functions import *

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Read departuredelays.csv in Edge DataFrame
### Read airport-codes-na.txt in Vertix DataFrame (the separator is Tab i.e sep = '\t' )

#### The US flight delays data set has five columns:
- The <b>date</b> column contains an integer like 02190925 . When converted, this maps to 02-19 09:25 am.
- The <b>delay</b> column gives the delay in minutes between the scheduled and actual departure times. Early departures show negative numbers.
- The <b>distance</b> column gives the distance in miles from the origin airport to the destination airport.
- The <b>origin</b> column contains the origin IATA airport code.
- The <b>destination</b> column contains the destination IATA airport code.

#### The airport-codes data set has four columns:
- The <b>IATA</b> column contains IATA airport code.
- The <b>City, State, and Country</b> columns contains information about the airport location.

In [4]:
ed = spark.read.csv("/home/el-neshwy/Documents/DataFrameworks/Day7/Lab7/departuredelays.csv", inferSchema=False, header=True)
ver = spark.read.option("delimiter", "\t").csv("/home/el-neshwy/Documents/DataFrameworks/Day7/Lab7/airport-codes-na.txt",header=True)

### In the vertix DataFrame, drop any duplicated rows with the same  IATA code.

In [5]:
ver2 = ver.dropDuplicates(["IATA"])

### In the edges DataFrame:
- Rename the <b>date</b> columns to become <b>tripid</b>.
- Rename the <b>origin</b> columns to become <b>src</b>.
- Rename the <b>destination</b> columns to become <b>dst</b>.

In [6]:
ed2 = ed.withColumnRenamed('date','tripid')
ed3 = ed2.withColumnRenamed('origin', 'src')
ed4 = ed3.withColumnRenamed('destination', 'dst')

### In the Vertix DataFrame:
- Rename the <b>IATA</b> columns to become <b>id</b>.

In [7]:
ver3 = ver2.withColumnRenamed('IATA', 'id')

### Create GraphFrame from Vertix and Edges DataFrames

In [8]:
ver3.show()
ed4.show()
graph = GraphFrame(ver3, ed4)

+-------------+-----+-------+---+
|         City|State|Country| id|
+-------------+-----+-------+---+
|    Allentown|   PA|    USA|ABE|
|      Abilene|   TX|    USA|ABI|
|  Albuquerque|   NM|    USA|ABQ|
|     Aberdeen|   SD|    USA|ABR|
|       Albany|   GA|    USA|ABY|
|    Nantucket|   MA|    USA|ACK|
|         Waco|   TX|    USA|ACT|
|       Eureka|   CA|    USA|ACV|
|Atlantic City|   NJ|    USA|ACY|
|       Kodiak|   AK|    USA|ADQ|
|   Alexandria|   LA|    USA|AEX|
|      Augusta|   GA|    USA|AGS|
|       Athens|   GA|    USA|AHN|
|     Alliance|   NE|    USA|AIA|
|  King Salmon|   AK|    USA|AKN|
|       Albany|   NY|    USA|ALB|
|     Waterloo|   IA|    USA|ALO|
|      Alamosa|   CO|    USA|ALS|
|  Walla Walla|   WA|    USA|ALW|
|     Amarillo|   TX|    USA|AMA|
+-------------+-----+-------+---+
only showing top 20 rows

+--------+-----+--------+---+---+
|  tripid|delay|distance|src|dst|
+--------+-----+--------+---+---+
|01011245|    6|     602|ABE|ATL|
|01020600|   -8|     3



### Determine the number of airports

In [9]:
number_of_airports = graph.vertices.count()
number_of_airports

524

### Determine the number of trips

In [10]:
number_of_trips = graph.edges.count()
number_of_trips

1391578

### What is the longest delay?

In [11]:
longest_delay = graph.edges.sort('delay',ascending=False).first().delay
longest_delay

'995'

In [12]:
graph.edges.sort("delay",ascending = False).show()

+--------+-----+--------+---+---+
|  tripid|delay|distance|src|dst|
+--------+-----+--------+---+---+
|01090600|  995|     462|SMF|SLC|
|03191420|  994|    1590|SJC|ORD|
|01200645|  993|     525|MOT|DEN|
|03230811|   99|     132|ATL|TYS|
|03301756|   99|     925|FLL|EWR|
|03051305|   99|     799|CLT|HOU|
|01011530|   99|    1138|HRL|MSP|
|03301439|   99|     678|ATL|HPN|
|01061605|   99|     599|ATL|IAH|
|02211459|   99|     910|SRQ|LGA|
|01102005|   99|     703|IAH|COS|
|03171605|   99|     966|BDL|TPA|
|03101059|   99|     188|IAH|DAL|
|02071445|   99|     206|TUL|DFW|
|01040807|   99|    1034|IAD|IAH|
|03091755|   99|     519|BHM|DFW|
|01201144|   99|     674|PSP|DEN|
|02010840|   99|     454|XNA|ORD|
|01011800|   99|     122|GGG|DFW|
|03301009|   99|     543|BHM|DTW|
+--------+-----+--------+---+---+
only showing top 20 rows



### Find out the number of delayed flights vs. early flights (flights that departed before actual time)

In [13]:
### delayed flights
delayed_flights = graph.edges.filter(graph.edges.delay < 0).count()
delayed_flights

668729

In [14]:
### early flights
early_flights = graph.edges.filter(graph.edges.delay > 0).count()
early_flights

591727

### What flight destinations departing SFO are most likely to have significant delays? Select the top 10
#### Hint: you should get the average delay for each destination for trips that depart from SFO only

In [15]:
SFO_delays = graph.edges.filter('src = "SFO"').groupBy('dst').agg(avg('delay').alias('avg')).sort('avg',ascending=False)
SFO_delays.show(10)

+---+------------------+
|dst|               avg|
+---+------------------+
|JAC| 30.78846153846154|
|OKC|24.822222222222223|
|SUN|22.696629213483146|
|COS| 22.58888888888889|
|SAT|             22.16|
|STL|         20.203125|
|HNL|19.982608695652175|
|ASE|19.846153846153847|
|CEC|19.089820359281436|
|MDW|18.771929824561404|
+---+------------------+
only showing top 10 rows



### Find the Incoming connections to the airport sorted in Desc. order.

In [16]:
Incoming_Desc = graph.edges.groupBy('dst').agg(count('dst').alias('count')).sort('count',ascending=False)
Incoming_Desc.show()

+---+-----+
|dst|count|
+---+-----+
|ATL|90434|
|DFW|66050|
|ORD|61967|
|LAX|53601|
|DEN|50921|
|IAH|42700|
|PHX|39721|
|SFO|38988|
|LAS|32994|
|CLT|28388|
|MCO|27959|
|EWR|27652|
|LGA|25469|
|BOS|25360|
|SLC|25323|
|JFK|23484|
|DTW|23310|
|SEA|23074|
|MSP|22385|
|MIA|21805|
+---+-----+
only showing top 20 rows



### Find the Outgoing connections from the airport sorted in Desc. order.

In [17]:
Outgoing_Desc = graph.edges.groupBy('src').agg(count('src').alias('count')).sort('count',ascending=False)
Outgoing_Desc.show()

+---+-----+
|src|count|
+---+-----+
|ATL|91484|
|DFW|68482|
|ORD|64228|
|LAX|54086|
|DEN|53148|
|IAH|43361|
|PHX|40155|
|SFO|39483|
|LAS|33107|
|CLT|28402|
|MCO|28313|
|EWR|27656|
|SLC|25868|
|LGA|25458|
|BOS|25348|
|MSP|24031|
|JFK|23572|
|DTW|23421|
|SEA|23078|
|MIA|21817|
+---+-----+
only showing top 20 rows



23/10/08 15:10:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Use motif finding to answer this question: which delays could we blame on SFO?
#### Hint: this practically means that SFO is a transit station

In [18]:
blame_on_SFO = graph.find('(v1)-[e1]->(SFO);(SFO)-[e2]->(v2)')\
.select('v1.id','e1.delay','SFO.id','e2.delay','v2.id')\
.filter('SFO.id = "SFO" and e2.delay > 0')
blame_on_SFO.show()



+---+-----+---+-----+---+
| id|delay| id|delay| id|
+---+-----+---+-----+---+
|ABQ|   -7|SFO|   55|JFK|
|ABQ|   -7|SFO|  134|DFW|
|ABQ|   -7|SFO|   32|ORD|
|ABQ|   -7|SFO|    3|DFW|
|ABQ|   -7|SFO|  124|ORD|
|ABQ|   -7|SFO|  139|LAX|
|ABQ|   -7|SFO|  133|JFK|
|ABQ|   -7|SFO|  113|ORD|
|ABQ|   -7|SFO|    8|LAX|
|ABQ|   -7|SFO|   18|MIA|
|ABQ|   -7|SFO|    2|DFW|
|ABQ|   -7|SFO|    9|ORD|
|ABQ|   -7|SFO|  326|ORD|
|ABQ|   -7|SFO|    1|DFW|
|ABQ|   -7|SFO|   34|ORD|
|ABQ|   -7|SFO|    1|DFW|
|ABQ|   -7|SFO|  190|ORD|
|ABQ|   -7|SFO|    9|LAX|
|ABQ|   -7|SFO|  111|JFK|
|ABQ|   -7|SFO|  103|DFW|
+---+-----+---+-----+---+
only showing top 20 rows



### Determine Airport Ranking in Desc. order using PageRank algorithm

In [19]:
sampled_vertices = graph.filterVertices(f"rand() < {0.2}").vertices
sampled_edges = graph.filterEdges(f"rand() < {0.2}").edges

new_graph = GraphFrame(sampled_vertices,sampled_edges)

In [20]:
Airport_Ranking = new_graph.pageRank(resetProbability=0.15, maxIter=10).vertices.orderBy('pagerank', ascending=False)
Airport_Ranking.show()

+------------+-----+-------+---+-------------------+
|        City|State|Country| id|           pagerank|
+------------+-----+-------+---+-------------------+
|  Fort Myers|   FL|    USA|RSW|  2.455802452294041|
| Kansas City|   MO|    USA|MCI| 1.1249698059223985|
| Bakersfield|   CA|    USA|BFL| 0.3586020942149372|
|    Montreal|   PQ| Canada|YUL|0.23164307824414976|
|  Mason City|   IA|    USA|MCW|0.23164307824414976|
|        Elko|   NV|    USA|EKO|0.23164307824414976|
|     Laramie|   WY|    USA|LAR|0.23164307824414976|
|  Farmington|   NM|    USA|FMN|0.23164307824414976|
|         Eek|   AK|    USA|EEK|0.23164307824414976|
| Grand Forks|   ND|    USA|GFK|0.23164307824414976|
|      Casper|   WY|    USA|CPR|0.23164307824414976|
|     Hyannis|   MA|    USA|HYA|0.23164307824414976|
|Medicine Hat|   AB| Canada|YXH|0.23164307824414976|
+------------+-----+-------+---+-------------------+



## Determine the most popular flights (single city hops)

In [21]:
popular_flights = graph.edges.groupBy('src','dst').agg(count('*').alias('count')).sort('count',ascending=False)
popular_flights.show()

+---+---+-----+
|src|dst|count|
+---+---+-----+
|SFO|LAX| 3232|
|LAX|SFO| 3198|
|LAS|LAX| 3016|
|LAX|LAS| 2964|
|JFK|LAX| 2720|
|LAX|JFK| 2719|
|ATL|LGA| 2501|
|LGA|ATL| 2500|
|LAX|PHX| 2394|
|PHX|LAX| 2387|
|HNL|OGG| 2380|
|OGG|HNL| 2379|
|LAX|SAN| 2215|
|SAN|LAX| 2214|
|SJC|LAX| 2208|
|LAX|SJC| 2201|
|ATL|MCO| 2136|
|MCO|ATL| 2090|
|JFK|SFO| 2084|
|SFO|JFK| 2084|
+---+---+-----+
only showing top 20 rows



### Find and Save a Subragph that obtained from the following pattern:
#### The flight starts from an airport and return back to the same airport through 2 other airports.

In [28]:
sampled_vertices = graph.filterVertices(f"rand() < {0.1}").vertices
sampled_edges = graph.filterEdges(f"rand() < {0.1}").edges

new_graph = GraphFrame(sampled_vertices,sampled_edges)

In [29]:
flight_from_airport_return_2_airports = new_graph.find('(v1)-[e1]->(v2);(v2)-[e2]->(v3);(v3)-[e3]->(v1)')

In [30]:
e1 = flight_from_airport_return_2_airports.select("e1.src", "e1.dst", "e1.tripid", "e1.delay", "e1.distance")
e2 = flight_from_airport_return_2_airports.select("e2.src", "e2.dst", "e2.tripid", "e2.delay", "e2.distance")
e3 = flight_from_airport_return_2_airports.select("e3.src", "e3.dst", "e3.tripid", "e3.delay", "e3.distance")
eset1 = e1.union(e2)
eset2= eset1.union(e3)
e = eset2.dropDuplicates()

In [31]:
v1 = flight_from_airport_return_2_airports.select("v1.id", "v1.Country", "v1.State", "v1.City")
v2 = flight_from_airport_return_2_airports.select("v2.id", "v2.Country", "v2.State", "v2.City")
v3 = flight_from_airport_return_2_airports.select("v3.id", "v3.Country", "v3.State", "v3.City")
vset1 = v1.union(v2)
vset2 = vset1.union(v3)
v = vset2.dropDuplicates()

In [32]:
sub_graph = GraphFrame(v,e)

In [33]:
sub_graph.edges.show()

23/10/08 15:10:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+---+---+--------+-----+--------+
|src|dst|  tripid|delay|distance|
+---+---+--------+-----+--------+
|PHX|DCA|01050945|    3|    1719|
|ATL|DCA|02081320|   -3|     475|
|PHX|PDX|01221935|    3|     877|
|TUS|ATL|01131352|  111|    1339|
|DCA|ATL|02261400|   -6|     475|
|PVD|ATL|02191807|    0|     785|
|DCA|ATL|01291900|  175|     475|
|ATL|TUL|02202008|    0|     586|
|CMH|ATL|02030705|   -1|     388|
|DCA|ATL|02252000|   -9|     475|
|PHX|ABQ|02161335|   -1|     285|
|PDX|ATL|02110915|   -5|    1888|
|ATL|CMH|03041729|   14|     388|
|TUL|ATL|02131029|    0|     586|
|PHX|ABQ|03301435|   25|     285|
|TUL|PHX|03070630|   -3|     813|
|ATL|CMH|02101510|   -1|     388|
|PHX|PDX|01270850|   25|     877|
|ATL|PHX|02081425|   -3|    1379|
|PHX|ATL|01130700|   -4|    1379|
+---+---+--------+-----+--------+
only showing top 20 rows




                                                                                