Skip to content

Commit

Permalink
Merge pull request #2 from mengxr/SPARK-12363
Browse files Browse the repository at this point in the history
use Graph instead of GraphImpl and update tests/example based on PIC paper
  • Loading branch information
viirya committed Feb 12, 2016
2 parents 4c7623f + c7ff1e6 commit d749f6d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,23 @@ import org.apache.spark.{SparkConf, SparkContext}
* n: Number of sampled points on innermost circle.. There are proportionally more points
* within the outer/larger circles
* maxIterations: Number of Power Iterations
* outerRadius: radius of the outermost of the concentric circles
* }}}
*
* Here is a sample run and output:
*
* ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
* ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
* 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
*
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object PowerIterationClusteringExample {

case class Params(
input: String = null,
k: Int = 3,
numPoints: Int = 5,
maxIterations: Int = 10,
outerRadius: Double = 3.0
k: Int = 2,
numPoints: Int = 10,
maxIterations: Int = 15
) extends AbstractParams[Params]

def main(args: Array[String]) {
Expand All @@ -69,17 +65,14 @@ object PowerIterationClusteringExample {
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
opt[Int]('k', "k")
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
.text(s"number of circles (clusters), default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
opt[Int]('n', "n")
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
.action((x, c) => c.copy(numPoints = x))
opt[Int]("maxIterations")
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
.action((x, c) => c.copy(maxIterations = x))
opt[Double]('r', "r")
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
.action((x, c) => c.copy(outerRadius = x))
}

parser.parse(args, defaultParams).map { params =>
Expand All @@ -97,20 +90,21 @@ object PowerIterationClusteringExample {

Logger.getRootLogger.setLevel(Level.WARN)

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(",")
}.mkString(", ")
val sizesStr = assignments.map {
_._2.size
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")

Expand All @@ -124,20 +118,17 @@ object PowerIterationClusteringExample {
}
}

def generateCirclesRdd(sc: SparkContext,
nCircles: Int = 3,
nPoints: Int = 30,
outerRadius: Double): RDD[(Long, Long, Double)] = {

val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
val points = (0 until nCircles).flatMap { cx =>
generateCircle(radii(cx), groupSizes(cx))
def generateCirclesRdd(
sc: SparkContext,
nCircles: Int,
nPoints: Int): RDD[(Long, Long, Double)] = {
val points = (1 to nCircles).flatMap { i =>
generateCircle(i, i * nPoints)
}.zipWithIndex
val rdd = sc.parallelize(points)
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
if (i0 < i1) {
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
} else {
None
}
Expand All @@ -148,11 +139,9 @@ object PowerIterationClusteringExample {
/**
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
*/
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
coeff * math.exp(expCoeff * ssquares)
math.exp(-ssquares / 2.0)
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -262,10 +261,10 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, graph.edges)
Graph(vD, graph.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.All)
new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true))
}

/**
Expand All @@ -291,10 +290,10 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, gA.edges)
Graph(vD, gA.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.Src)
new TripletFields(/* useSrc */ true, /* useDst */ false, /* useEdge */ true))
}

/**
Expand All @@ -315,7 +314,7 @@ object PowerIterationClustering extends Logging {
}, preservesPartitioning = true).cache()
val sum = r.values.map(math.abs).sum()
val v0 = r.mapValues(x => x / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
Graph(VertexRDD(v0), g.edges)
}

/**
Expand All @@ -330,7 +329,7 @@ object PowerIterationClustering extends Logging {
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
val sum = g.vertices.values.sum()
val v0 = g.vertices.mapValues(_ / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
Graph(VertexRDD(v0), g.edges)
}

/**
Expand All @@ -355,7 +354,7 @@ object PowerIterationClustering extends Logging {
val v = curG.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
mergeMsg = _ + _,
TripletFields.Dst).cache()
new TripletFields(/* useSrc */ false, /* useDst */ true, /* useEdge */ true)).cache()
// normalize v
val norm = v.values.map(math.abs).sum()
logInfo(s"$msgPrefix: norm(v) = $norm.")
Expand All @@ -368,7 +367,7 @@ object PowerIterationClustering extends Logging {
diffDelta = math.abs(delta - prevDelta)
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
// update v
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
curG = Graph(VertexRDD(v1), g.edges)
prevDelta = delta
}
curG.vertices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon

import org.apache.spark.mllib.clustering.PowerIterationClustering._

/** Generates a circle of points. */
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
Array.tabulate(n) { i =>
val theta = 2.0 * math.Pi * i / n
(r * math.cos(theta), r * math.sin(theta))
}
}

/** Computes Gaussian similarity. */
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
math.exp(-dist2 / 2.0)
}

test("power iteration clustering") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).
15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
val model = new PowerIterationClustering()
.setK(2)
.setMaxIterations(40)
.run(sc.parallelize(similarities, 2))
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))

val model2 = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("power iteration clustering on graph") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).
15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val edges = similarities.flatMap { case (i, j, s) =>
if (i != j) {
Expand All @@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon

val model = new PowerIterationClustering()
.setK(2)
.setMaxIterations(40)
.run(graph)
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))

val model2 = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("normalize and powerIter") {
Expand Down

0 comments on commit d749f6d

Please sign in to comment.