In [1]:
# import findspark
# findspark.init()
import pyspark # Call this only after findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark.read.csv('Jan_2019_ontime.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- OP_CARRIER_AIRLINE_ID: integer (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN_AIRPORT_SEQ_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST_AIRPORT_SEQ_ID: integer (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- DEP_DEL15: integer (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- ARR_DEL15: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DIVERTED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)



In [2]:
df_vertices = df.select('ORIGIN', 'ORIGIN_AIRPORT_ID').dropDuplicates(['ORIGIN']).withColumnRenamed('ORIGIN', 'id')

In [3]:
df_edges = df.withColumnRenamed('ORIGIN', 'src').withColumnRenamed('DEST', 'dst')

In [4]:
from graphframes import *

g = GraphFrame(df_vertices, df_edges)



In [7]:
g

GraphFrame(v:[id: string, ORIGIN_AIRPORT_ID: int], e:[src: string, dst: string ... 19 more fields])

In [20]:
factor_more_than_1 = g.edges.filter('DEP_DEL15 == 1')
df3 = factor_more_than_1.select('src', 'DEP_DEL15').distinct().orderBy('src', ascending=False)
factor_more_than_12 = g.edges.filter('ARR_DEL15 == 1')
df4 = factor_more_than_12.select('dst', 'ARR_DEL15').distinct().orderBy('dst', ascending=False)

In [21]:
inner = df4.join(df3, df3.src == df4.dst)
inner.show()

+---+---------+---+---------+
|dst|ARR_DEL15|src|DEP_DEL15|
+---+---------+---+---------+
|MSP|        1|MSP|        1|
|DTW|        1|DTW|        1|
|CVG|        1|CVG|        1|
|ATL|        1|ATL|        1|
+---+---------+---+---------+



In [22]:
g.edges.groupBy('dst').count().orderBy('count', ascending=False).show()

+---+-----+
|dst|count|
+---+-----+
|ATL|   38|
|MSP|   14|
|DTW|   12|
|CVG|    5|
|TYS|    4|
|JFK|    4|
|LGA|    3|
|CAE|    3|
|AGS|    3|
|SHV|    2|
|AVL|    2|
|BOS|    2|
|SDF|    2|
|RIC|    2|
|DAY|    2|
|ILM|    2|
|PIA|    2|
|MGM|    2|
|GNV|    2|
|GTF|    1|
+---+-----+
only showing top 20 rows



In [23]:
g.edges.groupBy('src').count().orderBy('count', ascending=False).show()

+---+-----+
|src|count|
+---+-----+
|ATL|   42|
|DTW|   13|
|MSP|   10|
|DSM|    4|
|CVG|    3|
|LGA|    3|
|AGS|    3|
|MSN|    2|
|CWA|    2|
|JAN|    2|
|SDF|    2|
|ELM|    2|
|RIC|    2|
|CAE|    2|
|ILM|    2|
|PIA|    2|
|MGM|    2|
|TYS|    2|
|MDT|    2|
|JAX|    2|
+---+-----+
only showing top 20 rows



In [24]:
g.degrees.orderBy('degree', ascending=False).show(3)

+---+------+
| id|degree|
+---+------+
|ATL|    80|
|DTW|    25|
|MSP|    24|
+---+------+
only showing top 3 rows



In [25]:
distance_greater = g.edges.select('src', 'dst', 'DISTANCE').filter('DISTANCE > 500').distinct().orderBy('DISTANCE', ascending=False)
distance_greater.show()

+---+---+--------+
|src|dst|DISTANCE|
+---+---+--------+
|SAT|DTW|    1214|
|SAT|MSP|    1097|
|BDL|MSP|    1050|
|DFW|DTW|     986|
|MSP|RDU|     980|
|MSP|RIC|     970|
|BNA|BOS|     942|
|BOS|BNA|     942|
|MSP|GTF|     887|
|ATL|LNK|     841|
|JAX|LGA|     833|
|LGA|JAX|     833|
|ELM|ATL|     717|
|ATL|ABE|     692|
|ABE|ATL|     692|
|ATL|TUL|     674|
|TUL|ATL|     674|
|ATL|EYW|     646|
|EYW|ATL|     646|
|CLE|MSP|     622|
+---+---+--------+
only showing top 20 rows



In [26]:
from pyspark.sql.functions import desc
distance = g.edges.select('src', 'dst', 'DISTANCE').distinct().sort(desc('DISTANCE'))
distance.show()

+---+---+--------+
|src|dst|DISTANCE|
+---+---+--------+
|SAT|DTW|    1214|
|SAT|MSP|    1097|
|BDL|MSP|    1050|
|DFW|DTW|     986|
|MSP|RDU|     980|
|MSP|RIC|     970|
|BOS|BNA|     942|
|BNA|BOS|     942|
|MSP|GTF|     887|
|ATL|LNK|     841|
|JAX|LGA|     833|
|LGA|JAX|     833|
|ELM|ATL|     717|
|ATL|ABE|     692|
|ABE|ATL|     692|
|ATL|TUL|     674|
|TUL|ATL|     674|
|ATL|EYW|     646|
|EYW|ATL|     646|
|CLE|MSP|     622|
+---+---+--------+
only showing top 20 rows



In [27]:
import pyspark.sql.functions as F
sub = GraphFrame(g.vertices, distance)
r = sub.find("(a)-[ab]->(b); (b)-[bc]->(c); !(a)-[]->(c)").filter('c.id != a.id')
r2 = r.withColumn("sum_distance", r.ab.DISTANCE + r.bc.DISTANCE).groupby('a.id','c.id').max('sum_distance').sort(desc('max(sum_distance)')).show()


+---+---+-----------------+
| id| id|max(sum_distance)|
+---+---+-----------------+
|SAT|RIC|             2067|
|BDL|RIC|             2020|
|SAT|JFK|             1723|
|SAT|CVG|             1693|
|SAT|DAY|             1671|
|SAT|TYS|             1657|
|BDL|CVG|             1646|
|BDL|DAY|             1624|
|CLE|RIC|             1592|
|SAT|EVV|             1577|
|SDF|RIC|             1574|
|CVG|RIC|             1566|
|SAT|ELM|             1546|
|DAY|RIC|             1544|
|DFW|JFK|             1495|
|IND|RIC|             1473|
|RAP|RIC|             1459|
|PIT|BNA|             1438|
|DFW|TYS|             1429|
|SAT|TVC|             1421|
+---+---+-----------------+
only showing top 20 rows



In [28]:
r.count()

837

In [15]:
sub2 = GraphFrame(g.vertices, distance)
result = sub2.find("(a)-[ab]->(b); (b)-[bc]->(c); !(a)-[]->(c)").filter("a.id = 'JFK'").filter("c.id = 'EWR'")
result.show(100)

+---+---+---+---+---+
|  a| ab|  b| bc|  c|
+---+---+---+---+---+
+---+---+---+---+---+

