Skip to content

Commit

Permalink
[SPARK-12363][MLLIB] Remove setRun and fix PowerIterationClustering f…
Browse files Browse the repository at this point in the history
…ailed test

JIRA: https://issues.apache.org/jira/browse/SPARK-12363

This issue is pointed by yanboliang. When `setRuns` is removed from PowerIterationClustering, one of the tests will be failed. I found that some `dstAttr`s of the normalized graph are not correct values but 0.0. By setting `TripletFields.All` in `mapTriplets` it can work.

Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes #10539 from viirya/fix-poweriter.

(cherry picked from commit e3441e3)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
  • Loading branch information
viirya authored and mengxr committed Feb 13, 2016
1 parent 74bda98 commit 399e13d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,23 @@ import org.apache.spark.{SparkConf, SparkContext}
* n: Number of sampled points on innermost circle.. There are proportionally more points
* within the outer/larger circles
* maxIterations: Number of Power Iterations
* outerRadius: radius of the outermost of the concentric circles
* }}}
*
* Here is a sample run and output:
*
* ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
* ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
*
* Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
* 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
*
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object PowerIterationClusteringExample {

case class Params(
input: String = null,
k: Int = 3,
numPoints: Int = 5,
maxIterations: Int = 10,
outerRadius: Double = 3.0
k: Int = 2,
numPoints: Int = 10,
maxIterations: Int = 15
) extends AbstractParams[Params]

def main(args: Array[String]) {
Expand All @@ -69,17 +65,14 @@ object PowerIterationClusteringExample {
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
opt[Int]('k', "k")
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
.text(s"number of circles (clusters), default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
opt[Int]('n', "n")
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
.action((x, c) => c.copy(numPoints = x))
opt[Int]("maxIterations")
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
.action((x, c) => c.copy(maxIterations = x))
opt[Double]('r', "r")
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
.action((x, c) => c.copy(outerRadius = x))
}

parser.parse(args, defaultParams).map { params =>
Expand All @@ -97,20 +90,21 @@ object PowerIterationClusteringExample {

Logger.getRootLogger.setLevel(Level.WARN)

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(",")
}.mkString(", ")
val sizesStr = assignments.map {
_._2.size
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")

Expand All @@ -124,20 +118,17 @@ object PowerIterationClusteringExample {
}
}

def generateCirclesRdd(sc: SparkContext,
nCircles: Int = 3,
nPoints: Int = 30,
outerRadius: Double): RDD[(Long, Long, Double)] = {

val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
val points = (0 until nCircles).flatMap { cx =>
generateCircle(radii(cx), groupSizes(cx))
def generateCirclesRdd(
sc: SparkContext,
nCircles: Int,
nPoints: Int): RDD[(Long, Long, Double)] = {
val points = (1 to nCircles).flatMap { i =>
generateCircle(i, i * nPoints)
}.zipWithIndex
val rdd = sc.parallelize(points)
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
if (i0 < i1) {
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
} else {
None
}
Expand All @@ -148,11 +139,9 @@ object PowerIterationClusteringExample {
/**
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
*/
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
coeff * math.exp(expCoeff * ssquares)
math.exp(-ssquares / 2.0)
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -271,10 +270,12 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, graph.edges)
Graph(vD, graph.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.Src)
new TripletFields(/* useSrc */ true,
/* useDst */ false,
/* useEdge */ true))
}

/**
Expand All @@ -300,10 +301,12 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
GraphImpl.fromExistingRDDs(vD, gA.edges)
Graph(vD, gA.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
TripletFields.Src)
new TripletFields(/* useSrc */ true,
/* useDst */ false,
/* useEdge */ true))
}

/**
Expand All @@ -324,7 +327,7 @@ object PowerIterationClustering extends Logging {
}, preservesPartitioning = true).cache()
val sum = r.values.map(math.abs).sum()
val v0 = r.mapValues(x => x / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
Graph(VertexRDD(v0), g.edges)
}

/**
Expand All @@ -339,7 +342,7 @@ object PowerIterationClustering extends Logging {
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
val sum = g.vertices.values.sum()
val v0 = g.vertices.mapValues(_ / sum)
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
Graph(VertexRDD(v0), g.edges)
}

/**
Expand All @@ -364,7 +367,9 @@ object PowerIterationClustering extends Logging {
val v = curG.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
mergeMsg = _ + _,
TripletFields.Dst).cache()
new TripletFields(/* useSrc */ false,
/* useDst */ true,
/* useEdge */ true)).cache()
// normalize v
val norm = v.values.map(math.abs).sum()
logInfo(s"$msgPrefix: norm(v) = $norm.")
Expand All @@ -377,7 +382,7 @@ object PowerIterationClustering extends Logging {
diffDelta = math.abs(delta - prevDelta)
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
// update v
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
curG = Graph(VertexRDD(v1), g.edges)
prevDelta = delta
}
curG.vertices
Expand All @@ -394,7 +399,6 @@ object PowerIterationClustering extends Logging {
val points = v.mapValues(x => Vectors.dense(x)).cache()
val model = new KMeans()
.setK(k)
.setRuns(5)
.setSeed(0L)
.run(points.values)
points.mapValues(p => model.predict(p)).cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon

import org.apache.spark.mllib.clustering.PowerIterationClustering._

/** Generates a circle of points. */
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
Array.tabulate(n) { i =>
val theta = 2.0 * math.Pi * i / n
(r * math.cos(theta), r * math.sin(theta))
}
}

/** Computes Gaussian similarity. */
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
math.exp(-dist2 / 2.0)
}

test("power iteration clustering") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).
15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
val model = new PowerIterationClustering()
.setK(2)
.setMaxIterations(40)
.run(sc.parallelize(similarities, 2))
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))

val model2 = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("power iteration clustering on graph") {
/*
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
edge (3, 4).
15-14 -13 -12
| |
4 . 3 - 2 11
| | x | |
5 0 - 1 10
| |
6 - 7 - 8 - 9
*/

val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val edges = similarities.flatMap { case (i, j, s) =>
if (i != j) {
Expand All @@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon

val model = new PowerIterationClustering()
.setK(2)
.setMaxIterations(40)
.run(graph)
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))

val model2 = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("normalize and powerIter") {
Expand Down
25 changes: 19 additions & 6 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,25 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
Model produced by [[PowerIterationClustering]].
>>> data = [(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), (1, 3, 1.0),
... (2, 3, 1.0), (3, 4, 0.1), (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0),
... (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0), (10, 11, 1.0),
... (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0)]
>>> rdd = sc.parallelize(data, 2)
>>> model = PowerIterationClustering.train(rdd, 2, 100)
>>> import math
>>> def genCircle(r, n):
... points = []
... for i in range(0, n):
... theta = 2.0 * math.pi * i / n
... points.append((r * math.cos(theta), r * math.sin(theta)))
... return points
>>> def sim(x, y):
... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
... return math.exp(-dist2 / 2.0)
>>> r1 = 1.0
>>> n1 = 10
>>> r2 = 4.0
>>> n2 = 40
>>> n = n1 + n2
>>> points = genCircle(r1, n1) + genCircle(r2, n2)
>>> similarities = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)]
>>> rdd = sc.parallelize(similarities, 2)
>>> model = PowerIterationClustering.train(rdd, 2, 40)
>>> model.k
2
>>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
Expand Down

0 comments on commit 399e13d

Please sign in to comment.