In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
from pyspark import SparkContext, SparkConf
from graphframes import GraphFrame

In [2]:
# (1)统计航班飞行网图中机场的数量  
# (2)统计航班飞行网图中航线的数量  
# (3)计算最长的飞行航线（Point to Point）  
# (4)找出最繁忙的机场  
# (5)找出最重要的机场（PageRank）  

In [3]:
# 初始化配置
spark_conf = SparkConf().setAppName('fly').setMaster('local[*]').set("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.5-s_2.12")
sc = SparkContext(conf=spark_conf)
spark=SparkSession.builder.appName("graph").getOrCreate()

:: loading settings :: url = jar:file:/Users/ikiwi/PycharmProjects/pythonProject/venv-3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/ikiwi/.ivy2/cache
The jars for the packages stored in: /Users/ikiwi/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-503747e6-fa5a-4c79-8100-6c9f8a95a357;1.0
	confs: [default]
	found graphframes#graphframes;0.8.3-spark3.5-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 200ms :: artifacts dl 15ms
	:: modules in use:
	graphframes#graphframes;0.8.3-spark3.5-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------------

In [4]:
fly_csv_data_path = "fly.csv"

In [5]:
flyDataDF = (spark.read.format("csv")
            .option("header", "true")
            .load(fly_csv_data_path))

                                                                                

In [6]:
flyDataDF.first()

Row(日='1', 周='3', 航空公司='AA', 飞机注册号='N338AA', 航班号='1', 起飞机场编号='12478', 起飞机场='JFK', 到达机场编号='12892', 到达机场='LAX', 预计起飞时间（时分）='900', 起飞时间='914', 起飞延迟（分钟）='14', 到达预计时间='1225', 到达时间='1238', 到达延迟（分钟）='13', 预计飞行时间='385', 飞行距离='2475')

In [8]:
# 航线
linesDF = flyDataDF.select(
    col("起飞机场编号").cast("int").alias("src"),
    col("到达机场编号").cast("int").alias("dst"),
    col("飞行距离").cast("long").alias("飞行距离")
).distinct()

In [9]:
# 机场
airPortInDF = flyDataDF.select(col("起飞机场编号").cast("int").alias("id"))
airPortOutDF = flyDataDF.select(col("到达机场编号").cast("int").alias("id"))
airPortDF = airPortInDF.union(airPortOutDF).distinct()

In [10]:
graph = GraphFrame(airPortDF, linesDF)
graph

GraphFrame(v:[id: int], e:[src: int, dst: int ... 1 more field])

In [11]:
# 1) 机场数量
airPortNum = graph.vertices.count()
airPortNum

                                                                                

301

In [12]:
# 2) 航线数量
linesNum = graph.edges.count()
linesNum

                                                                                

4088

In [13]:
# 3) 最长的飞行航线
longestLine: DataFrame = graph.edges.sort("飞行距离",)
longestLineInform = longestLine.sort("飞行距离", ascending=False).first()
longestLineInform

                                                                                

Row(src=12173, dst=12478, 飞行距离=4983)

In [14]:
# 4) 最繁忙的机场
graphDegreesDF: DataFrame = graph.degrees
maxGraphDegreeInform = graphDegreesDF.sort("degree", ascending=False).first()
maxGraphDegreeInform

                                                                                

Row(id=10397, degree=305)

In [15]:
# 5）找出最重要的机场
pageRankGraph = graph.pageRank(0.05, tol=0.01)
firstData = pageRankGraph.vertices.sort("pagerank", ascending=False).first()
firstData

24/05/25 17:53:51 WARN BlockManager: Block rdd_114_0 already exists on this machine; not re-adding it
                                                                                

Row(id=10397, pagerank=10.37214346638005)

In [16]:
print("机场数量:", airPortNum)
print("航线数量:", linesNum)
print("最长的飞行航线:(%d -> %d: %d)" % (longestLineInform["src"], longestLineInform["dst"], longestLineInform['飞行距离']))
print("最繁忙的机场: (%d度数为%d)" % (maxGraphDegreeInform["id"], maxGraphDegreeInform["degree"]))
print("最重要的机场: (%d, pagerank: %f)" % (firstData["id"], firstData["pagerank"]))

机场数量: 301
航线数量: 4088
最长的飞行航线:(12173 -> 12478: 4983)
最繁忙的机场: (10397度数为305)
最重要的机场: (10397, pagerank: 10.372143)
