本章将介绍如何使用 Spark 的图处理库 GraphX 进行图处理。
本章分为以下食谱:
- 图的基本运算
- 使用 PageRank
- 查找连接的组件
- 执行邻域聚合
图表分析在我们的生活中比我们想象的要常见得多。 举一个最常见的例子,当我们要求 GPS 找到到达目的地的最短路径时,它会使用图处理算法。
让我们从理解图表开始。 图是一组顶点的表示,其中一些顶点对由边连接。 当这些边从一个方向移动到另一个方向时,称为有向图或有向图。
GraphX 是用于图处理的 Spark API。 它提供了一个称为弹性分布属性图的 RDD 包装器。 属性图是具有附加到每个顶点和边的属性的有向多重图。
有两种类型的图-有向图(有向图)和正则图。 有向图具有一个方向的边,例如,从顶点 A 到顶点 B。Twitter Follow 就是有向图的一个很好的例子。 如果约翰是大卫在推特上的追随者,并不意味着大卫就是约翰的追随者。 另一方面,Facebook 是规则图的一个很好的例子。 如果说 John 是 David 的 Facebook 好友,那么 David 也是 John 的 Facebook 好友。
多重图是允许有多条边(也称为条平行边)的图。 由于 GraphX 中的每条边都有属性,因此每条边都有自己的标识。
传统上,对于分布式图处理,存在两种类型的系统:
- 数据并行
- 图并行
GraphX 的目标是将两者结合在一个系统中。 GraphX API 使用户能够以图表和集合(RDDS)的形式查看数据,而无需移动数据。
在本食谱中,我们将学习如何创建图并对其进行基本操作。
作为开始示例,我们将有三个顶点,每个顶点代表加利福尼亚州三个城市的市中心-圣克拉拉、弗里蒙特和旧金山。 以下是这些城市之间的距离:
|来源 / 提供信息的人 / 原始资料 / 水源
|
目的地 / 送达地点 / 专访目的地
|
距离(英里)
| | --- | --- | --- | | 加利福尼亚州圣克拉拉 | -加利福尼亚州弗里蒙特 | 20 个 | | -加利福尼亚州弗里蒙特 | 加州旧金山 | 44 | | 加州旧金山 | 加利福尼亚州圣克拉拉 | 53 |
-
导入与 GraphX 相关的类:
scala> import org.apache.spark.graphx._ scala> import org.apache.spark.rdd.RDD
-
将顶点数据加载到数组中:
scala> val vertices = Array((1L, ("Santa Clara","CA")),(2L, ("Fremont","CA")),(3L, ("San Francisco","CA")))
-
将顶点数组加载到顶点的 RDD 中:
scala> val vrdd = sc.parallelize(vertices)
-
将边数据加载到数组中:
scala> val edges = Array(Edge(1L,2L,20),Edge(2L,3L,44),Edge(3L,1L,53))
-
将数据加载到边的 RDD 中:
scala> val erdd = sc.parallelize(edges)
-
创建图表:
scala> val graph = Graph(vrdd,erdd)
-
打印图表的所有顶点:
scala> graph.vertices.collect.foreach(println)
-
打印图的所有边:
scala> graph.edges.collect.foreach(println)
-
打印边三元组;通过将源和目标属性添加到边来创建三元组:
scala> graph.triplets.collect.foreach(println)
-
图的度数是它所具有的向内有向边的数目。 打印每个顶点的入度(如
VertexRDD[Int]
):
```scala
scala> graph.inDegrees
```
PageRank 度量图中每个顶点的重要性。 PageRank 是由谷歌的创始人创立的,他们利用这样的理论:互联网上最重要的页面是有最多链接指向这些页面的页面。 PageRank 还关注指向目标页面的页面的重要性。 因此,如果一个给定的网页有来自排名较高的页面的传入链接,那么它的排名就会更高。
我们将使用维基百科的页面链接数据来计算页面排名。 维基百科以数据库转储的形式发布其数据。 我们将使用来自http://haselgrove.id.au/wikipedia.htm的链接数据,该数据位于两个文件中:
links-simple-sorted.txt
titles-sorted.txt
我已经把它们都放到了s3n://com.infoobjects.wiki/links
和s3n://com.infoobjects.wiki/nodes
的 Amazon S3 上。 由于数据较大,因此建议您在 Amazon EC2 或本地群集上运行它。 沙盒可能非常慢。
您可以使用以下命令将文件加载到hdfs
:
$ hdfs dfs -mkdir wiki
$ hdfs dfs -put links-simple-sorted.txt wiki/links.txt
$ hdfs dfs -put titles-sorted.txt wiki/nodes.txt
-
导入 GraphX 相关类:
scala> import org.apache.spark.graphx._
-
Load the edges from
hdfs
with 20 partitions:scala> val edgesFile = sc.textFile("wiki/links.txt",20)
或者,从 Amazon S3 加载边缘:
scala> val edgesFile = sc.textFile("s3n:// com.infoobjects.wiki/links",20)
links
文件在“SourceLink:Link1 Link2…”中有链接。 格式化。 -
展平并将其转换为“link1,link2”格式的 RDD,然后将其转换为
Edge
个对象的 RDD:scala> val edges = edgesFile.flatMap { line => val links = line.split("\\W+") val from = links(0) val to = links.tail for ( link <- to) yield (from,link) }.map( e => Edge(e._1.toLong,e._2.toLong,1))
-
用 20 个分区加载
hdfs
中的顶点:scala> val verticesFile = sc.textFile("wiki/nodes.txt",20)
-
或者,从 Amazon S3 加载边缘:
scala> val verticesFile = sc.textFile("s3n:// com.infoobjects.wiki/nodes",20)
-
为顶点提供索引,然后将其交换为(index,title)格式:
scala> val vertices = verticesFile.zipWithIndex.map(_.swap)
-
创建
graph
对象:scala> val graph = Graph(vertices,edges)
-
运行 PageRank 并获取顶点:
scala> val ranks = graph.pageRank(0.001).vertices
-
由于排名采用(顶点 ID,PageRank)格式,因此将其交换为(PageRank,顶点 ID)格式:
scala> val swappedRanks = ranks.map(_.swap)
-
排序以首先获得排名最高的页面:
```scala
scala> val sortedRanks = swappedRanks.sortByKey(false)
```
- 获取排名最高的个页面:
```scala
scala> val highest = sortedRanks.first
```
- 前面的命令给出了顶点 id,您仍然需要查找它才能看到具有排名的实际标题。 让我们做一个 JOIN:
```scala
scala> val join = sortedRanks.join(vertices)
```
- 从(顶点 ID,(页面排名,标题))格式转换为(页面排名,(顶点 ID,标题))格式后,连接的 RDD 再次排序:
```scala
scala> val final = join.map ( v => (v._2._1, (v._1,v._2._2))).sortByKey(false)
```
- 打印排名前五的页面
```scala
scala> final.take(5).collect.foreach(println)
```
下面是应该输出的内容:
(12406.054646736622,(5302153,United_States'_Country_Reports_on_Human_Rights_Practices))
(7925.094429748747,(84707,2007,_Canada_budget)) (7635.6564216408515,(88822,2008,_Madrid_plane_crash)) (7041.479913258444,(1921890,Geographic_coordinates)) (5675.169862343964,(5300058,United_Kingdom's))
连通分支是一个子图(其顶点是原始图的顶点集的子集,其边是原始图的边集的子集),其中任意两个顶点通过条边或一系列边相互连接。
要理解它,一个简单的方法是看一看夏威夷的公路网图。 这个州有许多岛屿,它们之间没有公路相连。 在每个岛屿内,大多数道路将相互连接。 查找连接组件的目标是找到这些群集。
连通分量算法用其编号最低的顶点的 ID 来标记图的每个连通分量。
我们将在这里为我们已知的集群构建一个小图,并使用连接组件将它们分开。 我们来看一下以下数据:
|追随者 / 爱好者 / 信徒 / 属下
|
跟随者
| | --- | --- | | 约翰 (人名) | 爱尔兰人的绰号 | | 爱尔兰人的绰号 | 戴夫 | | 加里 (印第安纳州西北部一工业城市) | 克里斯 | | 克里斯 | 警察 |
前面的数据是一个简单的数据,有六个顶点和两个簇。 让我们将此数据放在两个文件中:nodes.csv
和edges.csv
。
以下是nodes.csv
的内容:
1,John
2,Pat
3,Dave
4,Gary
5,Chris
6,Bill
以下是edges.csv
的内容:
1,2,follows
2,3,follows
4,5,follows
5,6,follows
我们应该期待连通分量算法识别两个集群,第一个由(1,John)识别,第二个由(4,Gary)识别。
您可以使用以下命令将文件加载到hdfs
:
$ hdfs dfs -mkdir data/cc
$ hdfs dfs -put nodes.csv data/cc/nodes.csv
$ hdfs dfs -put edges.csv data/cc/edges.csv
-
加载 Spark 外壳:
$ spark-shell
-
导入与 GraphX 相关的类:
scala> import org.apache.spark.graphx._
-
从
hdfs
加载边:scala> val edgesFile = sc.textFile("hdfs://localhost:9000/user/hduser/data/cc/edges.csv")
-
将
edgesFile
RDD 转换为边的 RDD:scala> val edges = edgesFile.map(_.split(",")).map(e => Edge(e(0).toLong,e(1).toLong,e(2)))
-
从
hdfs
加载顶点:scala> val verticesFile = sc.textFile("hdfs://localhost:9000/user/hduser/data/cc/nodes.csv")
-
映射顶点:
scala> val vertices = verticesFile.map(_.split(",")).map( e => (e(0).toLong,e(1)))
-
创建
graph
对象:scala> val graph = Graph(vertices,edges)
-
计算连接的组件:
scala> val cc = graph.connectedComponents
-
查找连通组件的顶点(这是一个子图):
scala> val ccVertices = cc.vertices
-
打印
ccVertices
:
```scala
scala> ccVertices.collect.foreach(println)
```
正如您在输出中看到的,顶点 1、2、3 指向 1,而 4、5、6 指向 4。这两个顶点都是各自集群中索引最低的顶点。
GraphX 通过隔离每个顶点及其邻居来执行大部分计算。 这使得在分布式系统上处理海量图数据变得更加容易。 这使得邻里行动变得非常重要。 GraphX 有一种机制,可以在每个邻域级别以aggregateMessages
方法的形式执行此操作。 它分两步完成:
- 在第一步(该方法的第一个函数)中,消息被发送到目的顶点或源顶点(类似于 MapReduce 中的 Map 函数)。
- 在第二步(该方法的第二个函数)中,对这些消息进行聚合(类似于 MapReduce 中的 Reduce 函数)。
让我们构建一个关注者的小数据集:
|追随者 / 爱好者 / 信徒 / 属下
|
跟随者
| | --- | --- | | 约翰 (人名) | 巴拉克 | | 爱尔兰人的绰号 | 巴拉克 | | 加里 (印第安纳州西北部一工业城市) | 巴拉克 | | 克里斯 | 独指手套 / 手 / 棒球手套 / 女用露指长手套 | | 抢劫 / 使丧失 / 使…丧失 / 非法剥夺 | 独指手套 / 手 / 棒球手套 / 女用露指长手套 |
我们的目标是找出每个节点有多少追随者。 让我们以两个文件的形式加载该数据:nodes.csv
和edges.csv
。
以下是nodes.csv
的内容:
1,Barack
2,John
3,Pat
4,Gary
5,Mitt
6,Chris
7,Rob
以下是edges.csv
的内容:
2,1,follows
3,1,follows
4,1,follows
6,5,follows
7,5,follows
您可以使用以下命令将文件加载到hdfs
:
$ hdfs dfs -mkdir data/na
$ hdfs dfs -put nodes.csv data/na/nodes.csv
$ hdfs dfs -put edges.csv data/na/edges.csv
-
加载 Spark 外壳:
$ spark-shell
-
导入 GraphX 相关类:
scala> import org.apache.spark.graphx._
-
从
hdfs
加载边:scala> val edgesFile = sc.textFile("hdfs://localhost:9000/user/hduser/data/na/edges.csv")
-
将边转换为边的 RDD:
scala> val edges = edgesFile.map(_.split(",")).map(e => Edge(e(0).toLong,e(1).toLong,e(2)))
-
从
hdfs
加载顶点:scala> val verticesFile = sc.textFile("hdfs://localhost:9000/user/hduser/data/cc/nodes.csv")
-
映射顶点:
scala> val vertices = verticesFile.map(_.split(",")).map( e => (e(0).toLong,e(1)))
-
创建
graph
对象:scala> val graph = Graph(vertices,edges)
-
通过向关注者发送消息,即每个关注者的关注者数量为 1,然后添加关注者数量:
scala> val followerCount = graph.aggregateMessages[(Int)]( t => t.sendToDst(1), (a, b) => (a+b))
,进行邻域聚合
-
按(关注者,关注人数)的形式打印
followerCount
:scala> followerCount.collect.foreach(println)
您应该会得到类似于以下内容的输出:
(1,3)
(5,2)