Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,17 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.run(graph, numIter, resetProb)
}

/**
* Compute the KCore membership of each vertex and return a graph with the vertex
* value containing a boolean the is true if this vertex belongs to the KCore of
* the original graph.
*
* @see [[org.apache.spark.graphx.lib.KCore#run]]
*/
def kCore(k: Int, maxIterations: Int = Int.MaxValue): Graph[Boolean, ED] = {
KCore.run(graph, k, maxIterations)
}

/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
Expand Down
90 changes: 90 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.graphx._



/** KCore algorithm. */
object KCore {
/**
* Compute the KCore membership of each vertex and return a graph with the vertex
* value containing a boolean that is true if this vertex belongs to the KCore of
* the original graph.
*
* General idea:
* 1. each node starts with his initial degree
* 2. if his current degree is less than K than:
* 2a. he changes its degree to 0
* 2b. he sends a message to his neighbours to reduce 1 from their degree
* 2c. he sends a message to himself to be marked as "removed" - degree = -1
*
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
* @param graph the graph for which to compute the KCore
* @param k the k of the KCore algorithm
* @param maxIterations the maximum number of iterations to run for
* @return a graph with vertex attributes containing a boolean that
* represents if this vertex belongs to the KCore
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
k: Int,
maxIterations: Int = Int.MaxValue): Graph[Boolean, ED] = {
require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
s" but got ${maxIterations}")

val kGraph = Graph[Int, ED](graph.degrees, graph.edges)
def vprog(vid: VertexId, vd: Int, i: Int) = {
val ret = if ((vd - i) >= k) vd - i
else if (vd > 0) 0
else -1

ret
}

def sendMessage(edge: EdgeTriplet[Int, ED]): Iterator[(VertexId, Int)] = {
val lst = if (edge.srcAttr == 0) {
List((edge.dstId, (1, edge.dstAttr)), (edge.srcId, (0, edge.srcAttr)))
} else if (edge.dstAttr == 0) {
List((edge.srcId, (1, edge.srcAttr)), (edge.dstId, (0, edge.dstAttr)))
} else List.empty

// no need to send msg to nodes that have already been "removed"
val lstFiltered = lst.filter(_._2._2>=0).map(t => (t._1, t._2._1))

lstFiltered.iterator
}

val initialMessage = 0
val pregelGraph = Pregel(kGraph, initialMessage,
maxIterations, EdgeDirection.Either)(
vprog = vprog,
sendMsg = sendMessage,
mergeMsg = (a, b) => a + b)
kGraph.unpersist()

pregelGraph.mapVertices[Boolean]((id: VertexId, d: Int) => d>0)

} // end of KCore

}
78 changes: 78 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import org.apache.spark.SparkFunSuite
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._


class KCoreSuite extends SparkFunSuite with LocalSparkContext {

test("KCore on a Toy Connected Directional Graph") {
withSpark { sc =>
val edges =
Array(2L -> 5L, 5L -> 3L, 5L -> 0L) ++
Array(3L -> 7L, 4L -> 0L, 7L -> 0L) ++
Array(7L -> 8L, 8L -> 9L, 5L -> 7L)

val rawEdges = sc.parallelize(edges)
val graph = Graph.fromEdgeTuples(rawEdges, -1L)
// Edges are:
// 2 ---> 5 ---> 3
// | \ |
// V \ |
// 4 ---> 0 <--- 7 ---> 8 ---> 9
//
val ccGraph = graph.kCore(2)
val verticesOutput = ccGraph.vertices.collect()

val expected = Array((4,false), (0,true), (3,true), (7,true), (9,false), (8,false), (5,true), (2,false))
assert(verticesOutput.deep == expected.deep)

}
} // end of toy KCore Directional Graph

test("KCore on a Toy Connected Bi-Directional Graph") {
withSpark { sc =>
val edges1 =
Array(2L -> 5L, 5L -> 3L, 5L -> 0L) ++
Array(3L -> 7L, 4L -> 0L, 7L -> 0L) ++
Array(7L -> 8L, 8L -> 9L, 5L -> 7L)

val edges = edges1 ++ edges1.map(_.swap)

val rawEdges = sc.parallelize(edges)
val graph = Graph.fromEdgeTuples(rawEdges, -1L)
// Edges are:
// 2 ---- 5 ---- 3
// | \ |
// | \ |
// 4 ---- 0 ---- 7 ---- 8 ---- 9
//
val ccGraph = graph.kCore(3)
val verticesOutput = ccGraph.vertices.collect()

val expected = Array((4,false), (0,true), (7,true), (3,true), (9,false), (8,false), (5,true), (2,false))
assert(verticesOutput.deep == expected.deep)

}
} // end of toy KCore Bi-Directional Graph

}