# Lab 7 - Graphs 

In this lab, we'll be using GraphFrames - a Spark package for graph representation and manipulation. GraphFrames provides largely the same functionalities as the GraphX library but extends this functionality to take advantage of DataFrames. It also gives us the ability to use a combination of query languages. The documentation can be found [here](https://graphframes.github.io/graphframes/docs/_site/user-guide.html). 

After set up, we will first build a small example graph manually to understand the different components and the different attributes we have access to on a graph. We will then build a bigger but similar graph from a large data file.  

This lab assumes a certain level of familiarity with lab 5, so please make sure you've gone over that first! 

## Section 1 - Set up

In [1]:
# Installing Java and Spark:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
!tar xf spark-3.2.2-bin-hadoop2.7.tgz
!rm spark-3.2.2-bin-hadoop2.7.tgz   # Tidying up

--2022-11-08 09:34:24--  https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 272846416 (260M) [application/x-gzip]
Saving to: ‘spark-3.2.2-bin-hadoop2.7.tgz’


2022-11-08 09:34:25 (268 MB/s) - ‘spark-3.2.2-bin-hadoop2.7.tgz’ saved [272846416/272846416]



In [2]:
# Setting up our environmental variables: 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop2.7"

In [3]:
!pip install -q findspark
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark import *

In [5]:
# Configuring GraphFrames:
conf = SparkConf()
conf.set("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12")
conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method

# Starting our Spark Session:
spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()
spark

In [6]:
from graphframes import *

## Section 2 - Small Graph

In this section we will create and manipulate a small graph representing airports. We can create our GraphFrame from **vertex** and **edge** DataFrames.

We define our airports as **vertices**. Each vertex Dataframe should have a "special column" named `id` which specifies the unique id of that vertex (in this case, the airport). Vertices can also have properties or attributes associated with them. In our example, this will be the three character airport code which we will refer to as `name`. 

In [None]:
# Defining our vertex DataFrame:
v = spark.createDataFrame([(1, "SFO"), 
                           (2, "ORD"), 
                           (3, "DFW"), 
                           (4, "CLT"), 
                           (5, "PHL"),
                           (6, "CLE"),
                           (7, "BOS")], 
                          ["id", "name"])

Our **edges** are the routes between these airports. Each edge DataFrame should contain two special columns called `src` (our source vertex id) and `dst` (our destination vertex id). 

Our edges can also have properties - in this case it will be the `distance` between the source and destination airport vertices. 

In [None]:
# Defining our edge DataFrame:
e = spark.createDataFrame([(1, 2, 1800),
                           (2, 3, 1400), 
                           (3, 2, 900), 
                           (6, 3, 1200),
                           (5, 6, 500),
                           (5, 4, 600), 
                           (4, 1, 1500),
                           (1, 5, 2300)], 
                          ["src", "dst", "distance"])

In [None]:
# Creating our GraphFrame:
g = GraphFrame(v, e)

Now we can look at some basic graph queries:

In [None]:
# We can show what vertices and edges we have: 
g.vertices.show()
g.edges.show()

+---+----+
| id|name|
+---+----+
|  1| SFO|
|  2| ORD|
|  3| DFW|
|  4| CLT|
|  5| PHL|
|  6| CLE|
|  7| BOS|
+---+----+

+---+---+--------+
|src|dst|distance|
+---+---+--------+
|  1|  2|    1800|
|  2|  3|    1400|
|  3|  2|     900|
|  6|  3|    1200|
|  5|  6|     500|
|  5|  4|     600|
|  4|  1|    1500|
|  1|  5|    2300|
+---+---+--------+



In [None]:
# We can see how many airports and routes there are: 
print("Number of airports: {}".format(g.vertices.count()))
print("Number of routes: {}".format(g.edges.count()))

Number of airports: 7
Number of routes: 8


In [None]:
# We can apply filters e.g. to see how many routes are over 1000 miles long:
g.edges.filter("distance > 1000").count()

5

In [None]:
# We can also see how many in and out degrees our vertices have:
g.inDegrees.show()
g.outDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  3|       2|
|  2|       2|
|  6|       1|
|  5|       1|
|  1|       1|
|  4|       1|
+---+--------+

+---+---------+
| id|outDegree|
+---+---------+
|  6|        1|
|  1|        2|
|  3|        1|
|  2|        1|
|  5|        2|
|  4|        1|
+---+---------+



In [None]:
# And we can apply sorting to the results of these queries: 
g.outDegrees.sort(['outDegree'], ascending=True).show()

+---+---------+
| id|outDegree|
+---+---------+
|  6|        1|
|  4|        1|
|  3|        1|
|  2|        1|
|  1|        2|
|  5|        2|
+---+---------+



We can also search for structural patters within a graph. This is also known as "motif finding" and it employs a simple Domain-Specific Language (DSL) for such queries. 

The basic unit of a pattern is an edge. So for example, `"(a)-[e]->(b)"` expresses an edge `e` from vertex `a` to vertex `b`. We can build on this and join patterns using a semicolon (`;`) as shown in the following code cells: 

In [None]:
# Here we'll search for two edges that link three airports 
# i.e. we want to go from a to b to c
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c)")
motifs.show()

+--------+------------+--------+------------+--------+
|       a|           e|       b|          e2|       c|
+--------+------------+--------+------------+--------+
|{5, PHL}| {5, 4, 600}|{4, CLT}|{4, 1, 1500}|{1, SFO}|
|{4, CLT}|{4, 1, 1500}|{1, SFO}|{1, 2, 1800}|{2, ORD}|
|{6, CLE}|{6, 3, 1200}|{3, DFW}| {3, 2, 900}|{2, ORD}|
|{2, ORD}|{2, 3, 1400}|{3, DFW}| {3, 2, 900}|{2, ORD}|
|{3, DFW}| {3, 2, 900}|{2, ORD}|{2, 3, 1400}|{3, DFW}|
|{1, SFO}|{1, 2, 1800}|{2, ORD}|{2, 3, 1400}|{3, DFW}|
|{5, PHL}| {5, 6, 500}|{6, CLE}|{6, 3, 1200}|{3, DFW}|
|{1, SFO}|{1, 5, 2300}|{5, PHL}| {5, 4, 600}|{4, CLT}|
|{4, CLT}|{4, 1, 1500}|{1, SFO}|{1, 5, 2300}|{5, PHL}|
|{1, SFO}|{1, 5, 2300}|{5, PHL}| {5, 6, 500}|{6, CLE}|
+--------+------------+--------+------------+--------+



In [None]:
# Here we'll search for vertices with edges in both directions between them:
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

+--------+------------+--------+------------+
|       a|           e|       b|          e2|
+--------+------------+--------+------------+
|{3, DFW}| {3, 2, 900}|{2, ORD}|{2, 3, 1400}|
|{2, ORD}|{2, 3, 1400}|{3, DFW}| {3, 2, 900}|
+--------+------------+--------+------------+



In [None]:
# We can also omit vertix/edge names when they're not necessary:
motifs = g.find("(a)-[]->(b); (b)-[]->(a)")
motifs.show()

+--------+--------+
|       a|       b|
+--------+--------+
|{3, DFW}|{2, ORD}|
|{2, ORD}|{3, DFW}|
+--------+--------+



In [None]:
# We can also negate edges
# For example, to find uni-directional paths:
motifs = g.find("(a)-[]->(b); !(b)-[]->(a)")
motifs.show()

+--------+--------+
|       a|       b|
+--------+--------+
|{5, PHL}|{4, CLT}|
|{5, PHL}|{6, CLE}|
|{1, SFO}|{2, ORD}|
|{6, CLE}|{3, DFW}|
|{4, CLT}|{1, SFO}|
|{1, SFO}|{5, PHL}|
+--------+--------+



We can also apply filters to develop more complex queries. For example, say we want to limit our journey to less than 1000km:

In [None]:
motifs = g.find("(a)-[e]->(b)")
motifs.show()

motifs.filter("e.distance < 1000").show()

+--------+------------+--------+
|       a|           e|       b|
+--------+------------+--------+
|{4, CLT}|{4, 1, 1500}|{1, SFO}|
|{6, CLE}|{6, 3, 1200}|{3, DFW}|
|{2, ORD}|{2, 3, 1400}|{3, DFW}|
|{3, DFW}| {3, 2, 900}|{2, ORD}|
|{1, SFO}|{1, 2, 1800}|{2, ORD}|
|{5, PHL}| {5, 6, 500}|{6, CLE}|
|{1, SFO}|{1, 5, 2300}|{5, PHL}|
|{5, PHL}| {5, 4, 600}|{4, CLT}|
+--------+------------+--------+

+--------+-----------+--------+
|       a|          e|       b|
+--------+-----------+--------+
|{3, DFW}|{3, 2, 900}|{2, ORD}|
|{5, PHL}|{5, 6, 500}|{6, CLE}|
|{5, PHL}|{5, 4, 600}|{4, CLT}|
+--------+-----------+--------+



In [None]:
# Sort to find the longest routes: 
g.find("(a)-[e]->(b)").sort(['e.distance'], ascending=False).show()

+--------+------------+--------+
|       a|           e|       b|
+--------+------------+--------+
|{1, SFO}|{1, 5, 2300}|{5, PHL}|
|{1, SFO}|{1, 2, 1800}|{2, ORD}|
|{4, CLT}|{4, 1, 1500}|{1, SFO}|
|{2, ORD}|{2, 3, 1400}|{3, DFW}|
|{6, CLE}|{6, 3, 1200}|{3, DFW}|
|{3, DFW}| {3, 2, 900}|{2, ORD}|
|{5, PHL}| {5, 4, 600}|{4, CLT}|
|{5, PHL}| {5, 6, 500}|{6, CLE}|
+--------+------------+--------+



In [None]:
# The triplets function will also put together all of the edge, src and dst columns into a DataFrame:
g.triplets.show()

+--------+------------+--------+
|     src|        edge|     dst|
+--------+------------+--------+
|{4, CLT}|{4, 1, 1500}|{1, SFO}|
|{6, CLE}|{6, 3, 1200}|{3, DFW}|
|{2, ORD}|{2, 3, 1400}|{3, DFW}|
|{3, DFW}| {3, 2, 900}|{2, ORD}|
|{1, SFO}|{1, 2, 1800}|{2, ORD}|
|{5, PHL}| {5, 6, 500}|{6, CLE}|
|{1, SFO}|{1, 5, 2300}|{5, PHL}|
|{5, PHL}| {5, 4, 600}|{4, CLT}|
+--------+------------+--------+



## Section 3 - Real Flight Data

You can download the full dataset [here](https://csserver.ucd.ie/~thomas/data_lab_6.csv). A smaller version of the dataset is also available [here](https://csserver.ucd.ie/~thomas/data_lab_6_small.csv) if you're having trouble with the larger version. 

We'l use the flight information from November 2019 from this [website](http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time). 

In [None]:
!wget https://csserver.ucd.ie/~thomas/data_lab_6.csv

--2022-10-28 12:16:15--  https://csserver.ucd.ie/~thomas/data_lab_6.csv
Resolving csserver.ucd.ie (csserver.ucd.ie)... 193.1.133.60
Connecting to csserver.ucd.ie (csserver.ucd.ie)|193.1.133.60|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 43181923 (41M) [text/csv]
Saving to: ‘data_lab_6.csv’


2022-10-28 12:16:18 (20.2 MB/s) - ‘data_lab_6.csv’ saved [43181923/43181923]



In [None]:
# First let's take a look at our data: 
df = spark.read.options(delimiter=",", header=True, inferSchema=True).csv("/content/data_lab_6.csv")
df.select('*').show(10)

+----------+---------+-----------------+-----------+-------------------------------+---------------+------+-------------+----+----------+-------+---------------+----------+-------+---------------+--------------+--------+
|DayofMonth|DayOfWeek|Reporting_Airline|Tail_Number|Flight_Number_Reporting_Airline|OriginAirportID|Origin|DestAirportID|Dest|CRSDepTime|DepTime|DepDelayMinutes|CRSArrTime|ArrTime|ArrDelayMinutes|CRSElapsedTime|Distance|
+----------+---------+-----------------+-----------+-------------------------------+---------------+------+-------------+----+----------+-------+---------------+----------+-------+---------------+--------------+--------+
|        16|        6|               OH|     N517AE|                           5213|          13795|   OAJ|        11057| CLT|      1036|   1032|              0|      1201|   1127|              0|            85|     191|
|        17|        7|               OH|     N526EA|                           5213|          13795|   OAJ|        1

As in the previous example, we'll represent airports as vertices and routes as edges. We're interested in visualising airports and routes and would like to see the number of airports that have departures or arrivals. 

In [None]:
# Getting our vertex DataFrame:
airports = df.select("OriginAirportID", "Origin").distinct()
airports.show(5)

+---------------+------+
|OriginAirportID|Origin|
+---------------+------+
|          11109|   COS|
|          12191|   HOU|
|          10721|   BOS|
|          11697|   FLL|
|          13296|   MHT|
+---------------+------+
only showing top 5 rows



In [None]:
# Renaming the airport id column to reflect the special column named "id"
# We'll also rename our vertex attribute for simplicity
airports = airports.withColumnRenamed("OriginAirportID", "id").withColumnRenamed("Origin", "name")
airports.show(5)

+-----+----+
|   id|name|
+-----+----+
|11109| COS|
|12191| HOU|
|10721| BOS|
|11697| FLL|
|13296| MHT|
+-----+----+
only showing top 5 rows



Now we'll get our edge DataFrame with special src and dst columns as well as our distance attribute. 

In [None]:
# Getting our edge DataFrame:
routes = df.select("OriginAirportID", "DestAirportID", "Distance").distinct()
routes.show(5)

+---------------+-------------+--------+
|OriginAirportID|DestAirportID|Distance|
+---------------+-------------+--------+
|          12892|        14869|     590|
|          15582|        11292|     259|
|          12264|        13121|     176|
|          11292|        11638|     844|
|          11292|        14730|    1024|
+---------------+-------------+--------+
only showing top 5 rows



In [None]:
# Renaming our columns: 
routes = routes.withColumnRenamed("OriginAirportID", "src").withColumnRenamed("DestAirportID", "dst").withColumnRenamed("Distance", "distance")
routes.show(5)

+-----+-----+--------+
|  src|  dst|distance|
+-----+-----+--------+
|12892|14869|     590|
|15582|11292|     259|
|12264|13121|     176|
|11292|11638|     844|
|11292|14730|    1024|
+-----+-----+--------+
only showing top 5 rows



In [None]:
# Creating our GraphFrame: 
graph = GraphFrame(airports, routes)

## Section 4 - Exercises

Now that we have our graph set up, let's move onto answering some questions about our data.

1. How many airports are there?

In [None]:
# TODO: 

# Solution: 
graph.vertices.count()

348

2. How many routes are there?

In [None]:
# TODO: 

# Solution: 
graph.edges.count()

5631

3. Which routes are > 1000 miles in distance? (Use the motif DSL).

In [None]:
# TODO: 

# Solution: 
graph.find("(a)-[e]->(b)").filter("e.Distance > 1000").show(10)

+------------+--------------------+------------+
|           a|                   e|           b|
+------------+--------------------+------------+
|{11109, COS}|{11109, 12264, 1463}|{12264, IAD}|
|{11109, COS}|{11109, 10397, 1184}|{10397, ATL}|
|{11109, COS}|{11109, 13204, 1520}|{13204, MCO}|
|{12191, HOU}|{12191, 14893, 1624}|{14893, SMF}|
|{12191, HOU}|{12191, 14869, 1214}|{14869, SLC}|
|{12191, HOU}|{12191, 14100, 1336}|{14100, PHL}|
|{12191, HOU}|{12191, 11066, 1001}|{11066, CMH}|
|{12191, HOU}|{12191, 14107, 1020}|{14107, PHX}|
|{12191, HOU}|{12191, 12892, 1390}|{12892, LAX}|
|{12191, HOU}|{12191, 13796, 1642}|{13796, OAK}|
+------------+--------------------+------------+
only showing top 10 rows



4. Which are the ten shortest routes? (Use the triplets method). 

In [None]:
# TODO: 

# Solution: 
graph.triplets.filter("edge.distance < 1000").sort(['edge.distance'], ascending=True).show(10)

+------------+------------------+------------+
|         src|              edge|         dst|
+------------+------------------+------------+
|{15841, WRG}|{15841, 14256, 31}|{14256, PSG}|
|{14256, PSG}|{14256, 15841, 31}|{15841, WRG}|
|{14006, PAH}|{14006, 10967, 45}|{10967, CGI}|
|{10967, CGI}|{10967, 14006, 45}|{14006, PAH}|
|{15023, STS}|{15023, 14771, 66}|{14771, SFO}|
|{14771, SFO}|{14771, 15023, 66}|{15023, STS}|
|{13930, ORD}|{13930, 13342, 67}|{13342, MKE}|
|{13342, MKE}|{13342, 13930, 67}|{13930, ORD}|
|{15024, STT}|{15024, 14843, 68}|{14843, SJU}|
|{14843, SJU}|{14843, 15024, 68}|{15024, STT}|
+------------+------------------+------------+
only showing top 10 rows



5. Find the ten longest routes (using either method). 

In [None]:
# TODO: 

# Solution: 
graph.find("(a)-[e]->(b)").sort(['e.distance'], ascending=False).show(10)

# OR:
graph.triplets.sort(desc("edge.distance")).show(10)

+------------+--------------------+------------+
|           a|                   e|           b|
+------------+--------------------+------------+
|{10721, BOS}|{10721, 12173, 5095}|{12173, HNL}|
|{12173, HNL}|{12173, 10721, 5095}|{10721, BOS}|
|{12478, JFK}|{12478, 12173, 4983}|{12173, HNL}|
|{12173, HNL}|{12173, 12478, 4983}|{12478, JFK}|
|{11618, EWR}|{11618, 12173, 4962}|{12173, HNL}|
|{12173, HNL}|{12173, 11618, 4962}|{11618, EWR}|
|{12173, HNL}|{12173, 12264, 4817}|{12264, IAD}|
|{12264, IAD}|{12264, 12173, 4817}|{12173, HNL}|
|{12173, HNL}|{12173, 10397, 4502}|{10397, ATL}|
|{10397, ATL}|{10397, 12173, 4502}|{12173, HNL}|
+------------+--------------------+------------+
only showing top 10 rows

+------------+--------------------+------------+
|         src|                edge|         dst|
+------------+--------------------+------------+
|{10721, BOS}|{10721, 12173, 5095}|{12173, HNL}|
|{12173, HNL}|{12173, 10721, 5095}|{10721, BOS}|
|{12478, JFK}|{12478, 12173, 4983}|{12173, 

6. Which airport has the most incoming flights? Hint: Think degrees! 

In [None]:
def airportMap(id):
  name = airports.filter(airports.id == id).collect()[0][1]
  return name

In [None]:
# TODO: 

# Solution: 
most = graph.inDegrees.sort(['inDegree'], ascending=False).collect()[0][0]
print("The airport with the most incoming flights: {}".format(airportMap(most)))

The airport with the most incoming flights: DFW


7. How many airports have just one outgoing flight?

In [None]:
# TODO: 

# Solution: 
graph.outDegrees.filter("outDegree == 1").count()

63