Skip to content

Commit

Permalink
[SPARK-3650][GRAPHX] Triangle Count handles reverse edges incorrectly
Browse files Browse the repository at this point in the history
jegonzal ankurdave please could you review

## What changes were proposed in this pull request?

Reworking of jegonzal PR #2495 to address the issue identified in SPARK-3650. Code amended to use the convertToCanonicalEdges method.

## How was the this patch tested?

Patch was tested using the unit tests created in PR #2495

Author: Robin East <robin.east@xense.co.uk>
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes #11290 from insidedctm/spark-3650.
  • Loading branch information
insidedctm authored and rxin committed Feb 22, 2016
1 parent 0f90f4e commit 3d79f10
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 23 deletions.
14 changes: 14 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.SparkException
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -183,6 +188,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
}

/**
* Remove self edges.
*
* @return a graph with all self edges removed
*/
def removeSelfEdges(): Graph[VD, ED] = {
graph.subgraph(epred = e => e.srcId != e.dstId)
}

/**
* Join the vertices with an RDD and then apply a function from the
* vertex and RDD entry to a new vertex value. The input table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,55 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D

/**
* Compute the number of triangles passing through each vertex.
*
* The algorithm is relatively straightforward and can be computed in three steps:
*
* <ul>
* <li>Compute the set of neighbors for each vertex
* <li>For each edge compute the intersection of the sets and send the count to both vertices.
* <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.
* <li> Compute the set of neighbors for each vertex</li>
* <li> For each edge compute the intersection of the sets and send the count to both vertices.</li>
* <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li>
* </ul>
*
* Note that the input graph should have its edges in canonical direction
* (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
* using [[org.apache.spark.graphx.Graph#partitionBy]].
* There are two implementations. The default `TriangleCount.run` implementation first removes
* self cycles and canonicalizes the graph to ensure that the following conditions hold:
* <ul>
* <li> There are no self edges</li>
* <li> All edges are oriented src > dst</li>
* <li> There are no duplicate edges</li>
* </ul>
* However, the canonicalization procedure is costly as it requires repartitioning the graph.
* If the input data is already in "canonical form" with self cycles removed then the
* `TriangleCount.runPreCanonicalized` should be used instead.
*
* {{{
* val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
* val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
* }}}
*
*/
object TriangleCount {

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Remove redundant edges
val g = graph.groupEdges((a, b) => a).cache()
// Transform the edge data something cheap to shuffle and then canonicalize
val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
// Get the triangle counts
val counters = runPreCanonicalized(canonicalGraph).vertices
// Join them bath with the original graph
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
optCounter.getOrElse(0)
}
}


def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(4)
graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
Expand All @@ -56,14 +79,14 @@ object TriangleCount {
}
set
}

// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}

// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
assert(ctx.srcAttr != null)
assert(ctx.dstAttr != null)
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
Expand All @@ -80,15 +103,15 @@ object TriangleCount {
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}

// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
g.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// double count should be even (divisible by two)
assert((dblCount & 1) == 0)
dblCount / 2
graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// This algorithm double counts each triangle so the final count should be even
require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
dblCount / 2
}
} // end of TriangleCount
}
}
15 changes: 15 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ class GraphOpsSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("removeSelfEdges") {
withSpark { sc =>
val edgeArray = Array((1 -> 2), (2 -> 3), (3 -> 3), (4 -> 3), (1 -> 1))
.map {
case (a, b) => (a.toLong, b.toLong)
}
val correctEdges = edgeArray.filter { case (a, b) => a != b }.toSet
val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId))
.collect
assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
assert(canonicalizedEdges.toSet === correctEdges)
}
}

test ("filter") {
withSpark { sc =>
val n = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 4)
} else {
assert(count === 2)
} else {
assert(count === 1)
}
}
}
Expand All @@ -75,7 +75,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count a single triangle with duplicate edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
Expand Down

0 comments on commit 3d79f10

Please sign in to comment.