From 308c8ad9d88d275f11be0d3733f016d3119f98ee Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 01:11:47 -0500 Subject: [PATCH] Numerous changes to improve code --- .../spark/examples/mllib/DenseGmmEM.scala | 11 +- .../clustering/GaussianMixtureModelEM.scala | 105 +++++++++--------- .../stat/impl/MultivariateGaussian.scala | 4 +- 3 files changed, 64 insertions(+), 56 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 12be814a150b5..d59ba49ed1ba3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -21,6 +21,13 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.GaussianMixtureModelEM import org.apache.spark.mllib.linalg.Vectors +/** + * An example Gaussian Mixture Model EM app. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ object DenseGmmEM { def main(args: Array[String]): Unit = { if (args.length != 3) { @@ -44,13 +51,15 @@ object DenseGmmEM { .run(data) for (i <- 0 until clusters.k) { - println("weight=%f mu=%s sigma=\n%s\n" format + println("weight=%f\nmu=%s\nsigma=\n%s\n" format (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } + println("Cluster labels:") val (responsibilityMatrix, clusterLabels) = clusters.predict(data) for (x <- clusterLabels.collect) { print(" " + x) } + println } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 5ab654f609bd2..5c190120904b9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -26,6 +26,8 @@ import org.apache.spark.mllib.stat.impl.MultivariateGaussian import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} import org.apache.spark.SparkContext.DoubleAccumulatorParam +import scala.collection.mutable.IndexedSeqView + /** * This class performs expectation maximization for multivariate Gaussian * Mixture Models (GMMs). A GMM represents a composite distribution of @@ -51,6 +53,7 @@ class GaussianMixtureModelEM private ( // Type aliases for convenience private type DenseDoubleVector = BreezeVector[Double] private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] private type ExpectationSum = ( Array[Double], // log-likelihood in index 0 @@ -80,10 +83,12 @@ class GaussianMixtureModelEM private ( // compute cluster contributions for each input point // (U, T) => U for aggregation - private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian]) + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) (model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { val k = model._2.length - val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } val pSum = p.sum model._1(0) += math.log(pSum) val xxt = x * new Transpose(x) @@ -106,7 +111,10 @@ class GaussianMixtureModelEM private ( /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ def this() = this(2, 0.01, 100) - /** Set the initial GMM starting point, bypassing the random initialization */ + /** Set the initial GMM starting point, bypassing the random initialization. + * You must call setK() prior to calling this method, and the condition + * (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException + */ def setInitialGmm(gmm: GaussianMixtureModel): this.type = { if (gmm.k == k) { initialGmm = Some(gmm) @@ -156,7 +164,7 @@ class GaussianMixtureModelEM private ( /** Perform expectation maximization */ def run(data: RDD[Vector]): GaussianMixtureModel = { - val ctx = data.sparkContext + val sc = data.sparkContext // we will operate on the data as breeze data val breezeData = data.map(u => u.toBreeze.toDenseVector).cache() @@ -164,26 +172,22 @@ class GaussianMixtureModelEM private ( // Get length of the input vectors val d = breezeData.first.length - // gaussians will be array of (weight, mean, covariance) tuples. + // Determine initial weights and corresponding Gaussians. // If the user supplied an initial GMM, we use those values, otherwise // we start with uniform weights, a random mean from the data, and // diagonal covariance matrices using component variances - // derived from the samples - var gaussians = initialGmm match { - case Some(gmm) => (0 until k).map{ i => - (gmm.weight(i), gmm.mu(i).toBreeze.toDenseVector, gmm.sigma(i).toBreeze.toDenseMatrix) - }.toArray + // derived from the samples + val (weights, gaussians) = initialGmm match { + case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map{ case(mu, sigma) => + new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix) + }.toArray) case None => { - // For each Gaussian, we will initialize the mean as the average - // of some random samples from the data val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - - (0 until k).map{ i => - (1.0 / k, - vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), - initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) - }.toArray + ((0 until k).map(_ => 1.0 / k).toArray, (0 until k).map{ i => + val slice = samples.view(i * nSamples, (i + 1) * nSamples) + new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) + }.toArray) } } @@ -192,47 +196,36 @@ class GaussianMixtureModelEM private ( var iter = 0 do { - // pivot gaussians into weight and distribution arrays - val weights = (0 until k).map(i => gaussians(i)._1).toArray - val dists = (0 until k).map{ i => - new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3) - }.toArray - // create and broadcast curried cluster contribution function - val compute = ctx.broadcast(computeExpectation(weights, dists)_) + val compute = sc.broadcast(computeExpectation(weights, gaussians)_) // aggregate the cluster contribution for all sample points - val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) - - // Assignments to make the code more readable - val logLikelihood = sums._1(0) - val W = sums._2 - val MU = sums._3 - val SIGMA = sums._4 + val (logLikelihood, wSums, muSums, sigmaSums) = + breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) - gaussians = (0 until k).map{ i => - val weight = W(i) / W.sum - val mu = MU(i) / W(i) - val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) - (weight, mu, sigma) - }.toArray - + val sumWeights = wSums.sum + for (i <- 0 until k) { + val mu = muSums(i) / wSums(i) + val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu) + weights(i) = wSums(i) / sumWeights + gaussians(i) = new MultivariateGaussian(mu, sigma) + } + llhp = llh // current becomes previous - llh = logLikelihood // this is the freshly computed log-likelihood + llh = logLikelihood(0) // this is the freshly computed log-likelihood iter += 1 } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) // Need to convert the breeze matrices to MLlib matrices - val weights = (0 until k).map(i => gaussians(i)._1).toArray - val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i)._2)).toArray - val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i)._3)).toArray + val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray + val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray new GaussianMixtureModel(weights, means, sigmas) } /** Average of dense breeze vectors */ - private def vectorMean(x: Array[DenseDoubleVector]): DenseDoubleVector = { + private def vectorMean(x: VectorArrayView): DenseDoubleVector = { val v = BreezeVector.zeros[Double](x(0).length) x.foreach(xi => v += xi) v / x.length.asInstanceOf[Double] @@ -242,7 +235,7 @@ class GaussianMixtureModelEM private ( * Construct matrix where diagonal entries are element-wise * variance of input vectors (computes biased variance) */ - private def initCovariance(x: Array[DenseDoubleVector]): DenseDoubleMatrix = { + private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = { val mu = vectorMean(x) val ss = BreezeVector.zeros[Double](x(0).length) val cov = BreezeMatrix.eye[Double](ss.length) @@ -255,15 +248,18 @@ class GaussianMixtureModelEM private ( * Given the input vectors, return the membership value of each vector * to all mixture components. */ - def predictClusters(points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix], + def predictClusters( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], weight: Array[Double], k: Int): RDD[Array[Double]] = { - val ctx = points.sparkContext - val dists = ctx.broadcast{ + val sc = points.sparkContext + val dists = sc.broadcast{ (0 until k).map{ i => new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) }.toArray } - val weights = ctx.broadcast((0 until k).map(i => weight(i)).toArray) + val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) points.map{ x => computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) } @@ -272,11 +268,14 @@ class GaussianMixtureModelEM private ( /** * Compute the partial assignments for each vector */ - def computeSoftAssignments(pt: DenseDoubleVector, dists: Array[MultivariateGaussian], - weights: Array[Double], k: Int): Array[Double] = { - val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(pt)).toArray + private def computeSoftAssignments( + pt: DenseDoubleVector, + dists: Array[MultivariateGaussian], + weights: Array[Double], + k: Int): Array[Double] = { + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } val pSum = p.sum - for(i<- 0 until k){ + for (i <- 0 until k){ p(i) /= pSum } p diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala index ab046a3acea05..d14adab09177e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.stat.impl import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.{Transpose, det, inv} +import breeze.linalg.{Transpose, det, pinv} /** * Utility class to implement the density function for multivariate Gaussian distribution. @@ -28,7 +28,7 @@ import breeze.linalg.{Transpose, det, inv} private[mllib] class MultivariateGaussian( val mu: BreezeVector[Double], val sigma: BreezeMatrix[Double]) extends Serializable { - private val sigmaInv2 = inv(sigma) * -0.5 + private val sigmaInv2 = pinv(sigma) * -0.5 private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) def pdf(x: BreezeVector[Double]): Double = {