Skip to content

Commit

Permalink
Kcore debugging.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Apr 29, 2014
1 parent dbe5180 commit a641dc1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
3 changes: 3 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ object Pregel extends Logging {
// Loop
var prevG: Graph[VD, ED] = null.asInstanceOf[Graph[VD, ED]]
var i = 0
logWarning("Starting pregel.")
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
Expand Down Expand Up @@ -157,6 +158,8 @@ object Pregel extends Logging {
newVerts.unpersist(blocking=false)
prevG.unpersistVertices(blocking=false)
// count the iteration
logWarning(s"Pregel iteration $i")
// println(s"Pregel iteration $i")
i += 1
}

Expand Down
13 changes: 10 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/Kcore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ object KCore extends Logging {
: Graph[Int, ED] = {

// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true))
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache
var curK = 1
while (curK <= kmax) {
g = computeCurrentKCore(g, curK)
g = computeCurrentKCore(g, curK).cache
val testK = curK
val vCount = g.vertices.filter{ case (vid, (vd, _)) => vd >= testK}.count()
val eCount = g.triplets.map{t => t.srcAttr._1 >= testK && t.dstAttr._1 >= testK }.count()
logWarning(s"K=$curK, V=$vCount, E=$eCount")
curK += 1
}
g.mapVertices({ case (_, (k, _)) => k})
Expand All @@ -44,7 +48,7 @@ object KCore extends Logging {
def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = {
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = {
if (!et.srcAttr._2 || !et.dstAttr._2) {
// if either vertex has already been turned off, in which case we do nothing
// if either vertex has already been turned off we do nothing
Iterator.empty
} else if (et.srcAttr._1 < k && et.dstAttr._1 < k) {
// tell both vertices to turn off but don't need change count value
Expand All @@ -56,6 +60,8 @@ object KCore extends Logging {
// if dst is being pruned, tell src to subtract from vertex count but don't turn off
Iterator((et.dstId, (0, false)), (et.srcId, (1, true)))
} else {
// no-op but keep these vertices active?
// Iterator((et.srcId, (0, true)), (et.dstId, (0, true)))
Iterator.empty
}
}
Expand All @@ -76,6 +82,7 @@ object KCore extends Logging {
}

// Note that initial message should have no effect
logWarning("kcore starting pregel")
Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg)
}
}

0 comments on commit a641dc1

Please sign in to comment.