From d469d12257e68e437328a90d647a67bd002622f4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Feb 2015 16:19:06 +0800 Subject: [PATCH 01/16] Add Affinity Propagation clustering algorithm. --- .../clustering/AffinityPropagation.scala | 342 ++++++++++++++++++ .../clustering/AffinityPropagationSuite.scala | 93 +++++ 2 files changed, 435 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala new file mode 100644 index 0000000000000..a33006bb241af --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.Map +import scala.collection.mutable.Set + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.annotation.Experimental +import org.apache.spark.graphx._ +import org.apache.spark.graphx.impl.GraphImpl +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +/** + * :: Experimental :: + * + * Model produced by [[AffinityPropagation]]. + * + * @param k number of clusters + * @param assignments an RDD of (vertexID, clusterID) pairs + */ +@Experimental +class AffinityPropagationModel( + val clusters: Seq[Set[Long]]) extends Serializable { + + /** + * Set the number of clusters + */ + def getK(): Int = clusters.size + + /** + * Find the cluster the given vertex belongs + */ + def findCluster(vertexID: Long): Set[Long] = { + clusters.filter(_.contains(vertexID))(0) + } + + /** + * Find the cluster id the given vertex belongs + */ + def findClusterID(vertexID: Long): Option[Int] = { + var i = 0 + clusters.foreach(cluster => { + if (cluster.contains(vertexID)) { + return Some(i) + } + i += i + }) + None + } +} + +/** + * :: Experimental :: + * + * AffinityPropagation (AP), a graph clustering algorithm based on the concept of "message passing" + * between data points. Unlike clustering algorithms such as k-means or k-medoids, AP does not + * require the number of clusters to be determined or estimated before running it. AP is developed + * by [[http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf Frey and Dueck]]. + * + * @param maxIterations Maximum number of iterations of the AP algorithm. + * + * @see [[http://en.wikipedia.org/wiki/Affinity_propagation (Wikipedia)]] + */ +@Experimental +class AffinityPropagation private[clustering] ( + private var maxIterations: Int, + private var lambda: Double, + private var normalization: Boolean) extends Serializable { + + import org.apache.spark.mllib.clustering.AffinityPropagation._ + + /** Constructs a AP instance with default parameters: {maxIterations: 100, lambda: 0.5, + * normalization: false}. + */ + def this() = this(maxIterations = 100, lambda = 0.5, normalization = false) + + /** + * Set maximum number of iterations of the messaging iteration loop + */ + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** + * Get maximum number of iterations of the messaging iteration loop + */ + def getMaxIterations(): Int = { + this.maxIterations + } + + /** + * Set lambda of the messaging iteration loop + */ + def setLambda(lambda: Double): this.type = { + this.lambda = lambda + this + } + + /** + * Get lambda of the messaging iteration loop + */ + def getLambda(): Double = { + this.lambda + } + + /** + * Set whether to do normalization or not + */ + def setNormalization(normalization: Boolean): this.type = { + this.normalization = normalization + this + } + + /** + * Get whether to do normalization or not + */ + def getNormalization(): Boolean = { + this.normalization + } + + /** + * Run the AP algorithm. + * + * @param similarities an RDD of (i, j, s,,ij,,) tuples representing the similarity matrix, which + * is the matrix S in the AP paper. The similarity s,,ij,, is set to negative + * real-valued similarities. This is not required to be a symmetric matrix + * and hence s,,ij,, can be not equal to s,,ji,,. Tuples with i = j are + * referred to as "preferences" in the AP paper. The data points with larger + * values of s,,ii,, are more likely to be chosen as exemplars. + * + * @param symmetric the given similarity matrix is symmetric or not. Default value: true + * @return a [[AffinityPropagationModel]] that contains the clustering result + */ + def run(similarities: RDD[(Long, Long, Double)], symmetric: Boolean = true) + : AffinityPropagationModel = { + val s = constructGraph(similarities, normalization, symmetric) + ap(s) + } + + /** + * Runs the AP algorithm. + * + * @param s The (normalized) similarity matrix, which is the matrix S in the AP paper with vertex + * similarities and the initial availabilities and responsibilities as its edge + * properties. + */ + private def ap(s: Graph[Seq[Double], Seq[Double]]): AffinityPropagationModel = { + val g = apIter(s, maxIterations, lambda) + chooseExemplars(g) + } +} + +private[clustering] object AffinityPropagation extends Logging { + /** + * Construct the similarity matrix (S) and do normalization if needed. + * Returns the (normalized) similarity matrix (S). + */ + def constructGraph(similarities: RDD[(Long, Long, Double)], normalize: Boolean, + symmetric: Boolean): + Graph[Seq[Double], Seq[Double]] = { + val edges = similarities.flatMap { case (i, j, s) => + if (s > 0.0) { + throw new SparkException("Similarity must be negative but found s($i, $j) = $s.") + } + if (symmetric) { + Seq(Edge(i, j, Seq(s, 0.0, 0.0)), Edge(j, i, Seq(s, 0.0, 0.0))) + } else { + Seq(Edge(i, j, Seq(s, 0.0, 0.0))) + } + } + + if (normalize) { + val gA = Graph.fromEdges(edges, Seq(0.0)) + val vD = gA.aggregateMessages[Seq[Double]]( + sendMsg = ctx => { + ctx.sendToSrc(Seq(ctx.attr(0))) + }, + mergeMsg = (s1, s2) => Seq(s1(0) + s2(0)), + TripletFields.EdgeOnly) + val normalized = GraphImpl.fromExistingRDDs(vD, gA.edges) + .mapTriplets( + e => Seq(e.attr(0) / math.max(math.abs(e.srcAttr(0)), MLUtils.EPSILON), 0.0, 0.0), + TripletFields.Src) + Graph.fromEdges(normalized.edges, Seq(0.0, 0.0)) + } else { + Graph.fromEdges(edges, Seq(0.0, 0.0)) + } + } + + /** + * Runs AP's iteration. + * @param g input graph with edges representing the (normalized) similarity matrix (S) and + * the initial availabilities and responsibilities. + * @param maxIterations maximum number of iterations. + * @return a [[Graph]] representing the final graph. + */ + def apIter( + g: Graph[Seq[Double], Seq[Double]], + maxIterations: Int, lambda: Double): Graph[Seq[Double], Seq[Double]] = { + val tol = math.max(1e-5 / g.vertices.count(), 1e-8) + var prevDelta = (Double.MaxValue, Double.MaxValue) + var diffDelta = (Double.MaxValue, Double.MaxValue) + var curG = g + for (iter <- 0 until maxIterations + if math.abs(diffDelta._1) > tol || math.abs(diffDelta._2) > tol) { + val msgPrefix = s"Iteration $iter" + + // update responsibilities + val vD_r = curG.aggregateMessages[Seq[Double]]( + sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr(0) + ctx.attr(1))), + mergeMsg = _ ++ _, + TripletFields.EdgeOnly) + + val updated_r = GraphImpl.fromExistingRDDs(vD_r, curG.edges) + .mapTriplets( + (e) => { + val filtered = e.srcAttr.filter(_ != (e.attr(0) + e.attr(1))) + val pool = if (filtered.size < e.srcAttr.size - 1) { + filtered.:+(e.attr(0) + e.attr(1)) + } else { + filtered + } + val maxValue = if (pool.isEmpty) { 0.0 } else { pool.max } + Seq(e.attr(0), e.attr(1), lambda * (e.attr(0) - maxValue) + (1.0 - lambda) * e.attr(2)) + }, TripletFields.Src) + + var iterG = Graph.fromEdges(updated_r.edges, Seq(0.0)) + + // update availabilities + val vD_a = iterG.aggregateMessages[Seq[Double]]( + sendMsg = ctx => { + if (ctx.srcId != ctx.dstId) { + ctx.sendToDst(Seq(math.max(ctx.attr(2), 0.0))) + } else { + ctx.sendToDst(Seq(ctx.attr(2))) + } + }, mergeMsg = (s1, s2) => Seq(s1(0) + s2(0)), + TripletFields.EdgeOnly) + + val updated_a = GraphImpl.fromExistingRDDs(vD_a, iterG.edges) + .mapTriplets( + (e) => { + if (e.srcId != e.dstId) { + val newA = lambda * math.min(0.0, e.dstAttr(0) - math.max(e.attr(2), 0.0)) + + (1.0 - lambda) * e.attr(1) + Seq(e.attr(0), newA, e.attr(2)) + } else { + val newA = lambda * (e.dstAttr(0) - e.attr(2)) + (1.0 - lambda) * e.attr(1) + Seq(e.attr(0), newA, e.attr(2)) + } + }, TripletFields.Dst) + + iterG = Graph.fromEdges(updated_a.edges, Seq(0.0)) + + // compare difference + if (iter % 10 == 0) { + val vaD = iterG.aggregateMessages[Seq[Double]]( + sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr(1), ctx.attr(2))), + mergeMsg = (s1, s2) => Seq(s1(0) + s2(0), s1(1) + s2(1)), + TripletFields.EdgeOnly) + + val prev_vaD = curG.aggregateMessages[Seq[Double]]( + sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr(1), ctx.attr(2))), + mergeMsg = (s1, s2) => Seq(s1(0) + s2(0), s1(1) + s2(1)), + TripletFields.EdgeOnly) + + val delta = vaD.join(prev_vaD).values.collect().map { x => + (x._1(0) - x._2(0), x._1(1) - x._2(1)) + }.foldLeft((0.0, 0.0)) {(s, t) => (s._1 + t._1, s._2 + t._2)} + + logInfo(s"$msgPrefix: availability delta = ${delta._1}.") + logInfo(s"$msgPrefix: responsibility delta = ${delta._2}.") + + diffDelta = (math.abs(delta._1 - prevDelta._1), math.abs(delta._2 - prevDelta._2)) + + logInfo(s"$msgPrefix: diff(delta) = $diffDelta.") + + prevDelta = delta + } + curG = iterG + } + curG + } + + /** + * Choose exemplars for nodes in graph. + * @param g input graph with edges representing the (normalized) similarity matrix (S) and + * the final availabilities and responsibilities. + * @return a [[AffinityPropagationModel]] representing the clustering results. + */ + def chooseExemplars( + g: Graph[Seq[Double], Seq[Double]]): AffinityPropagationModel = { + val accum = g.edges.map(a => (a.srcId, (a.dstId, a.attr(1) + a.attr(2)))) + val exemplars = accum.reduceByKey((ar1, ar2) => { + if (ar1._2 > ar2._2) { + (ar1._1, ar1._2) + } else { + (ar2._1, ar2._2) + } + }).map(kv => (kv._1, kv._2._1)).collect().toMap + + var clusters = Seq[Set[Long]]() + + def findPath(node: Long, path: Set[Long]): Set[Long] = { + path += node + if (exemplars.contains(node) && !path.contains(exemplars(node))) { + path ++ findPath(exemplars(node), path) + } else { + path + } + } + + exemplars.keys.foreach(k => { + val neighbors = findPath(k, Set[Long]()) + val dup = clusters.filter(!_.intersect(neighbors).isEmpty) + if (dup.isEmpty) { + clusters = clusters.:+(neighbors) + } else { + dup(0) ++= neighbors + } + }) + new AffinityPropagationModel(clusters) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala new file mode 100644 index 0000000000000..32d9ca0fcc6dc --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable + +import org.scalatest.FunSuite + +import org.apache.spark.graphx.{Edge, Graph} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ + +class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { + + import org.apache.spark.mllib.clustering.AffinityPropagation._ + + test("affinity propagation") { + /* + We use the following graph to test AP. + + 15-14 -13 12 + . \ / + 4 . 3 . 2 + | | | + 5 0 . 1 10 + | \ . | + 6 7 . 8 - 9 - 11 + */ + + val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -1.8), (1, 2, -0.4), + (1, 8, -8.1), (2, 3, -9.2), (2, 12, -1.1), (3, 15, -0.8), (3, 4, -10.1), (4, 5, -0.7), + (4, 15, -11.8), (5, 6, -0.7), (5, 7, -0.41), (7, 8, -8.1), (8, 9, -0.55), (9, 10, -1.8), + (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67)) + val model = new AffinityPropagation() + .setMaxIterations(100) + .run(sc.parallelize(similarities, 2)) + + assert(model.getK() == 4) + assert(model.findCluster(5).toSeq.sorted == Seq(4, 5, 6, 7)) + assert(model.findClusterID(14) == model.findClusterID(15)) + } + + test("normalize") { + /* + Test normalize() with the following graph: + + 0 - 3 + | \ | + 1 - 2 + + The similarity matrix (A) is + + 0 -1 -1 -1 + -1 0 1 0 + -1 -1 0 -1 + -1 0 -1 0 + + D is diag(3, 2, 3, 2) and hence S is + + 0 -1/3 -1/3 -1/3 + -1/2 0 -1/2 0 + -1/3 -1/3 0 -1/3 + -1/2 0 -1/2 0 + */ + val similarities = Seq[(Long, Long, Double)]( + (0, 1, -1.0), (1, 0, -1.0), (0, 2, -1.0), (2, 0, -1.0), (0, 3, -1.0), (3, 0, -1.0), + (1, 2, -1.0), (2, 1, -1.0), (2, 3, -1.0), (3, 2, -1.0)) + val expected = Array( + Array(0.0, -1.0/3.0, -1.0/3.0, -1.0/3.0), + Array(-1.0/2.0, 0.0, -1.0/2.0, 0.0), + Array(-1.0/3.0, -1.0/3.0, 0.0, -1.0/3.0), + Array(-1.0/2.0, 0.0, -1.0/2.0, 0.0)) + val s = constructGraph(sc.parallelize(similarities, 2), true, false) + s.edges.collect().foreach { case Edge(i, j, x) => + assert(x(0) ~== expected(i.toInt)(j.toInt) absTol 1e-14) + } + } +} From 99d812a77d858a1f8079a981d53f43c22109bba0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 17 Feb 2015 00:10:11 +0800 Subject: [PATCH 02/16] Choose exemplars in distributed way. Add exemplars to model. --- .../clustering/AffinityPropagation.scala | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index a33006bb241af..935336358fbfb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -32,12 +32,13 @@ import org.apache.spark.rdd.RDD * * Model produced by [[AffinityPropagation]]. * - * @param k number of clusters - * @param assignments an RDD of (vertexID, clusterID) pairs + * @param clusters the vertexIDs of each cluster. + * @param exemplars the vertexIDs of all exemplars. */ @Experimental class AffinityPropagationModel( - val clusters: Seq[Set[Long]]) extends Serializable { + val clusters: Seq[Set[Long]], + val exemplars: Seq[Long]) extends Serializable { /** * Set the number of clusters @@ -302,41 +303,47 @@ private[clustering] object AffinityPropagation extends Logging { /** * Choose exemplars for nodes in graph. - * @param g input graph with edges representing the (normalized) similarity matrix (S) and - * the final availabilities and responsibilities. + * @param g input graph with edges representing the final availabilities and responsibilities. * @return a [[AffinityPropagationModel]] representing the clustering results. */ def chooseExemplars( g: Graph[Seq[Double], Seq[Double]]): AffinityPropagationModel = { val accum = g.edges.map(a => (a.srcId, (a.dstId, a.attr(1) + a.attr(2)))) - val exemplars = accum.reduceByKey((ar1, ar2) => { + val clusterMembers = accum.reduceByKey((ar1, ar2) => { if (ar1._2 > ar2._2) { (ar1._1, ar1._2) } else { (ar2._1, ar2._2) } - }).map(kv => (kv._1, kv._2._1)).collect().toMap + }).map(kv => (kv._2._1, kv._1)).aggregateByKey(Set[Long]())( + seqOp = (s, d) => s ++ Set(d), + combOp = (s1, s2) => s1 ++ s2 + ).cache() + + val neighbors = clusterMembers.map(kv => kv._2 ++ Set(kv._1)).collect() + val exemplars = clusterMembers.map(kv => kv._1).collect() var clusters = Seq[Set[Long]]() - - def findPath(node: Long, path: Set[Long]): Set[Long] = { - path += node - if (exemplars.contains(node) && !path.contains(exemplars(node))) { - path ++ findPath(exemplars(node), path) - } else { - path - } - } - exemplars.keys.foreach(k => { - val neighbors = findPath(k, Set[Long]()) - val dup = clusters.filter(!_.intersect(neighbors).isEmpty) - if (dup.isEmpty) { - clusters = clusters.:+(neighbors) + var i = 0 + var nz = neighbors.size + while (i < nz) { + var curCluster = neighbors(i) + var j = i + 1 + while (j < nz) { + if (!curCluster.intersect(neighbors(j)).isEmpty) { + curCluster ++= neighbors(j) + } + j += 1 + } + val overlap = clusters.filter(!_.intersect(curCluster).isEmpty) + if (overlap.isEmpty) { + clusters = clusters.:+(curCluster) } else { - dup(0) ++= neighbors + overlap(0) ++= curCluster } - }) - new AffinityPropagationModel(clusters) + i += 1 + } + new AffinityPropagationModel(clusters, exemplars) } } From 6dbec7d451511d17f39750815d4a7d11da03561b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 17 Feb 2015 16:03:32 +0800 Subject: [PATCH 03/16] Ap doesn't require similarity to be negative. Fix normalization bug. --- .../clustering/AffinityPropagation.scala | 12 ++++---- .../clustering/AffinityPropagationSuite.scala | 28 +++++++++---------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 935336358fbfb..62a2fab29a9c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -24,7 +24,6 @@ import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl -import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD /** @@ -141,7 +140,7 @@ class AffinityPropagation private[clustering] ( * Run the AP algorithm. * * @param similarities an RDD of (i, j, s,,ij,,) tuples representing the similarity matrix, which - * is the matrix S in the AP paper. The similarity s,,ij,, is set to negative + * is the matrix S in the AP paper. The similarity s,,ij,, is set to * real-valued similarities. This is not required to be a symmetric matrix * and hence s,,ij,, can be not equal to s,,ji,,. Tuples with i = j are * referred to as "preferences" in the AP paper. The data points with larger @@ -178,9 +177,6 @@ private[clustering] object AffinityPropagation extends Logging { symmetric: Boolean): Graph[Seq[Double], Seq[Double]] = { val edges = similarities.flatMap { case (i, j, s) => - if (s > 0.0) { - throw new SparkException("Similarity must be negative but found s($i, $j) = $s.") - } if (symmetric) { Seq(Edge(i, j, Seq(s, 0.0, 0.0)), Edge(j, i, Seq(s, 0.0, 0.0))) } else { @@ -198,8 +194,10 @@ private[clustering] object AffinityPropagation extends Logging { TripletFields.EdgeOnly) val normalized = GraphImpl.fromExistingRDDs(vD, gA.edges) .mapTriplets( - e => Seq(e.attr(0) / math.max(math.abs(e.srcAttr(0)), MLUtils.EPSILON), 0.0, 0.0), - TripletFields.Src) + e => { + val s = if (e.srcAttr(0) == 0.0) { e.attr(0) } else { e.attr(0) / e.srcAttr(0) } + Seq(s, 0.0, 0.0) + }, TripletFields.Src) Graph.fromEdges(normalized.edges, Seq(0.0, 0.0)) } else { Graph.fromEdges(edges, Seq(0.0, 0.0)) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 32d9ca0fcc6dc..398766c0968e3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -65,26 +65,26 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { The similarity matrix (A) is - 0 -1 -1 -1 - -1 0 1 0 - -1 -1 0 -1 - -1 0 -1 0 + 0 1 1 1 + 1 0 1 0 + 1 1 0 1 + 1 0 1 0 D is diag(3, 2, 3, 2) and hence S is - 0 -1/3 -1/3 -1/3 - -1/2 0 -1/2 0 - -1/3 -1/3 0 -1/3 - -1/2 0 -1/2 0 + 0 1/3 1/3 1/3 + 1/2 0 1/2 0 + 1/3 1/3 0 1/3 + 1/2 0 1/2 0 */ val similarities = Seq[(Long, Long, Double)]( - (0, 1, -1.0), (1, 0, -1.0), (0, 2, -1.0), (2, 0, -1.0), (0, 3, -1.0), (3, 0, -1.0), - (1, 2, -1.0), (2, 1, -1.0), (2, 3, -1.0), (3, 2, -1.0)) + (0, 1, 1.0), (1, 0, 1.0), (0, 2, 1.0), (2, 0, 1.0), (0, 3, 1.0), (3, 0, 1.0), + (1, 2, 1.0), (2, 1, 1.0), (2, 3, 1.0), (3, 2, 1.0)) val expected = Array( - Array(0.0, -1.0/3.0, -1.0/3.0, -1.0/3.0), - Array(-1.0/2.0, 0.0, -1.0/2.0, 0.0), - Array(-1.0/3.0, -1.0/3.0, 0.0, -1.0/3.0), - Array(-1.0/2.0, 0.0, -1.0/2.0, 0.0)) + Array(0.0, 1.0/3.0, 1.0/3.0, 1.0/3.0), + Array(1.0/2.0, 0.0, 1.0/2.0, 0.0), + Array(1.0/3.0, 1.0/3.0, 0.0, 1.0/3.0), + Array(1.0/2.0, 0.0, 1.0/2.0, 0.0)) val s = constructGraph(sc.parallelize(similarities, 2), true, false) s.edges.collect().foreach { case Edge(i, j, x) => assert(x(0) ~== expected(i.toInt)(j.toInt) absTol 1e-14) From 6cddeb2f655fb477d23c1fbe5bf0230e2b97bdce Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 17 Feb 2015 17:04:57 +0800 Subject: [PATCH 04/16] Add preferences to unit test data. Don't duplicate preferences in symmetric mode. --- .../spark/mllib/clustering/AffinityPropagation.scala | 2 +- .../mllib/clustering/AffinityPropagationSuite.scala | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 62a2fab29a9c6..6a673f0960870 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -177,7 +177,7 @@ private[clustering] object AffinityPropagation extends Logging { symmetric: Boolean): Graph[Seq[Double], Seq[Double]] = { val edges = similarities.flatMap { case (i, j, s) => - if (symmetric) { + if (symmetric && i != j) { Seq(Edge(i, j, Seq(s, 0.0, 0.0)), Edge(j, i, Seq(s, 0.0, 0.0))) } else { Seq(Edge(i, j, Seq(s, 0.0, 0.0))) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 398766c0968e3..8844cc9b2cb43 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -42,15 +42,17 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { 6 7 . 8 - 9 - 11 */ - val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -1.8), (1, 2, -0.4), - (1, 8, -8.1), (2, 3, -9.2), (2, 12, -1.1), (3, 15, -0.8), (3, 4, -10.1), (4, 5, -0.7), + val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -2.8), (1, 2, -0.4), + (1, 8, -8.1), (2, 3, -9.2), (2, 12, -1.1), (3, 15, -1.5), (3, 4, -10.1), (4, 5, -0.7), (4, 15, -11.8), (5, 6, -0.7), (5, 7, -0.41), (7, 8, -8.1), (8, 9, -0.55), (9, 10, -1.8), - (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67)) + (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67), (0, 0, -3), (1, 1, -3), (3, 3, -3), + (4, 4, -3), (5, 5, -3), (6, 6, -3), (7, 7, -3), (8, 8, -3), (9, 9, -3), (10, 10, -3), + (11, 11, -3), (12, 12, -3), (13, 13, -3), (14, 14, -3), (15, 15, -3)) val model = new AffinityPropagation() .setMaxIterations(100) .run(sc.parallelize(similarities, 2)) - assert(model.getK() == 4) + assert(model.getK() == 5) assert(model.findCluster(5).toSeq.sorted == Seq(4, 5, 6, 7)) assert(model.findClusterID(14) == model.findClusterID(15)) } From d762697e4140781b2dc8882c4d654e19ce5bb80b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Apr 2015 00:48:31 +0800 Subject: [PATCH 05/16] Address comments. --- .../clustering/AffinityPropagation.scala | 89 +++++++++++-------- .../clustering/AffinityPropagationSuite.scala | 5 +- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 6a673f0960870..a12eb54a83072 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -46,37 +46,48 @@ class AffinityPropagationModel( /** * Find the cluster the given vertex belongs + * @param vertexID vertex id. + * @return a [[Array]] that contains vertex ids in the same cluster of given vertexID. If + * the given vertex doesn't belong to any cluster, return null. */ - def findCluster(vertexID: Long): Set[Long] = { - clusters.filter(_.contains(vertexID))(0) + def findCluster(vertexID: Long): Array[Long] = { + val cluster = clusters.filter(_.contains(vertexID)) + if (cluster.nonEmpty) { + cluster(0).toArray + } else { + null + } } /** - * Find the cluster id the given vertex belongs + * Find the cluster id the given vertex belongs to + * @param vertexID vertex id. + * @return the cluster id that the given vertex belongs to. If the given vertex doesn't belong to + * any cluster, return -1. */ - def findClusterID(vertexID: Long): Option[Int] = { + def findClusterID(vertexID: Long): Int = { var i = 0 - clusters.foreach(cluster => { + clusters.foreach { cluster => if (cluster.contains(vertexID)) { - return Some(i) + return i } i += i - }) - None + } + -1 } } /** * :: Experimental :: * - * AffinityPropagation (AP), a graph clustering algorithm based on the concept of "message passing" + * Affinity propagation (AP), a graph clustering algorithm based on the concept of "message passing" * between data points. Unlike clustering algorithms such as k-means or k-medoids, AP does not * require the number of clusters to be determined or estimated before running it. AP is developed - * by [[http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf Frey and Dueck]]. + * by [[http://doi.org/10.1126/science.1136800 Frey and Dueck]]. * * @param maxIterations Maximum number of iterations of the AP algorithm. * - * @see [[http://en.wikipedia.org/wiki/Affinity_propagation (Wikipedia)]] + * @see [[http://en.wikipedia.org/wiki/Affinity_propagation Affinity propagation (Wikipedia)]] */ @Experimental class AffinityPropagation private[clustering] ( @@ -86,7 +97,7 @@ class AffinityPropagation private[clustering] ( import org.apache.spark.mllib.clustering.AffinityPropagation._ - /** Constructs a AP instance with default parameters: {maxIterations: 100, lambda: 0.5, + /** Constructs a AP instance with default parameters: {maxIterations: 100, lambda: `0.5`, * normalization: false}. */ def this() = this(maxIterations = 100, lambda = 0.5, normalization = false) @@ -141,10 +152,11 @@ class AffinityPropagation private[clustering] ( * * @param similarities an RDD of (i, j, s,,ij,,) tuples representing the similarity matrix, which * is the matrix S in the AP paper. The similarity s,,ij,, is set to - * real-valued similarities. This is not required to be a symmetric matrix - * and hence s,,ij,, can be not equal to s,,ji,,. Tuples with i = j are - * referred to as "preferences" in the AP paper. The data points with larger - * values of s,,ii,, are more likely to be chosen as exemplars. + * real-valued (could be positive or negative) similarities. This is not + * required to be a symmetric matrix and hence s,,ij,, can be different from + * s,,ji,,. Tuples with i = j are referred to as "preferences" in the AP paper. + * The data points with larger values of s,,ii,, are more likely to be chosen + * as exemplars. * * @param symmetric the given similarity matrix is symmetric or not. Default value: true * @return a [[AffinityPropagationModel]] that contains the clustering result @@ -173,9 +185,9 @@ private[clustering] object AffinityPropagation extends Logging { * Construct the similarity matrix (S) and do normalization if needed. * Returns the (normalized) similarity matrix (S). */ - def constructGraph(similarities: RDD[(Long, Long, Double)], normalize: Boolean, - symmetric: Boolean): - Graph[Seq[Double], Seq[Double]] = { + def constructGraph(similarities: RDD[(Long, Long, Double)], + normalize: Boolean, + symmetric: Boolean): Graph[Seq[Double], Seq[Double]] = { val edges = similarities.flatMap { case (i, j, s) => if (symmetric && i != j) { Seq(Edge(i, j, Seq(s, 0.0, 0.0)), Edge(j, i, Seq(s, 0.0, 0.0))) @@ -185,19 +197,18 @@ private[clustering] object AffinityPropagation extends Logging { } if (normalize) { - val gA = Graph.fromEdges(edges, Seq(0.0)) - val vD = gA.aggregateMessages[Seq[Double]]( + val gA = Graph.fromEdges(edges, 0.0) + val vD = gA.aggregateMessages[Double]( sendMsg = ctx => { - ctx.sendToSrc(Seq(ctx.attr(0))) + ctx.sendToSrc(ctx.attr(0)) }, - mergeMsg = (s1, s2) => Seq(s1(0) + s2(0)), + mergeMsg = (s1, s2) => s1 + s2, TripletFields.EdgeOnly) val normalized = GraphImpl.fromExistingRDDs(vD, gA.edges) - .mapTriplets( - e => { - val s = if (e.srcAttr(0) == 0.0) { e.attr(0) } else { e.attr(0) / e.srcAttr(0) } + .mapTriplets({ e => + val s = if (e.srcAttr == 0.0) e.attr(0) else e.attr(0) / e.srcAttr Seq(s, 0.0, 0.0) - }, TripletFields.Src) + }, TripletFields.Src) Graph.fromEdges(normalized.edges, Seq(0.0, 0.0)) } else { Graph.fromEdges(edges, Seq(0.0, 0.0)) @@ -213,7 +224,8 @@ private[clustering] object AffinityPropagation extends Logging { */ def apIter( g: Graph[Seq[Double], Seq[Double]], - maxIterations: Int, lambda: Double): Graph[Seq[Double], Seq[Double]] = { + maxIterations: Int, + lambda: Double): Graph[Seq[Double], Seq[Double]] = { val tol = math.max(1e-5 / g.vertices.count(), 1e-8) var prevDelta = (Double.MaxValue, Double.MaxValue) var diffDelta = (Double.MaxValue, Double.MaxValue) @@ -229,17 +241,16 @@ private[clustering] object AffinityPropagation extends Logging { TripletFields.EdgeOnly) val updated_r = GraphImpl.fromExistingRDDs(vD_r, curG.edges) - .mapTriplets( - (e) => { - val filtered = e.srcAttr.filter(_ != (e.attr(0) + e.attr(1))) - val pool = if (filtered.size < e.srcAttr.size - 1) { - filtered.:+(e.attr(0) + e.attr(1)) - } else { - filtered - } - val maxValue = if (pool.isEmpty) { 0.0 } else { pool.max } - Seq(e.attr(0), e.attr(1), lambda * (e.attr(0) - maxValue) + (1.0 - lambda) * e.attr(2)) - }, TripletFields.Src) + .mapTriplets({ e => + val filtered = e.srcAttr.filter(_ != (e.attr(0) + e.attr(1))) + val pool = if (filtered.size < e.srcAttr.size - 1) { + filtered :+ (e.attr(0) + e.attr(1)) + } else { + filtered + } + val maxValue = if (pool.isEmpty) 0.0 else pool.max + Seq(e.attr(0), e.attr(1), lambda * (e.attr(0) - maxValue) + (1.0 - lambda) * e.attr(2)) + }, TripletFields.Src) var iterG = Graph.fromEdges(updated_r.edges, Seq(0.0)) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 8844cc9b2cb43..c4222dc5c7bfa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -53,8 +53,9 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { .run(sc.parallelize(similarities, 2)) assert(model.getK() == 5) - assert(model.findCluster(5).toSeq.sorted == Seq(4, 5, 6, 7)) - assert(model.findClusterID(14) == model.findClusterID(15)) + assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) + assert(model.findClusterID(14) != -1) + assert(model.findClusterID(14) === model.findClusterID(15)) } test("normalize") { From 68947bd619650c5b25ef574d155a24bf97f0356a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Apr 2015 01:15:11 +0800 Subject: [PATCH 06/16] Fix style. --- .../org/apache/spark/mllib/clustering/AffinityPropagation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index a12eb54a83072..4fa9e84e277ed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -154,7 +154,7 @@ class AffinityPropagation private[clustering] ( * is the matrix S in the AP paper. The similarity s,,ij,, is set to * real-valued (could be positive or negative) similarities. This is not * required to be a symmetric matrix and hence s,,ij,, can be different from - * s,,ji,,. Tuples with i = j are referred to as "preferences" in the AP paper. + * s,,ji,,. Tuples with i = j are referred to as "preferences" in the paper. * The data points with larger values of s,,ii,, are more likely to be chosen * as exemplars. * From f031efdb45dd7bd879297d4a9d7b31e9d9a8804f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 9 Apr 2015 01:50:18 +0800 Subject: [PATCH 07/16] Define private case classes for vertex and edge data. --- .../clustering/AffinityPropagation.scala | 139 ++++++++++++------ .../clustering/AffinityPropagationSuite.scala | 2 +- 2 files changed, 97 insertions(+), 44 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 4fa9e84e277ed..26776cc0471e7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -77,6 +77,29 @@ class AffinityPropagationModel( } } +/** + * The message exchanged on the node graph + */ +private case class EdgeMessage( + similarity: Double, + availability: Double, + responsibility: Double) extends Equals { + override def canEqual(that: Any): Boolean = { + that match { + case e: EdgeMessage => + similarity == e.similarity && availability == e.availability && + responsibility == e.responsibility + case _ => + false + } + } +} + +/** + * The data stored in each vertex on the graph + */ +private case class VertexData(availability: Double, responsibility: Double) + /** * :: Experimental :: * @@ -86,6 +109,9 @@ class AffinityPropagationModel( * by [[http://doi.org/10.1126/science.1136800 Frey and Dueck]]. * * @param maxIterations Maximum number of iterations of the AP algorithm. + * @param lambda lambda parameter used in the messaging iteration loop + * @param normalization Indication of performing normalization + * @param symmetric Indication of using symmetric similarity input * * @see [[http://en.wikipedia.org/wiki/Affinity_propagation Affinity propagation (Wikipedia)]] */ @@ -93,14 +119,15 @@ class AffinityPropagationModel( class AffinityPropagation private[clustering] ( private var maxIterations: Int, private var lambda: Double, - private var normalization: Boolean) extends Serializable { + private var normalization: Boolean, + private var symmetric: Boolean) extends Serializable { import org.apache.spark.mllib.clustering.AffinityPropagation._ /** Constructs a AP instance with default parameters: {maxIterations: 100, lambda: `0.5`, - * normalization: false}. + * normalization: false, symmetric: true}. */ - def this() = this(maxIterations = 100, lambda = 0.5, normalization = false) + def this() = this(maxIterations = 100, lambda = 0.5, normalization = false, symmetric = true) /** * Set maximum number of iterations of the messaging iteration loop @@ -146,6 +173,24 @@ class AffinityPropagation private[clustering] ( def getNormalization(): Boolean = { this.normalization } + + /** + * Set whether the input similarities are symmetric or not. + * When symmetric is set to true, we assume that input similarities only contain triangular + * matrix. That means, only s,,ij,, is included in the similarities. If both s,,ij,, and + * s,,ji,, are given in the similarities, it very possibly causes error. + */ + def setSymmetric(symmetric: Boolean): this.type = { + this.symmetric = symmetric + this + } + + /** + * Get whether the input similarities are symmetric or not + */ + def getSymmetric(): Boolean = { + this.symmetric + } /** * Run the AP algorithm. @@ -174,7 +219,7 @@ class AffinityPropagation private[clustering] ( * similarities and the initial availabilities and responsibilities as its edge * properties. */ - private def ap(s: Graph[Seq[Double], Seq[Double]]): AffinityPropagationModel = { + private def ap(s: Graph[VertexData, EdgeMessage]): AffinityPropagationModel = { val g = apIter(s, maxIterations, lambda) chooseExemplars(g) } @@ -187,12 +232,12 @@ private[clustering] object AffinityPropagation extends Logging { */ def constructGraph(similarities: RDD[(Long, Long, Double)], normalize: Boolean, - symmetric: Boolean): Graph[Seq[Double], Seq[Double]] = { + symmetric: Boolean): Graph[VertexData, EdgeMessage] = { val edges = similarities.flatMap { case (i, j, s) => if (symmetric && i != j) { - Seq(Edge(i, j, Seq(s, 0.0, 0.0)), Edge(j, i, Seq(s, 0.0, 0.0))) + Seq(Edge(i, j, new EdgeMessage(s, 0.0, 0.0)), Edge(j, i, new EdgeMessage(s, 0.0, 0.0))) } else { - Seq(Edge(i, j, Seq(s, 0.0, 0.0))) + Seq(Edge(i, j, new EdgeMessage(s, 0.0, 0.0))) } } @@ -200,18 +245,18 @@ private[clustering] object AffinityPropagation extends Logging { val gA = Graph.fromEdges(edges, 0.0) val vD = gA.aggregateMessages[Double]( sendMsg = ctx => { - ctx.sendToSrc(ctx.attr(0)) + ctx.sendToSrc(ctx.attr.similarity) }, mergeMsg = (s1, s2) => s1 + s2, TripletFields.EdgeOnly) val normalized = GraphImpl.fromExistingRDDs(vD, gA.edges) .mapTriplets({ e => - val s = if (e.srcAttr == 0.0) e.attr(0) else e.attr(0) / e.srcAttr - Seq(s, 0.0, 0.0) + val s = if (e.srcAttr == 0.0) e.attr.similarity else e.attr.similarity / e.srcAttr + new EdgeMessage(s, 0.0, 0.0) }, TripletFields.Src) - Graph.fromEdges(normalized.edges, Seq(0.0, 0.0)) + Graph.fromEdges(normalized.edges, new VertexData(0.0, 0.0)) } else { - Graph.fromEdges(edges, Seq(0.0, 0.0)) + Graph.fromEdges(edges, new VertexData(0.0, 0.0)) } } @@ -223,9 +268,9 @@ private[clustering] object AffinityPropagation extends Logging { * @return a [[Graph]] representing the final graph. */ def apIter( - g: Graph[Seq[Double], Seq[Double]], + g: Graph[VertexData, EdgeMessage], maxIterations: Int, - lambda: Double): Graph[Seq[Double], Seq[Double]] = { + lambda: Double): Graph[VertexData, EdgeMessage] = { val tol = math.max(1e-5 / g.vertices.count(), 1e-8) var prevDelta = (Double.MaxValue, Double.MaxValue) var diffDelta = (Double.MaxValue, Double.MaxValue) @@ -236,65 +281,73 @@ private[clustering] object AffinityPropagation extends Logging { // update responsibilities val vD_r = curG.aggregateMessages[Seq[Double]]( - sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr(0) + ctx.attr(1))), + sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr.similarity + ctx.attr.availability)), mergeMsg = _ ++ _, TripletFields.EdgeOnly) - - val updated_r = GraphImpl.fromExistingRDDs(vD_r, curG.edges) + + val updated_r = GraphImpl(vD_r, curG.edges) .mapTriplets({ e => - val filtered = e.srcAttr.filter(_ != (e.attr(0) + e.attr(1))) + val filtered = e.srcAttr.filter(_ != (e.attr.similarity + e.attr.availability)) val pool = if (filtered.size < e.srcAttr.size - 1) { - filtered :+ (e.attr(0) + e.attr(1)) + filtered :+ (e.attr.similarity + e.attr.availability) } else { filtered } val maxValue = if (pool.isEmpty) 0.0 else pool.max - Seq(e.attr(0), e.attr(1), lambda * (e.attr(0) - maxValue) + (1.0 - lambda) * e.attr(2)) + new EdgeMessage(e.attr.similarity, + e.attr.availability, + lambda * (e.attr.similarity - maxValue) + (1.0 - lambda) * e.attr.responsibility) }, TripletFields.Src) - var iterG = Graph.fromEdges(updated_r.edges, Seq(0.0)) + var iterG = Graph.fromEdges(updated_r.edges, new VertexData(0.0, 0.0)) // update availabilities - val vD_a = iterG.aggregateMessages[Seq[Double]]( + val vD_a = iterG.aggregateMessages[Double]( sendMsg = ctx => { if (ctx.srcId != ctx.dstId) { - ctx.sendToDst(Seq(math.max(ctx.attr(2), 0.0))) + ctx.sendToDst(math.max(ctx.attr.responsibility, 0.0)) } else { - ctx.sendToDst(Seq(ctx.attr(2))) + ctx.sendToDst(ctx.attr.responsibility) } - }, mergeMsg = (s1, s2) => Seq(s1(0) + s2(0)), + }, mergeMsg = (s1, s2) => s1 + s2, TripletFields.EdgeOnly) - val updated_a = GraphImpl.fromExistingRDDs(vD_a, iterG.edges) + val updated_a = GraphImpl(vD_a, iterG.edges) .mapTriplets( (e) => { if (e.srcId != e.dstId) { - val newA = lambda * math.min(0.0, e.dstAttr(0) - math.max(e.attr(2), 0.0)) + - (1.0 - lambda) * e.attr(1) - Seq(e.attr(0), newA, e.attr(2)) + val newA = lambda * math.min(0.0, e.dstAttr - math.max(e.attr.responsibility, 0.0)) + + (1.0 - lambda) * e.attr.availability + new EdgeMessage(e.attr.similarity, newA, e.attr.responsibility) } else { - val newA = lambda * (e.dstAttr(0) - e.attr(2)) + (1.0 - lambda) * e.attr(1) - Seq(e.attr(0), newA, e.attr(2)) + val newA = lambda * (e.dstAttr - e.attr.responsibility) + (1.0 - lambda) * e.attr.availability + new EdgeMessage(e.attr.similarity, newA, e.attr.responsibility) } }, TripletFields.Dst) - iterG = Graph.fromEdges(updated_a.edges, Seq(0.0)) + iterG = Graph.fromEdges(updated_a.edges, new VertexData(0.0, 0.0)) // compare difference if (iter % 10 == 0) { - val vaD = iterG.aggregateMessages[Seq[Double]]( - sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr(1), ctx.attr(2))), - mergeMsg = (s1, s2) => Seq(s1(0) + s2(0), s1(1) + s2(1)), + val vaD = iterG.aggregateMessages[VertexData]( + sendMsg = ctx => + ctx.sendToSrc(new VertexData(ctx.attr.availability, ctx.attr.responsibility)), + mergeMsg = (s1, s2) => + new VertexData(s1.availability + s2.availability, + s1.responsibility + s2.responsibility), TripletFields.EdgeOnly) - val prev_vaD = curG.aggregateMessages[Seq[Double]]( - sendMsg = ctx => ctx.sendToSrc(Seq(ctx.attr(1), ctx.attr(2))), - mergeMsg = (s1, s2) => Seq(s1(0) + s2(0), s1(1) + s2(1)), + val prev_vaD = curG.aggregateMessages[VertexData]( + sendMsg = ctx => + ctx.sendToSrc(new VertexData(ctx.attr.availability, ctx.attr.responsibility)), + mergeMsg = (s1, s2) => + new VertexData(s1.availability + s2.availability, + s1.responsibility + s2.responsibility), TripletFields.EdgeOnly) - val delta = vaD.join(prev_vaD).values.collect().map { x => - (x._1(0) - x._2(0), x._1(1) - x._2(1)) - }.foldLeft((0.0, 0.0)) {(s, t) => (s._1 + t._1, s._2 + t._2)} + val delta = vaD.join(prev_vaD).values.map { x => + (x._1.availability - x._2.availability, x._1.responsibility - x._2.responsibility) + }.collect().foldLeft((0.0, 0.0)) {(s, t) => (s._1 + t._1, s._2 + t._2)} logInfo(s"$msgPrefix: availability delta = ${delta._1}.") logInfo(s"$msgPrefix: responsibility delta = ${delta._2}.") @@ -316,8 +369,8 @@ private[clustering] object AffinityPropagation extends Logging { * @return a [[AffinityPropagationModel]] representing the clustering results. */ def chooseExemplars( - g: Graph[Seq[Double], Seq[Double]]): AffinityPropagationModel = { - val accum = g.edges.map(a => (a.srcId, (a.dstId, a.attr(1) + a.attr(2)))) + g: Graph[VertexData, EdgeMessage]): AffinityPropagationModel = { + val accum = g.edges.map(a => (a.srcId, (a.dstId, a.attr.availability + a.attr.responsibility))) val clusterMembers = accum.reduceByKey((ar1, ar2) => { if (ar1._2 > ar2._2) { (ar1._1, ar1._2) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index c4222dc5c7bfa..264d5c606eaa6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -90,7 +90,7 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { Array(1.0/2.0, 0.0, 1.0/2.0, 0.0)) val s = constructGraph(sc.parallelize(similarities, 2), true, false) s.edges.collect().foreach { case Edge(i, j, x) => - assert(x(0) ~== expected(i.toInt)(j.toInt) absTol 1e-14) + assert(x.similarity ~== expected(i.toInt)(j.toInt) absTol 1e-14) } } } From 250b6a40206b4a540432bb340ef598bef0dd457d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 9 Apr 2015 02:17:04 +0800 Subject: [PATCH 08/16] Fix style. --- .../apache/spark/mllib/clustering/AffinityPropagation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 26776cc0471e7..063657908f369 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -320,7 +320,8 @@ private[clustering] object AffinityPropagation extends Logging { (1.0 - lambda) * e.attr.availability new EdgeMessage(e.attr.similarity, newA, e.attr.responsibility) } else { - val newA = lambda * (e.dstAttr - e.attr.responsibility) + (1.0 - lambda) * e.attr.availability + val newA = lambda * (e.dstAttr - e.attr.responsibility) + + (1.0 - lambda) * e.attr.availability new EdgeMessage(e.attr.similarity, newA, e.attr.responsibility) } }, TripletFields.Dst) From 6ad4905b0de5ac6ca9602ea30ce515bd75151221 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 10 Apr 2015 02:29:17 +0800 Subject: [PATCH 09/16] Make model distributed. Define private case classes for edge message and vertex data. --- .../clustering/AffinityPropagation.scala | 98 +++++++++++-------- .../clustering/AffinityPropagationSuite.scala | 52 ++++++++-- 2 files changed, 99 insertions(+), 51 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 063657908f369..de4e3c8ce5480 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -17,8 +17,7 @@ package org.apache.spark.mllib.clustering -import scala.collection.mutable.Map -import scala.collection.mutable.Set +import scala.collection.mutable import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.Experimental @@ -36,13 +35,13 @@ import org.apache.spark.rdd.RDD */ @Experimental class AffinityPropagationModel( - val clusters: Seq[Set[Long]], - val exemplars: Seq[Long]) extends Serializable { + val clusters: RDD[(Array[Long], Long)], + val exemplars: RDD[(Long, Long)]) extends Serializable { /** * Set the number of clusters */ - def getK(): Int = clusters.size + lazy val getK: Long = clusters.count() /** * Find the cluster the given vertex belongs @@ -51,9 +50,9 @@ class AffinityPropagationModel( * the given vertex doesn't belong to any cluster, return null. */ def findCluster(vertexID: Long): Array[Long] = { - val cluster = clusters.filter(_.contains(vertexID)) + val cluster = clusters.filter(_._1.contains(vertexID)).collect() if (cluster.nonEmpty) { - cluster(0).toArray + cluster(0)._1 } else { null } @@ -65,15 +64,20 @@ class AffinityPropagationModel( * @return the cluster id that the given vertex belongs to. If the given vertex doesn't belong to * any cluster, return -1. */ - def findClusterID(vertexID: Long): Int = { - var i = 0 - clusters.foreach { cluster => - if (cluster.contains(vertexID)) { - return i + def findClusterID(vertexID: Long): Long = { + val clusterIds = clusters.flatMap { clusterAndId => + val clusterId = clusterAndId._2 + if (clusterAndId._1.contains(vertexID)) { + Seq(clusterId) + } else { + Seq() } - i += i + }.collect() + if (clusterIds.nonEmpty) { + clusterIds(0) + } else { + -1 } - -1 } } @@ -191,7 +195,40 @@ class AffinityPropagation private[clustering] ( def getSymmetric(): Boolean = { this.symmetric } - + + /** + * Calculate the median value of similarities + */ + private def getMedian(similarities: RDD[(Long, Long, Double)]): Double = { + import org.apache.spark.SparkContext._ + + val sorted = similarities.sortBy(_._3).zipWithIndex().map { + case (v, idx) => (idx, v) + } + + val count = sorted.count() + + if (count % 2 == 0) { + val l = count / 2 - 1 + val r = l + 1 + (sorted.lookup(l).head._3 + sorted.lookup(r).head._3).toDouble / 2 + } else { + sorted.lookup(count / 2).head._3.toDouble + } + } + + /** + * Determine preferences by calculating median of similarities. + * This might cost considering computation time for large similarities data. + */ + def determinePreferences( + similarities: RDD[(Long, Long, Double)]): RDD[(Long, Long, Double)] = { + // the recommended preferences is the median of similarities + val median = getMedian(similarities) + val preferences = similarities.flatMap(t => Seq(t._1, t._2)).distinct().map(i => (i, i, median)) + similarities.union(preferences) + } + /** * Run the AP algorithm. * @@ -378,35 +415,14 @@ private[clustering] object AffinityPropagation extends Logging { } else { (ar2._1, ar2._2) } - }).map(kv => (kv._2._1, kv._1)).aggregateByKey(Set[Long]())( - seqOp = (s, d) => s ++ Set(d), + }).map(kv => (kv._2._1, kv._1)).aggregateByKey(mutable.Set[Long]())( + seqOp = (s, d) => s ++ mutable.Set(d), combOp = (s1, s2) => s1 ++ s2 ).cache() - val neighbors = clusterMembers.map(kv => kv._2 ++ Set(kv._1)).collect() - val exemplars = clusterMembers.map(kv => kv._1).collect() - - var clusters = Seq[Set[Long]]() - - var i = 0 - var nz = neighbors.size - while (i < nz) { - var curCluster = neighbors(i) - var j = i + 1 - while (j < nz) { - if (!curCluster.intersect(neighbors(j)).isEmpty) { - curCluster ++= neighbors(j) - } - j += 1 - } - val overlap = clusters.filter(!_.intersect(curCluster).isEmpty) - if (overlap.isEmpty) { - clusters = clusters.:+(curCluster) - } else { - overlap(0) ++= curCluster - } - i += 1 - } + val clusters = clusterMembers.map(kv => (kv._2 ++ mutable.Set(kv._1)).toArray).zipWithIndex() + val exemplars = clusterMembers.map(kv => kv._1).zipWithIndex() + new AffinityPropagationModel(clusters, exemplars) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 264d5c606eaa6..a7c55b12e0bd2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -36,26 +36,58 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { 15-14 -13 12 . \ / 4 . 3 . 2 - | | | + | . | 5 0 . 1 10 - | \ . | + | \ . . 6 7 . 8 - 9 - 11 */ - val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -2.8), (1, 2, -0.4), - (1, 8, -8.1), (2, 3, -9.2), (2, 12, -1.1), (3, 15, -1.5), (3, 4, -10.1), (4, 5, -0.7), - (4, 15, -11.8), (5, 6, -0.7), (5, 7, -0.41), (7, 8, -8.1), (8, 9, -0.55), (9, 10, -1.8), - (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67), (0, 0, -3), (1, 1, -3), (3, 3, -3), - (4, 4, -3), (5, 5, -3), (6, 6, -3), (7, 7, -3), (8, 8, -3), (9, 9, -3), (10, 10, -3), - (11, 11, -3), (12, 12, -3), (13, 13, -3), (14, 14, -3), (15, 15, -3)) + val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -5.8), (1, 2, -0.4), + (1, 8, -8.1), (2, 3, -9.2), (2, 12, -0.8), (3, 15, -1.5), (3, 4, -10.1), (4, 5, -0.7), + (4, 15, -11.8), (5, 6, -0.7), (5, 7, -0.41), (7, 8, -8.1), (8, 9, -0.55), (9, 10, -5.8), + (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67), (0, 0, -1.3), (1, 1, -1.3), (2, 2, -1.3), + (3, 3, -1.3), (4, 4, -1.3), (5, 5, -1.3), (6, 6, -1.3), (7, 7, -1.3), (8, 8, -1.3), + (9, 9, -1.3),(10, 10, -1.3), (11, 11, -1.3), (12, 12, -1.3), (13, 13, -1.3), (14, 14, -1.3), + (15, 15, -1.3)) + val model = new AffinityPropagation() - .setMaxIterations(100) + .setMaxIterations(30) .run(sc.parallelize(similarities, 2)) - assert(model.getK() == 5) + assert(model.getK == 7) + assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) + assert(model.findClusterID(14) != -1) + assert(model.findClusterID(14) === model.findClusterID(15)) + } + + test("calculate preferences") { + val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -5.8), (1, 2, -0.4), + (1, 8, -8.1), (2, 3, -9.2), (2, 12, -1.1), (3, 15, -1.5), (3, 4, -10.1), (4, 5, -0.7), + (4, 15, -11.8), (5, 6, -0.7), (5, 7, -0.41), (7, 8, -8.1), (8, 9, -0.55), (9, 10, -5.8), + (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67)) + + val ap = new AffinityPropagation() + val similaritiesWithPreferneces = + ap.determinePreferences(sc.parallelize(similarities, 2)) + + def median(s: Seq[Double]) = { + val (lower, upper) = s.sortWith(_<_).splitAt(s.size / 2) + if (s.size % 2 == 0) (lower.last + upper.head) / 2.0 else upper.head + } + + val medianValue = median(similarities.map(_._3)) + + val preferences = similaritiesWithPreferneces.collect().filter(x => x._1 == x._2).map(_._3) + preferences.foreach(p => assert(p == medianValue)) + + val model = ap.setMaxIterations(30) + .run(similaritiesWithPreferneces) + + assert(model.getK == 7) assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) assert(model.findClusterID(14) != -1) assert(model.findClusterID(14) === model.findClusterID(15)) + assert(model.findClusterID(100) == -1) } test("normalize") { From d70b70a4b2813ac8219cfe5f5e8c4c3e41f48c4b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 10 Apr 2015 02:35:10 +0800 Subject: [PATCH 10/16] Remove useless parameter. --- .../apache/spark/mllib/clustering/AffinityPropagation.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index de4e3c8ce5480..e02412860bd5e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -240,12 +240,11 @@ class AffinityPropagation private[clustering] ( * The data points with larger values of s,,ii,, are more likely to be chosen * as exemplars. * - * @param symmetric the given similarity matrix is symmetric or not. Default value: true * @return a [[AffinityPropagationModel]] that contains the clustering result */ - def run(similarities: RDD[(Long, Long, Double)], symmetric: Boolean = true) + def run(similarities: RDD[(Long, Long, Double)]) : AffinityPropagationModel = { - val s = constructGraph(similarities, normalization, symmetric) + val s = constructGraph(similarities, normalization, this.symmetric) ap(s) } From 22ef08e43fff3ad2bc1742620893b27441d8dd87 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 10 Apr 2015 08:26:22 +0800 Subject: [PATCH 11/16] remove redundant spaces. --- .../spark/mllib/clustering/AffinityPropagationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index a7c55b12e0bd2..46ac520c73c41 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -36,7 +36,7 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { 15-14 -13 12 . \ / 4 . 3 . 2 - | . | + | . | 5 0 . 1 10 | \ . . 6 7 . 8 - 9 - 11 @@ -79,7 +79,7 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { val preferences = similaritiesWithPreferneces.collect().filter(x => x._1 == x._2).map(_._3) preferences.foreach(p => assert(p == medianValue)) - + val model = ap.setMaxIterations(30) .run(similaritiesWithPreferneces) From a485422d9793c6523be3ea92410c42280d75791f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 10 Apr 2015 22:21:09 +0800 Subject: [PATCH 12/16] Add java example. --- .../mllib/JavaAffinityPropagation.java | 71 +++++++++++++++++++ .../clustering/AffinityPropagation.scala | 47 ++++++++---- 2 files changed, 104 insertions(+), 14 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java new file mode 100644 index 0000000000000..e949d05c82386 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import scala.Tuple3; + +import com.google.common.collect.Lists; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.clustering.AffinityPropagation; +import org.apache.spark.mllib.clustering.AffinityPropagationCluster; +import org.apache.spark.mllib.clustering.AffinityPropagationModel; + +/** + * Java example for graph clustering using affinity propagation (AP). + */ +public class JavaAffinityPropagation { + public static void main(String[] args) { + SparkConf sparkConf = new SparkConf().setAppName("JavaAffinityPropagationExample"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + + @SuppressWarnings("unchecked") + JavaRDD> similarities = sc.parallelize(Lists.newArrayList( + new Tuple3(0L, 1L, 0.9), // similarities + new Tuple3(1L, 2L, 0.9), + new Tuple3(1L, 3L, 0.9), + new Tuple3(3L, 4L, 0.1), + new Tuple3(4L, 5L, 0.9), + new Tuple3(4L, 6L, 0.9), + new Tuple3(0L, 0L, 0.1), // preferences + new Tuple3(1L, 1L, 0.2), + new Tuple3(2L, 2L, 0.2), + new Tuple3(3L, 3L, 0.2), + new Tuple3(4L, 4L, 0.2), + new Tuple3(5L, 5L, 0.2), + new Tuple3(6L, 6L, 0.2))); + + AffinityPropagation ap = new AffinityPropagation() + .setMaxIterations(20); + AffinityPropagationModel model = ap.run(similarities); + + for (AffinityPropagationCluster c: model.clusters().toJavaRDD().collect()) { + StringBuilder builder = new StringBuilder(); + builder.append("cluster id: " + c.id() + " -> "); + builder.append(" exemplar: " + c.exemplar() + " members:"); + for (Long node: c.members()) { + builder.append(" " + node); + } + System.out.println(builder.toString()); + } + + sc.stop(); + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index e02412860bd5e..7e9fbe136bb72 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaRDD import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.rdd.RDD @@ -30,19 +31,29 @@ import org.apache.spark.rdd.RDD * * Model produced by [[AffinityPropagation]]. * - * @param clusters the vertexIDs of each cluster. - * @param exemplars the vertexIDs of all exemplars. + * @param id cluster id. + * @param exemplar cluster exemplar. + * @param members cluster members. + */ +@Experimental +case class AffinityPropagationCluster(val id: Long, val exemplar: Long, val members: Array[Long]) + +/** + * :: Experimental :: + * + * Model produced by [[AffinityPropagation]]. + * + * @param clusters the clusters of AffinityPropagation clustering results. */ @Experimental class AffinityPropagationModel( - val clusters: RDD[(Array[Long], Long)], - val exemplars: RDD[(Long, Long)]) extends Serializable { + val clusters: RDD[AffinityPropagationCluster]) extends Serializable { /** * Set the number of clusters */ lazy val getK: Long = clusters.count() - + /** * Find the cluster the given vertex belongs * @param vertexID vertex id. @@ -50,9 +61,9 @@ class AffinityPropagationModel( * the given vertex doesn't belong to any cluster, return null. */ def findCluster(vertexID: Long): Array[Long] = { - val cluster = clusters.filter(_._1.contains(vertexID)).collect() + val cluster = clusters.filter(_.members.contains(vertexID)).collect() if (cluster.nonEmpty) { - cluster(0)._1 + cluster(0).members } else { null } @@ -65,10 +76,9 @@ class AffinityPropagationModel( * any cluster, return -1. */ def findClusterID(vertexID: Long): Long = { - val clusterIds = clusters.flatMap { clusterAndId => - val clusterId = clusterAndId._2 - if (clusterAndId._1.contains(vertexID)) { - Seq(clusterId) + val clusterIds = clusters.flatMap { cluster => + if (cluster.members.contains(vertexID)) { + Seq(cluster.id) } else { Seq() } @@ -248,6 +258,14 @@ class AffinityPropagation private[clustering] ( ap(s) } + /** + * A Java-friendly version of [[AffinityPropagation.run]]. + */ + def run(similarities: JavaRDD[(java.lang.Long, java.lang.Long, java.lang.Double)]) + : AffinityPropagationModel = { + run(similarities.rdd.asInstanceOf[RDD[(Long, Long, Double)]]) + } + /** * Runs the AP algorithm. * @@ -419,9 +437,10 @@ private[clustering] object AffinityPropagation extends Logging { combOp = (s1, s2) => s1 ++ s2 ).cache() - val clusters = clusterMembers.map(kv => (kv._2 ++ mutable.Set(kv._1)).toArray).zipWithIndex() - val exemplars = clusterMembers.map(kv => kv._1).zipWithIndex() + val clusters = clusterMembers.zipWithIndex().map {kv => + new AffinityPropagationCluster(kv._2, kv._1._1, kv._1._2.toArray) + } - new AffinityPropagationModel(clusters, exemplars) + new AffinityPropagationModel(clusters) } } From ffe06c32fa892fc36a3a7f225ba8df256813e653 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 11 Apr 2015 00:36:44 +0800 Subject: [PATCH 13/16] Add Java-friendly determinePreferences and example. --- .../mllib/JavaAffinityPropagation.java | 25 ++++++++++++++++++- .../clustering/AffinityPropagation.scala | 11 +++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java index e949d05c82386..657177315be96 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java @@ -56,6 +56,19 @@ public static void main(String[] args) { .setMaxIterations(20); AffinityPropagationModel model = ap.run(similarities); + JavaRDD> similarities2 = sc.parallelize(Lists.newArrayList( + new Tuple3(0L, 1L, -0.12), + new Tuple3(1L, 2L, -0.08), + new Tuple3(1L, 3L, -0.22), + new Tuple3(3L, 4L, -0.93), + new Tuple3(3L, 5L, -0.82), + new Tuple3(4L, 1L, -0.85), + new Tuple3(4L, 2L, -0.73), + new Tuple3(4L, 5L, -0.19), + new Tuple3(4L, 6L, -0.12))); + + AffinityPropagationModel model2 = ap.run(ap.determinePreferences(similarities2)); + for (AffinityPropagationCluster c: model.clusters().toJavaRDD().collect()) { StringBuilder builder = new StringBuilder(); builder.append("cluster id: " + c.id() + " -> "); @@ -65,7 +78,17 @@ public static void main(String[] args) { } System.out.println(builder.toString()); } - + + for (AffinityPropagationCluster c: model2.clusters().toJavaRDD().collect()) { + StringBuilder builder = new StringBuilder(); + builder.append("cluster id: " + c.id() + " -> "); + builder.append(" exemplar: " + c.exemplar() + " members:"); + for (Long node: c.members()) { + builder.append(" " + node); + } + System.out.println(builder.toString()); + } + sc.stop(); } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 7e9fbe136bb72..7fbd169af1357 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -238,7 +238,16 @@ class AffinityPropagation private[clustering] ( val preferences = similarities.flatMap(t => Seq(t._1, t._2)).distinct().map(i => (i, i, median)) similarities.union(preferences) } - + + /** + * A Java-friendly version of [[AffinityPropagation.determinePreferences]]. + */ + def determinePreferences( + similarities: JavaRDD[(java.lang.Long, java.lang.Long, java.lang.Double)]): + RDD[(Long, Long, Double)] = { + determinePreferences(similarities.rdd.asInstanceOf[RDD[(Long, Long, Double)]]) + } + /** * Run the AP algorithm. * From 97cef01f1e9f1bd2fcaa2d7718302c8ae79cf25a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 11 Apr 2015 15:25:33 +0800 Subject: [PATCH 14/16] Add function to manually set up preferences. --- .../mllib/JavaAffinityPropagation.java | 22 ++++++++++++---- .../clustering/AffinityPropagation.scala | 20 +++++++++++++++ .../clustering/AffinityPropagationSuite.scala | 25 +++++++++++++++++++ 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java index 657177315be96..298c36fa80085 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java @@ -44,7 +44,7 @@ public static void main(String[] args) { new Tuple3(3L, 4L, 0.1), new Tuple3(4L, 5L, 0.9), new Tuple3(4L, 6L, 0.9), - new Tuple3(0L, 0L, 0.1), // preferences + new Tuple3(0L, 0L, 0.2), // preferences new Tuple3(1L, 1L, 0.2), new Tuple3(2L, 2L, 0.2), new Tuple3(3L, 3L, 0.2), @@ -56,6 +56,16 @@ public static void main(String[] args) { .setMaxIterations(20); AffinityPropagationModel model = ap.run(similarities); + for (AffinityPropagationCluster c: model.clusters().toJavaRDD().collect()) { + StringBuilder builder = new StringBuilder(); + builder.append("cluster id: " + c.id() + " -> "); + builder.append(" exemplar: " + c.exemplar() + " members:"); + for (Long node: c.members()) { + builder.append(" " + node); + } + System.out.println(builder.toString()); + } + JavaRDD> similarities2 = sc.parallelize(Lists.newArrayList( new Tuple3(0L, 1L, -0.12), new Tuple3(1L, 2L, -0.08), @@ -69,7 +79,7 @@ public static void main(String[] args) { AffinityPropagationModel model2 = ap.run(ap.determinePreferences(similarities2)); - for (AffinityPropagationCluster c: model.clusters().toJavaRDD().collect()) { + for (AffinityPropagationCluster c: model2.clusters().toJavaRDD().collect()) { StringBuilder builder = new StringBuilder(); builder.append("cluster id: " + c.id() + " -> "); builder.append(" exemplar: " + c.exemplar() + " members:"); @@ -78,8 +88,10 @@ public static void main(String[] args) { } System.out.println(builder.toString()); } - - for (AffinityPropagationCluster c: model2.clusters().toJavaRDD().collect()) { + + AffinityPropagationModel model3 = ap.run(ap.embedPreferences(similarities2, -0.5)); + + for (AffinityPropagationCluster c: model3.clusters().toJavaRDD().collect()) { StringBuilder builder = new StringBuilder(); builder.append("cluster id: " + c.id() + " -> "); builder.append(" exemplar: " + c.exemplar() + " members:"); @@ -88,7 +100,7 @@ public static void main(String[] args) { } System.out.println(builder.toString()); } - + sc.stop(); } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 7fbd169af1357..3aa3c6336f1b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -247,6 +247,26 @@ class AffinityPropagation private[clustering] ( RDD[(Long, Long, Double)] = { determinePreferences(similarities.rdd.asInstanceOf[RDD[(Long, Long, Double)]]) } + + /** + * Manually set up preferences for tuning cluster size. + */ + def embedPreferences( + similarities: RDD[(Long, Long, Double)], + preference: Double): RDD[(Long, Long, Double)] = { + val preferences = similarities.flatMap(t => Seq(t._1, t._2)).distinct() + .map(i => (i, i, preference)) + similarities.union(preferences) + } + + /** + * A Java-friendly version of [[AffinityPropagation.embedPreferences]]. + */ + def embedPreferences( + similarities: JavaRDD[(java.lang.Long, java.lang.Long, java.lang.Double)], + preference: Double): RDD[(Long, Long, Double)] = { + embedPreferences(similarities.rdd.asInstanceOf[RDD[(Long, Long, Double)]], preference) + } /** * Run the AP algorithm. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 46ac520c73c41..3789e2d535447 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -89,6 +89,31 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { assert(model.findClusterID(14) === model.findClusterID(15)) assert(model.findClusterID(100) == -1) } + + test("manually set up preferences") { + val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -5.8), (1, 2, -0.4), + (1, 8, -8.1), (2, 3, -9.2), (2, 12, -1.1), (3, 15, -1.5), (3, 4, -10.1), (4, 5, -0.7), + (4, 15, -11.8), (5, 6, -0.7), (5, 7, -0.41), (7, 8, -8.1), (8, 9, -0.55), (9, 10, -5.8), + (9, 11, -0.76), (13, 14, -0.15), (14, 15, -0.67)) + + val preference = -1.3 + + val ap = new AffinityPropagation() + val similaritiesWithPreferneces = + ap.embedPreferences(sc.parallelize(similarities, 2), preference) + + val preferences = similaritiesWithPreferneces.collect().filter(x => x._1 == x._2).map(_._3) + preferences.foreach(p => assert(p == preference)) + + val model = ap.setMaxIterations(30) + .run(similaritiesWithPreferneces) + + assert(model.getK == 7) + assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) + assert(model.findClusterID(14) != -1) + assert(model.findClusterID(14) === model.findClusterID(15)) + assert(model.findClusterID(100) == -1) + } test("normalize") { /* From e062a94e580e51d7cb5ff7b3d0e93be9c3edc704 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 7 May 2015 02:57:34 +0800 Subject: [PATCH 15/16] Address comments. --- .../mllib/JavaAffinityPropagation.java | 6 +- .../clustering/AffinityPropagation.scala | 93 ++++++++++++------- .../clustering/AffinityPropagationSuite.scala | 72 +++++++------- 3 files changed, 96 insertions(+), 75 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java index 298c36fa80085..bff316ef125fa 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAffinityPropagation.java @@ -56,7 +56,7 @@ public static void main(String[] args) { .setMaxIterations(20); AffinityPropagationModel model = ap.run(similarities); - for (AffinityPropagationCluster c: model.clusters().toJavaRDD().collect()) { + for (AffinityPropagationCluster c: model.fromAssignToClusters().toJavaRDD().collect()) { StringBuilder builder = new StringBuilder(); builder.append("cluster id: " + c.id() + " -> "); builder.append(" exemplar: " + c.exemplar() + " members:"); @@ -79,7 +79,7 @@ public static void main(String[] args) { AffinityPropagationModel model2 = ap.run(ap.determinePreferences(similarities2)); - for (AffinityPropagationCluster c: model2.clusters().toJavaRDD().collect()) { + for (AffinityPropagationCluster c: model2.fromAssignToClusters().toJavaRDD().collect()) { StringBuilder builder = new StringBuilder(); builder.append("cluster id: " + c.id() + " -> "); builder.append(" exemplar: " + c.exemplar() + " members:"); @@ -91,7 +91,7 @@ public static void main(String[] args) { AffinityPropagationModel model3 = ap.run(ap.embedPreferences(similarities2, -0.5)); - for (AffinityPropagationCluster c: model3.clusters().toJavaRDD().collect()) { + for (AffinityPropagationCluster c: model3.fromAssignToClusters().toJavaRDD().collect()) { StringBuilder builder = new StringBuilder(); builder.append("cluster id: " + c.id() + " -> "); builder.append(" exemplar: " + c.exemplar() + " members:"); diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 3aa3c6336f1b0..83cb3821b12c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -33,7 +33,19 @@ import org.apache.spark.rdd.RDD * * @param id cluster id. * @param exemplar cluster exemplar. - * @param members cluster members. + * @param member cluster member. + */ +@Experimental +case class AffinityPropagationAssignment(val id: Long, val exemplar: Long, val member: Long) + +/** + * :: Experimental :: + * + * Model produced by [[AffinityPropagation]]. + * + * @param id cluster id. + * @param exemplar cluster exemplar. + * @param members cluster member. */ @Experimental case class AffinityPropagationCluster(val id: Long, val exemplar: Long, val members: Array[Long]) @@ -43,29 +55,29 @@ case class AffinityPropagationCluster(val id: Long, val exemplar: Long, val memb * * Model produced by [[AffinityPropagation]]. * - * @param clusters the clusters of AffinityPropagation clustering results. + * @param assignments the cluster assignments of AffinityPropagation clustering results. */ @Experimental class AffinityPropagationModel( - val clusters: RDD[AffinityPropagationCluster]) extends Serializable { + val assignments: RDD[AffinityPropagationAssignment]) extends Serializable { /** - * Set the number of clusters + * Get the number of clusters */ - lazy val getK: Long = clusters.count() + lazy val k: Long = assignments.map(_.id).distinct.count() /** * Find the cluster the given vertex belongs * @param vertexID vertex id. - * @return a [[Array]] that contains vertex ids in the same cluster of given vertexID. If + * @return a [[RDD]] that contains vertex ids in the same cluster of given vertexID. If * the given vertex doesn't belong to any cluster, return null. */ - def findCluster(vertexID: Long): Array[Long] = { - val cluster = clusters.filter(_.members.contains(vertexID)).collect() - if (cluster.nonEmpty) { - cluster(0).members + def findCluster(vertexID: Long): RDD[Long] = { + val assign = assignments.filter(_.member == vertexID).collect() + if (assign.nonEmpty) { + assignments.filter(_.id == assign(0).id).map(_.member) } else { - null + assignments.sparkContext.emptyRDD[Long] } } @@ -76,19 +88,27 @@ class AffinityPropagationModel( * any cluster, return -1. */ def findClusterID(vertexID: Long): Long = { - val clusterIds = clusters.flatMap { cluster => - if (cluster.members.contains(vertexID)) { - Seq(cluster.id) - } else { - Seq() - } - }.collect() - if (clusterIds.nonEmpty) { - clusterIds(0) + val assign = assignments.filter(_.member == vertexID).collect() + if (assign.nonEmpty) { + assign(0).id } else { -1 } } + + /** + * Turn cluster assignments to cluster representations [[AffinityPropagationCluster]]. + * @return a [[RDD]] that contains all clusters generated by Affinity Propagation. Because the + * cluster members in [[AffinityPropagationCluster]] is an [[Array]], it could consume too much + * memory even run out of memory when you call collect() on the returned [[RDD]]. + */ + def fromAssignToClusters(): RDD[AffinityPropagationCluster] = { + assignments.map { assign => ((assign.id, assign.exemplar), assign.member) } + .aggregateByKey(mutable.Set[Long]())( + seqOp = (s, d) => s ++ mutable.Set(d), + combOp = (s1, s2) => s1 ++ s2 + ).map(kv => new AffinityPropagationCluster(kv._1._1, kv._1._2, kv._2.toArray)) + } } /** @@ -154,7 +174,7 @@ class AffinityPropagation private[clustering] ( /** * Get maximum number of iterations of the messaging iteration loop */ - def getMaxIterations(): Int = { + def getMaxIterations: Int = { this.maxIterations } @@ -210,21 +230,22 @@ class AffinityPropagation private[clustering] ( * Calculate the median value of similarities */ private def getMedian(similarities: RDD[(Long, Long, Double)]): Double = { - import org.apache.spark.SparkContext._ - - val sorted = similarities.sortBy(_._3).zipWithIndex().map { - case (v, idx) => (idx, v) - } + val sorted: RDD[(Long, Double)] = similarities.sortBy(_._3).zipWithIndex().map { + case (v, idx) => (idx, v._3) + }.persist() val count = sorted.count() - if (count % 2 == 0) { - val l = count / 2 - 1 - val r = l + 1 - (sorted.lookup(l).head._3 + sorted.lookup(r).head._3).toDouble / 2 - } else { - sorted.lookup(count / 2).head._3.toDouble - } + val median: Double = + if (count % 2 == 0) { + val l = count / 2 - 1 + val r = l + 1 + (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2 + } else { + sorted.lookup(count / 2).head + } + sorted.unpersist() + median } /** @@ -466,10 +487,10 @@ private[clustering] object AffinityPropagation extends Logging { combOp = (s1, s2) => s1 ++ s2 ).cache() - val clusters = clusterMembers.zipWithIndex().map {kv => - new AffinityPropagationCluster(kv._2, kv._1._1, kv._1._2.toArray) + val assignments = clusterMembers.zipWithIndex().flatMap { kv => + kv._1._2.map(new AffinityPropagationAssignment(kv._2, kv._1._1, _)) } - new AffinityPropagationModel(clusters) + new AffinityPropagationModel(assignments) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 3789e2d535447..0fc61f47f2fc0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -30,16 +30,16 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { import org.apache.spark.mllib.clustering.AffinityPropagation._ test("affinity propagation") { - /* - We use the following graph to test AP. - - 15-14 -13 12 - . \ / - 4 . 3 . 2 - | . | - 5 0 . 1 10 - | \ . . - 6 7 . 8 - 9 - 11 + /** + * We use the following graph to test AP. + * + * 15-14 -13 12 + * . \ / + * 4 . 3 . 2 + * | . | + * 5 0 . 1 10 + * | \ . . + * 6 7 . 8 - 9 - 11 */ val similarities = Seq[(Long, Long, Double)]((0, 1, -8.2), (0, 3, -5.8), (1, 2, -0.4), @@ -54,8 +54,8 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { .setMaxIterations(30) .run(sc.parallelize(similarities, 2)) - assert(model.getK == 7) - assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) + assert(model.k == 7) + assert(model.findCluster(5).collect().sorted === Array[Long](4, 5, 6, 7)) assert(model.findClusterID(14) != -1) assert(model.findClusterID(14) === model.findClusterID(15)) } @@ -83,8 +83,8 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { val model = ap.setMaxIterations(30) .run(similaritiesWithPreferneces) - assert(model.getK == 7) - assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) + assert(model.k == 7) + assert(model.findCluster(5).collect().sorted === Array[Long](4, 5, 6, 7)) assert(model.findClusterID(14) != -1) assert(model.findClusterID(14) === model.findClusterID(15)) assert(model.findClusterID(100) == -1) @@ -108,34 +108,34 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { val model = ap.setMaxIterations(30) .run(similaritiesWithPreferneces) - assert(model.getK == 7) - assert(model.findCluster(5).sorted === Array[Long](4, 5, 6, 7)) + assert(model.k == 7) + assert(model.findCluster(5).collect().sorted === Array[Long](4, 5, 6, 7)) assert(model.findClusterID(14) != -1) assert(model.findClusterID(14) === model.findClusterID(15)) assert(model.findClusterID(100) == -1) } test("normalize") { - /* - Test normalize() with the following graph: - - 0 - 3 - | \ | - 1 - 2 - - The similarity matrix (A) is - - 0 1 1 1 - 1 0 1 0 - 1 1 0 1 - 1 0 1 0 - - D is diag(3, 2, 3, 2) and hence S is - - 0 1/3 1/3 1/3 - 1/2 0 1/2 0 - 1/3 1/3 0 1/3 - 1/2 0 1/2 0 + /** + * Test normalize() with the following graph: + * + * 0 - 3 + * | \ | + * 1 - 2 + * + * The similarity matrix (A) is + * + * 0 1 1 1 + * 1 0 1 0 + * 1 1 0 1 + * 1 0 1 0 + * + * D is diag(3, 2, 3, 2) and hence S is + * + * 0 1/3 1/3 1/3 + * 1/2 0 1/2 0 + * 1/3 1/3 0 1/3 + * 1/2 0 1/2 0 */ val similarities = Seq[(Long, Long, Double)]( (0, 1, 1.0), (1, 0, 1.0), (0, 2, 1.0), (2, 0, 1.0), (0, 3, 1.0), (3, 0, 1.0), From 0c7a26f56e85febfe1edac0d85251de9b0bf91e0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 7 May 2015 18:12:56 +0800 Subject: [PATCH 16/16] Fix style and use StorageLevel.MEMORY_AND_DISK for persisting. --- .../apache/spark/mllib/clustering/AffinityPropagation.scala | 3 ++- .../spark/mllib/clustering/AffinityPropagationSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala index 83cb3821b12c4..7bbdd2d858eb9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/AffinityPropagation.scala @@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * :: Experimental :: @@ -232,7 +233,7 @@ class AffinityPropagation private[clustering] ( private def getMedian(similarities: RDD[(Long, Long, Double)]): Double = { val sorted: RDD[(Long, Double)] = similarities.sortBy(_._3).zipWithIndex().map { case (v, idx) => (idx, v._3) - }.persist() + }.persist(StorageLevel.MEMORY_AND_DISK) val count = sorted.count() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala index 0fc61f47f2fc0..7048e2a91cdea 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/AffinityPropagationSuite.scala @@ -70,7 +70,7 @@ class AffinityPropagationSuite extends FunSuite with MLlibTestSparkContext { val similaritiesWithPreferneces = ap.determinePreferences(sc.parallelize(similarities, 2)) - def median(s: Seq[Double]) = { + def median(s: Seq[Double]): Double = { val (lower, upper) = s.sortWith(_<_).splitAt(s.size / 2) if (s.size % 2 == 0) (lower.last + upper.head) / 2.0 else upper.head }