Rename this file with your name.

For the following questions, you need to use two files that I used in my lecture 8 (\lecture8\Data\). If you executed lecture 8 code successfully, you should have the required files uploaded into your Databricks. Otherwise, upload two CSV files into DBFS. Note that I had two csv files inside “/FileStore/tables/” folder: “airport_codes-e62ed.csv” and “departuredelays.csv”

Once you have the files, execute the following code block (once) to create a graph.

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from graphframes import *

# Set File Paths
airportsnaFilePath = "/FileStore/tables/airport_codes-e62ed.csv"
tripdelaysFilePath = "/FileStore/tables/departuredelays.csv"


airportsSchema = StructType([StructField("city",StringType(), True), \
                             StructField("state", StringType(), True),\
                             StructField("country", StringType(), True), \
                             StructField("iata", StringType(), True)])


# Obtain airports dataset
airportna = spark.read.format("csv"). \
              option("header", "true"). \
              schema(airportsSchema). \
              load(airportsnaFilePath)


# Obtain departure Delays data

departureDelaysSchema = StructType([StructField("id", IntegerType(), True), \
                             StructField("delay", IntegerType(), True),\
                             StructField("distance", IntegerType(), True), \
                             StructField("src", StringType(), True), \
                             StructField("dst", StringType(), True)])

departureDelays = spark.read.format("csv"). \
              option("header", "true"). \
              schema(departureDelaysSchema). \
              load(tripdelaysFilePath)

#joined dataframe

joinedDf = departureDelays.join(airportna, departureDelays.dst == airportna.iata, how='left')

tripVertices = airportna.withColumnRenamed("iata", "id")

tripEdges = joinedDf.select(joinedDf['id'], joinedDf['delay'], joinedDf['distance'],joinedDf['src'],joinedDf['dst'],joinedDf['city'], joinedDf['state'])

tripGraph = GraphFrame(tripVertices, tripEdges)

tripGraph.vertices.show(2)
tripGraph.edges.show(2)

Question 1: Find the number of flights that departs from SFO and arrives in ATL with a delay of 300 minutes.

Possible output: 2

In [4]:
display(tripGraph.edges.filter("src = 'SFO' and dst = 'ATL' and delay > 300"))


#What destinations tend to have significant delays departing from SFO

#display(tripGraph.edges.filter("src = 'SFO' and delay > 300"))

id,delay,distance,src,dst,city,state
1301204,329,1859,SFO,ATL,Atlanta,GA
2122356,376,1859,SFO,ATL,Atlanta,GA


Question 2: Show the top 4 routes that have the longest distance between two airports.

Possible answer: 


|src|dst|max(distance)|
|---|---|-------------|
|HNL|JFK|         4330|
|JFK|HNL|         4330|
|HNL|EWR|         4312|
|EWR|HNL|         4312|

In [6]:
display(tripGraph.edges.orderBy("distance").limit(4))
#fix me
#display(ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(5))


id,delay,distance,src,dst,city,state
3122210,0,21,IAD,DCA,Washington DC,
2071330,38,23,HRL,BRO,Brownsville,TX
3041040,-9,27,WRG,PSG,,
3021040,-24,27,WRG,PSG,,


Question 3: Find the average delay for the flights departing from Atlanta to other airports. Show only the top 4 records based on the delay (in ascending order based on delay).

Possible output:



|src|dst|        avg(delay)|
|---|---|------------------|
|ATL|ONT|              59.4|
|ATL|TTN| 38.98275862068966|
|ATL|GRB|30.288888888888888|
|ATL|PWM|30.023529411764706|

In [8]:
from pyspark.sql.functions import avg, min, max
Delays = tripGraph.edges.groupBy().avg("delay")
display(Delays.filter("src = 'ATL'"))


Question 4: Find the number of incoming flights to SFO airport. 

Possible answer:


| id|inDegree|
|---|--------|
|SFO|   38988|

In [10]:
tripGraph.inDegrees("src = 'SFO'").count()

Question 5: First, calculate shortest paths from different airports to SFO. Then filter the results so that it only contains airports (i.e., include id, city, state and distances) that do not have any connections to SFO (try size(distance) = 0). Then sort the airport based on the ID and show the first 5 airports (you are allowed to use multiple datagrames/graphframes/lines)


| id|         city|state|distances|
|---|-------------|-----|---------|
|ABR|     Aberdeen|   SD|       []|
|ACK|    Nantucket|   MA|       []|
|ACY|Atlantic City|   NJ|       []|
|AHN|       Athens|   GA|       []|
|AIA|     Alliance|   NE|       []|

Question 6: Using motifs, find the number of flights that departs from SEA and has a delay more than 500 minutes

Possible answer: 6

In [14]:
tripGraph.find("(a)-[e1]").filter("(b.id = 'SEA') and (e1.delay > 500)).count()
