In [1]:
import pyspark # Call this only after findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

df = spark.read.csv("/home/sysadm/Downloads/bda_etbdc_end_sem_exam/Jan_2019_ontime.csv", header = True, inferSchema = True)
df.printSchema()

root
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- OP_CARRIER_AIRLINE_ID: integer (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN_AIRPORT_SEQ_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST_AIRPORT_SEQ_ID: integer (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- DEP_DEL15: double (nullable = true)
 |-- DEP_TIME_BLK: string (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- ARR_DEL15: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- _c21: string (nullable = true)



[Stage 1:===>                                                     (1 + 15) / 16]                                                                                

In [2]:
import pandas as pd

In [3]:
df_edges = df.withColumnRenamed('ORIGIN', 'src').withColumnRenamed('DEST', 'dst')

In [4]:
df_vertices = df.select('ORIGIN', 'ORIGIN_AIRPORT_ID').dropDuplicates(['ORIGIN']).withColumnRenamed('ORIGIN', 'id')

In [5]:
from graphframes import *

g = GraphFrame(df_vertices, df_edges)

In [6]:
from graphframes import GraphFrame

# Assuming you have created the GraphFrame object 'g' using df_vertices and df_edges
num_edges = g.edges.count()
print("Number of edges:", num_edges)


Number of edges: 583985


In [7]:
from graphframes import GraphFrame

# Assuming you have created the GraphFrame object 'g' using df_vertices and df_edges
vertices_df = g.vertices

# Count the number of vertices
num_vertices = vertices_df.count()
print("Number of vertices:", num_vertices)

# Show the vertices DataFrame
vertices_df.show()


                                                                                

Number of vertices: 346
+---+-----------------+
| id|ORIGIN_AIRPORT_ID|
+---+-----------------+
|BGM|            10577|
|PSE|            14254|
|INL|            12343|
|MSY|            13495|
|PPG|            14222|
|DRT|            11415|
|GEG|            11884|
|SNA|            14908|
|BUR|            10800|
|GRB|            11977|
|GTF|            12003|
|IDA|            12280|
|GRR|            11986|
|LWB|            13121|
|JLN|            12511|
|PVU|            14314|
|EUG|            11603|
|PSG|            14256|
|GSO|            11995|
|PVD|            14307|
+---+-----------------+
only showing top 20 rows



In [8]:
factor_more_than_1 = g.edges.filter('DEP_DEL15 == 1')
df3 = factor_more_than_1.select('src', 'DEP_DEL15').distinct().orderBy('src', ascending=False)
factor_more_than_12 = g.edges.filter('ARR_DEL15 == 1')
df4 = factor_more_than_12.select('dst', 'ARR_DEL15').distinct().orderBy('dst', ascending=False)

In [9]:
inner = df4.join(df3, df3.src == df4.dst)
inner.show()



+---+---------+---+---------+
|dst|ARR_DEL15|src|DEP_DEL15|
+---+---------+---+---------+
|BGM|      1.0|BGM|      1.0|
|INL|      1.0|INL|      1.0|
|PSE|      1.0|PSE|      1.0|
|MSY|      1.0|MSY|      1.0|
|DRT|      1.0|DRT|      1.0|
|GEG|      1.0|GEG|      1.0|
|BUR|      1.0|BUR|      1.0|
|SNA|      1.0|SNA|      1.0|
|GRB|      1.0|GRB|      1.0|
|GTF|      1.0|GTF|      1.0|
|IDA|      1.0|IDA|      1.0|
|GRR|      1.0|GRR|      1.0|
|LWB|      1.0|LWB|      1.0|
|JLN|      1.0|JLN|      1.0|
|PVU|      1.0|PVU|      1.0|
|EUG|      1.0|EUG|      1.0|
|PSG|      1.0|PSG|      1.0|
|GSO|      1.0|GSO|      1.0|
|MYR|      1.0|MYR|      1.0|
|PVD|      1.0|PVD|      1.0|
+---+---------+---+---------+
only showing top 20 rows



                                                                                

In [10]:
g.edges.groupBy('dst').count().orderBy('count', ascending=False).show()

+---+-----+
|dst|count|
+---+-----+
|ATL|31151|
|ORD|26212|
|DFW|23078|
|CLT|19105|
|DEN|18498|
|LAX|17977|
|PHX|14764|
|IAH|14586|
|LGA|13882|
|SFO|13702|
|LAS|13219|
|MSP|12184|
|DTW|12160|
|MCO|12042|
|DCA|11851|
|BOS|11429|
|EWR|10536|
|JFK|10483|
|SEA|10227|
|SLC| 9360|
+---+-----+
only showing top 20 rows



In [11]:
g.edges.groupBy('src').count().orderBy('count', ascending=False).show()

+---+-----+
|src|count|
+---+-----+
|ATL|31155|
|ORD|26216|
|DFW|23063|
|CLT|19100|
|DEN|18507|
|LAX|17988|
|PHX|14761|
|IAH|14598|
|LGA|13872|
|SFO|13689|
|LAS|13209|
|MSP|12180|
|DTW|12172|
|MCO|12045|
|DCA|11839|
|BOS|11430|
|EWR|10522|
|JFK|10485|
|SEA|10230|
|SLC| 9339|
+---+-----+
only showing top 20 rows



In [12]:
g.degrees.orderBy('degree', ascending=False).show(3)

+---+------+
| id|degree|
+---+------+
|ATL| 62306|
|ORD| 52428|
|DFW| 46141|
+---+------+
only showing top 3 rows



In [13]:
distance_greater = g.edges.select('src', 'dst', 'DISTANCE').filter('DISTANCE > 500').distinct().orderBy('DISTANCE', ascending=False)
distance_greater.show()

+---+---+--------+
|src|dst|DISTANCE|
+---+---+--------+
|HNL|JFK|  4983.0|
|JFK|HNL|  4983.0|
|HNL|EWR|  4962.0|
|EWR|HNL|  4962.0|
|HNL|IAD|  4817.0|
|IAD|HNL|  4817.0|
|HNL|ATL|  4502.0|
|ATL|HNL|  4502.0|
|HNL|ORD|  4243.0|
|ORD|HNL|  4243.0|
|OGG|ORD|  4184.0|
|ORD|OGG|  4184.0|
|MSP|HNL|  3972.0|
|HNL|MSP|  3972.0|
|HNL|IAH|  3904.0|
|IAH|HNL|  3904.0|
|HNL|GUM|  3801.0|
|GUM|HNL|  3801.0|
|HNL|DFW|  3784.0|
|DFW|HNL|  3784.0|
+---+---+--------+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import desc
distance = g.edges.select('src', 'dst', 'DISTANCE').distinct().sort(desc('DISTANCE'))
distance.show()

+---+---+--------+
|src|dst|DISTANCE|
+---+---+--------+
|JFK|HNL|  4983.0|
|HNL|JFK|  4983.0|
|EWR|HNL|  4962.0|
|HNL|EWR|  4962.0|
|IAD|HNL|  4817.0|
|HNL|IAD|  4817.0|
|ATL|HNL|  4502.0|
|HNL|ATL|  4502.0|
|ORD|HNL|  4243.0|
|HNL|ORD|  4243.0|
|ORD|OGG|  4184.0|
|OGG|ORD|  4184.0|
|HNL|MSP|  3972.0|
|MSP|HNL|  3972.0|
|IAH|HNL|  3904.0|
|HNL|IAH|  3904.0|
|HNL|GUM|  3801.0|
|GUM|HNL|  3801.0|
|HNL|DFW|  3784.0|
|DFW|HNL|  3784.0|
+---+---+--------+
only showing top 20 rows



In [15]:
from pyspark.sql.functions import col

In [16]:
import pyspark.sql.functions as F
sub = GraphFrame(g.vertices, distance)
r = sub.find("(a)-[ab]->(b); (b)-[bc]->(c); !(a)-[]->(c)").filter('c.id != a.id')
r2 = r.withColumn("sum_distance", r.ab.DISTANCE + r.bc.DISTANCE).groupby('a.id','c.id').max('sum_distance').sort(desc('max(sum_distance)')).show()
#r.count()



+---+---+-----------------+
| id| id|max(sum_distance)|
+---+---+-----------------+
|EWR|JFK|           9945.0|
|JFK|EWR|           9945.0|
|JFK|IAH|           8887.0|
|IAH|JFK|           8887.0|
|GUM|JFK|           8784.0|
|JFK|GUM|           8784.0|
|GUM|EWR|           8763.0|
|EWR|GUM|           8763.0|
|GUM|IAD|           8618.0|
|IAD|GUM|           8618.0|
|GUM|ATL|           8303.0|
|ATL|GUM|           8303.0|
|GUM|ORD|           8044.0|
|ORD|GUM|           8044.0|
|MSP|GUM|           7773.0|
|GUM|MSP|           7773.0|
|ANC|JFK|           7760.0|
|JFK|ANC|           7760.0|
|ANC|EWR|           7739.0|
|EWR|ANC|           7739.0|
+---+---+-----------------+
only showing top 20 rows



                                                                                

# find the airport "a" is connect to "b" and "b" is connect to "c" but "a" is not connect to "c"

In [17]:
# Find chains of two flights
result = sub.edges.alias("ab") \
    .join(sub.edges.alias("bc"), col("ab.dst") == col("bc.src")) \
    .select(col("ab.src").alias("a"), col("ab.dst").alias("b"), col("bc.dst").alias("c"))

# Display the result
result.count()

342760

In [18]:
import pyspark.sql.functions as F
sub = GraphFrame(g.vertices, distance)
r = sub.find("(a)-[ab]->(b); (b)-[bc]->(c); (a)-[]->(c)").filter('c.id != a.id')
r2 = r.withColumn("sum_distance", r.ab.DISTANCE + r.bc.DISTANCE).groupby('a.id','c.id').max('sum_distance').sort(desc('max(sum_distance)')).show()
#r.count()



+---+---+-----------------+
| id| id|max(sum_distance)|
+---+---+-----------------+
|IAD|JFK|           9800.0|
|JFK|IAD|           9800.0|
|IAD|EWR|           9779.0|
|EWR|IAD|           9779.0|
|ATL|JFK|           9485.0|
|JFK|ATL|           9485.0|
|EWR|ATL|           9464.0|
|ATL|EWR|           9464.0|
|IAD|ATL|           9319.0|
|ATL|IAD|           9319.0|
|JFK|ORD|           9226.0|
|ORD|JFK|           9226.0|
|EWR|ORD|           9205.0|
|ORD|EWR|           9205.0|
|IAD|ORD|           9060.0|
|ORD|IAD|           9060.0|
|JFK|MSP|           8955.0|
|MSP|JFK|           8955.0|
|EWR|MSP|           8934.0|
|MSP|EWR|           8934.0|
+---+---+-----------------+
only showing top 20 rows



                                                                                

In [19]:
# Find chains of two flights
result = sub.edges.alias("ab") \
    .join(sub.edges.alias("bc"), col("ab.dst") == col("bc.src")) & () \
    .select(col("ab.src").alias("a"), col("ab.dst").alias("b"), col("bc.dst").alias("c"))

# Display the result
result.count()

AttributeError: 'tuple' object has no attribute 'select'

In [None]:
sub2 = GraphFrame(g.vertices, distance)
result = sub2.find("(a)-[ab]->(b); (b)-[bc]->(c); !(a)-[]->(c)").filter("a.id = 'JFK'").filter("c.id = 'EWR'")
result.show(100)