From ec88567259f68ced0e6fb7392b300848074a4489 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 1 Feb 2015 20:20:45 +0800 Subject: [PATCH 1/3] Run the PIC algorithm with degree vector d as suggected by the PIC paper. --- .../clustering/PowerIterationClustering.scala | 22 +++++++++++++++++++ .../PowerIterationClusteringSuite.scala | 9 ++++++++ 2 files changed, 31 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index fcb9a3643cc48..58af8d00b98fd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -86,6 +86,16 @@ class PowerIterationClustering private[clustering] ( pic(w0) } + /** + * Run the PIC algorithm with degree vector d as suggected by the PIC paper. + */ + def runWithDegreeVector( + similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = { + val w = normalize(similarities) + val w0 = initDegreeVector(w) + pic(w0) + } + /** * Runs the PIC algorithm. * @@ -148,6 +158,18 @@ private[clustering] object PowerIterationClustering extends Logging { GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges) } + /** + * Generates the degree vector as the vertex properties (v0) to start power iteration. + * + * @param g a graph representing the normalized affinity matrix (W) + * @return a graph with edges representing W and vertices representing the degree vector + */ + def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = { + val sum = g.vertices.values.sum() + val v0 = g.vertices.mapValues(_ / sum) + GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges) + } + /** * Runs power iteration. * @param g input graph with edges representing the normalized affinity matrix (W) and vertices diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala index 2bae465d392aa..fb79666abbe84 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala @@ -55,6 +55,15 @@ class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext predictions(c) += i } assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet)) + + val model2 = new PowerIterationClustering() + .setK(2) + .runWithDegreeVector(sc.parallelize(similarities, 2)) + val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) + model2.assignments.collect().foreach { case (i, c) => + predictions2(c) += i + } + assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet)) } test("normalize and powerIter") { From 19cf94ecfd6d879cbceb52f0abc0a32461e7d871 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 2 Feb 2015 15:12:23 +0800 Subject: [PATCH 2/3] Add an option to select initialization method. --- .../clustering/PowerIterationClustering.scala | 30 +++++++++++-------- .../PowerIterationClusteringSuite.scala | 3 +- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 58af8d00b98fd..b07248d71d9ee 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -43,10 +43,12 @@ class PowerIterationClusteringModel( * * @param k Number of clusters. * @param maxIterations Maximum number of iterations of the PIC algorithm. + * @param initMethod Initialization method. */ class PowerIterationClustering private[clustering] ( private var k: Int, - private var maxIterations: Int) extends Serializable { + private var maxIterations: Int, + private var initMethod: String = "random") extends Serializable { import org.apache.spark.mllib.clustering.PowerIterationClustering._ @@ -69,6 +71,17 @@ class PowerIterationClustering private[clustering] ( this } + /** + * Set the initialization method + */ + def setInitialization(method: String): this.type = { + this.initMethod = method match { + case "random" | "degree" => method + case _ => throw new IllegalArgumentException("Incorrect initialization method") + } + this + } + /** * Run the PIC algorithm. * @@ -82,20 +95,13 @@ class PowerIterationClustering private[clustering] ( */ def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = { val w = normalize(similarities) - val w0 = randomInit(w) + val w0 = initMethod match { + case "random" => randomInit(w) + case "degree" => initDegreeVector(w) + } pic(w0) } - /** - * Run the PIC algorithm with degree vector d as suggected by the PIC paper. - */ - def runWithDegreeVector( - similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = { - val w = normalize(similarities) - val w0 = initDegreeVector(w) - pic(w0) - } - /** * Runs the PIC algorithm. * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala index fb79666abbe84..f0ca5af394f8c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala @@ -58,7 +58,8 @@ class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext val model2 = new PowerIterationClustering() .setK(2) - .runWithDegreeVector(sc.parallelize(similarities, 2)) + .setInitialization("degree") + .run(sc.parallelize(similarities, 2)) val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) model2.assignments.collect().foreach { case (i, c) => predictions2(c) += i From 7db28fbb6551d804436c52bcbff4067f7d8fac95 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 3 Feb 2015 09:49:25 +0800 Subject: [PATCH 3/3] Refactor it to address comments. --- .../clustering/PowerIterationClustering.scala | 25 +++++++++++-------- .../PowerIterationClusteringSuite.scala | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index b07248d71d9ee..9b5c155b0a805 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -43,17 +43,19 @@ class PowerIterationClusteringModel( * * @param k Number of clusters. * @param maxIterations Maximum number of iterations of the PIC algorithm. - * @param initMethod Initialization method. + * @param initMode Initialization mode. */ class PowerIterationClustering private[clustering] ( private var k: Int, private var maxIterations: Int, - private var initMethod: String = "random") extends Serializable { + private var initMode: String) extends Serializable { import org.apache.spark.mllib.clustering.PowerIterationClustering._ - /** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100}. */ - def this() = this(k = 2, maxIterations = 100) + /** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100, + * initMode: "random"}. + */ + def this() = this(k = 2, maxIterations = 100, initMode = "random") /** * Set the number of clusters. @@ -72,12 +74,13 @@ class PowerIterationClustering private[clustering] ( } /** - * Set the initialization method + * Set the initialization mode. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. */ - def setInitialization(method: String): this.type = { - this.initMethod = method match { - case "random" | "degree" => method - case _ => throw new IllegalArgumentException("Incorrect initialization method") + def setInitializationMode(mode: String): this.type = { + this.initMode = mode match { + case "random" | "degree" => mode + case _ => throw new IllegalArgumentException("Invalid initialization mode: " + mode) } this } @@ -95,7 +98,7 @@ class PowerIterationClustering private[clustering] ( */ def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = { val w = normalize(similarities) - val w0 = initMethod match { + val w0 = initMode match { case "random" => randomInit(w) case "degree" => initDegreeVector(w) } @@ -166,6 +169,8 @@ private[clustering] object PowerIterationClustering extends Logging { /** * Generates the degree vector as the vertex properties (v0) to start power iteration. + * It is not exactly the node degrees but just the normalized sum similarities. Call it + * as degree vector because it is used in the PIC paper. * * @param g a graph representing the normalized affinity matrix (W) * @return a graph with edges representing W and vertices representing the degree vector diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala index f0ca5af394f8c..03ecd9ca730be 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala @@ -58,7 +58,7 @@ class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext val model2 = new PowerIterationClustering() .setK(2) - .setInitialization("degree") + .setInitializationMode("degree") .run(sc.parallelize(similarities, 2)) val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) model2.assignments.collect().foreach { case (i, c) =>