## The following section is for Colab Users.
### Just run the following code cells

In [13]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [14]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*
!pip -q install findspark pyspark graphframes

In [15]:
!wget https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar -P /content/spark-3.0.2-bin-hadoop2.7/jars/
!cp /content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar /content/spark-3.0.2-bin-hadoop2.7/graphframes-0.8.2-spark3.0-s_2.12.zip

--2022-06-29 12:52:59--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 13.227.254.32, 13.227.254.25, 13.227.254.119, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|13.227.254.32|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 247882 (242K) [binary/octet-stream]
Saving to: ‘/content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar.1’


2022-06-29 12:52:59 (84.8 MB/s) - ‘/content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar.1’ saved [247882/247882]



In [16]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

In [17]:
import findspark
findspark.init()

In [18]:
!export PYSPARK_SUBMIT_ARGS="--master local[*] pyspark-shell"
!export PYSPARK_DRIVER_PYTHON=jupyter
!export PYSPARK_DRIVER_PYTHON_OPTS=notebook

In [19]:
from pyspark.sql import SparkSession
from graphframes import *

spark = SparkSession.builder.master("local[*]").appName("GraphFrames").getOrCreate()

In [20]:
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"

**************************************************************************
**************************************************************************
**************************************************************************

In [21]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [22]:
import pyspark
from pyspark.sql import SparkSession


In [23]:
spark=SparkSession.builder.appName('depa').getOrCreate()

### Read departuredelays.csv in Edge DataFrame
### Read airport-codes-na.txt in Vertix DataFrame (the separator is Tab i.e sep = '\t' )

#### The US flight delays data set has five columns:
- The <b>date</b> column contains an integer like 02190925 . When converted, this maps to 02-19 09:25 am.
- The <b>delay</b> column gives the delay in minutes between the scheduled and actual departure times. Early departures show negative numbers.
- The <b>distance</b> column gives the distance in miles from the origin airport to the destination airport.
- The <b>origin</b> column contains the origin IATA airport code.
- The <b>destination</b> column contains the destination IATA airport code.

#### The airport-codes data set has four columns:
- The <b>IATA</b> column contains IATA airport code.
- The <b>City, State, and Country</b> columns contains information about the airport location. 

In [24]:
e=spark.read.csv('/content/departuredelays.csv', header='true', inferSchema='true', sep=',')
v=spark.read.csv('/content/airport-codes-na.txt', header='true', inferSchema='true', sep='\t')

In [25]:
e.show(10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 10 rows



In [26]:
v.show(10)

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+
only showing top 10 rows



### In the vertix DataFrame, drop any duplicated rows with the same  IATA code.

In [27]:
v1=v.dropDuplicates(["IATA"])

In [28]:
v1.show()

+-------------------+-----+-------+----+
|               City|State|Country|IATA|
+-------------------+-----+-------+----+
|         Binghamton|   NY|    USA| BGM|
|            Lebanon|   NH|    USA| LEB|
|           Montreal|   PQ| Canada| YUL|
|         Dillingham|   AK|    USA| DLG|
|International Falls|   MN|    USA| INL|
|         Wolf Point|   MT|    USA| OLF|
|        New Orleans|   LA|    USA| MSY|
|            Toronto|   ON| Canada| YTO|
|            Spokane|   WA|    USA| GEG|
|              Havre|   MT|    USA| HVR|
|            Burbank|   CA|    USA| BUR|
|      Orange County|   CA|    USA| SNA|
|             Dryden|   ON| Canada| YHD|
|         Fort Dodge|   IA|    USA| FOD|
|          Green Bay|   WI|    USA| GRB|
|        Great Falls|   MT|    USA| GTF|
|              Homer|   AK|    USA| HOM|
|        Idaho Falls|   ID|    USA| IDA|
|      Sioux Lookout|   ON| Canada| YXL|
|       Grand Rapids|   MI|    USA| GRR|
+-------------------+-----+-------+----+
only showing top

### In the edges DataFrame:
- Rename the <b>date</b> columns to become <b>tripid</b>.
- Rename the <b>origin</b> columns to become <b>src</b>.
- Rename the <b>destination</b> columns to become <b>dst</b>.

In [29]:
from pyspark.sql.types import IntegerType

In [30]:
e1 = e.withColumnRenamed("date","tripid") \
    .withColumnRenamed("origin","src")\
    .withColumnRenamed("destination","dst")

In [31]:
e2=e1.withColumn("delay",e1.delay.cast(IntegerType()))

In [32]:
e2.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)



### In the Vertix DataFrame:
- Rename the <b>IATA</b> columns to become <b>id</b>.

In [33]:
v2=v1.withColumnRenamed('IATA','id')

In [34]:
v2.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- id: string (nullable = true)



### Create GraphFrame from Vertix and Edges DataFrames

In [35]:
from graphframes import *

In [36]:
gf = GraphFrame(v2,e2)

### Determine the number of airports

In [37]:
gf.edges.show()

+-------+-----+--------+---+---+
| tripid|delay|distance|src|dst|
+-------+-----+--------+---+---+
|1011245|    6|     602|ABE|ATL|
|1020600|   -8|     369|ABE|DTW|
|1021245|   -2|     602|ABE|ATL|
|1020605|   -4|     602|ABE|ATL|
|1031245|   -4|     602|ABE|ATL|
|1030605|    0|     602|ABE|ATL|
|1041243|   10|     602|ABE|ATL|
|1040605|   28|     602|ABE|ATL|
|1051245|   88|     602|ABE|ATL|
|1050605|    9|     602|ABE|ATL|
|1061215|   -6|     602|ABE|ATL|
|1061725|   69|     602|ABE|ATL|
|1061230|    0|     369|ABE|DTW|
|1060625|   -3|     602|ABE|ATL|
|1070600|    0|     369|ABE|DTW|
|1071725|    0|     602|ABE|ATL|
|1071230|    0|     369|ABE|DTW|
|1070625|    0|     602|ABE|ATL|
|1071219|    0|     569|ABE|ORD|
|1080600|    0|     369|ABE|DTW|
+-------+-----+--------+---+---+
only showing top 20 rows



### Determine the number of trips 

In [38]:
gf.edges.select('tripid').count()

524644

### What is the longest delay?

In [39]:
gf.edges.groupBy().max("delay").show()

+----------+
|max(delay)|
+----------+
|      1560|
+----------+



### Find out the number of delayed flights vs. early flights (flights that departed before actual time)

early flights 

In [40]:
gf.edges.filter('delay<0').count()

233465

delayed flights

In [41]:
gf.edges.filter('delay>0').count()

231718

### What flight destinations departing SFO are most likely to have significant delays? Select the top 10
#### Hint: you should get the average delay for each destination for trips that depart from SFO only

In [42]:
from pyspark.sql.functions import desc

In [43]:
gf.edges.filter("src = 'SFO' and delay > 0").groupBy("src", "dst"). avg("delay").sort(desc("avg(delay)")).show(10)
  
 
  

+---+---+------------------+
|src|dst|        avg(delay)|
+---+---+------------------+
|SFO|COS|             68.25|
|SFO|SUN|60.714285714285715|
|SFO|MSY|              54.0|
|SFO|OTH|              52.0|
|SFO|LGB|          50.53125|
|SFO|PIT|              49.6|
|SFO|MSP| 48.44736842105263|
|SFO|JAC|              47.9|
|SFO|ASE| 45.44444444444444|
|SFO|JFK| 45.13304721030043|
+---+---+------------------+
only showing top 10 rows



### Find the Incoming connections to the airport sorted in Desc. order.

In [44]:
gf.degrees.sort(desc("degree")).show()

+---+------+
| id|degree|
+---+------+
|ATL| 91975|
|DFW| 48986|
|ORD| 44354|
|LAX| 37940|
|DEN| 36883|
|IAH| 30625|
|SFO| 28120|
|PHX| 27793|
|BOS| 24817|
|LAS| 23102|
|CLT| 20724|
|MCO| 19790|
|EWR| 19712|
|LGA| 18467|
|SLC| 17951|
|JFK| 16712|
|MSP| 16381|
|SEA| 16297|
|BWI| 15890|
|DTW| 15872|
+---+------+
only showing top 20 rows



In [45]:
gf.inDegrees.sort(desc("inDegree")).show()

+---+--------+
| id|inDegree|
+---+--------+
|ATL|   33185|
|DFW|   25498|
|ORD|   22857|
|LAX|   19459|
|DEN|   18906|
|IAH|   16044|
|SFO|   14570|
|PHX|   14450|
|LAS|   12052|
|CLT|   10956|
|MCO|   10584|
|EWR|   10445|
|LGA|    9937|
|SLC|    9266|
|JFK|    8834|
|BOS|    8792|
|BWI|    8713|
|SEA|    8517|
|MIA|    8335|
|DTW|    8330|
+---+--------+
only showing top 20 rows



### Find the Outgoing connections from the airport sorted in Desc. order.

In [46]:
gf.outDegrees.sort(desc("outDegree")).show()

+---+---------+
| id|outDegree|
+---+---------+
|ATL|    58790|
|DFW|    23488|
|ORD|    21497|
|LAX|    18481|
|DEN|    17977|
|BOS|    16025|
|IAH|    14581|
|SFO|    13550|
|PHX|    13343|
|LAS|    11050|
|CLT|     9768|
|EWR|     9267|
|MCO|     9206|
|BNA|     8870|
|SLC|     8685|
|LGA|     8530|
|MSP|     8085|
|JFK|     7878|
|SEA|     7780|
|DTW|     7542|
+---+---------+
only showing top 20 rows



### Use motif finding to answer this question: which delays could we blame on SFO?
#### Hint: this practically means that SFO is a transit station

In [47]:
gf.find("(a)-[ab]->(b); (b)-[bc]->(c)").filter("(b.id = 'SFO') and (ab.delay > 500 or bc.delay > 500) and bc.tripid > ab.tripid and bc.tripid < ab.tripid + 10000").show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                  ab|                   b|                  bc|                   c|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[Albuquerque, NM,...|[1020600, 0, 779,...|[San Francisco, C...|[1021507, 536, 22...|[New York, NY, US...|
|[Albuquerque, NM,...|[1210815, -12, 77...|[San Francisco, C...|[1211508, 593, 22...|[New York, NY, US...|
|[Eureka, CA, USA,...|[1011635, -15, 21...|[San Francisco, C...|[1021507, 536, 22...|[New York, NY, US...|
|[Eureka, CA, USA,...|[1012016, -4, 217...|[San Francisco, C...|[1021507, 536, 22...|[New York, NY, US...|
|[Eureka, CA, USA,...|[1020531, -2, 217...|[San Francisco, C...|[1021507, 536, 22...|[New York, NY, US...|
|[Eureka, CA, USA,...|[1020948, -11, 21...|[San Francisco, C...|[1021507, 536, 22...|[New York, NY, US...|
|[Eureka, CA, USA,...|[1021506, -3, 2

### Determine Airport Ranking in Desc. order using PageRank algorithm

In [48]:
gf.pageRank(resetProbability=0.15 , tol = 0.01).vertices.orderBy(desc("pagerank")).show()


+--------------+-----+-------+---+------------------+
|          City|State|Country| id|          pagerank|
+--------------+-----+-------+---+------------------+
|       Atlanta|   GA|    USA|ATL|27.727899400710534|
|        Dallas|   TX|    USA|DFW| 20.26679805190053|
|       Chicago|   IL|    USA|ORD|19.412400365738495|
|        Denver|   CO|    USA|DEN|14.542386215539732|
|   Los Angeles|   CA|    USA|LAX|13.534796821839427|
|       Houston|   TX|    USA|IAH|11.708716655367548|
| San Francisco|   CA|    USA|SFO|10.693019052556826|
|       Phoenix|   AZ|    USA|PHX| 9.849017406034136|
|Salt Lake City|   UT|    USA|SLC|  8.78494058676923|
|     Las Vegas|   NV|    USA|LAS|  8.03687122322553|
|       Seattle|   WA|    USA|SEA| 7.086312995684949|
|     Charlotte|   NC|    USA|CLT| 6.967069267775819|
|        Newark|   NJ|    USA|EWR| 6.748345144738438|
|       Orlando|   FL|    USA|MCO| 6.604209836084269|
|   Minneapolis|   MN|    USA|MSP| 6.496808728249724|
|      New York|   NY|    US

## Determine the most popular flights (single city hops)

In [49]:
gf.edges.groupBy('src','dst').agg({'delay': 'count'}).sort(desc("count(delay)")).show(10)



+---+---+------------+
|src|dst|count(delay)|
+---+---+------------+
|ATL|LGA|        1620|
|ATL|MCO|        1373|
|BOS|DCA|        1290|
|ATL|FLL|        1176|
|ATL|DFW|        1155|
|ATL|TPA|        1117|
|SFO|LAX|        1109|
|ATL|DCA|        1108|
|LAX|SFO|        1092|
|ATL|MIA|        1092|
+---+---+------------+
only showing top 10 rows



### Find and Save a Subragph that obtained from the following pattern:
#### The flight starts from an airport and return back to the same airport through 2 other airports.

In [50]:
gf.find("""(v1)-[e1]->(v2); (v2)-[e2]->(v3);(v3)-[e3] ->(v1)""").show()
               

Py4JJavaError: ignored