Skip to content

Commit

Permalink
Merge pull request apache#3 from dcrankshaw/osdi_with_kcore_for_merge
Browse files Browse the repository at this point in the history
Osdi with kcore for merge
  • Loading branch information
ankurdave committed Apr 29, 2014
2 parents 8d22500 + 9e59642 commit f483ca4
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 9 deletions.
5 changes: 4 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ object Pregel extends Logging {
// Loop
var prevG: Graph[VD, ED] = null.asInstanceOf[Graph[VD, ED]]
var i = 0
logWarning("Starting pregel.")
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
Expand All @@ -150,13 +151,15 @@ object Pregel extends Logging {
SparkEnv.get.blockManager.shuffleBlockManager.removeAllShuffleStuff()
}

logInfo("Pregel finished iteration " + i)
logWarning("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
prevG.unpersistVertices(blocking=false)
// count the iteration
logWarning(s"Pregel iteration $i")
// println(s"Pregel iteration $i")
i += 1
}

Expand Down
56 changes: 50 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.graphx.lib

import org.apache.spark._
import scala.math._
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.SparkContext._

/**
* Driver program for running graph algorithms.
Expand Down Expand Up @@ -127,27 +129,69 @@ object Analytics extends Logging {
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
logWarning("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct().count())
sc.stop()

case "kcore" =>
var numEPart = 4
var kmax = 4
var kmin = 1
var partitionStrategy: Option[PartitionStrategy] = None

options.foreach{
case ("numEPart", v) => numEPart = v.toInt
case ("kmax", v) => kmax = v.toInt
case ("kmin", v) => kmin = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

if (kmax < 1) {
logWarning("kmax must be positive")
sys.exit(1)
}
if (kmax < kmin) {
logWarning("kmax must be greater than or equal to kmin")
sys.exit(1)
}

println("======================================")
println("| KCORE |")
println("======================================")

val sc = new SparkContext(host, "KCore(" + fname + ")", conf)
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

logWarning("Starting kcore")
val result = KCore.run(graph, kmax, kmin)

logWarning("Size of cores: " + result.vertices.map { case (vid,data) => (min(data, kmax), 1) }.reduceByKey((_+_)).collect().mkString(", "))
sc.stop()

case "triangles" =>
var numEPart = 4
// TriangleCount requires the graph to be partitioned
var partitionStrategy: PartitionStrategy = RandomVertexCut
var partitionStrategy: Option[PartitionStrategy] = None

options.foreach{
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| Triangle Count |")
println("======================================")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf)
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
// val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
// minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
logWarning(s"Graph has ${graph.vertices.count} vertices")
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
logWarning("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong
}.reduce(_ + _) / 3)
sc.stop()
Expand Down
96 changes: 96 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.apache.spark.graphx.lib

import org.apache.spark.graphx._
import org.apache.spark._
import scala.math._
import org.apache.spark.SparkContext._
import scala.reflect.ClassTag

object KCore extends Logging {
/**
* Compute the k-core decomposition of the graph for all k <= kmax. This
* uses the iterative pruning algorithm discussed by Alvarez-Hamelin et al.
* in K-Core Decomposition: a Tool For the Visualization of Large Scale Networks
* (see <a href="http://arxiv.org/abs/cs/0504107">http://arxiv.org/abs/cs/0504107</a>).
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
*
* @param graph the graph for which to compute the connected components
* @param kmax the maximum value of k to decompose the graph
*
* @return a graph where the vertex attribute is the minimum of
* kmax or the highest value k for which that vertex was a member of
* the k-core.
*
* @note This method has the advantage of returning not just a single kcore of the
* graph but will yield all the cores for all k in [1, kmax].
*/

def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
kmax: Int,
kmin: Int = 1)
: Graph[Int, ED] = {

// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache
val degrees = graph.degrees
val numVertices = degrees.count
// logWarning(s"Numvertices: $numVertices")
// logWarning(s"degree sample: ${degrees.take(10).mkString(", ")}")
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).take(10).mkString(", "))
var curK = kmin
while (curK <= kmax) {
g = computeCurrentKCore(g, curK).cache
val testK = curK
val vCount = g.vertices.filter{ case (vid, (vd, _)) => vd >= testK}.count()
val eCount = g.triplets.map{t => t.srcAttr._1 >= testK && t.dstAttr._1 >= testK }.count()
logWarning(s"K=$curK, V=$vCount, E=$eCount")
curK += 1
}
g.mapVertices({ case (_, (k, _)) => k})
}

def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = {
logWarning(s"Computing kcore for k=$k")
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = {
if (!et.srcAttr._2 || !et.dstAttr._2) {
// if either vertex has already been turned off we do nothing
Iterator.empty
} else if (et.srcAttr._1 < k && et.dstAttr._1 < k) {
// tell both vertices to turn off but don't need change count value
Iterator((et.srcId, (0, false)), (et.dstId, (0, false)))
} else if (et.srcAttr._1 < k) {
// if src is being pruned, tell dst to subtract from vertex count but don't turn off
Iterator((et.srcId, (0, false)), (et.dstId, (1, true)))
} else if (et.dstAttr._1 < k) {
// if dst is being pruned, tell src to subtract from vertex count but don't turn off
Iterator((et.dstId, (0, false)), (et.srcId, (1, true)))
} else {
// no-op but keep these vertices active?
// Iterator((et.srcId, (0, true)), (et.dstId, (0, true)))
Iterator.empty
}
}

// subtracts removed neighbors from neighbor count and tells vertex whether it was turned off or not
def mergeMsg(m1: (Int, Boolean), m2: (Int, Boolean)): (Int, Boolean) = {
(m1._1 + m2._1, m1._2 && m2._2)
}

def vProg(vid: VertexId, data: (Int, Boolean), update: (Int, Boolean)): (Int, Boolean) = {
var newCount = data._1
var on = data._2
if (on) {
newCount = max(k - 1, data._1 - update._1)
on = update._2
}
(newCount, on)
}

// Note that initial message should have no effect
Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx.lib

import scala.reflect.ClassTag
import org.apache.spark.Logging

import org.apache.spark.graphx._

Expand All @@ -36,10 +37,12 @@ import org.apache.spark.graphx._
* (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
* using [[org.apache.spark.graphx.Graph#partitionBy]].
*/
object TriangleCount {
object TriangleCount extends Logging {

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
// Remove redundant edges

logWarning("Entering Triangle Count.")
val g = graph.groupEdges((a, b) => a).cache()

// Construct set representations of the neighborhoods
Expand All @@ -56,6 +59,8 @@ object TriangleCount {
}
set
}

logWarning("Neighbor sets collected.")
// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
Expand All @@ -82,12 +87,14 @@ object TriangleCount {
// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
g.outerJoinVertices(counters) {
val result = g.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// double count should be even (divisible by two)
assert((dblCount & 1) == 0)
dblCount / 2
}
logWarning("Triangle count finished.")
result
} // end of TriangleCount
}
44 changes: 44 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.spark.graphx.lib

import org.scalatest.FunSuite

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
// import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._


class KCoreSuite extends FunSuite with LocalSparkContext {

def createTriple(sid: VertexId, did: VertexId, sattr: Int, dattr: Int, eattr: Int): EdgeTriplet[Int,Int] = {
val et = new EdgeTriplet[Int,Int]
et.srcId = sid
et.dstId = did
et.srcAttr = sattr
et.dstAttr = dattr
et.attr = eattr
et
}

def createKCoreEdges(): Seq[Edge[Int]] = {
Seq(Edge(11,31), Edge(12,31), Edge(31,33), Edge(31,32), Edge(31,34), Edge(33,34),
Edge(33,32), Edge(34,32), Edge(32,13), Edge(32,23), Edge(34,23), Edge(23,14),
Edge(34,21), Edge(34,22), Edge(21,22))
}

test("KCore") {
withSpark { sc =>
val rawEdges = createKCoreEdges()
val vertices = Set((11, 1), (12,1), (13,1), (14,1), (21,2), (22,2), (23,2), (31, 3), (32,3), (33,3), (34,3))
val graph = Graph.fromEdges(sc.parallelize(rawEdges), "a")
val resultGraph = KCore.run(graph, 5)
val resultVerts = resultGraph.vertices.collect.toSet
assert(resultVerts === vertices)

}
}



}

0 comments on commit f483ca4

Please sign in to comment.