From d56aa5b22829c47d7be5c6f9c3483209502c84cc Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Fri, 27 Jun 2014 14:31:22 -0400 Subject: [PATCH 01/14] Added KMeansMiniBatch implementation --- .../mllib/clustering/KMeansMiniBatch.scala | 383 ++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala new file mode 100644 index 0000000000000..cc0061455e8c9 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.ArrayBuffer + +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} + +import org.apache.spark.annotation.Experimental +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +/** + * K-means clustering with support for multiple parallel runs and a k-means++ like initialization + * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, + * they are executed together with joint passes over the data for efficiency. + * + * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given + * to it should be cached by the user. + */ +class KMeansMiniBatch private ( + private var k: Int, + private var maxIterations: Int, + private var batchSize: Int, + private var runs: Int, + private var initializationMode: String, + private var initializationSteps: Int, + private var epsilon: Double) extends Serializable with Logging { + + /** + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, + * batchSize: 1000, initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}. + */ + def this() = this(2, 20, 1, 1000, KMeansMiniBatch.K_MEANS_PARALLEL, 5, 1e-4) + + def setBatchSize(batchSize: Int): KMeansMiniBatch = { + this.batchSize = batchSize + this + } + + /** Set the number of clusters to create (k). Default: 2. */ + def setK(k: Int): KMeansMiniBatch = { + this.k = k + this + } + + /** Set maximum number of iterations to run. Default: 20. */ + def setMaxIterations(maxIterations: Int): KMeansMiniBatch = { + this.maxIterations = maxIterations + this + } + + /** + * Set the initialization algorithm. This can be either "random" to choose random points as + * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ + * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. + */ + def setInitializationMode(initializationMode: String): KMeansMiniBatch = { + if (initializationMode != KMeansMiniBatch.RANDOM && initializationMode != KMeansMiniBatch.K_MEANS_PARALLEL) { + throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode) + } + this.initializationMode = initializationMode + this + } + + /** + * :: Experimental :: + * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm + * this many times with random starting conditions (configured by the initialization mode), then + * return the best clustering found over any run. Default: 1. + */ + @Experimental + def setRuns(runs: Int): KMeansMiniBatch = { + if (runs <= 0) { + throw new IllegalArgumentException("Number of runs must be positive") + } + this.runs = runs + this + } + + /** + * Set the number of steps for the k-means|| initialization mode. This is an advanced + * setting -- the default of 5 is almost always enough. Default: 5. + */ + def setInitializationSteps(initializationSteps: Int): KMeansMiniBatch = { + if (initializationSteps <= 0) { + throw new IllegalArgumentException("Number of initialization steps must be positive") + } + this.initializationSteps = initializationSteps + this + } + + /** + * Set the distance threshold within which we've consider centers to have converged. + * If all centers move less than this Euclidean distance, we stop iterating one run. + */ + def setEpsilon(epsilon: Double): KMeansMiniBatch = { + this.epsilon = epsilon + this + } + + /** + * Train a K-means model on the given set of points; `data` should be cached for high + * performance, because this is an iterative algorithm. + */ + def run(data: RDD[Vector]): KMeansModel = { + // Compute squared norms and cache them. + val norms = data.map(v => breezeNorm(v.toBreeze, 2.0)) + norms.persist() + val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) => + new BreezeVectorWithNorm(v, norm) + } + + val runModels = (0 until runs).map { _ => + runBreeze(breezeData) + } + + val bestModel = runModels.minBy(t => t._2)._1 + + norms.unpersist() + bestModel + } + + /** + * Implementation of K-Means using breeze. + */ + private def runBreeze(data: RDD[BreezeVectorWithNorm]): (KMeansModel, Double) = { + + val sc = data.sparkContext + + val initStartTime = System.nanoTime() + + val centers = if (initializationMode == KMeansMiniBatch.RANDOM) { + initRandom(data) + } else { + initKMeansMiniBatchParallel(data) + } + + val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 + logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + + " seconds.") + + var costs = 0.0 + var iteration = 0 + val iterationStartTime = System.nanoTime() + + // Execute iterations of Lloyd's algorithm until all runs have converged + while (iteration < maxIterations) { + type WeightedPoint = (BV[Double], Long) + def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { + (p1._1 += p2._1, p1._2 + p2._2) + } + + val costAccums = sc.accumulator(0.0) + + // Find the sum and count of points mapping to each center + val totalContribs = data.mapPartitions { points => + val k = centers.length + val dims = centers(0).vector.length + + val sums = Array.fill(k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]]) + val counts = Array.fill(k)(0L) + + points.foreach { point => + val (bestCenter, cost) = KMeansMiniBatch.findClosest(centers, point) + costAccums += cost + sums(bestCenter) += point.vector + counts(bestCenter) += 1 + } + + val contribs = for (j <- 0 until k) yield { + (j, (sums(j), counts(j))) + } + contribs.iterator + }.reduceByKey(mergeContribs).collectAsMap() + + // Update the cluster centers and costs + var j = 0 + while (j < k) { + val (sum, count) = totalContribs(j) + if (count != 0) { + sum /= count.toDouble + val newCenter = new BreezeVectorWithNorm(sum) + centers(j) = newCenter + } + j += 1 + } + + costs = costAccums.value + iteration += 1 + } + + val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 + logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.") + + logInfo(s"The cost for the run is $costs.") + + new Tuple2(new KMeansModel(centers.map(c => Vectors.fromBreeze(c.vector))), costs) + } + + /** + * Initialize `runs` sets of cluster centers at random. + */ + private def initRandom(data: RDD[BreezeVectorWithNorm]) + : Array[BreezeVectorWithNorm] = { + // Sample all the cluster centers in one pass to avoid repeated scans + val sample = data.takeSample(true, k, new XORShiftRandom().nextInt()).toSeq + sample.map { v => + new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm) + }.toArray + } + + /** + * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries + * to find with dissimilar cluster centers by starting with a random center and then doing + * passes where more centers are chosen with probability proportional to their squared distance + * to the current cluster set. It results in a provable approximation to an optimal clustering. + * + * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. + */ + private def initKMeansMiniBatchParallel(data: RDD[BreezeVectorWithNorm]) + : Array[BreezeVectorWithNorm] = { + // Initialize each run's center to a random point + val seed = new XORShiftRandom().nextInt() + val sample = data.takeSample(true, 1, seed).toSeq + val centers = ArrayBuffer() ++ sample + + // On each step, sample 2 * k points on average for each run with probability proportional + // to their squared distance from that run's current centers + var step = 0 + while (step < initializationSteps) { + val sumCosts = data.map { point => + KMeansMiniBatch.pointCost(centers, point) + }.reduce(_ + _) + val chosen = data.mapPartitionsWithIndex { (index, points) => + val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) + + // accept / reject each point + val sampledCenters = points.filter { p => + rand.nextDouble() < 2.0 * KMeansMiniBatch.pointCost(centers, p) * k / sumCosts + } + + sampledCenters + }.collect() + + + centers ++= chosen + step += 1 + } + + // Finally, we might have a set of more than k candidate centers for each run; weigh each + // candidate by the number of points in the dataset mapping to it and run a local k-means++ + // on the weighted centers to pick just k of them + val weightMap = data.map { p => + (KMeansMiniBatch.findClosest(centers, p)._1, 1.0) + }.reduceByKey(_ + _).collectAsMap() + val weights = (0 until centers.length).map(i => weightMap.getOrElse(i, 0.0)).toArray + val finalCenters = LocalKMeans.kMeansPlusPlus(seed, centers.toArray, weights, k, 30) + + finalCenters.toArray + } +} + + +/** + * Top-level methods for calling K-means clustering. + */ +object KMeansMiniBatch { + + // Initialization mode names + val RANDOM = "random" + val K_MEANS_PARALLEL = "k-means||" + + /** + * Trains a k-means model using the given set of parameters. + * + * @param data training points stored as `RDD[Array[Double]]` + * @param k number of clusters + * @param batchSize number of points in each batch + * @param maxIterations max number of iterations + * @param runs number of parallel runs, defaults to 1. The best model is returned. + * @param initializationMode initialization model, either "random" or "k-means||" (default). + */ + def train( + data: RDD[Vector], + k: Int, + batchSize: Int, + maxIterations: Int, + runs: Int, + initializationMode: String): KMeansModel = { + new KMeansMiniBatch().setK(k) + .setBatchSize(batchSize) + .setMaxIterations(maxIterations) + .setRuns(runs) + .setInitializationMode(initializationMode) + .run(data) + } + + /** + * Trains a k-means model using specified parameters and the default values for unspecified. + */ + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int): KMeansModel = { + train(data, k, 1000, maxIterations, 1, K_MEANS_PARALLEL) + } + + /** + * Trains a k-means model using specified parameters and the default values for unspecified. + */ + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int): KMeansModel = { + train(data, k, 1000, maxIterations, runs, K_MEANS_PARALLEL) + } + + /** + * Returns the index of the closest center to the given point, as well as the squared distance. + */ + private[mllib] def findClosest( + centers: TraversableOnce[BreezeVectorWithNorm], + point: BreezeVectorWithNorm): (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + centers.foreach { center => + // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary + // distance computation. + var lowerBoundOfSqDist = center.norm - point.norm + lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist + if (lowerBoundOfSqDist < bestDistance) { + val distance: Double = fastSquaredDistance(center, point) + if (distance < bestDistance) { + bestDistance = distance + bestIndex = i + } + } + i += 1 + } + (bestIndex, bestDistance) + } + + /** + * Returns the K-means cost of a given point against the given cluster centers. + */ + private[mllib] def pointCost( + centers: TraversableOnce[BreezeVectorWithNorm], + point: BreezeVectorWithNorm): Double = + findClosest(centers, point)._2 + + /** + * Returns the squared Euclidean distance between two vectors computed by + * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. + */ + private[clustering] def fastSquaredDistance( + v1: BreezeVectorWithNorm, + v2: BreezeVectorWithNorm): Double = { + MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) + } +} From 54fabe1c7b158c64d860151ca77a410df66a6ac7 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Fri, 27 Jun 2014 14:36:47 -0400 Subject: [PATCH 02/14] Updated KMeansMiniBatch docs --- .../apache/spark/mllib/clustering/KMeansMiniBatch.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index cc0061455e8c9..1657a17bef871 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -30,10 +30,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.util.random.XORShiftRandom /** - * K-means clustering with support for multiple parallel runs and a k-means++ like initialization - * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, - * they are executed together with joint passes over the data for efficiency. - * + * K-means clustering with support for multiple parallel runs, a k-means++ like initialization + * mode (the k-means|| algorithm by Bahmani et al), and randomly-sampled mini-batches of points + * in each iteration instead of all points for speed (Web-Scale K-Means Clustering by Sculley). + * * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given * to it should be cached by the user. */ From 2afee1af31aeb4a542ff24628f4ed89d46e3a06f Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Fri, 27 Jun 2014 14:49:05 -0400 Subject: [PATCH 03/14] Added KMeansMiniBatch to docs --- docs/mllib-clustering.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 429cdf8d40cec..b049b3bc3b526 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -21,8 +21,11 @@ MLlib supports the most commonly used clustering algorithms that clusters the data points into predefined number of clusters. The MLlib implementation includes a parallelized variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method -called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf). -The implementation in MLlib has the following parameters: +called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf) +as well as a higher-performance [mini-batch](http://dl.acm.org/citation.cfm?id=1772862) +version that uses a randomly-sampled subset of the data points in each iteration +instead of the full set of data points. The implementation in MLlib has the +following parameters: * *k* is the number of desired clusters. * *maxIterations* is the maximum number of iterations to run. @@ -34,6 +37,10 @@ a given dataset, the algorithm returns the best clustering result). * *initializationSteps* determines the number of steps in the k-means\|\| algorithm. * *epsilon* determines the distance threshold within which we consider k-means to have converged. +The mini-batch version takes an additional paramater: + +* *batchSize* is the number of points to randomly sample in each iteration. + ## Examples
From 0853adbf55a7452e1804d722b133b002d5c0ff19 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Fri, 27 Jun 2014 15:49:11 -0400 Subject: [PATCH 04/14] Added overloaded alternative for train() --- .../spark/mllib/clustering/KMeansMiniBatch.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index 1657a17bef871..7f5a38a295e8f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -336,6 +336,18 @@ object KMeansMiniBatch { runs: Int): KMeansModel = { train(data, k, 1000, maxIterations, runs, K_MEANS_PARALLEL) } + + /** + * Trains a k-means model using specified parameters and the default values for unspecified. + */ + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String): KMeansModel = { + train(data, k, 1000, maxIterations, runs, initializationMode) + } /** * Returns the index of the closest center to the given point, as well as the squared distance. From fc472ca867fbe2475cbd402f32a78c1e5cb3f060 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Fri, 27 Jun 2014 15:49:43 -0400 Subject: [PATCH 05/14] Added KMeansMiniBatchSuite test --- .../clustering/KMeansMiniBatchSuite.scala | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala new file mode 100644 index 0000000000000..42a4722b4dc52 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.Vectors + +class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { + + import KMeans.{RANDOM, K_MEANS_PARALLEL} + + test("single cluster") { + val data = sc.parallelize(Array( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) + )) + + val center = Vectors.dense(1.0, 3.0, 4.0) + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + var model = KMeansMiniBatch.train(data, k=1, maxIterations=1) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=2) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train( + data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assert(model.clusterCenters.head === center) + } + + test("single cluster with big dataset") { + val smallData = Array( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) + ) + val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + val center = Vectors.dense(1.0, 3.0, 4.0) + + var model = KMeansMiniBatch.train(data, k=1, maxIterations=1) + assert(model.clusterCenters.size === 1) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=2) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assert(model.clusterCenters.head === center) + } + + test("single cluster with sparse data") { + + val n = 10000 + val data = sc.parallelize((1 to 100).flatMap { i => + val x = i / 1000.0 + Array( + Vectors.sparse(n, Seq((0, 1.0 + x), (1, 2.0), (2, 6.0))), + Vectors.sparse(n, Seq((0, 1.0 - x), (1, 2.0), (2, 6.0))), + Vectors.sparse(n, Seq((0, 1.0), (1, 3.0 + x))), + Vectors.sparse(n, Seq((0, 1.0), (1, 3.0 - x))), + Vectors.sparse(n, Seq((0, 1.0), (1, 4.0), (2, 6.0 + x))), + Vectors.sparse(n, Seq((0, 1.0), (1, 4.0), (2, 6.0 - x))) + ) + }, 4) + + data.persist() + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))) + + var model = KMeansMiniBatch.train(data, k=1, maxIterations=1) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=2) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assert(model.clusterCenters.head === center) + + model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assert(model.clusterCenters.head === center) + + data.unpersist() + } + + test("k-means|| initialization") { + val points = Seq( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0), + Vectors.dense(1.0, 0.0, 1.0), + Vectors.dense(1.0, 1.0, 1.0) + ) + val rdd = sc.parallelize(points) + + // K-means|| initialization should place all clusters into distinct centers because + // it will make at least five passes, and it will give non-zero probability to each + // unselected point as long as it hasn't yet selected all of them + + var model = KMeansMiniBatch.train(rdd, k=5, maxIterations=1) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) + + // Iterations of Lloyd's should not change the answer either + model = KMeansMiniBatch.train(rdd, k=5, maxIterations=10) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) + + // Neither should more runs + model = KMeansMiniBatch.train(rdd, k=5, maxIterations=10, runs=5) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) + } + + test("two clusters") { + val points = Seq( + Vectors.dense(0.0, 0.0), + Vectors.dense(0.0, 0.1), + Vectors.dense(0.1, 0.0), + Vectors.dense(9.0, 0.0), + Vectors.dense(9.0, 0.2), + Vectors.dense(9.2, 0.0) + ) + val rdd = sc.parallelize(points, 3) + + for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { + // Two iterations are sufficient no matter where the initial centers are. + val model = KMeansMiniBatch.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) + + val predicts = model.predict(rdd).collect() + + assert(predicts(0) === predicts(1)) + assert(predicts(0) === predicts(2)) + assert(predicts(3) === predicts(4)) + assert(predicts(3) === predicts(5)) + assert(predicts(0) != predicts(3)) + } + } +} From a6626fb8f7efcc3922e5c905219c39d9957670f9 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 1 Jul 2014 13:58:57 -0400 Subject: [PATCH 06/14] Created KMeansCommons file for common code. Created trait holding functions common to KMeans and KMeansMiniBatch objects. --- .../spark/mllib/clustering/KMeans.scala | 64 +---------------- .../mllib/clustering/KMeansCommons.scala | 72 +++++++++++++++++++ .../mllib/clustering/KMeansMiniBatch.scala | 46 +----------- 3 files changed, 74 insertions(+), 108 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index de22fbb6ffc10..60317b3bcdaa2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -305,7 +305,7 @@ class KMeans private ( /** * Top-level methods for calling K-means clustering. */ -object KMeans { +object KMeans extends KMeansObjectCommons { // Initialization mode names val RANDOM = "random" @@ -353,66 +353,4 @@ object KMeans { runs: Int): KMeansModel = { train(data, k, maxIterations, runs, K_MEANS_PARALLEL) } - - /** - * Returns the index of the closest center to the given point, as well as the squared distance. - */ - private[mllib] def findClosest( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - centers.foreach { center => - // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary - // distance computation. - var lowerBoundOfSqDist = center.norm - point.norm - lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist - if (lowerBoundOfSqDist < bestDistance) { - val distance: Double = fastSquaredDistance(center, point) - if (distance < bestDistance) { - bestDistance = distance - bestIndex = i - } - } - i += 1 - } - (bestIndex, bestDistance) - } - - /** - * Returns the K-means cost of a given point against the given cluster centers. - */ - private[mllib] def pointCost( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): Double = - findClosest(centers, point)._2 - - /** - * Returns the squared Euclidean distance between two vectors computed by - * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. - */ - private[clustering] def fastSquaredDistance( - v1: BreezeVectorWithNorm, - v2: BreezeVectorWithNorm): Double = { - MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) - } -} - -/** - * A breeze vector with its norm for fast distance computation. - * - * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] - */ -private[clustering] -class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable { - - def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0)) - - def this(array: Array[Double]) = this(new BDV[Double](array)) - - def this(v: Vector) = this(v.toBreeze) - - /** Converts the vector to a dense vector. */ - def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala new file mode 100644 index 0000000000000..966f42315216c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala @@ -0,0 +1,72 @@ +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} + +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.util.MLUtils + +trait KMeansObjectCommons { + + /** + * Returns the index of the closest center to the given point, as well as the squared distance. + */ + private[mllib] def findClosest( + centers: TraversableOnce[BreezeVectorWithNorm], + point: BreezeVectorWithNorm): (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + centers.foreach { center => + // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary + // distance computation. + var lowerBoundOfSqDist = center.norm - point.norm + lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist + if (lowerBoundOfSqDist < bestDistance) { + val distance: Double = fastSquaredDistance(center, point) + if (distance < bestDistance) { + bestDistance = distance + bestIndex = i + } + } + i += 1 + } + (bestIndex, bestDistance) + } + + /** + * Returns the K-means cost of a given point against the given cluster centers. + */ + private[mllib] def pointCost( + centers: TraversableOnce[BreezeVectorWithNorm], + point: BreezeVectorWithNorm): Double = + findClosest(centers, point)._2 + + /** + * Returns the squared Euclidean distance between two vectors computed by + * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. + */ + private[clustering] def fastSquaredDistance( + v1: BreezeVectorWithNorm, + v2: BreezeVectorWithNorm): Double = { + MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) + } + +} + +/** + * A breeze vector with its norm for fast distance computation. + * + * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] + */ +private[clustering] +class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable { + + def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0)) + + def this(array: Array[Double]) = this(new BDV[Double](array)) + + def this(v: Vector) = this(v.toBreeze) + + /** Converts the vector to a dense vector. */ + def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm) +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index 7f5a38a295e8f..1f466dcf33a8d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -285,7 +285,7 @@ class KMeansMiniBatch private ( /** * Top-level methods for calling K-means clustering. */ -object KMeansMiniBatch { +object KMeansMiniBatch extends KMeansObjectCommons { // Initialization mode names val RANDOM = "random" @@ -348,48 +348,4 @@ object KMeansMiniBatch { initializationMode: String): KMeansModel = { train(data, k, 1000, maxIterations, runs, initializationMode) } - - /** - * Returns the index of the closest center to the given point, as well as the squared distance. - */ - private[mllib] def findClosest( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - centers.foreach { center => - // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary - // distance computation. - var lowerBoundOfSqDist = center.norm - point.norm - lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist - if (lowerBoundOfSqDist < bestDistance) { - val distance: Double = fastSquaredDistance(center, point) - if (distance < bestDistance) { - bestDistance = distance - bestIndex = i - } - } - i += 1 - } - (bestIndex, bestDistance) - } - - /** - * Returns the K-means cost of a given point against the given cluster centers. - */ - private[mllib] def pointCost( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): Double = - findClosest(centers, point)._2 - - /** - * Returns the squared Euclidean distance between two vectors computed by - * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. - */ - private[clustering] def fastSquaredDistance( - v1: BreezeVectorWithNorm, - v2: BreezeVectorWithNorm): Double = { - MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) - } } From 316e0053a2b70bd8e847632c2edd0eea1a825a29 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 1 Jul 2014 17:19:35 -0400 Subject: [PATCH 07/14] Created KMeansCommons.KMeansCommons trait for common functions in private KMeans classes. Moved KMeansMiniBatch.{initRandom, initKMeansMiniBatchParallel} there --- .../mllib/clustering/KMeansCommons.scala | 72 +++++++++++++++++++ .../mllib/clustering/KMeansMiniBatch.scala | 69 +----------------- 2 files changed, 75 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala index 966f42315216c..5d57b69c50d60 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala @@ -1,9 +1,81 @@ package org.apache.spark.mllib.clustering +import scala.collection.mutable.ArrayBuffer + import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} +import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +trait KMeansCommons { + /** + * Initialize `runs` sets of cluster centers at random. + */ + protected def initRandom(data: RDD[BreezeVectorWithNorm], k: Int) + : Array[BreezeVectorWithNorm] = { + // Sample all the cluster centers in one pass to avoid repeated scans + val sample = data.takeSample(true, k, new XORShiftRandom().nextInt()).toSeq + sample.map { v => + new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm) + }.toArray + } + + /** + * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries + * to find with dissimilar cluster centers by starting with a random center and then doing + * passes where more centers are chosen with probability proportional to their squared distance + * to the current cluster set. It results in a provable approximation to an optimal clustering. + * + * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. + */ + protected def initKMeansMiniBatchParallel(data: RDD[BreezeVectorWithNorm], + k: Int, + initializationSteps: Int) + : Array[BreezeVectorWithNorm] = { + // Initialize each run's center to a random point + val seed = new XORShiftRandom().nextInt() + val sample = data.takeSample(true, 1, seed).toSeq + val centers = ArrayBuffer() ++ sample + + // On each step, sample 2 * k points on average for each run with probability proportional + // to their squared distance from that run's current centers + var step = 0 + while (step < initializationSteps) { + val sumCosts = data.map { point => + KMeansMiniBatch.pointCost(centers, point) + }.reduce(_ + _) + val chosen = data.mapPartitionsWithIndex { (index, points) => + val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) + + // accept / reject each point + val sampledCenters = points.filter { p => + rand.nextDouble() < 2.0 * KMeansMiniBatch.pointCost(centers, p) * k / sumCosts + } + + sampledCenters + }.collect() + + + centers ++= chosen + step += 1 + } + + // Finally, we might have a set of more than k candidate centers for each run; weigh each + // candidate by the number of points in the dataset mapping to it and run a local k-means++ + // on the weighted centers to pick just k of them + val weightMap = data.map { p => + (KMeansMiniBatch.findClosest(centers, p)._1, 1.0) + }.reduceByKey(_ + _).collectAsMap() + val weights = (0 until centers.length).map(i => weightMap.getOrElse(i, 0.0)).toArray + val finalCenters = LocalKMeans.kMeansPlusPlus(seed, centers.toArray, weights, k, 30) + + finalCenters.toArray + } +} trait KMeansObjectCommons { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index 1f466dcf33a8d..c1b40d6fe8da9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -44,7 +44,7 @@ class KMeansMiniBatch private ( private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, - private var epsilon: Double) extends Serializable with Logging { + private var epsilon: Double) extends Serializable with KMeansCommons with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, @@ -150,9 +150,9 @@ class KMeansMiniBatch private ( val initStartTime = System.nanoTime() val centers = if (initializationMode == KMeansMiniBatch.RANDOM) { - initRandom(data) + initRandom(data, k) } else { - initKMeansMiniBatchParallel(data) + initKMeansMiniBatchParallel(data, k, initializationSteps) } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 @@ -216,69 +216,6 @@ class KMeansMiniBatch private ( new Tuple2(new KMeansModel(centers.map(c => Vectors.fromBreeze(c.vector))), costs) } - - /** - * Initialize `runs` sets of cluster centers at random. - */ - private def initRandom(data: RDD[BreezeVectorWithNorm]) - : Array[BreezeVectorWithNorm] = { - // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, k, new XORShiftRandom().nextInt()).toSeq - sample.map { v => - new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm) - }.toArray - } - - /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. - * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find with dissimilar cluster centers by starting with a random center and then doing - * passes where more centers are chosen with probability proportional to their squared distance - * to the current cluster set. It results in a provable approximation to an optimal clustering. - * - * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. - */ - private def initKMeansMiniBatchParallel(data: RDD[BreezeVectorWithNorm]) - : Array[BreezeVectorWithNorm] = { - // Initialize each run's center to a random point - val seed = new XORShiftRandom().nextInt() - val sample = data.takeSample(true, 1, seed).toSeq - val centers = ArrayBuffer() ++ sample - - // On each step, sample 2 * k points on average for each run with probability proportional - // to their squared distance from that run's current centers - var step = 0 - while (step < initializationSteps) { - val sumCosts = data.map { point => - KMeansMiniBatch.pointCost(centers, point) - }.reduce(_ + _) - val chosen = data.mapPartitionsWithIndex { (index, points) => - val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - - // accept / reject each point - val sampledCenters = points.filter { p => - rand.nextDouble() < 2.0 * KMeansMiniBatch.pointCost(centers, p) * k / sumCosts - } - - sampledCenters - }.collect() - - - centers ++= chosen - step += 1 - } - - // Finally, we might have a set of more than k candidate centers for each run; weigh each - // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick just k of them - val weightMap = data.map { p => - (KMeansMiniBatch.findClosest(centers, p)._1, 1.0) - }.reduceByKey(_ + _).collectAsMap() - val weights = (0 until centers.length).map(i => weightMap.getOrElse(i, 0.0)).toArray - val finalCenters = LocalKMeans.kMeansPlusPlus(seed, centers.toArray, weights, k, 30) - - finalCenters.toArray - } } From b37c733b3ee382194fff10492970b54eeb55e786 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 1 Jul 2014 17:36:55 -0400 Subject: [PATCH 08/14] Change KMeansCommons.initKMeansMiniBatchParallel to KMeansCommons.initParallel --- .../scala/org/apache/spark/mllib/clustering/KMeansCommons.scala | 2 +- .../org/apache/spark/mllib/clustering/KMeansMiniBatch.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala index 5d57b69c50d60..16a0cc92f0a39 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala @@ -32,7 +32,7 @@ trait KMeansCommons { * * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. */ - protected def initKMeansMiniBatchParallel(data: RDD[BreezeVectorWithNorm], + protected def initParallel(data: RDD[BreezeVectorWithNorm], k: Int, initializationSteps: Int) : Array[BreezeVectorWithNorm] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index c1b40d6fe8da9..3c85a080ac9d1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -152,7 +152,7 @@ class KMeansMiniBatch private ( val centers = if (initializationMode == KMeansMiniBatch.RANDOM) { initRandom(data, k) } else { - initKMeansMiniBatchParallel(data, k, initializationSteps) + initParallel(data, k, initializationSteps) } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 From 71e56852e7f687408eb24eee4c20b5aeaa67763c Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 1 Jul 2014 18:06:52 -0400 Subject: [PATCH 09/14] Modify KMeans to use initRandom, initParallel in KMeansCommons --- .../spark/mllib/clustering/KMeans.scala | 76 ++----------------- 1 file changed, 5 insertions(+), 71 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 60317b3bcdaa2..8e884908106db 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -43,7 +43,7 @@ class KMeans private ( private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, - private var epsilon: Double) extends Serializable with Logging { + private var epsilon: Double) extends Serializable with KMeansCommons with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, @@ -138,9 +138,11 @@ class KMeans private ( val initStartTime = System.nanoTime() val centers = if (initializationMode == KMeans.RANDOM) { - initRandom(data) + val flatCenters = initRandom(data, k * runs) + + Array.tabulate(runs)(r => flatCenters.slice(r * k, (r + 1) * k).toArray) } else { - initKMeansParallel(data) + Array.tabulate(runs)(r => initParallel(data, k, initializationSteps)) } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 @@ -231,74 +233,6 @@ class KMeans private ( new KMeansModel(centers(bestRun).map(c => Vectors.fromBreeze(c.vector))) } - - /** - * Initialize `runs` sets of cluster centers at random. - */ - private def initRandom(data: RDD[BreezeVectorWithNorm]) - : Array[Array[BreezeVectorWithNorm]] = { - // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq - Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => - new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm) - }.toArray) - } - - /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. - * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find with dissimilar cluster centers by starting with a random center and then doing - * passes where more centers are chosen with probability proportional to their squared distance - * to the current cluster set. It results in a provable approximation to an optimal clustering. - * - * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. - */ - private def initKMeansParallel(data: RDD[BreezeVectorWithNorm]) - : Array[Array[BreezeVectorWithNorm]] = { - // Initialize each run's center to a random point - val seed = new XORShiftRandom().nextInt() - val sample = data.takeSample(true, runs, seed).toSeq - val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) - - // On each step, sample 2 * k points on average for each run with probability proportional - // to their squared distance from that run's current centers - var step = 0 - while (step < initializationSteps) { - val sumCosts = data.flatMap { point => - (0 until runs).map { r => - (r, KMeans.pointCost(centers(r), point)) - } - }.reduceByKey(_ + _).collectAsMap() - val chosen = data.mapPartitionsWithIndex { (index, points) => - val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - points.flatMap { p => - (0 until runs).filter { r => - rand.nextDouble() < 2.0 * KMeans.pointCost(centers(r), p) * k / sumCosts(r) - }.map((_, p)) - } - }.collect() - chosen.foreach { case (r, p) => - centers(r) += p.toDense - } - step += 1 - } - - // Finally, we might have a set of more than k candidate centers for each run; weigh each - // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick just k of them - val weightMap = data.flatMap { p => - (0 until runs).map { r => - ((r, KMeans.findClosest(centers(r), p)._1), 1.0) - } - }.reduceByKey(_ + _).collectAsMap() - val finalCenters = (0 until runs).map { r => - val myCenters = centers(r).toArray - val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray - LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) - } - - finalCenters.toArray - } } From 5eb9b2919aa1f7c12ef4c60e0ae2028a09dc51ec Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 1 Jul 2014 18:09:34 -0400 Subject: [PATCH 10/14] Updated documentation for initRun/initParallel to specify that centers are only for one run. --- .../org/apache/spark/mllib/clustering/KMeansCommons.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala index 16a0cc92f0a39..aedabddbdb76b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansCommons.scala @@ -12,7 +12,7 @@ import org.apache.spark.util.random.XORShiftRandom trait KMeansCommons { /** - * Initialize `runs` sets of cluster centers at random. + * Initialize cluster centers for one run at random. */ protected def initRandom(data: RDD[BreezeVectorWithNorm], k: Int) : Array[BreezeVectorWithNorm] = { @@ -24,7 +24,7 @@ trait KMeansCommons { } /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * Initialize cluster centers for one run using the k-means|| algorithm by Bahmani et al. * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries * to find with dissimilar cluster centers by starting with a random center and then doing * passes where more centers are chosen with probability proportional to their squared distance From b121178fc7ea7b3e829dc3a77b7005dbc6f2dd80 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 1 Jul 2014 18:11:50 -0400 Subject: [PATCH 11/14] Clean up calls to initRandom in KMeans --- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 8e884908106db..22b249766e1ca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -138,9 +138,7 @@ class KMeans private ( val initStartTime = System.nanoTime() val centers = if (initializationMode == KMeans.RANDOM) { - val flatCenters = initRandom(data, k * runs) - - Array.tabulate(runs)(r => flatCenters.slice(r * k, (r + 1) * k).toArray) + Array.tabulate(runs)(r => initRandom(data, k)) } else { Array.tabulate(runs)(r => initParallel(data, k, initializationSteps)) } From 294640b082aa1159ad5bdda808c315c06cf1c152 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Wed, 2 Jul 2014 14:39:18 -0400 Subject: [PATCH 12/14] Updated KMeansMiniBatch to properly implement sampling and gradient step update --- .../mllib/clustering/KMeansMiniBatch.scala | 75 +++++++------------ 1 file changed, 25 insertions(+), 50 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index 3c85a080ac9d1..2f1a2e73527e4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -43,14 +43,13 @@ class KMeansMiniBatch private ( private var batchSize: Int, private var runs: Int, private var initializationMode: String, - private var initializationSteps: Int, - private var epsilon: Double) extends Serializable with KMeansCommons with Logging { - + private var initializationSteps: Int) extends Serializable with KMeansCommons with Logging { + /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * batchSize: 1000, initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}. + * batchSize: 1000, initializationMode: "k-means||", initializationSteps: 5}. */ - def this() = this(2, 20, 1, 1000, KMeansMiniBatch.K_MEANS_PARALLEL, 5, 1e-4) + def this() = this(2, 20, 1, 1000, KMeansMiniBatch.K_MEANS_PARALLEL, 5) def setBatchSize(batchSize: Int): KMeansMiniBatch = { this.batchSize = batchSize @@ -109,15 +108,6 @@ class KMeansMiniBatch private ( this } - /** - * Set the distance threshold within which we've consider centers to have converged. - * If all centers move less than this Euclidean distance, we stop iterating one run. - */ - def setEpsilon(epsilon: Double): KMeansMiniBatch = { - this.epsilon = epsilon - this - } - /** * Train a K-means model on the given set of points; `data` should be cached for high * performance, because this is an iterative algorithm. @@ -154,7 +144,9 @@ class KMeansMiniBatch private ( } else { initParallel(data, k, initializationSteps) } - + + val centerCounts = Array.fill(centers.length){0} + val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + " seconds.") @@ -165,47 +157,30 @@ class KMeansMiniBatch private ( // Execute iterations of Lloyd's algorithm until all runs have converged while (iteration < maxIterations) { - type WeightedPoint = (BV[Double], Long) - def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { - (p1._1 += p2._1, p1._2 + p2._2) - } - - val costAccums = sc.accumulator(0.0) - - // Find the sum and count of points mapping to each center - val totalContribs = data.mapPartitions { points => - val k = centers.length - val dims = centers(0).vector.length - - val sums = Array.fill(k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]]) - val counts = Array.fill(k)(0L) - points.foreach { point => - val (bestCenter, cost) = KMeansMiniBatch.findClosest(centers, point) - costAccums += cost - sums(bestCenter) += point.vector - counts(bestCenter) += 1 - } + val sampledPoints = data.sample(false, batchSize) + + val groupedPoints = sampledPoints.map { p => + val (center, cost) = KMeansMiniBatch.findClosest(centers, p) - val contribs = for (j <- 0 until k) yield { - (j, (sums(j), counts(j))) - } - contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() + (center, p.vector, cost) + }.collect() + // Update the cluster centers and costs - var j = 0 - while (j < k) { - val (sum, count) = totalContribs(j) - if (count != 0) { - sum /= count.toDouble - val newCenter = new BreezeVectorWithNorm(sum) - centers(j) = newCenter - } - j += 1 + costs = 0.0 + for ((centerIdx, vec, dist) <- groupedPoints) { + costs += dist + centerCounts(centerIdx) += 1 + + // take gradient step + val learningRate = 1.0 / centerCounts(centerIdx).toDouble + val center = centers(centerIdx).vector + val updatedCenter = center * (1.0 - learningRate) + vec * learningRate + + centers(centerIdx) = new BreezeVectorWithNorm(updatedCenter) } - costs = costAccums.value iteration += 1 } From da1c2cf3db223fcde752ba9cf2e818be20617558 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Wed, 2 Jul 2014 14:40:28 -0400 Subject: [PATCH 13/14] Modify KMeansMiniBatchSuite to test KMeansMiniBatch differences. Given stochastic nature, use epsilons instead of direct comparison of floats. --- .../clustering/KMeansMiniBatchSuite.scala | 139 ++++-------------- 1 file changed, 28 insertions(+), 111 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala index 42a4722b4dc52..de927b577ad5e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} + import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext @@ -25,41 +27,8 @@ import org.apache.spark.mllib.linalg.Vectors class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { import KMeans.{RANDOM, K_MEANS_PARALLEL} - - test("single cluster") { - val data = sc.parallelize(Array( - Vectors.dense(1.0, 2.0, 6.0), - Vectors.dense(1.0, 3.0, 0.0), - Vectors.dense(1.0, 4.0, 6.0) - )) - - val center = Vectors.dense(1.0, 3.0, 4.0) - - // No matter how many runs or iterations we use, we should get one cluster, - // centered at the mean of the points - - var model = KMeansMiniBatch.train(data, k=1, maxIterations=1) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=2) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=5) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train( - data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assert(model.clusterCenters.head === center) - } + + val epsilon = 1e-04 test("single cluster with big dataset") { val smallData = Array( @@ -67,34 +36,20 @@ class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { Vectors.dense(1.0, 3.0, 0.0), Vectors.dense(1.0, 4.0, 6.0) ) - val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) - - // No matter how many runs or iterations we use, we should get one cluster, - // centered at the mean of the points - - val center = Vectors.dense(1.0, 3.0, 4.0) - - var model = KMeansMiniBatch.train(data, k=1, maxIterations=1) - assert(model.clusterCenters.size === 1) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=2) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=5) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) - assert(model.clusterCenters.head === center) + val data = sc.parallelize((1 to 10).flatMap(_ => smallData), 4) + + data.persist() - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) - assert(model.clusterCenters.head === center) + // result should converge to given center after a few iterations + val center = Vectors.dense(1.0, 3.0, 4.0).toBreeze - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assert(model.clusterCenters.head === center) + var model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=RANDOM) + var error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) + assert(error < epsilon) - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assert(model.clusterCenters.head === center) + model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=K_MEANS_PARALLEL) + error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) + assert(error < epsilon) } test("single cluster with sparse data") { @@ -113,62 +68,22 @@ class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { }, 4) data.persist() + + val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))).toBreeze - // No matter how many runs or iterations we use, we should get one cluster, - // centered at the mean of the points - - val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))) - - var model = KMeansMiniBatch.train(data, k=1, maxIterations=1) - assert(model.clusterCenters.head === center) + var model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=RANDOM) + var error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) + assert(error < epsilon) - model = KMeansMiniBatch.train(data, k=1, maxIterations=2) - assert(model.clusterCenters.head === center) - model = KMeansMiniBatch.train(data, k=1, maxIterations=5) - assert(model.clusterCenters.head === center) + model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=K_MEANS_PARALLEL) + error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) + assert(error < epsilon) - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=5) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assert(model.clusterCenters.head === center) - - model = KMeansMiniBatch.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assert(model.clusterCenters.head === center) data.unpersist() } - test("k-means|| initialization") { - val points = Seq( - Vectors.dense(1.0, 2.0, 6.0), - Vectors.dense(1.0, 3.0, 0.0), - Vectors.dense(1.0, 4.0, 6.0), - Vectors.dense(1.0, 0.0, 1.0), - Vectors.dense(1.0, 1.0, 1.0) - ) - val rdd = sc.parallelize(points) - - // K-means|| initialization should place all clusters into distinct centers because - // it will make at least five passes, and it will give non-zero probability to each - // unselected point as long as it hasn't yet selected all of them - - var model = KMeansMiniBatch.train(rdd, k=5, maxIterations=1) - assert(Set(model.clusterCenters: _*) === Set(points: _*)) - - // Iterations of Lloyd's should not change the answer either - model = KMeansMiniBatch.train(rdd, k=5, maxIterations=10) - assert(Set(model.clusterCenters: _*) === Set(points: _*)) - - // Neither should more runs - model = KMeansMiniBatch.train(rdd, k=5, maxIterations=10, runs=5) - assert(Set(model.clusterCenters: _*) === Set(points: _*)) - } - test("two clusters") { val points = Seq( Vectors.dense(0.0, 0.0), @@ -178,11 +93,13 @@ class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { Vectors.dense(9.0, 0.2), Vectors.dense(9.2, 0.0) ) - val rdd = sc.parallelize(points, 3) + + val rdd = sc.parallelize((1 to 10000).flatMap(_ => points), 3) + + rdd.persist() for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { - // Two iterations are sufficient no matter where the initial centers are. - val model = KMeansMiniBatch.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) + val model = KMeansMiniBatch.train(rdd, k = 2, batchSize=10, maxIterations = 10, runs = 1, initMode) val predicts = model.predict(rdd).collect() From cfff6ff1bda4715694c34534e93c24c3f542ea84 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Wed, 9 Jul 2014 09:28:23 -0400 Subject: [PATCH 14/14] Change KMeansMiniBatch to use takeSample() instead of sample(). Update tests accordingly. --- .../mllib/clustering/KMeansMiniBatch.scala | 10 ++++---- .../clustering/KMeansMiniBatchSuite.scala | 24 +++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala index 2f1a2e73527e4..dab509300f526 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansMiniBatch.scala @@ -43,13 +43,14 @@ class KMeansMiniBatch private ( private var batchSize: Int, private var runs: Int, private var initializationMode: String, - private var initializationSteps: Int) extends Serializable with KMeansCommons with Logging { + private var initializationSteps: Int, + private var rng: XORShiftRandom) extends Serializable with KMeansCommons with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, * batchSize: 1000, initializationMode: "k-means||", initializationSteps: 5}. */ - def this() = this(2, 20, 1, 1000, KMeansMiniBatch.K_MEANS_PARALLEL, 5) + def this() = this(2, 20, 1, 1000, KMeansMiniBatch.K_MEANS_PARALLEL, 5, new XORShiftRandom()) def setBatchSize(batchSize: Int): KMeansMiniBatch = { this.batchSize = batchSize @@ -158,14 +159,13 @@ class KMeansMiniBatch private ( // Execute iterations of Lloyd's algorithm until all runs have converged while (iteration < maxIterations) { - val sampledPoints = data.sample(false, batchSize) + val sampledPoints = data.takeSample(false, batchSize, rng.nextInt()) val groupedPoints = sampledPoints.map { p => val (center, cost) = KMeansMiniBatch.findClosest(centers, p) (center, p.vector, cost) - }.collect() - + } // Update the cluster centers and costs costs = 0.0 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala index de927b577ad5e..040adb0172bd9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansMiniBatchSuite.scala @@ -28,7 +28,7 @@ class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { import KMeans.{RANDOM, K_MEANS_PARALLEL} - val epsilon = 1e-04 + val epsilon = 1e-01 test("single cluster with big dataset") { val smallData = Array( @@ -36,25 +36,29 @@ class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { Vectors.dense(1.0, 3.0, 0.0), Vectors.dense(1.0, 4.0, 6.0) ) - val data = sc.parallelize((1 to 10).flatMap(_ => smallData), 4) + + // produce 300 data points + val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) data.persist() // result should converge to given center after a few iterations val center = Vectors.dense(1.0, 3.0, 4.0).toBreeze - var model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=RANDOM) + var model = KMeansMiniBatch.train(data, k=1, batchSize=50, maxIterations=50, runs=1, initializationMode=RANDOM) var error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) - assert(error < epsilon) + assert(error < epsilon, "Error (" + error + ") was larger than expected value of " + epsilon) - model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=K_MEANS_PARALLEL) + model = KMeansMiniBatch.train(data, k=1, batchSize=50, maxIterations=50, runs=1, initializationMode=K_MEANS_PARALLEL) error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) - assert(error < epsilon) + assert(error < epsilon, "Error (" + error + ") was larger than expected value of " + epsilon) + } test("single cluster with sparse data") { val n = 10000 + // 600 data points val data = sc.parallelize((1 to 100).flatMap { i => val x = i / 1000.0 Array( @@ -71,14 +75,14 @@ class KMeansMiniBatchSuite extends FunSuite with LocalSparkContext { val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))).toBreeze - var model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=RANDOM) + var model = KMeansMiniBatch.train(data, k=1, batchSize=100, maxIterations=50, runs=1, initializationMode=RANDOM) var error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) - assert(error < epsilon) + assert(error < epsilon, "Error (" + error + ") was larger than expected value of " + epsilon) - model = KMeansMiniBatch.train(data, k=1, batchSize=10, maxIterations=10, runs=1, initializationMode=K_MEANS_PARALLEL) + model = KMeansMiniBatch.train(data, k=1, batchSize=100, maxIterations=50, runs=1, initializationMode=K_MEANS_PARALLEL) error = breezeNorm(model.clusterCenters.head.toBreeze - center, 2.0) - assert(error < epsilon) + assert(error < epsilon, "Error (" + error + ") was larger than expected value of " + epsilon) data.unpersist()