From d422be759b967ab5c4bef0f6a34aa53f1a3a4c77 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 9 Nov 2015 00:13:21 -0800 Subject: [PATCH] refactor --- .../mllib/clustering/BisectingKMeans.scala | 771 ++++++++---------- .../clustering/BisectingKMeansModel.scala | 73 +- .../clustering/JavaBisectingKMeansSuite.java | 83 +- .../BisectingKMeansModelSuite.scala | 129 --- .../clustering/BisectingKMeansSuite.scala | 296 ++++--- 5 files changed, 537 insertions(+), 815 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansModelSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 1601d6c84e217..9a7916f75dcc7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -17,53 +17,45 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{Vector => BV, SparseVector => BSV, norm => breezeNorm} +import java.util.Random -import org.apache.spark.util.random.XORShiftRandom -import org.apache.spark.{Logging, SparkException} -import org.apache.spark.annotation.Since -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.rdd.RDD +import scala.collection.mutable +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** - * This is a divisive hierarchical clustering algorithm based on bisecting k-means algorithm. - * - * The main idea of this algorithm is based on "A comparison of document clustering techniques", - * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 2000. - * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf - * - * However, we modified it to fit for Spark. This algorithm consists of the two main parts. + * A bisecting k-means algorithm based on the paper "A comparison of document clustering techniques" + * by Steinbach, Karypis, and Kumar, with modification to fit Spark. + * The algorithm starts from a single cluster that contains all points. + * Iteratively it finds divisible clusters on the bottom level and bisects each of them using + * k-means, until there are `k` leaf clusters in total or no leaf clusters are divisible. + * The bisecting steps of clusters on the same level are grouped together to increase parallelism. + * If bisecting all divisible clusters on the bottom level would result more than `k` leaf clusters, + * larger clusters get higher priority. * - * 1. Split clusters until the number of clusters will be enough to build a cluster tree - * 2. Build a cluster tree as a binary tree by the splitted clusters + * @param k the desired number of leaf clusters (default: 4). The actual number could be smaller if + * there are no divisible leaf clusters. + * @param maxIterations the max number of k-means iterations to split clusters (default: 20) + * @param minDivisibleClusterSize the minimum number of points (if >= 1.0) or the minimum proportion + * of points (if < 1.0) of a divisible cluster (default: 1) + * @param seed a random seed (default: hash value of the class name) * - * First, it splits clusters to their children clusters step by step, not considering a cluster - * will be included in the final cluster tree or not. That's because it makes the algorithm more - * efficient on Spark and splitting a cluster one by one is very slow. It will keep splitting until - * the number of clusters will be enough to build a cluster tree. Otherwise, it will stop splitting - * when there are no dividable clusters before the number of clusters will be sufficient. And - * it calculates the costs, such as average cost, entropy and so on, for building a cluster - * tree in the first part. The costs means how large the cluster is. That is, the cluster - * whose cost is maximum of all the clusters is the largest cluster. - * - * Second, it builds a cluster tree as a binary tree by the result of the first part. - * First of all, the cluster tree starts with only the root cluster which includes all points. - * So, there are two candidates which can be merged to the cluster tree. Those are the children of - * the root. Then, it picks up the larger child of the two and merge it to the cluster tree. - * After that, there are tree candidates to merge. Those are the smaller child of the root and - * the two children of the larger cluster of the root. It picks up the largest cluster of the tree - * and merge it to the * cluster tree. Like this, it continues to pick up the largest one of the - * candidates and merge it to the cluster tree until the desired number of clusters is reached. - * - * @param k tne desired number of clusters - * @param maxIterations the number of maximal iterations to split clusters - * @param seed a random seed + * @see [[http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf + * Steinbach, Karypis, and Kumar, A comparison of document clustering techniques, + * KDD Workshop on Text Mining, 2000.]] */ @Since("1.6.0") +@Experimental class BisectingKMeans private ( private var k: Int, private var maxIterations: Int, + private var minDivisibleClusterSize: Double, private var seed: Long) extends Logging { import BisectingKMeans._ @@ -72,34 +64,62 @@ class BisectingKMeans private ( * Constructs with the default configuration */ @Since("1.6.0") - def this() = this(2, 20, 1) + def this() = this(4, 20, 1.0, classOf[BisectingKMeans].getName.##) /** - * Sets the number of clusters you want + * Sets the desired number of leaf clusters (default: 4). + * The actual number could be smaller if there are no divisible leaf clusters. */ @Since("1.6.0") def setK(k: Int): this.type = { + require(k > 0, s"k must be positive but got $k.") this.k = k this } + /** + * Gets the desired number of leaf clusters. + */ @Since("1.6.0") def getK: Int = this.k /** - * Sets the number of maximal iterations in each clustering step + * Sets the max number of k-means iterations to split clusters (default: 20). */ @Since("1.6.0") def setMaxIterations(maxIterations: Int): this.type = { + require(maxIterations > 0, s"maxIterations must be positive but got $maxIterations.") this.maxIterations = maxIterations this } + /** + * Gets the max number of k-means iterations to split clusters. + */ @Since("1.6.0") def getMaxIterations: Int = this.maxIterations /** - * Sets the random seed + * Sets the minimum number of points (if >= `1.0`) or the minimum proportion of points + * (if < `1.0`) of a divisible cluster (default: 1). + */ + @Since("1.6.0") + def setMinDivisibleClusterSize(minDivisibleClusterSize: Double): this.type = { + require(minDivisibleClusterSize > 0.0, + s"minDivisibleClusterSize must be positive but got $minDivisibleClusterSize.") + this.minDivisibleClusterSize = minDivisibleClusterSize + this + } + + /** + * Gets the minimum number of points (if >= `1.0`) or the minimum proportion of points + * (if < `1.0`) of a divisible cluster. + */ + @Since("1.6.0") + def getMinDivisibleClusterSize: Double = minDivisibleClusterSize + + /** + * Sets the random seed (default: hash value of the class name). */ @Since("1.6.0") def setSeed(seed: Long): this.type = { @@ -107,478 +127,363 @@ class BisectingKMeans private ( this } + /** + * Gets the random seed. + */ @Since("1.6.0") def getSeed: Long = this.seed /** - * Runs the bisecting k-means algorithm + * Runs the bisecting k-means algorithm. * @param input RDD of vectors * @return model for the bisecting kmeans */ @Since("1.6.0") def run(input: RDD[Vector]): BisectingKMeansModel = { - val sc = input.sparkContext - val startTime = System.currentTimeMillis() - var data = initData(input).cache() - var updatedDataHistory = Array.empty[RDD[(Long, BV[Double])]] - - // `clusterStats` is described as binary tree structure as Map - // `clusterStats(1)` means the root of a binary tree - // `clusterStats(2n)` and `clusterStats(2n+1)` are the children of `clusterStats(n)` - var leafClusterStats = summarizeClusters(data) - var dividableLeafClusters = leafClusterStats.filter(_._2.isDividable) - var clusterStats = leafClusterStats - - // the minimum number of nodes of a binary tree by given parameter - var step = 1 - val numNodeLimit = getMinimumNumNodesInTree(this.k) - // divide clusters until the number of clusters reachs the condition - // or there is no dividable cluster - while (clusterStats.size < numNodeLimit && dividableLeafClusters.nonEmpty) { - logInfo(s"${sc.appName} starts step ${step}") - - // can be clustered if the number of divided clusterStats is equal to 0 - // TODO Remove non-leaf cluster stats from `leafClusterStats` - val dividedData = divideClusters(data, dividableLeafClusters, maxIterations, seed).cache() - leafClusterStats = summarizeClusters(dividedData) - dividableLeafClusters = leafClusterStats.filter(_._2.isDividable) - clusterStats = clusterStats ++ leafClusterStats - - // keep recent 2 cached RDDs in order to run more quickly - updatedDataHistory = updatedDataHistory ++ Array(dividedData) - data = dividedData - step += 1 - if (updatedDataHistory.length > 1) { - val head = updatedDataHistory.head - updatedDataHistory = updatedDataHistory.tail - head.unpersist() - } + if (input.getStorageLevel == StorageLevel.NONE) { + logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if" + + " its parent RDDs are also not cached.") } - // create a map of cluster node with their costs - val nodes = createClusterNodes(data, clusterStats) - // unpersist RDDs - data.unpersist() - updatedDataHistory.foreach(_.unpersist()) - - // build a cluster tree by Map class which is expressed - logInfo(s"Building the cluster tree is started in ${sc.appName}") - val root = buildTree(nodes, ROOT_INDEX_KEY, this.k) - if (root.isEmpty) { - new SparkException("Failed to build a cluster tree from a Map type of clusterStats") + val d = input.map(_.size).first() + logInfo(s"Feature dimension: $d.") + // Compute and cache vector norms for fast distance computation. + val norms = input.map(v => Vectors.norm(v, 2.0)).persist(StorageLevel.MEMORY_AND_DISK) + val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) } + var assignments = vectors.map(v => (ROOT_INDEX, v)) + var activeClusters = summarize(d, assignments) + val rootSummary = activeClusters(ROOT_INDEX) + val n = rootSummary.size + logInfo(s"Number of points: $n.") + logInfo(s"Initial cost: ${rootSummary.cost}.") + val minSize = if (minDivisibleClusterSize >= 1.0) { + math.ceil(minDivisibleClusterSize).toLong + } else { + math.ceil(minDivisibleClusterSize * n).toLong } - - // set the elapsed time for training - val finishTime = (System.currentTimeMillis() - startTime) / 1000.0 - logInfo(s"Elapsed Time for ${this.getClass.getSimpleName} Training: ${finishTime} [sec]") - - // make a bisecting kmeans model - val model = new BisectingKMeansModel(root.get) - val leavesNodes = model.getClusters - if (leavesNodes.length < this.k) { - logWarning(s"# clusters is less than you want: ${leavesNodes.length} / ${k}") + logInfo(s"The minimum number of points of a divisible cluster is $minSize.") + var inactiveClusters = mutable.Seq.empty[(Long, ClusterSummary)] + val random = new Random(seed) + var numLeafClustersNeeded = k - 1 + var level = 1 + while (activeClusters.nonEmpty && numLeafClustersNeeded > 0 && level < 63) { + // Divisible clusters are sufficiently large and have non-trivial cost. + var divisibleClusters = activeClusters.filter { case (_, summary) => + (summary.size >= minSize) && (summary.cost > MLUtils.EPSILON * summary.size) + } + // If we don't need all divisible clusters, take the larger ones. + if (divisibleClusters.size > numLeafClustersNeeded) { + divisibleClusters = divisibleClusters.toSeq.sortBy { case (_, summary) => + -summary.size + }.take(numLeafClustersNeeded) + .toMap + } + if (divisibleClusters.nonEmpty) { + val divisibleIndices = divisibleClusters.keys.toSet + logInfo(s"Dividing ${divisibleIndices.size} clusters on level $level.") + var newClusterCenters = divisibleClusters.flatMap { case (index, summary) => + val (left, right) = splitCenter(summary.center, random) + Iterator((leftChildIndex(index), left), (rightChildIndex(index), right)) + }.map(identity) // workaround for a Scala bug (SI-7005) that produces a not serializable map + var newClusters: Map[Long, ClusterSummary] = null + var newAssignments: RDD[(Long, VectorWithNorm)] = null + for (iter <- 0 until maxIterations) { + newAssignments = updateAssignments(assignments, divisibleIndices, newClusterCenters) + .filter { case (index, _) => + divisibleIndices.contains(parentIndex(index)) + } + newClusters = summarize(d, newAssignments) + newClusterCenters = newClusters.mapValues(_.center).map(identity) + } + // TODO: Unpersist old indices. + val indices = updateAssignments(assignments, divisibleIndices, newClusterCenters).keys + .persist(StorageLevel.MEMORY_AND_DISK) + assignments = indices.zip(vectors) + inactiveClusters ++= activeClusters + activeClusters = newClusters + numLeafClustersNeeded -= divisibleClusters.size + } else { + logInfo(s"None active and divisible clusters left on level $level. Stop iterations.") + inactiveClusters ++= activeClusters + activeClusters = Map.empty + } + level += 1 } - model + val clusters = activeClusters ++ inactiveClusters + val root = buildTree(clusters) + new BisectingKMeansModel(root) } + + /** + * Java-friendly version of [[run(RDD[Vector])*]] + */ + def run(data: JavaRDD[Vector]): BisectingKMeansModel = run(data.rdd) } +private object BisectingKMeans extends Serializable { -private[clustering] object BisectingKMeans { + /** The index of the root node of a tree. */ + private val ROOT_INDEX: Long = 1 - val ROOT_INDEX_KEY: Long = 1 + private val MAX_DIVISIBLE_CLUSTER_INDEX: Long = Long.MaxValue / 2 - /** - * Finds the closes cluster's center - * - * @param metric a distance metric - * @param centers centers of the clusters - * @param point a target point - * @return an index of the array of clusters - */ - def findClosestCenter(metric: (BV[Double], BV[Double]) => Double) - (centers: Seq[BV[Double]])(point: BV[Double]): Int = { - // get the closest index - centers.zipWithIndex.map { case (center, idx) => (metric(center, point), idx)}.minBy(_._1)._2 - } - - /** - * Gets the minimum number of nodes in a tree by the number of leaves - * - * @param k: the number of leaf nodes - */ - def getMinimumNumNodesInTree(k: Int): Int = { - // the calculation is same as `math.pow(2, multiplier)` - val multiplier = math.ceil(math.log(k) / math.log(2.0)) + 1 - 1 << multiplier.toInt + /** Returns the left child index of the given node index. */ + private def leftChildIndex(index: Long): Long = { + require(index <= MAX_DIVISIBLE_CLUSTER_INDEX, s"Child index out of bound: 2 * $index.") + 2 * index } - /** - * Summarizes data by each cluster as Map - * - * @param data pairs of point and its cluster index - */ - def summarizeClusters( - data: RDD[(Long, BV[Double])] - ): collection.Map[Long, BisectingClusterStat] = { - - // sum the number of node and points of each cluster - val stats = data.map {case (idx, p) => - (idx, (p, 1L)) - }.reduceByKey {case ((p1, n1), (p2, n2)) => (p1 + p2, n1 + n2) }.collectAsMap() - - // calculate within-cluster sum of squares of each cluster - val bcStats = data.sparkContext.broadcast(stats) - val sumOfSquaresMap = data.map { case (idx, point) => - val meanPoint = bcStats.value.apply(idx)._1 :/ bcStats.value.apply(idx)._2.toDouble - (idx, (point - meanPoint) dot (point - meanPoint)) - }.reduceByKey(_ + _).collectAsMap() - - stats.map { case (idx, (sumPoint, n)) => - val meanPoint = sumPoint :/ n.toDouble - val sumOfSquares = math.abs(sumOfSquaresMap(idx)) - (idx, new BisectingClusterStat(n, meanPoint, sumOfSquares)) - } + /** Returns the right child index of the given node index. */ + private def rightChildIndex(index: Long): Long = { + require(index <= MAX_DIVISIBLE_CLUSTER_INDEX, s"Child index out of bound: 2 * $index + 1.") + 2 * index + 1 } - /** - * Assigns the initial cluster index id to all data - */ - def initData(data: RDD[Vector]): RDD[(Long, BV[Double])] = { - data.map { v: Vector => (ROOT_INDEX_KEY, v.toBreeze)} + /** Returns the parent index of the given node index, or 0 if the input is 1 (root). */ + private def parentIndex(index: Long): Long = { + index / 2 } /** - * Gets the initial centers for bisecting k-means - * - * @param stats pairs of cluster index and cluster statistics - * @param seed random seed + * Summarizes data by each cluster as Map. + * @param d feature dimension + * @param assignments pairs of point and its cluster index + * @return a map from cluster indices to corresponding cluster summaries */ - def initNextCenters( - stats: collection.Map[Long, BisectingClusterStat], - seed: Long - ): collection.Map[Long, BV[Double]] = { - - val random = new XORShiftRandom() - random.setSeed(seed) - val nextCenters = stats.flatMap { case (idx, clusterStats) => - val center = clusterStats.mean - val stdev = math.sqrt(clusterStats.sumOfSquares) / clusterStats.rows - val activeKeys = clusterStats.mean.activeKeysIterator.toArray - val activeValues = activeKeys.map(i => random.nextDouble() * stdev) - val perturbation = new BSV[Double](activeKeys, activeValues, clusterStats.mean.size) - Array((2 * idx, center - perturbation), (2 * idx + 1, center + perturbation)) - }.toMap - nextCenters + private def summarize( + d: Int, + assignments: RDD[(Long, VectorWithNorm)]): Map[Long, ClusterSummary] = { + assignments.aggregateByKey(new ClusterSummaryAggregator(d))( + seqOp = (agg, v) => agg.add(v), + combOp = (agg1, agg2) => agg1.merge(agg2) + ).mapValues(_.summary) + .collect().toMap } /** - * Divides clusters according to their statistics - * - * @param data pairs of point and its cluster index - * @param clusterStats target clusters to divide - * @param maxIterations the maximum iterations to calculate clusters statistics - * @param seed random seed + * Cluster summary aggregator. + * @param d feature dimension */ - def divideClusters( - data: RDD[(Long, BV[Double])], - clusterStats: collection.Map[Long, BisectingClusterStat], - maxIterations: Int, - seed: Long - ): RDD[(Long, BV[Double])] = { - - val sc = data.sparkContext - val appName = sc.appName - - // get keys of dividable clusters - val dividableClusterStats = clusterStats.filter { case (idx, cluster) => cluster.isDividable } - if (dividableClusterStats.isEmpty) { - return data + private class ClusterSummaryAggregator(val d: Int) extends Serializable { + private var n: Long = 0L + private val sum: Vector = Vectors.zeros(d) + private var sumSq: Double = 0.0 + + /** Adds a point. */ + def add(v: VectorWithNorm): this.type = { + n += 1L + // TODO: use a numerically stable approach to estimate cost + sumSq += v.norm * v.norm + BLAS.axpy(1.0, v.vector, sum) + this } - // extract dividable input data - val dividableData = data.filter { case (idx, point) => dividableClusterStats.contains(idx)} - // get next initial centers - var newCenters = initNextCenters(dividableClusterStats, seed) - var nextData = data - var subIter = 0 - var totalSumOfSquares = Double.MaxValue - var oldTotalSumOfSquares = Double.MaxValue - var relativeError = Double.MaxValue - val dimension = dividableData.first()._2.size - - // TODO Supports distance metrics other Euclidean distance metric - val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - bv2, 2.0) - val bcMetric = sc.broadcast(metric) - - while (subIter < maxIterations && relativeError > 1e-4) { - // TODO add a set method for the threshold, instead of 1e-4 - - // convert each index into the closest child index - val bcNewCenters = sc.broadcast(newCenters) - nextData = dividableData.map { case (idx, point) => - // calculate next index number - val childIndexes = Array(2 * idx, 2 * idx + 1) - val childrenCenters = childIndexes - .filter(x => bcNewCenters.value.contains(x)).map(bcNewCenters.value(_)) - if (childrenCenters.length != 2) { - new SparkException(s"A node whose index is ${idx} doesn't have two children") - } - val closestIndex = findClosestCenter(bcMetric.value)(childrenCenters)(point) - val nextIndex = 2 * idx + closestIndex - (nextIndex, point) - } + /** Merges another aggregator. */ + def merge(other: ClusterSummaryAggregator): this.type = { + n += other.n + sumSq += other.sumSq + BLAS.axpy(1.0, other.sum, sum) + this + } - // summarize each cluster - val zeroValue = (BV.zeros[Double](dimension), 0L, 0.0) - val seqOp = (acc: (BV[Double], Long, Double), point: BV[Double]) => { - val sums = acc._1 + point - val n = acc._2 + 1L - val sumOfNorm = acc._3 + (point dot point) - (sums, n, sumOfNorm) + /** Returns the summary. */ + def summary: ClusterSummary = { + val mean = sum.copy + if (n > 0L) { + BLAS.scal(1.0 / n, mean) } - val comOp = (acc1: (BV[Double], Long, Double), acc2: (BV[Double], Long, Double)) => - (acc1._1 + acc2._1, acc1._2 + acc2._2, acc1._3 + acc1._3) - val tempStats = nextData.aggregateByKey(zeroValue)(seqOp, comOp).collectAsMap() - - // calculate the center of each cluster - newCenters = tempStats.map {case (idx, (sums, n, sumOfNorm)) => (idx, sums :/ n.toDouble)} - - totalSumOfSquares = tempStats.map{case (idx, (sums, n, sumOfNorm)) => sumOfNorm}.sum - relativeError = math.abs(totalSumOfSquares - oldTotalSumOfSquares) / totalSumOfSquares - oldTotalSumOfSquares = totalSumOfSquares - subIter += 1 + val center = new VectorWithNorm(mean) + val cost = math.max(sumSq - n * center.norm * center.norm, 0.0) + new ClusterSummary(n, center, cost) } - nextData } /** - * Creates the map of cluster stats to the map of cluster nodes with their costs + * Bisects a cluster center. * - * @param data input data - * @param stats map of cluster stats which is described as a binary tree + * @param center current cluster center + * @param random a random number generator + * @return initial centers */ - def createClusterNodes( - data: RDD[(Long, BV[Double])], - stats: collection.Map[Long, BisectingClusterStat] - ): collection.Map[Long, BisectingClusterNode] = { - // TODO: support other cost, such as entropy - createClusterNodesWithAverageCost(data, stats) + private def splitCenter( + center: VectorWithNorm, + random: Random): (VectorWithNorm, VectorWithNorm) = { + val d = center.vector.size + val norm = center.norm + val level = 1e-4 * norm + val noise = Vectors.dense(Array.fill(d)(random.nextDouble())) + val left = center.vector.copy + BLAS.axpy(-level, noise, left) + val right = center.vector.copy + BLAS.axpy(level, noise, right) + (new VectorWithNorm(left), new VectorWithNorm(right)) } /** - * Creates the map of cluster stats to the map of cluster nodes with their average costs + * Updates assignments. + * @param assignments current assignments + * @param divisibleIndices divisible cluster indices + * @param newClusterCenters new cluster centers + * @return new assignments */ - private def createClusterNodesWithAverageCost( - data: RDD[(Long, BV[Double])], - stats: collection.Map[Long, BisectingClusterStat] - ): collection.Map[Long, BisectingClusterNode] = { - - stats.map { case (idx, clusterStats) => - val rows = clusterStats.rows - val center = clusterStats.mean - val cost = math.sqrt(clusterStats.sumOfSquares) / rows - idx -> new BisectingClusterNode(Vectors.fromBreeze(center), rows, cost) + private def updateAssignments( + assignments: RDD[(Long, VectorWithNorm)], + divisibleIndices: Set[Long], + newClusterCenters: Map[Long, VectorWithNorm]): RDD[(Long, VectorWithNorm)] = { + assignments.map { case (index, v) => + if (divisibleIndices.contains(index)) { + val children = Seq(leftChildIndex(index), rightChildIndex(index)) + val selected = children.minBy { child => + KMeans.fastSquaredDistance(newClusterCenters(child), v) + } + (selected, v) + } else { + (index, v) + } } } /** - * Builds a cluster tree from a Map of clusters - * - * @param treeMap divided clusters as a Map class - * @param rootIndex index you want to start - * @param k the number of clusters you want - * @return a built cluster tree + * Builds a clustering tree by re-indexing internal and leaf clusters. + * @param clusters a map from cluster indices to corresponding cluster summaries + * @return the root node of the clustering tree */ - private def buildTree( - treeMap: collection.Map[Long, BisectingClusterNode], - rootIndex: Long, - k: Int): Option[BisectingClusterNode] = { - - // if there is no index in the Map - if (!treeMap.contains(rootIndex)) return None - - // build a cluster tree if the queue is empty or until the number of leaf clusters is enough - var numLeavesClusters = 1 - val root = treeMap(rootIndex) - var leavesQueue = Map(rootIndex -> root) - while (leavesQueue.nonEmpty && numLeavesClusters < k) { - // pick up the largest cluster by the maximum cost of all the clusters - val mostScattered = leavesQueue.maxBy(_._2.cost) - val mostScatteredKey = mostScattered._1 - val mostScatteredCluster = mostScattered._2 - - // relate the most scattered cluster to its children clusters - val childrenIndexes = Array(2 * mostScatteredKey, 2 * mostScatteredKey + 1) - if (childrenIndexes.forall(i => treeMap.contains(i))) { - // insert children to the most scattered cluster - val children = childrenIndexes.map(i => treeMap(i)) - mostScatteredCluster.insert(children) - - // calculate the local dendrogram height - // TODO Supports distance metrics other Euclidean distance metric - val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - bv2, 2.0) - val localHeight = children - .map(child => metric(child.center.toBreeze, mostScatteredCluster.center.toBreeze)).max - mostScatteredCluster.setLocalHeight(localHeight) - - // update the queue - leavesQueue = leavesQueue ++ childrenIndexes.map(i => i -> treeMap(i)).toMap - numLeavesClusters += 1 + private def buildTree(clusters: Map[Long, ClusterSummary]): ClusteringTreeNode = { + var leafIndex = 0 + var internalIndex = -1 + + /** + * Builds a subtree from this given node index. + */ + def buildSubTree(rawIndex: Long): ClusteringTreeNode = { + val cluster = clusters(rawIndex) + val size = cluster.size + val center = cluster.center + val cost = cluster.cost + val isInternal = clusters.contains(leftChildIndex(rawIndex)) + if (isInternal) { + val index = internalIndex + internalIndex -= 1 + val leftIndex = leftChildIndex(rawIndex) + val rightIndex = rightChildIndex(rawIndex) + val height = math.sqrt(Seq(leftIndex, rightIndex).map { childIndex => + KMeans.fastSquaredDistance(center, clusters(childIndex).center) + }.max) + val left = buildSubTree(leftIndex) + val right = buildSubTree(rightIndex) + new ClusteringTreeNode(index, size, center, cost, height, Array(left, right)) + } else { + val index = leafIndex + leafIndex += 1 + val height = 0.0 + new ClusteringTreeNode(index, size, center, cost, height, Array.empty) } - - // remove the cluster which is involved to the cluster tree - leavesQueue = leavesQueue.filterNot(_ == mostScattered) } - Some(root) + + buildSubTree(ROOT_INDEX) } + + /** + * Summary of a cluster. + * + * @param size the number of points within this cluster + * @param center the center of the points within this cluster + * @param cost the sum of squared distances to the center + */ + private case class ClusterSummary(size: Long, center: VectorWithNorm, cost: Double) } /** - * A cluster as a tree node which can have its sub nodes + * Represents a node in a clustering tree. * - * @param center the center of the cluster - * @param rows the number of rows in the cluster - * @param cost how large a cluster is - * @param localHeight the maximal distance between this node and its children - * @param parent the parent cluster of the cluster - * @param children the children nodes of the cluster + * @param index node index, negative for internal nodes and non-negative for leaf nodes + * @param size size of the cluster + * @param centerWithNorm cluster center with norm + * @param cost cost of the cluster, i.e., the sum of squared distances to the center + * @param height height of the node in the dendrogram. Currently this is defined as the max distance + * from the center to the centers of the children's, but subject to change. + * @param children children nodes */ @Since("1.6.0") -class BisectingClusterNode private ( - @Since("1.6.0") val center: Vector, - @Since("1.6.0") val rows: Long, - @Since("1.6.0") val cost: Double, - private var localHeight: Double, - private var parent: Option[BisectingClusterNode], - private var children: Seq[BisectingClusterNode]) extends Serializable { - - require(!cost.isNaN) - - @Since("1.6.0") - def this(center: Vector, rows: Long, cost: Double) = - this(center, rows, cost, 0.0, None, Array.empty[BisectingClusterNode]) - - /** - * Inserts a sub node as its child - * - * @param child inserted sub node - */ - @Since("1.6.0") - def insert(child: BisectingClusterNode) { - insert(Array(child)) +@Experimental +class ClusteringTreeNode private[clustering] ( + val index: Int, + val size: Long, + private val centerWithNorm: VectorWithNorm, + val cost: Double, + val height: Double, + val children: Array[ClusteringTreeNode]) extends Serializable { + + /** Whether this is a leaf node. */ + val isLeaf: Boolean = children.isEmpty + + require((isLeaf && index >= 0) || (!isLeaf && index < 0)) + + /** Cluster center. */ + def center: Vector = centerWithNorm.vector + + /** Predicts the leaf cluster node index that the input point belongs to. */ + def predict(point: Vector): Int = { + val (index, _) = predict(new VectorWithNorm(point)) + index } - /** - * Inserts sub nodes as its children - * - * @param children inserted sub nodes - */ - @Since("1.6.0") - def insert(children: Array[BisectingClusterNode]) { - this.children = this.children ++ children - children.foreach(child => child.parent = Some(this)) + /** Returns the full prediction path from root to leaf. */ + def predictPath(point: Vector): Array[ClusteringTreeNode] = { + predictPath(new VectorWithNorm(point)).toArray } - /** - * Converts the tree into Array class - * the sub nodes are recursively expanded - * - * @return an Array class which the cluster tree is expanded - */ - @Since("1.6.0") - def toArray: Array[BisectingClusterNode] = { - val array = this.children.size match { - case 0 => Array(this) - case _ => Array(this) ++ this.children.flatMap(child => child.toArray.toIterator) - } - array.sortWith { case (a, b) => - a.getDepth < b.getDepth && a.cost < b.cost && a.rows < b.rows + /** Returns the full prediction path from root to leaf. */ + private def predictPath(pointWithNorm: VectorWithNorm): List[ClusteringTreeNode] = { + if (isLeaf) { + this :: Nil + } else { + val selected = children.minBy { child => + KMeans.fastSquaredDistance(child.centerWithNorm, pointWithNorm) + } + selected :: selected.predictPath(pointWithNorm) } } /** - * Gets the depth of the cluster in the tree - * - * @return the depth from the root + * Computes the cost (squared distance to the predicted leaf cluster center) of the input point. */ - @Since("1.6.0") - def getDepth: Int = { - this.parent match { - case None => 0 - case _ => 1 + this.parent.get.getDepth - } + def computeCost(point: Vector): Double = { + val (_, cost) = predict(new VectorWithNorm(point)) + cost } /** - * Finds a leaf which is the closest under the node - * - * @param point target point + * Predicts the cluster index and the cost of the input point. */ - @Since("1.6.0") - def findClosestLeaf( - point: Vector, - metric: (BV[Double], BV[Double]) => Double - ): BisectingClusterNode = { - this.children.size match { - case 0 => this - case _ => { - val bv = point.toBreeze - val centers = this.children.map(_.center).map(_.toBreeze) - val closestIndex = BisectingKMeans.findClosestCenter(metric)(centers)(bv) - this.children(closestIndex).findClosestLeaf(point, metric) - } - } + private def predict(pointWithNorm: VectorWithNorm): (Int, Double) = { + predict(pointWithNorm, KMeans.fastSquaredDistance(centerWithNorm, pointWithNorm)) } /** - * Gets the leaves nodes in the cluster tree + * Predicts the cluster index and the cost of the input point. + * @param pointWithNorm input point + * @param cost the cost to the current center + * @return (predicted leaf cluster index, cost) */ - @Since("1.6.0") - def getLeavesNodes: Array[BisectingClusterNode] = { - this.toArray.filter(_.isLeaf).sortBy(_.center.toArray.sum) + private def predict(pointWithNorm: VectorWithNorm, cost: Double): (Int, Double) = { + if (isLeaf) { + (index, cost) + } else { + val (selectedChild, minCost) = children.map { child => + (child, KMeans.fastSquaredDistance(child.centerWithNorm, pointWithNorm)) + }.minBy(_._2) + selectedChild.predict(pointWithNorm, minCost) + } } - @Since("1.6.0") - def isLeaf: Boolean = this.children.isEmpty - - @Since("1.6.0") - def getParent: Option[BisectingClusterNode] = this.parent - - @Since("1.6.0") - def getChildren: Seq[BisectingClusterNode] = this.children - /** - * Gets the dendrogram height of the cluster at the cluster tree. - * A dendrogram height is different from a local height. - * A dendrogram height means a total height of a node in a tree. - * A local height means a maximum distance between a node and its children. - * - * @return the dendrogram height + * Returns all leaf nodes from this node. */ - @Since("1.6.0") - def getHeight: Double = { - this.children.size match { - case 0 => 0.0 - case _ => this.localHeight + this.children.map(_.getHeight).max + def leafNodes: Array[ClusteringTreeNode] = { + if (isLeaf) { + Array(this) + } else { + children.flatMap(_.leafNodes) } } - - @Since("1.6.0") - def setLocalHeight(height: Double): Unit = this.localHeight = height -} - - -/** - * This class is used for maneging a cluster statistics - * - * @param rows the number of points - * @param mean the sum of points - * @param sumOfSquares the sum of squares of points - */ -private[clustering] case class BisectingClusterStat ( - rows: Long, - mean: BV[Double], - sumOfSquares: Double) extends Serializable { - require(sumOfSquares >= 0.0) - - def isDividable: Boolean = sumOfSquares > 0 && rows >= 2 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 38f4695eb0d26..5015f1540d920 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -17,80 +17,79 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{Vector => BV, norm => breezeNorm} - import org.apache.spark.Logging -import org.apache.spark.annotation.Since +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD /** - * This class is used for the model of the bisecting kmeans + * Clustering model produced by [[BisectingKMeans]]. + * The prediction is done level-by-level from the root node to a leaf node, and at each node among + * its children the closest to the input point is selected. * - * @param node a cluster as a tree node + * @param root the root node of the clustering tree */ @Since("1.6.0") +@Experimental class BisectingKMeansModel @Since("1.6.0") ( - @Since("1.6.0") val node: BisectingClusterNode + @Since("1.6.0") val root: ClusteringTreeNode ) extends Serializable with Logging { + /** + * Leaf cluster centers. + */ @Since("1.6.0") - def getClusters: Array[BisectingClusterNode] = this.node.getLeavesNodes + def clusterCenters: Array[Vector] = root.leafNodes.map(_.center) - @Since("1.6.0") - def getCenters: Array[Vector] = this.getClusters.map(_.center) + /** + * Number of leaf clusters. + */ + lazy val k: Int = clusterCenters.length /** - * Predicts the closest cluster by one point + * Predicts the index of the cluster that the input point belongs to. */ @Since("1.6.0") - def predict(vector: Vector): Int = { - // TODO Supports distance metrics other Euclidean distance metric - val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - bv2, 2.0) - val closestLeafNode = this.node.findClosestLeaf(vector, metric) - - val closestCenter = closestLeafNode.center - val centers = this.getCenters.map(_.toBreeze) - BisectingKMeans.findClosestCenter(metric)(centers)(closestCenter.toBreeze) + def predict(point: Vector): Int = { + root.predict(point) } /** - * Predicts the closest cluster by RDD of the points + * Predicts the indices of the clusters that the input points belong to. */ @Since("1.6.0") - def predict(data: RDD[Vector]): RDD[Int] = { - val sc = data.sparkContext - data.map { p => predict(p) } + def predict(points: RDD[Vector]): RDD[Int] = { + points.map { p => root.predict(p) } } /** - * Predicts the closest cluster by RDD of the points for Java + * Java-friendly version of [[predict(RDD[Vector])*]] */ @Since("1.6.0") def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** - * Computes Within Set Sum of Squared Error(WSSSE) + * Computes the squared distance between the input point and the cluster center it belongs to. + */ + @Since("1.6.0") + def computeCost(point: Vector): Double = { + root.computeCost(point) + } + + /** + * Computes the sum of squared distances between the input points and their corresponding cluster + * centers. */ @Since("1.6.0") def computeCost(data: RDD[Vector]): Double = { - val bvCenters = this.getCenters.map(_.toBreeze) - data.context.broadcast(bvCenters) - val distances = data.map {point => - val bvPoint = point.toBreeze - val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - bv2, 2.0) - val idx = BisectingKMeans.findClosestCenter(metric)(bvCenters)(bvPoint) - val closestCenter = bvCenters(idx) - val distance = metric(bvPoint, closestCenter) - distance - } - distances.sum() + data.map(root.computeCost).sum() } + /** + * Java-friendly version of [[computeCost(RDD[Vector])*]]. + */ @Since("1.6.0") def computeCost(data: JavaRDD[Vector]): Double = this.computeCost(data.rdd) - } - diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java index 926bd54e54424..a714620ff7e4b 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java @@ -18,13 +18,12 @@ package org.apache.spark.mllib.clustering; import java.io.Serializable; -import java.util.List; +import com.google.common.collect.Lists; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import com.google.common.collect.Lists; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -46,63 +45,29 @@ public void tearDown() { } @Test - public void runWithSmallData() { - List points = Lists.newArrayList( - Vectors.dense(1.0, 2.0, 6.0), - Vectors.dense(1.0, 3.0, 0.0), - Vectors.dense(1.0, 4.0, 6.0) - ); - - Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); - - JavaRDD data = sc.parallelize(points, 2); - BisectingKMeans algo = new BisectingKMeans().setK(1); - BisectingKMeansModel model = algo.run(data.rdd()); - assertEquals(1, model.getCenters().length); - assertEquals(expectedCenter, model.getCenters()[0]); - } - - @Test - public void runWithDenseVectors() { - int numClusters = 5; - List points = Lists.newArrayList(); - for (int i = 0; i < 99; i++) { - Double elm = (double)(i % numClusters); - Vector point = Vectors.dense(elm, elm); - points.add(point); - } - JavaRDD data = sc.parallelize(points, 2); - BisectingKMeans algo = new BisectingKMeans().setK(numClusters); - BisectingKMeansModel model = algo.run(data.rdd()); - Vector[] centers = model.getCenters(); - assertEquals(numClusters, centers.length); - assertEquals(Vectors.dense(0.0, 0.0), centers[0]); - assertEquals(Vectors.dense(1.0, 1.0), centers[1]); - assertEquals(Vectors.dense(2.0, 2.0), centers[2]); - assertEquals(Vectors.dense(3.0, 3.0), centers[3]); - assertEquals(Vectors.dense(4.0, 4.0), centers[4]); - } + public void twoDimensionalData() { + JavaRDD points = sc.parallelize(Lists.newArrayList( + Vectors.dense(4, -1), + Vectors.dense(4, 1), + Vectors.sparse(2, new int[] {0}, new double[] {1.0}) + ), 2); - @Test - public void runWithSparseVectors() { - int numClusters = 5; - List points = Lists.newArrayList(); - for (int i = 0; i < 99; i++) { - int elm = i % numClusters; - int indexes[] = {elm}; - double values[] = {elm}; - Vector point = Vectors.sparse(numClusters, indexes, values); - points.add(point); + BisectingKMeans bkm = new BisectingKMeans() + .setK(4) + .setMaxIterations(2) + .setSeed(1L); + BisectingKMeansModel model = bkm.run(points); + Assert.assertEquals(3, model.k()); + Assert.assertArrayEquals(new double[] {3.0, 0.0}, model.root().center().toArray(), 1e-12); + for (ClusteringTreeNode child: model.root().children()) { + double[] center = child.center().toArray(); + if (center[0] > 2) { + Assert.assertEquals(2, child.size()); + Assert.assertArrayEquals(new double[] {4.0, 0.0}, center, 1e-12); + } else { + Assert.assertEquals(1, child.size()); + Assert.assertArrayEquals(new double[] {1.0, 0.0}, center, 1e-12); + } } - JavaRDD data = sc.parallelize(points, 2); - BisectingKMeans algo = new BisectingKMeans().setK(numClusters); - BisectingKMeansModel model = algo.run(data.rdd()); - Vector[] centers = model.getCenters(); - assertEquals(numClusters, centers.length); - assertEquals(points.get(0), centers[0]); - assertEquals(points.get(1), centers[1]); - assertEquals(points.get(2), centers[2]); - assertEquals(points.get(3), centers[3]); - assertEquals(points.get(4), centers[4]); } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansModelSuite.scala deleted file mode 100644 index ceac039efc8d0..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansModelSuite.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.clustering - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.MLlibTestSparkContext - -class BisectingKMeansModelSuite - extends SparkFunSuite with MLlibTestSparkContext with BeforeAndAfterEach { - - test("clustering dense vectors") { - val app = new BisectingKMeans().setK(5).setSeed(1) - - val localData = (1 to 100).toSeq.map { i => - val label = i % 5 - val vector = Vectors.dense(label, label, label) - (label, vector) - } - val data = sc.parallelize(localData.map(_._2)) - val model = app.run(data) - - val clusters = model.getClusters - assert(clusters.isInstanceOf[Array[BisectingClusterNode]]) - assert(clusters.length === 5) - - val centers = model.getCenters.sortBy(_.toArray.sum) - assert(centers.length === 5) - assert(centers(0) === Vectors.dense(0.0, 0.0, 0.0)) - assert(centers(1) === Vectors.dense(1.0, 1.0, 1.0)) - assert(centers(2) === Vectors.dense(2.0, 2.0, 2.0)) - assert(centers(3) === Vectors.dense(3.0, 3.0, 3.0)) - assert(centers(4) === Vectors.dense(4.0, 4.0, 4.0)) - - // predict with one vector - assert(model.predict(Vectors.dense(0.0, 0.0, 0.0)) === 0) - assert(model.predict(Vectors.dense(0.5, 0.5, 0.5)) === 0) - assert(model.predict(Vectors.dense(1.0, 1.0, 1.0)) === 1) - assert(model.predict(Vectors.dense(2.0, 2.0, 2.0)) === 2) - assert(model.predict(Vectors.dense(3.0, 3.0, 3.0)) === 3) - assert(model.predict(Vectors.dense(4.0, 4.0, 4.0)) === 4) - - // predict with a RDD - val predicted = model.predict(data).collect() - assert(predicted === localData.map(_._1)) - - // compute WSSSE - assert(model.computeCost(data) === 0.0) - } - - test("clustering sparse vectors") { - val app = new BisectingKMeans().setK(5).setSeed(1) - - val localData = (1 to 100).toSeq.map { i => - val label = i % 5 - val vector = Vectors.sparse(5, Seq((label, label.toDouble))) - (label, vector) - } - val data = sc.parallelize(localData.map(_._2)) - val model = app.run(data) - - val clusters = model.getClusters - assert(clusters.isInstanceOf[Array[BisectingClusterNode]]) - assert(clusters.length === 5) - - val centers = model.getCenters.sortBy(_.toArray.sum) - assert(centers.length === 5) - assert(centers(0) === Vectors.sparse(5, Array(), Array())) - assert(centers(1) === Vectors.sparse(5, Array(1), Array(1.0))) - assert(centers(2) === Vectors.sparse(5, Array(2), Array(2.0))) - assert(centers(3) === Vectors.sparse(5, Array(3), Array(3.0))) - assert(centers(4) === Vectors.sparse(5, Array(4), Array(4.0))) - - // predict with one vector - assert(model.predict(Vectors.sparse(5, Array(0), Array(0.0))) === 0) - assert(model.predict(Vectors.sparse(5, Array(1), Array(1.0))) === 1) - assert(model.predict(Vectors.sparse(5, Array(2), Array(2.0))) === 2) - assert(model.predict(Vectors.sparse(5, Array(3), Array(3.0))) === 3) - assert(model.predict(Vectors.sparse(5, Array(4), Array(4.0))) === 4) - - // predict with a RDD - val predicted = model.predict(data).collect() - assert(predicted === localData.map(_._1)) - - // compute WSSSE - assert(model.computeCost(data) === 0.0) - } - - test("clustering should be done correctly") { - for (k <- Array(9, 19)) { - val app = new BisectingKMeans().setK(k).setSeed(1) - val localData = (1 to 19).toSeq.map { i => - val label = i % k - val sparseVector = Vectors.sparse(k, Seq((label, label.toDouble))) - val denseVector = Vectors.fromBreeze(sparseVector.toBreeze.toDenseVector) - (label, denseVector, sparseVector) - } - - // dense version - val denseData = sc.parallelize(localData.map(_._2), 2) - val denseModel = app.run(denseData) - assert(denseModel.getCenters.length === k) - assert(denseModel.getClusters.forall(_.cost == 0.0)) - - // sparse version - val sparseData = sc.parallelize(localData.map(_._3), 2) - val sparseModel = app.run(sparseData) - assert(sparseModel.getCenters.length === k) - assert(sparseModel.getClusters.forall(_.cost == 0.0)) - } - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala index 74e12d00c2022..41b9d5c0d93bb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala @@ -17,184 +17,166 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{Vector => BV, norm => breezeNorm} - import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ - class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext { - test("run") { - val k = 123 - val algo = new BisectingKMeans().setK(k).setSeed(1) - val localSeed: Seq[Vector] = (0 to 999).map(i => Vectors.dense(i.toDouble, i.toDouble)).toSeq - val data = sc.parallelize(localSeed, 2) - val model = algo.run(data) - assert(model.getClusters.length == 123) - assert(model.node.getHeight ~== 702.8641 absTol 10E-4) - - // check the relations between a parent cluster and its children - assert(model.node.getParent === None) - assert(model.node.getChildren.head.getParent.get === model.node) - assert(model.node.getChildren.apply(1).getParent.get === model.node) - assert(model.getClusters.forall(_.getParent.isDefined)) - - val predicted = model.predict(data) - assert(predicted.distinct.count() === k) + test("default values") { + val bkm0 = new BisectingKMeans() + assert(bkm0.getK === 4) + assert(bkm0.getMaxIterations === 20) + assert(bkm0.getMinDivisibleClusterSize === 1.0) + val bkm1 = new BisectingKMeans() + assert(bkm0.getSeed === bkm1.getSeed, "The default seed should be constant.") } - test("run with too many cluster size than the records") { - val algo = new BisectingKMeans().setK(123).setSeed(1) - val localSeed: Seq[Vector] = (0 to 99).map(i => Vectors.dense(i.toDouble, i.toDouble)).toSeq - val data = sc.parallelize(localSeed) - val model = algo.run(data) - assert(model.getClusters.length == 100) - assert(model.node.getHeight ~== 72.12489 absTol 10E-4) + test("setter/getter") { + val bkm = new BisectingKMeans() + + val k = 10 + assert(bkm.getK !== k) + assert(bkm.setK(k).getK === k) + val maxIter = 100 + assert(bkm.getMaxIterations !== maxIter) + assert(bkm.setMaxIterations(maxIter).getMaxIterations === maxIter) + val minSize = 2.0 + assert(bkm.getMinDivisibleClusterSize !== minSize) + assert(bkm.setMinDivisibleClusterSize(minSize).getMinDivisibleClusterSize === minSize) + val seed = 10L + assert(bkm.getSeed !== seed) + assert(bkm.setSeed(seed).getSeed === seed) + + intercept[IllegalArgumentException] { + bkm.setK(0) + } + intercept[IllegalArgumentException] { + bkm.setMaxIterations(0) + } + intercept[IllegalArgumentException] { + bkm.setMinDivisibleClusterSize(0.0) + } } - test("setNumClusters") { - val algo = new BisectingKMeans() - assert(algo.getK == 2) - algo.setK(1000) - assert(algo.getK == 1000) + test("1D data") { + val points = Vectors.sparse(1, Array.empty, Array.empty) +: + (1 until 8).map(i => Vectors.dense(i)) + val data = sc.parallelize(points, 2) + val bkm = new BisectingKMeans() + .setK(4) + .setMaxIterations(1) + .setSeed(1L) + // The clusters should be + // (0, 1, 2, 3, 4, 5, 6, 7) + // - (0, 1, 2, 3) + // - (0, 1) + // - (2, 3) + // - (4, 5, 6, 7) + // - (4, 5) + // - (6, 7) + val model = bkm.run(data) + assert(model.k === 4) + // The total cost should be 8 * 0.5 * 0.5 = 2.0. + assert(model.computeCost(data) ~== 2.0 relTol 1e-12) + val predictions = data.map(v => (v(0), model.predict(v))).collectAsMap() + Range(0, 8, 2).foreach { i => + assert(predictions(i) === predictions(i + 1), + s"$i and ${i + 1} should belong to the same cluster.") + } + val root = model.root + assert(root.center(0) ~== 3.5 relTol 1e-12) + assert(root.height ~== 2.0 relTol 1e-12) + assert(root.children.length === 2) + assert(root.children(0).height ~== 1.0 relTol 1e-12) + assert(root.children(1).height ~== 1.0 relTol 1e-12) } - test("setSubIterations") { - val algo = new BisectingKMeans() - assert(algo.getMaxIterations == 20) - algo.setMaxIterations(15) - assert(algo.getMaxIterations == 15) + test("points are the same") { + val data = sc.parallelize(Seq.fill(8)(Vectors.dense(1.0, 1.0)), 2) + val bkm = new BisectingKMeans() + .setK(2) + .setMaxIterations(1) + .setSeed(1L) + val model = bkm.run(data) + assert(model.k === 1) } - test("setSeed") { - val algo = new BisectingKMeans() - assert(algo.getSeed == 1) - algo.setSeed(987) - assert(algo.getSeed == 987) + test("more desired clusters than points") { + val data = sc.parallelize(Seq.tabulate(4)(i => Vectors.dense(i)), 2) + val bkm = new BisectingKMeans() + .setK(8) + .setMaxIterations(2) + .setSeed(1L) + val model = bkm.run(data) + assert(model.k === 4) } - test("summarize center stats") { - val algo = new BisectingKMeans - val local = Seq( - (4L, Vectors.dense(1.5, 1.5).toBreeze), - (4L, Vectors.dense(2.5, 2.5).toBreeze), - (5L, Vectors.dense(11.5, 11.5).toBreeze), - (5L, Vectors.dense(12.5, 12.5).toBreeze), - (6L, Vectors.dense(21.5, 21.5).toBreeze), - (6L, Vectors.dense(22.5, 22.5).toBreeze), - (7L, Vectors.dense(31.5, 31.5).toBreeze), - (7L, Vectors.dense(32.5, 32.5).toBreeze) - ) - val data = sc.parallelize(local) - - val clusterStats = BisectingKMeans.summarizeClusters(data) - assert(clusterStats.size === 4) - assert(clusterStats(4).mean === Vectors.dense(2.0, 2.0).toBreeze) - assert(clusterStats(4).sumOfSquares ~== 1.0 absTol 10e-4) - assert(clusterStats(4).rows === 2) - assert(clusterStats(5).mean === Vectors.dense(12.0, 12.0).toBreeze) - assert(clusterStats(5).sumOfSquares ~== 1.0 absTol 10e-4) - assert(clusterStats(5).rows === 2) - assert(clusterStats(6).mean === Vectors.dense(22.0, 22.0).toBreeze) - assert(clusterStats(6).sumOfSquares ~== 1.0 absTol 10e-4) - assert(clusterStats(6).rows === 2) - assert(clusterStats(7).mean === Vectors.dense(32.0, 32.0).toBreeze) - assert(clusterStats(7).sumOfSquares ~== 1.0 absTol 10e-4) - assert(clusterStats(7).rows === 2) + test("min divisible cluster") { + val data = sc.parallelize( + Seq.tabulate(16)(i => Vectors.dense(i)) ++ Seq.tabulate(4)(i => Vectors.dense(-100.0 - i)), + 2) + val bkm = new BisectingKMeans() + .setK(4) + .setMinDivisibleClusterSize(10) + .setMaxIterations(1) + .setSeed(1L) + val model = bkm.run(data) + assert(model.k === 3) + assert(model.predict(Vectors.dense(-100)) === model.predict(Vectors.dense(-97))) + assert(model.predict(Vectors.dense(7)) !== model.predict(Vectors.dense(8))) + + bkm.setMinDivisibleClusterSize(0.5) + val sameModel = bkm.run(data) + assert(sameModel.k === 3) } - test("initialize centers at next step") { - val local = Seq( - (2L, BV[Double](0.9, 0.9)), (2L, BV[Double](1.1, 1.1)), - (3L, BV[Double](1.9, 1.9)), (2L, BV[Double](2.1, 2.1)) - ) - val data = sc.parallelize(local) - val stats = Map[Long, BisectingClusterStat]( - 2L -> new BisectingClusterStat(2, BV[Double](1.0, 1.0) * 2.0, 0.0), - 3L -> new BisectingClusterStat(2, BV[Double](2.0, 2.0) * 2.0, 0.0) - ) - val initNextCenters = BisectingKMeans.initNextCenters(stats, 1) - assert(initNextCenters.size === 4) - assert(initNextCenters.keySet === Set(4, 5, 6, 7)) + test("larger clusters get selected first") { + val data = sc.parallelize( + Seq.tabulate(16)(i => Vectors.dense(i)) ++ Seq.tabulate(4)(i => Vectors.dense(-100.0 - i)), + 2) + val bkm = new BisectingKMeans() + .setK(3) + .setMaxIterations(1) + .setSeed(1L) + val model = bkm.run(data) + assert(model.k === 3) + assert(model.predict(Vectors.dense(-100)) === model.predict(Vectors.dense(-97))) + assert(model.predict(Vectors.dense(7)) !== model.predict(Vectors.dense(8))) } - test("should assign each data to new clusters") { - val seed = Seq( - (2L, Vectors.dense(0.0, 0.0)), (2L, Vectors.dense(1.0, 1.0)), - (2L, Vectors.dense(2.0, 2.0)), (2L, Vectors.dense(3.0, 3.0)), - (2L, Vectors.dense(4.0, 4.0)), (2L, Vectors.dense(5.0, 5.0)), - (3L, Vectors.dense(6.0, 6.0)), (3L, Vectors.dense(7.0, 7.0)), - (3L, Vectors.dense(8.0, 8.0)), (3L, Vectors.dense(9.0, 9.0)), - (3L, Vectors.dense(10.0, 10.0)), (3L, Vectors.dense(11.0, 11.0)) - ).map { case (idx, vector) => (idx, vector.toBreeze) } - val variance = breezeNorm(Vectors.dense(1.0, 1.0).toBreeze, 2.0) - val newClusterStats = Map( - 4L -> new BisectingClusterStat(3L, BV[Double](1.0, 1.0) :* 3.0, variance), - 5L -> new BisectingClusterStat(3L, BV[Double](4.0, 4.0) :* 3.0, variance), - 6L -> new BisectingClusterStat(3L, BV[Double](7.0, 7.0) :* 3.0, variance), - 7L -> new BisectingClusterStat(3L, BV[Double](10.0, 10.0) :* 3.0, variance) - ) - val data = sc.parallelize(seed, 1) - val leafClusterStats = BisectingKMeans.summarizeClusters(data) - val dividableLeafClusters = leafClusterStats.filter(_._2.isDividable) - val result = BisectingKMeans.divideClusters(data, dividableLeafClusters, 20, 1).collect() - - val expected = Seq( - (4, Vectors.dense(0.0, 0.0)), (4, Vectors.dense(1.0, 1.0)), (4, Vectors.dense(2.0, 2.0)), - (5, Vectors.dense(3.0, 3.0)), (5, Vectors.dense(4.0, 4.0)), (5, Vectors.dense(5.0, 5.0)), - (6, Vectors.dense(6.0, 6.0)), (6, Vectors.dense(7.0, 7.0)), (6, Vectors.dense(8.0, 8.0)), - (7, Vectors.dense(9.0, 9.0)), (7, Vectors.dense(10.0, 10.0)), (7, Vectors.dense(11.0, 11.0)) - ).map { case (idx, vector) => (idx, vector.toBreeze) } - assert(result === expected) - } - - test("findClosestCenter") { - val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - bv2, 2.0) - val centers = Seq( - Vectors.sparse(5, Array(0, 1, 2), Array(0.0, 1.0, 2.0)).toBreeze, - Vectors.sparse(5, Array(1, 2, 3), Array(1.0, 2.0, 3.0)).toBreeze, - Vectors.sparse(5, Array(2, 3, 4), Array(2.0, 3.0, 4.0)).toBreeze - ) - - for (i <- 0 to (centers.size - 1)) { - val point = centers(i) - val closestIndex = BisectingKMeans.findClosestCenter(metric)(centers)(point) - assert(closestIndex === i) + test("2D data") { + val points = Seq( + (11, 10), (9, 10), (10, 9), (10, 11), + (11, -10), (9, -10), (10, -9), (10, -11), + (0, 1), (0, -1) + ).map { case (x, y) => + if (x == 0) { + Vectors.sparse(2, Array(1), Array(y)) + } else { + Vectors.dense(x, y) + } } - } - - test("should be equal to math.pow") { - (1 to 1000).foreach { k => - // the minimum number of nodes of a binary tree by given parameter - val multiplier = math.ceil(math.log(k) / math.log(2.0)) + 1 - val expected = math.pow(2, multiplier).toInt - val result = BisectingKMeans.getMinimumNumNodesInTree(k) - assert(result === expected) + val data = sc.parallelize(points, 2) + val bkm = new BisectingKMeans() + .setK(3) + .setMaxIterations(4) + .setSeed(1L) + val model = bkm.run(data) + assert(model.k === 3) + assert(model.root.center ~== Vectors.dense(8, 0) relTol 1e-12) + model.root.leafNodes.foreach { node => + if (node.center(0) < 5) { + assert(node.size === 2) + assert(node.center ~== Vectors.dense(0, 0) relTol 1e-12) + } else if (node.center(1) > 0) { + assert(node.size === 4) + assert(node.center ~== Vectors.dense(10, 10) relTol 1e-12) + } else { + assert(node.size === 4) + assert(node.center ~== Vectors.dense(10, -10) relTol 1e-12) + } } } - - test("should divide clusters correctly") { - val local = Seq( - (2L, BV[Double](0.9, 0.9)), (2L, BV[Double](1.1, 1.1)), - (2L, BV[Double](9.9, 9.9)), (2L, BV[Double](10.1, 10.1)), - (3L, BV[Double](99.9, 99.9)), (3L, BV[Double](100.1, 100.1)), - (3L, BV[Double](109.9, 109.9)), (3L, BV[Double](110.1, 110.1)) - ) - val data = sc.parallelize(local, 1) - val stats = BisectingKMeans.summarizeClusters(data) - val dividedData = BisectingKMeans.divideClusters(data, stats, 20, 1).collect() - - assert(dividedData(0) == (4L, BV[Double](0.9, 0.9))) - assert(dividedData(1) == (4L, BV[Double](1.1, 1.1))) - assert(dividedData(2) == (5L, BV[Double](9.9, 9.9))) - assert(dividedData(3) == (5L, BV[Double](10.1, 10.1))) - assert(dividedData(4) == (6L, BV[Double](99.9, 99.9))) - assert(dividedData(5) == (6L, BV[Double](100.1, 100.1))) - assert(dividedData(6) == (7L, BV[Double](109.9, 109.9))) - assert(dividedData(7) == (7L, BV[Double](110.1, 110.1))) - } - }