Skip to content

Commit

Permalink
Fixed results of cherry-pick
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Apr 29, 2014
1 parent 4ddc552 commit dbe5180
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 169 deletions.
157 changes: 0 additions & 157 deletions graph/src/main/scala/org/apache/spark/graph/Pregel.scala

This file was deleted.

30 changes: 30 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.SparkContext._

/**
* Driver program for running graph algorithms.
Expand Down Expand Up @@ -130,6 +131,35 @@ object Analytics extends Logging {
logWarning("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct().count())
sc.stop()

case "kcore" =>
var numEPart = 4
var kmax = 3
var partitionStrategy: Option[PartitionStrategy] = None

options.foreach{
case ("numEPart", v) => numEPart = v.toInt
case ("kmax", v) => kmax = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

if (kmax < 1) {
logWarning("kmax must be positive")
sys.exit(1)
}
println("======================================")
println("| KCORE |")
println("======================================")

val sc = new SparkContext(host, "KCore(" + fname + ")", conf)
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

val result = KCore.run(graph, kmax)
println("Size of cores: " + result.vertices.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
sc.stop()

case "triangles" =>
var numEPart = 4
// TriangleCount requires the graph to be partitioned
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.apache.spark.graph.algorithms
package org.apache.spark.graphx.lib

import org.apache.spark.graph._
import org.apache.spark.graphx._
import org.apache.spark._
import scala.math._
import scala.reflect.ClassTag

object KCore extends Logging {
/**
Expand All @@ -25,7 +26,7 @@ object KCore extends Logging {
* graph but will yield all the cores for all k in [1, kmax].
*/

def run[VD: Manifest, ED: Manifest](
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
kmax: Int)
: Graph[Int, ED] = {
Expand All @@ -40,8 +41,8 @@ object KCore extends Logging {
g.mapVertices({ case (_, (k, _)) => k})
}

def computeCurrentKCore[ED: Manifest](graph: Graph[(Int, Boolean), ED], k: Int) = {
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(Vid, (Int, Boolean))] = {
def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = {
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = {
if (!et.srcAttr._2 || !et.dstAttr._2) {
// if either vertex has already been turned off, in which case we do nothing
Iterator.empty
Expand All @@ -64,7 +65,7 @@ object KCore extends Logging {
(m1._1 + m2._1, m1._2 && m2._2)
}

def vProg(vid: Vid, data: (Int, Boolean), update: (Int, Boolean)): (Int, Boolean) = {
def vProg(vid: VertexId, data: (Int, Boolean), update: (Int, Boolean)): (Int, Boolean) = {
var newCount = data._1
var on = data._2
if (on) {
Expand All @@ -75,6 +76,6 @@ object KCore extends Logging {
}

// Note that initial message should have no effect
Pregel.undirectedRun(graph, (0, true))(vProg, sendMsg, mergeMsg)
Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg)
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package org.apache.spark.graph.algorithms
package org.apache.spark.graphx.lib

import org.scalatest.FunSuite

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graph._
import org.apache.spark.graph.util.GraphGenerators
import org.apache.spark.graphx._
// import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._


class KCoreSuite extends FunSuite with LocalSparkContext {

def createTriple(sid: Vid, did: Vid, sattr: Int, dattr: Int, eattr: Int): EdgeTriplet[Int,Int] = {
def createTriple(sid: VertexId, did: VertexId, sattr: Int, dattr: Int, eattr: Int): EdgeTriplet[Int,Int] = {
val et = new EdgeTriplet[Int,Int]
et.srcId = sid
et.dstId = did
Expand All @@ -32,7 +32,7 @@ class KCoreSuite extends FunSuite with LocalSparkContext {
val rawEdges = createKCoreEdges()
val vertices = Set((11, 1), (12,1), (13,1), (14,1), (21,2), (22,2), (23,2), (31, 3), (32,3), (33,3), (34,3))
val graph = Graph.fromEdges(sc.parallelize(rawEdges), "a")
val resultGraph = KCore.run(graph, 1, 5)
val resultGraph = KCore.run(graph, 5)
val resultVerts = resultGraph.vertices.collect.toSet
assert(resultVerts === vertices)

Expand Down

0 comments on commit dbe5180

Please sign in to comment.