In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from graphframes import *
from pyspark.sql.functions import concat_ws

In [2]:
spark = SparkSession.builder.appName("GraphFlightsAnalysis") \
.config("spark.jars.packages", "graphframes:graphframes:0.8.1-spark2.4-s_2.11") \
.getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.driver.port', '39013'),
 ('spark.submit.pyFiles',
  '/home/jovyan/.ivy2/jars/graphframes_graphframes-0.8.1-spark2.4-s_2.11.jar,/home/jovyan/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar'),
 ('spark.driver.host', '972a856b75c2'),

In [3]:
dataDir = "/home/jovyan/work"
df_encoded = spark.read.csv(dataDir+"/encoded_sampled_data_10_percent.csv", header=True, inferSchema=True)
df_encoded.show(1)

+-----------+--------------+---------+---------+--------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------+-------------------+------------------+---------------+-----+---+-----------+--------------------+---------------+-------------------------+
|elapsedDays|isBasicEconomy|isNonStop|totalFare|seatsRemaining|totalTravelDistance|      DepartureTime|DepartureAirportCode|segmentsAirlineName|date_diff_in_days|duration_in_minutes|Segments|        ArrivalTime|ArrivalAirportCode|Sentiment_score|month|day|day_of_week|isBasicEconomy_index|isNonStop_index|segmentsAirlineName_index|
+-----------+--------------+---------+---------+--------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------+-------------------+------------------+---------------+-----+---+-----------+--------------------+---------------+-------------------------+
|   

In [4]:
vertices = df_encoded.select("DepartureAirportCode").distinct()
vertices = vertices.withColumnRenamed("DepartureAirportCode", "id")
vertices.show(30)   # 26 diff values

+---+
| id|
+---+
|OAK|
|LGA|
|TTN|
|BOS|
|EWR|
|DEN|
|IAD|
|CLT|
|DAL|
|MIA|
|DFW|
|SFO|
|ATL|
|ORD|
|DTW|
|ONT|
|LAX|
|JFK|
|PHL|
|SJU|
|SAN|
|BJC|
|PIT|
|BOG|
|SLC|
|PBI|
+---+



In [5]:
edges = df_encoded.select("DepartureAirportCode", "ArrivalAirportCode")
edges.show(2)

+--------------------+------------------+
|DepartureAirportCode|ArrivalAirportCode|
+--------------------+------------------+
|                 ATL|               BOS|
|                 ATL|               DEN|
+--------------------+------------------+
only showing top 2 rows



In [6]:
edges = edges.withColumnRenamed('DepartureAirportCode', 'src').withColumnRenamed('ArrivalAirportCode', 'dst')

In [7]:
vertices.show(1)

+---+
| id|
+---+
|OAK|
+---+
only showing top 1 row



In [8]:
edges.show(1)

+---+---+
|src|dst|
+---+---+
|ATL|BOS|
+---+---+
only showing top 1 row



In [9]:
# Filter out any rows with null values in src or dst columns

edges = edges.na.drop()
vertices = vertices.na.drop()

In [10]:
from pyspark.sql.functions import isnull

null_count2 = vertices.filter(isnull("id")).count()
null_count4 = edges.filter(isnull("src")).count()
null_count5 = edges.filter(isnull("dst")).count()

In [11]:
null_count2,null_count4,null_count5

(0, 0, 0)

In [12]:
graph = GraphFrame(vertices, edges)



In [13]:
graph.vertices.show(5)

+---+
| id|
+---+
|OAK|
|LGA|
|TTN|
|BOS|
|EWR|
+---+
only showing top 5 rows



In [14]:
graph.edges.show(5)

+---+---+
|src|dst|
+---+---+
|ATL|BOS|
|ATL|DEN|
|ATL|DEN|
|ATL|DTW|
|ATL|DTW|
+---+---+
only showing top 5 rows



In [15]:
# Run PageRank algorithm
results = graph.pageRank(resetProbability=0.15, maxIter=5)



In [16]:
# Get the most important airports based on PageRank scores
most_important_airports = results.vertices.orderBy(results.vertices.pagerank.desc()).limit(5)

In [17]:
# Show the results
most_important_airports.show()

+---+------------------+
| id|          pagerank|
+---+------------------+
|LAX| 1.958246645695028|
|LGA|1.7189795688526857|
|DFW| 1.687464879565661|
|BOS|1.6450819764667992|
|ORD|1.6256205503528556|
+---+------------------+

