From 7dcffd5536e729f7db903e1efb0c417a960f73e3 Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Thu, 5 Nov 2015 22:49:13 +0200 Subject: [PATCH 1/8] Add a bunch of comments to KMeans code. --- .../spark/mllib/clustering/KMeans.scala | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 7168aac32c997..2d006cf99a666 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -238,6 +238,9 @@ class KMeans private ( runs } + // Centers initialization (random, ||) + // Array of Arrays of VectorWithNorms + // there is one array of centers per run (if more than one model is trained) val centers = initialModel match { case Some(kMeansCenters) => { Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s))) @@ -254,53 +257,97 @@ class KMeans private ( logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + " seconds.") + // Initially all runs are active (active == true means the according run has not yet converged) + // Also Array of Arrays val active = Array.fill(numRuns)(true) + // Initially the costs are 0.0 for all runs + // Also Array of Arrays val costs = Array.fill(numRuns)(0.0) + // 0, 1, 2, .... nunRuns-1 - ArrayBuffer containing the remaining active runs + // Initially it contains all the runs var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) var iteration = 0 val iterationStartTime = System.nanoTime() + // Stop condition is given by + // - no more active runs (all runs converged) + // - maximum number of iterations reached // Execute iterations of Lloyd's algorithm until all runs have converged while (iteration < maxIterations && !activeRuns.isEmpty) { + type WeightedPoint = (Vector, Long) + // this is the function that will be used in the reduce phase def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { + // y += a * x + // - in this case y += x axpy(1.0, x._1, y._1) (y._1, x._2 + y._2) } + // the centers for each run still active val activeCenters = activeRuns.map(r => centers(r)).toArray + // the cost for each run - one accumulator per run val costAccums = activeRuns.map(_ => sc.accumulator(0.0)) + // broadcast the centers val bcActiveCenters = sc.broadcast(activeCenters) + // mapPartitions - Return a new RDD by applying a function to each partition of this RDD + // reduceByKey - Merge the values for each key using an associative reduce function // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => + + // we're inside the Spark magic now + // one Array of centers per each active run val thisActiveCenters = bcActiveCenters.value + // how many runs are still active val runs = thisActiveCenters.length + // how many clusters are needed (k) val k = thisActiveCenters(0).length + // the space dimension (munber of coordinates) val dims = thisActiveCenters(0)(0).vector.size + // sums are zero (per runs per each dimension) val sums = Array.fill(runs, k)(Vectors.zeros(dims)) + // counts are zero (per run per each dimension) val counts = Array.fill(runs, k)(0L) + // ++++++++++++++++++++++++++++++++++++++++++++++++++++++ + // Here we assign points to clusters + // Compute the total cost (sum of distances), sum and count + points.foreach { point => (0 until runs).foreach { i => + // WE ARE IN THE CONTEXT OF A SPECIFIC RUN HERE + // Returns the index of the closest center to the given point, as well as the squared distance. + // TODO - here we need something different for CMeans val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) + // the total cost increases costAccums(i) += cost + // add the current point to the cluster sum val sum = sums(i)(bestCenter) axpy(1.0, point.vector, sum) + // increase point count for current cluster counts(i)(bestCenter) += 1 } } + // ++++++++++++++++++++++++++++++++++++++++++++++++++++++ + + // For every run and every cluster the sum and count are emitted val contribs = for (i <- 0 until runs; j <- 0 until k) yield { ((i, j), (sums(i)(j), counts(i)(j))) } contribs.iterator + // The key is a combination of run and cluster + // reduceByKey computes the values accross clusters (sum and count) }.reduceByKey(mergeContribs).collectAsMap() + // At this point, for each run, each cluster, + // we know the sum of vectors and the number of points + // Update the cluster centers and costs for each active run for ((run, i) <- activeRuns.zipWithIndex) { var changed = false @@ -308,8 +355,11 @@ class KMeans private ( while (j < k) { val (sum, count) = totalContribs((i, j)) if (count != 0) { + // x = a * x - multiplies a vector with a scalar + // Compute new center scal(1.0 / count, sum) val newCenter = new VectorWithNorm(sum) + // Changed - (distance greater than epsilon squared) if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { changed = true } @@ -318,13 +368,17 @@ class KMeans private ( j += 1 } if (!changed) { + // Kill the run that converged already active(run) = false logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") } costs(run) = costAccums(i).value } + // remove runs no longer active (active switches to false if there are no changes + // between 2 successive iterations) activeRuns = activeRuns.filter(active(_)) + // increase number of iterations iteration += 1 } From 9d6f23894f05d0a3d2e131bab5eb3236331fee6a Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Fri, 6 Nov 2015 23:03:28 +0200 Subject: [PATCH 2/8] Create degreesOfMembership method - computes the weight for each point with respect to each cluster --- .../spark/mllib/clustering/KMeans.scala | 101 +++++++++++------- 1 file changed, 64 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 2d006cf99a666..46bdb7b81be89 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -38,14 +38,14 @@ import org.apache.spark.util.random.XORShiftRandom * to it should be cached by the user. */ @Since("0.8.0") -class KMeans private ( - private var k: Int, - private var maxIterations: Int, - private var runs: Int, - private var initializationMode: String, - private var initializationSteps: Int, - private var epsilon: Double, - private var seed: Long) extends Serializable with Logging { +class KMeans private( + private var k: Int, + private var maxIterations: Int, + private var runs: Int, + private var initializationMode: String, + private var initializationSteps: Int, + private var epsilon: Double, + private var seed: Long) extends Serializable with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, @@ -306,7 +306,7 @@ class KMeans private ( val runs = thisActiveCenters.length // how many clusters are needed (k) val k = thisActiveCenters(0).length - // the space dimension (munber of coordinates) + // the space dimension (number of coordinates) val dims = thisActiveCenters(0)(0).vector.size // sums are zero (per runs per each dimension) @@ -324,6 +324,7 @@ class KMeans private ( // Returns the index of the closest center to the given point, as well as the squared distance. // TODO - here we need something different for CMeans val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) + val membershipDegrees = KMeans.degreesOfMembership(thisActiveCenters(i), point) // the total cost increases costAccums(i) += cost // add the current point to the cluster sum @@ -448,10 +449,10 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => - Array.tabulate(runs) { r => - math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - } - }.persist(StorageLevel.MEMORY_AND_DISK) + Array.tabulate(runs) { r => + math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) + } + }.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs .aggregate(new Array[Double](runs))( seqOp = (s, v) => { @@ -537,12 +538,12 @@ object KMeans { */ @Since("1.3.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String, - seed: Long): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String, + seed: Long): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) @@ -562,11 +563,11 @@ object KMeans { */ @Since("0.8.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) @@ -579,9 +580,9 @@ object KMeans { */ @Since("0.8.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int): KMeansModel = { train(data, k, maxIterations, 1, K_MEANS_PARALLEL) } @@ -590,10 +591,10 @@ object KMeans { */ @Since("0.8.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int): KMeansModel = { train(data, k, maxIterations, runs, K_MEANS_PARALLEL) } @@ -601,8 +602,8 @@ object KMeans { * Returns the index of the closest center to the given point, as well as the squared distance. */ private[mllib] def findClosest( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): (Int, Double) = { + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): (Int, Double) = { var bestDistance = Double.PositiveInfinity var bestIndex = 0 var i = 0 @@ -623,12 +624,38 @@ object KMeans { (bestIndex, bestDistance) } + /** + * Returns the degree of membership of the point to each of the clusters + */ + private[mllib] def degreesOfMembership( + centers: Array[VectorWithNorm], + point: VectorWithNorm): Array[Double] = { + // TODO - make fuzzifier a parameter of the algorithm + val fuzzifier = 2 + + // Distances from the point to each centroid + val distances = centers map (fastSquaredDistance(_, point)) + + // If at least one of the distances is 0 + val perfectMatches = distances.count(d => d == 0.0) + if (perfectMatches > 0) { + distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0) + } else { + // Initialize membershipDegrees + val membershipDegrees = distances + membershipDegrees map (m => + 1.0 / distances.foldLeft(0.0)((s, d) => + s + Math.pow(m / d, 2.0 / (fuzzifier - 1.0)))) + } + } + + /** * Returns the K-means cost of a given point against the given cluster centers. */ private[mllib] def pointCost( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): Double = + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): Double = findClosest(centers, point)._2 /** @@ -636,8 +663,8 @@ object KMeans { * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. */ private[clustering] def fastSquaredDistance( - v1: VectorWithNorm, - v2: VectorWithNorm): Double = { + v1: VectorWithNorm, + v2: VectorWithNorm): Double = { MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) } From daac6545f157484eca4e49c9d37e83f7acaa1c40 Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Sun, 8 Nov 2015 14:58:32 +0200 Subject: [PATCH 3/8] Add distances to degreesOfMembership method, switch to it instead of findClosest, convert counts to Double, rename it accordingly --- .../spark/mllib/clustering/KMeans.scala | 247 +++++++++--------- 1 file changed, 127 insertions(+), 120 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 46bdb7b81be89..d4bdc01ac3315 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -30,13 +30,13 @@ import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom /** - * K-means clustering with support for multiple parallel runs and a k-means++ like initialization - * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, - * they are executed together with joint passes over the data for efficiency. - * - * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given - * to it should be cached by the user. - */ + * K-means clustering with support for multiple parallel runs and a k-means++ like initialization + * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, + * they are executed together with joint passes over the data for efficiency. + * + * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given + * to it should be cached by the user. + */ @Since("0.8.0") class KMeans private( private var k: Int, @@ -48,21 +48,21 @@ class KMeans private( private var seed: Long) extends Serializable with Logging { /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}. - */ + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, + * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}. + */ @Since("0.8.0") def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong()) /** - * Number of clusters to create (k). - */ + * Number of clusters to create (k). + */ @Since("1.4.0") def getK: Int = k /** - * Set the number of clusters to create (k). Default: 2. - */ + * Set the number of clusters to create (k). Default: 2. + */ @Since("0.8.0") def setK(k: Int): this.type = { this.k = k @@ -70,14 +70,14 @@ class KMeans private( } /** - * Maximum number of iterations to run. - */ + * Maximum number of iterations to run. + */ @Since("1.4.0") def getMaxIterations: Int = maxIterations /** - * Set maximum number of iterations to run. Default: 20. - */ + * Set maximum number of iterations to run. Default: 20. + */ @Since("0.8.0") def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations @@ -85,16 +85,16 @@ class KMeans private( } /** - * The initialization algorithm. This can be either "random" or "k-means||". - */ + * The initialization algorithm. This can be either "random" or "k-means||". + */ @Since("1.4.0") def getInitializationMode: String = initializationMode /** - * Set the initialization algorithm. This can be either "random" to choose random points as - * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ - * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. - */ + * Set the initialization algorithm. This can be either "random" to choose random points as + * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ + * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. + */ @Since("0.8.0") def setInitializationMode(initializationMode: String): this.type = { KMeans.validateInitMode(initializationMode) @@ -103,19 +103,19 @@ class KMeans private( } /** - * :: Experimental :: - * Number of runs of the algorithm to execute in parallel. - */ + * :: Experimental :: + * Number of runs of the algorithm to execute in parallel. + */ @Since("1.4.0") @Experimental def getRuns: Int = runs /** - * :: Experimental :: - * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm - * this many times with random starting conditions (configured by the initialization mode), then - * return the best clustering found over any run. Default: 1. - */ + * :: Experimental :: + * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm + * this many times with random starting conditions (configured by the initialization mode), then + * return the best clustering found over any run. Default: 1. + */ @Since("0.8.0") @Experimental def setRuns(runs: Int): this.type = { @@ -127,15 +127,15 @@ class KMeans private( } /** - * Number of steps for the k-means|| initialization mode - */ + * Number of steps for the k-means|| initialization mode + */ @Since("1.4.0") def getInitializationSteps: Int = initializationSteps /** - * Set the number of steps for the k-means|| initialization mode. This is an advanced - * setting -- the default of 5 is almost always enough. Default: 5. - */ + * Set the number of steps for the k-means|| initialization mode. This is an advanced + * setting -- the default of 5 is almost always enough. Default: 5. + */ @Since("0.8.0") def setInitializationSteps(initializationSteps: Int): this.type = { if (initializationSteps <= 0) { @@ -146,15 +146,15 @@ class KMeans private( } /** - * The distance threshold within which we've consider centers to have converged. - */ + * The distance threshold within which we've consider centers to have converged. + */ @Since("1.4.0") def getEpsilon: Double = epsilon /** - * Set the distance threshold within which we've consider centers to have converged. - * If all centers move less than this Euclidean distance, we stop iterating one run. - */ + * Set the distance threshold within which we've consider centers to have converged. + * If all centers move less than this Euclidean distance, we stop iterating one run. + */ @Since("0.8.0") def setEpsilon(epsilon: Double): this.type = { this.epsilon = epsilon @@ -162,14 +162,14 @@ class KMeans private( } /** - * The random seed for cluster initialization. - */ + * The random seed for cluster initialization. + */ @Since("1.4.0") def getSeed: Long = seed /** - * Set the random seed for cluster initialization. - */ + * Set the random seed for cluster initialization. + */ @Since("1.4.0") def setSeed(seed: Long): this.type = { this.seed = seed @@ -181,10 +181,10 @@ class KMeans private( private var initialModel: Option[KMeansModel] = None /** - * Set the initial starting point, bypassing the random initialization or k-means|| - * The condition model.k == this.k must be met, failure results - * in an IllegalArgumentException. - */ + * Set the initial starting point, bypassing the random initialization or k-means|| + * The condition model.k == this.k must be met, failure results + * in an IllegalArgumentException. + */ @Since("1.4.0") def setInitialModel(model: KMeansModel): this.type = { require(model.k == k, "mismatched cluster count") @@ -193,9 +193,9 @@ class KMeans private( } /** - * Train a K-means model on the given set of points; `data` should be cached for high - * performance, because this is an iterative algorithm. - */ + * Train a K-means model on the given set of points; `data` should be cached for high + * performance, because this is an iterative algorithm. + */ @Since("0.8.0") def run(data: RDD[Vector]): KMeansModel = { @@ -222,8 +222,8 @@ class KMeans private( } /** - * Implementation of K-Means algorithm. - */ + * Implementation of K-Means algorithm. + */ private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = { val sc = data.sparkContext @@ -277,7 +277,7 @@ class KMeans private( // Execute iterations of Lloyd's algorithm until all runs have converged while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) + type WeightedPoint = (Vector, Double) // this is the function that will be used in the reduce phase def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { // y += a * x @@ -311,8 +311,8 @@ class KMeans private( // sums are zero (per runs per each dimension) val sums = Array.fill(runs, k)(Vectors.zeros(dims)) - // counts are zero (per run per each dimension) - val counts = Array.fill(runs, k)(0L) + // fuzzyCounts are zero (per run per each dimension) + val fuzzyCounts = Array.fill(runs, k)(0.0) // ++++++++++++++++++++++++++++++++++++++++++++++++++++++ // Here we assign points to clusters @@ -321,17 +321,23 @@ class KMeans private( points.foreach { point => (0 until runs).foreach { i => // WE ARE IN THE CONTEXT OF A SPECIFIC RUN HERE - // Returns the index of the closest center to the given point, as well as the squared distance. - // TODO - here we need something different for CMeans - val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) - val membershipDegrees = KMeans.degreesOfMembership(thisActiveCenters(i), point) - // the total cost increases - costAccums(i) += cost - // add the current point to the cluster sum - val sum = sums(i)(bestCenter) - axpy(1.0, point.vector, sum) - // increase point count for current cluster - counts(i)(bestCenter) += 1 + // Returns the index of the closest center to the given point, + // as well as the squared distance. + // val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) + val (mbrpDegree, distances) = KMeans.degreesOfMembership(thisActiveCenters(i), point) + // compute membership based cost - ignore "almost zeros" + mbrpDegree.zipWithIndex. + filter(_._1 > epsilon * epsilon). + foreach { degreeWithIndex => + val (deg, ind) = degreeWithIndex + // the total cost increases + costAccums(i) += deg * distances(ind) + // add the current point to the cluster sum + val sum = sums(i)(ind) + axpy(deg, point.vector, sum) + // increase point count for current cluster + fuzzyCounts(i)(ind) += deg + } } } @@ -339,7 +345,7 @@ class KMeans private( // For every run and every cluster the sum and count are emitted val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) + ((i, j), (sums(i)(j), fuzzyCounts(i)(j))) } contribs.iterator // The key is a combination of run and cluster @@ -400,8 +406,8 @@ class KMeans private( } /** - * Initialize `runs` sets of cluster centers at random. - */ + * Initialize `runs` sets of cluster centers at random. + */ private def initRandom(data: RDD[VectorWithNorm]) : Array[Array[VectorWithNorm]] = { // Sample all the cluster centers in one pass to avoid repeated scans @@ -412,14 +418,14 @@ class KMeans private( } /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. - * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find with dissimilar cluster centers by starting with a random center and then doing - * passes where more centers are chosen with probability proportional to their squared distance - * to the current cluster set. It results in a provable approximation to an optimal clustering. - * - * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. - */ + * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries + * to find with dissimilar cluster centers by starting with a random center and then doing + * passes where more centers are chosen with probability proportional to their squared distance + * to the current cluster set. It results in a provable approximation to an optimal clustering. + * + * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. + */ private def initKMeansParallel(data: RDD[VectorWithNorm]) : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. @@ -515,8 +521,8 @@ class KMeans private( /** - * Top-level methods for calling K-means clustering. - */ + * Top-level methods for calling K-means clustering. + */ @Since("0.8.0") object KMeans { @@ -527,15 +533,15 @@ object KMeans { val K_MEANS_PARALLEL = "k-means||" /** - * Trains a k-means model using the given set of parameters. - * - * @param data training points stored as `RDD[Vector]` - * @param k number of clusters - * @param maxIterations max number of iterations - * @param runs number of parallel runs, defaults to 1. The best model is returned. - * @param initializationMode initialization model, either "random" or "k-means||" (default). - * @param seed random seed value for cluster initialization - */ + * Trains a k-means model using the given set of parameters. + * + * @param data training points stored as `RDD[Vector]` + * @param k number of clusters + * @param maxIterations max number of iterations + * @param runs number of parallel runs, defaults to 1. The best model is returned. + * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @param seed random seed value for cluster initialization + */ @Since("1.3.0") def train( data: RDD[Vector], @@ -553,14 +559,14 @@ object KMeans { } /** - * Trains a k-means model using the given set of parameters. - * - * @param data training points stored as `RDD[Vector]` - * @param k number of clusters - * @param maxIterations max number of iterations - * @param runs number of parallel runs, defaults to 1. The best model is returned. - * @param initializationMode initialization model, either "random" or "k-means||" (default). - */ + * Trains a k-means model using the given set of parameters. + * + * @param data training points stored as `RDD[Vector]` + * @param k number of clusters + * @param maxIterations max number of iterations + * @param runs number of parallel runs, defaults to 1. The best model is returned. + * @param initializationMode initialization model, either "random" or "k-means||" (default). + */ @Since("0.8.0") def train( data: RDD[Vector], @@ -576,8 +582,8 @@ object KMeans { } /** - * Trains a k-means model using specified parameters and the default values for unspecified. - */ + * Trains a k-means model using specified parameters and the default values for unspecified. + */ @Since("0.8.0") def train( data: RDD[Vector], @@ -587,8 +593,8 @@ object KMeans { } /** - * Trains a k-means model using specified parameters and the default values for unspecified. - */ + * Trains a k-means model using specified parameters and the default values for unspecified. + */ @Since("0.8.0") def train( data: RDD[Vector], @@ -599,8 +605,8 @@ object KMeans { } /** - * Returns the index of the closest center to the given point, as well as the squared distance. - */ + * Returns the index of the closest center to the given point, as well as the squared distance. + */ private[mllib] def findClosest( centers: TraversableOnce[VectorWithNorm], point: VectorWithNorm): (Int, Double) = { @@ -625,11 +631,12 @@ object KMeans { } /** - * Returns the degree of membership of the point to each of the clusters - */ + * Returns the degree of membership of the point to each of the clusters + * Along with the array of distances from the point to each centroid + */ private[mllib] def degreesOfMembership( centers: Array[VectorWithNorm], - point: VectorWithNorm): Array[Double] = { + point: VectorWithNorm): (Array[Double], Array[Double]) = { // TODO - make fuzzifier a parameter of the algorithm val fuzzifier = 2 @@ -639,29 +646,29 @@ object KMeans { // If at least one of the distances is 0 val perfectMatches = distances.count(d => d == 0.0) if (perfectMatches > 0) { - distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0) + (distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0), distances) } else { // Initialize membershipDegrees val membershipDegrees = distances - membershipDegrees map (m => + (membershipDegrees map (m => 1.0 / distances.foldLeft(0.0)((s, d) => - s + Math.pow(m / d, 2.0 / (fuzzifier - 1.0)))) + s + Math.pow(m / d, 2.0 / (fuzzifier - 1.0)))), distances) } } /** - * Returns the K-means cost of a given point against the given cluster centers. - */ + * Returns the K-means cost of a given point against the given cluster centers. + */ private[mllib] def pointCost( centers: TraversableOnce[VectorWithNorm], point: VectorWithNorm): Double = findClosest(centers, point)._2 /** - * Returns the squared Euclidean distance between two vectors computed by - * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. - */ + * Returns the squared Euclidean distance between two vectors computed by + * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. + */ private[clustering] def fastSquaredDistance( v1: VectorWithNorm, v2: VectorWithNorm): Double = { @@ -678,10 +685,10 @@ object KMeans { } /** - * A vector with its norm for fast distance computation. - * - * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] - */ + * A vector with its norm for fast distance computation. + * + * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] + */ private[clustering] class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable { From d4dc4beb7369b29fc6baae3b8ae92df6b2e92c96 Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Sun, 8 Nov 2015 15:57:22 +0200 Subject: [PATCH 4/8] Adjustments for centroid computation. Multiple runs for "two clusters" test (with a high value for the fuzzyfier and a bad choice of initial centroids the algorithm fails sometimes) --- .../apache/spark/mllib/clustering/KMeans.scala | 17 +++++++---------- .../spark/mllib/clustering/KMeansSuite.scala | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index d4bdc01ac3315..a8f6d60e52076 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -321,9 +321,6 @@ class KMeans private( points.foreach { point => (0 until runs).foreach { i => // WE ARE IN THE CONTEXT OF A SPECIFIC RUN HERE - // Returns the index of the closest center to the given point, - // as well as the squared distance. - // val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) val (mbrpDegree, distances) = KMeans.degreesOfMembership(thisActiveCenters(i), point) // compute membership based cost - ignore "almost zeros" mbrpDegree.zipWithIndex. @@ -360,11 +357,11 @@ class KMeans private( var changed = false var j = 0 while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { + val (sum, fuzzyCount) = totalContribs((i, j)) + if (fuzzyCount != 0) { // x = a * x - multiplies a vector with a scalar // Compute new center - scal(1.0 / count, sum) + scal(1.0 / fuzzyCount, sum) val newCenter = new VectorWithNorm(sum) // Changed - (distance greater than epsilon squared) if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { @@ -638,7 +635,7 @@ object KMeans { centers: Array[VectorWithNorm], point: VectorWithNorm): (Array[Double], Array[Double]) = { // TODO - make fuzzifier a parameter of the algorithm - val fuzzifier = 2 + val fuzzifier = 25 // Distances from the point to each centroid val distances = centers map (fastSquaredDistance(_, point)) @@ -649,10 +646,10 @@ object KMeans { (distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0), distances) } else { // Initialize membershipDegrees + def fuzzyMembership: (Double) => Double = x => + 1.0 / distances.foldLeft(0.0)((s, d) => s + Math.pow(x / d, 2.0 / (fuzzifier - 1.0))) val membershipDegrees = distances - (membershipDegrees map (m => - 1.0 / distances.foldLeft(0.0)((s, d) => - s + Math.pow(m / d, 2.0 / (fuzzifier - 1.0)))), distances) + (membershipDegrees map (m => Math.pow(fuzzyMembership(m), fuzzifier)), distances) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 3003c62d9876c..6e65cb668d37d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -250,7 +250,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { // Two iterations are sufficient no matter where the initial centers are. - val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode) + val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 10, initMode) val predicts = model.predict(rdd).collect() From b28fcf4285d489fa051657ca80e21f51070d359f Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Sun, 8 Nov 2015 16:31:25 +0200 Subject: [PATCH 5/8] Add fuzzifier (m) as parameter, default to Hard Clustering for m==1 --- .../spark/mllib/clustering/KMeans.scala | 68 ++++++++++++++----- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index a8f6d60e52076..e1c6db6e295a2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -40,6 +40,7 @@ import org.apache.spark.util.random.XORShiftRandom @Since("0.8.0") class KMeans private( private var k: Int, + private var m: Double, private var maxIterations: Int, private var runs: Int, private var initializationMode: String, @@ -48,11 +49,11 @@ class KMeans private( private var seed: Long) extends Serializable with Logging { /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, + * Constructs a KMeans instance with default parameters: {k: 2, m: 1, maxIterations: 20, runs: 1, * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}. */ @Since("0.8.0") - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong()) + def this() = this(2, 1, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong()) /** * Number of clusters to create (k). @@ -69,6 +70,22 @@ class KMeans private( this } + /** + * The level of cluster fuzziness (m) + * Check -> https://en.wikipedia.org/wiki/Fuzzy_clustering + */ + @Since("1.6.0") + def getM: Double = m + + /** + * Set the level of cluster fuzziness (m). Default: 1 (hard clustering) + */ + @Since("1.6.0") + def setM(m: Double): this.type = { + this.m = m + this + } + /** * Maximum number of iterations to run. */ @@ -294,6 +311,9 @@ class KMeans private( // broadcast the centers val bcActiveCenters = sc.broadcast(activeCenters) + // broadcast the fuzzifier + val bcM = sc.broadcast(m) + // mapPartitions - Return a new RDD by applying a function to each partition of this RDD // reduceByKey - Merge the values for each key using an associative reduce function // Find the sum and count of points mapping to each center @@ -308,6 +328,8 @@ class KMeans private( val k = thisActiveCenters(0).length // the space dimension (number of coordinates) val dims = thisActiveCenters(0)(0).vector.size + // the level of cluster fuzziness + val m = bcM.value // sums are zero (per runs per each dimension) val sums = Array.fill(runs, k)(Vectors.zeros(dims)) @@ -321,7 +343,7 @@ class KMeans private( points.foreach { point => (0 until runs).foreach { i => // WE ARE IN THE CONTEXT OF A SPECIFIC RUN HERE - val (mbrpDegree, distances) = KMeans.degreesOfMembership(thisActiveCenters(i), point) + val (mbrpDegree, distances) = KMeans.degreesOfMembership(thisActiveCenters(i), point, m) // compute membership based cost - ignore "almost zeros" mbrpDegree.zipWithIndex. filter(_._1 > epsilon * epsilon). @@ -633,23 +655,35 @@ object KMeans { */ private[mllib] def degreesOfMembership( centers: Array[VectorWithNorm], - point: VectorWithNorm): (Array[Double], Array[Double]) = { - // TODO - make fuzzifier a parameter of the algorithm - val fuzzifier = 25 + point: VectorWithNorm, + fuzzifier: Double): (Array[Double], Array[Double]) = { - // Distances from the point to each centroid - val distances = centers map (fastSquaredDistance(_, point)) + if (fuzzifier == 1) { - // If at least one of the distances is 0 - val perfectMatches = distances.count(d => d == 0.0) - if (perfectMatches > 0) { - (distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0), distances) - } else { - // Initialize membershipDegrees - def fuzzyMembership: (Double) => Double = x => - 1.0 / distances.foldLeft(0.0)((s, d) => s + Math.pow(x / d, 2.0 / (fuzzifier - 1.0))) + // This is classical hard clustering + val (bestIndex, bestDistance) = findClosest(centers, point) + val distances = Array.fill(centers.length)(0.0) val membershipDegrees = distances - (membershipDegrees map (m => Math.pow(fuzzyMembership(m), fuzzifier)), distances) + distances(bestIndex) = bestDistance + membershipDegrees(bestIndex) = 1 + (membershipDegrees, distances) + + } else { + + // Distances from the point to each centroid + val distances = centers map (fastSquaredDistance(_, point)) + + // If at least one of the distances is 0 + val perfectMatches = distances.count(d => d == 0.0) + if (perfectMatches > 0) { + (distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0), distances) + } else { + // Initialize membershipDegrees + def fuzzyMembership: (Double) => Double = x => + 1.0 / distances.foldLeft(0.0)((s, d) => s + Math.pow(x / d, 2.0 / (fuzzifier - 1.0))) + val membershipDegrees = distances + (membershipDegrees map (m => Math.pow(fuzzyMembership(m), fuzzifier)), distances) + } } } From eed6195d106e2c9448e1c0083abf1ab9034bc618 Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Sun, 8 Nov 2015 21:59:32 +0200 Subject: [PATCH 6/8] Optimizations for membership computation --- .../org/apache/spark/mllib/clustering/KMeans.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index e1c6db6e295a2..f93ab6ad71726 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -673,16 +673,16 @@ object KMeans { // Distances from the point to each centroid val distances = centers map (fastSquaredDistance(_, point)) - // If at least one of the distances is 0 val perfectMatches = distances.count(d => d == 0.0) if (perfectMatches > 0) { + // If at least one of the distances is 0 the membershib divides between + // the perfect mathces (distances map (d => if (d == 0.0) 1.0 / perfectMatches else 0.0), distances) } else { - // Initialize membershipDegrees - def fuzzyMembership: (Double) => Double = x => - 1.0 / distances.foldLeft(0.0)((s, d) => s + Math.pow(x / d, 2.0 / (fuzzifier - 1.0))) - val membershipDegrees = distances - (membershipDegrees map (m => Math.pow(fuzzyMembership(m), fuzzifier)), distances) + // Standard formula + val pow = 2.0 / (fuzzifier - 1.0) + val denom = distances.foldLeft(0.0)((sum, dik) => sum + Math.pow(1 / dik, pow)) + (distances map (dij => 1 / (Math.pow(dij, pow) * denom)), distances) } } } From 2c0f7a66a55bf187d1a7d93cb4037849403ee414 Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Thu, 12 Nov 2015 22:34:15 +0200 Subject: [PATCH 7/8] Publish m as train parameter, add some fuzzier tests. --- .../spark/mllib/clustering/KMeans.scala | 32 ++++++++++ .../spark/mllib/clustering/KMeansSuite.scala | 64 ++++++++++++++++++- 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 72e67a0f313c3..921e694ba3ce1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -82,6 +82,9 @@ class KMeans private( */ @Since("1.6.0") def setM(m: Double): this.type = { + if (m <= 1) { + throw new IllegalArgumentException("Fuzzifier must be greater than 1!") + } this.m = m this } @@ -551,6 +554,35 @@ object KMeans { @Since("0.8.0") val K_MEANS_PARALLEL = "k-means||" + /** + * Trains a k-means model using the given set of parameters. + * + * @param data training points stored as `RDD[Vector]` + * @param k number of clusters + * @param maxIterations max number of iterations + * @param runs number of parallel runs, defaults to 1. The best model is returned. + * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @param seed random seed value for cluster initialization + * @param m fuzzifier, between 1 and infinity, default is 1, which leads to hard clustering + */ + @Since("1.6.0") + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String, + seed: Long, + m: Double): KMeansModel = { + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .setRuns(runs) + .setInitializationMode(initializationMode) + .setSeed(seed) + .setM(m) + .run(data) + } + /** * Trains a k-means model using the given set of parameters. * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 6e65cb668d37d..7e8def2fbc2de 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -62,6 +62,27 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { model = KMeans.train( data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) + + // Fuzzier models + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 3.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL, + seed = Utils.random.nextLong(), m = 3.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) } test("no distinct points") { @@ -76,6 +97,12 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // Make sure code runs. var model = KMeans.train(data, k = 2, maxIterations = 1) assert(model.clusterCenters.size === 2) + + // Fuzzier models + model = KMeans.train( + data, k = 2, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.size === 2) } test("more clusters than points") { @@ -88,6 +115,12 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // Make sure code runs. var model = KMeans.train(data, k = 3, maxIterations = 1) assert(model.clusterCenters.size === 3) + + // Fuzzier models + model = KMeans.train( + data, k = 3, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.size === 3) } test("deterministic initialization") { @@ -146,6 +179,23 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) + + // Fuzzier models + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + model = KMeans.train( + data, k = 1, maxIterations = 10, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + model = KMeans.train( + data, k = 1, maxIterations = 15, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + } test("single cluster with sparse data") { @@ -192,6 +242,18 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { initializationMode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) + // Fuzzier models + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + model = KMeans.train( + data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM, + seed = Utils.random.nextLong(), m = 3.0) + assert(model.clusterCenters.head ~== center absTol 1E-5) + + data.unpersist() } @@ -295,7 +357,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { .setMaxIterations(0) .setInitialModel(initialModel) .run(rdd) - // comparing the returned model and the initial model + // comparing the returned model and the initial model assert(returnModel.clusterCenters(0) === initialModel.clusterCenters(0)) assert(returnModel.clusterCenters(1) === initialModel.clusterCenters(1)) } From 30d6f2f1b59eb52705f946d6691c205a31afffbb Mon Sep 17 00:00:00 2001 From: Adrian Florea Date: Fri, 13 Nov 2015 21:29:58 +0200 Subject: [PATCH 8/8] Add more fuzzier tests. --- .../spark/mllib/clustering/KMeansSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 7e8def2fbc2de..00690b8ed91ed 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -297,6 +297,27 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { model = KMeans.train(rdd, k = 5, maxIterations = 10, runs = 5) assert(model.clusterCenters.sortBy(VectorWithCompare(_)) .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) + + // Fuzzier models + model = KMeans.train( + rdd, k = 5, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.sortBy(VectorWithCompare(_)) + .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) + + // Iterations of Lloyd's should not change the answer either + model = KMeans.train( + rdd, k = 5, maxIterations = 10, runs = 1, initializationMode = K_MEANS_PARALLEL, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.sortBy(VectorWithCompare(_)) + .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) + + // Neither should more runs + model = KMeans.train( + rdd, k = 5, maxIterations = 10, runs = 5, initializationMode = K_MEANS_PARALLEL, + seed = Utils.random.nextLong(), m = 2.0) + assert(model.clusterCenters.sortBy(VectorWithCompare(_)) + .zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5)) } test("two clusters") { @@ -313,14 +334,23 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { // Two iterations are sufficient no matter where the initial centers are. val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 10, initMode) + val fuzzyModel = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 10, initMode, + seed = Utils.random.nextLong(), m = 2.0) val predicts = model.predict(rdd).collect() + val fuzzyPredicts = model.predict(rdd).collect() assert(predicts(0) === predicts(1)) assert(predicts(0) === predicts(2)) assert(predicts(3) === predicts(4)) assert(predicts(3) === predicts(5)) assert(predicts(0) != predicts(3)) + + assert(fuzzyPredicts(0) === fuzzyPredicts(1)) + assert(fuzzyPredicts(0) === fuzzyPredicts(2)) + assert(fuzzyPredicts(3) === fuzzyPredicts(4)) + assert(fuzzyPredicts(3) === fuzzyPredicts(5)) + assert(fuzzyPredicts(0) != fuzzyPredicts(3)) } }