From c15405c78345e9a46549a398c6b59bed80274f9e Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 30 Oct 2014 14:50:47 -0400 Subject: [PATCH 01/27] SPARK-4156 --- .../spark/examples/mllib/DenseGmmEM.scala | 47 ++++ .../GMMExpectationMaximization.scala | 246 ++++++++++++++++++ .../clustering/GaussianMixtureModel.scala | 32 +++ 3 files changed, 325 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala new file mode 100644 index 0000000000000..4272280b261ea --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.clustering.GaussianMixtureModel +import org.apache.spark.mllib.clustering.GMMExpectationMaximization +import org.apache.spark.mllib.linalg.Vectors + +object DenseGmmEM { + def main(args: Array[String]): Unit = { + if( args.length != 3 ) { + println("usage: DenseGmmEM ") + } else { + run(args(0), args(1).toInt, args(2).toDouble) + } + } + + def run(inputFile: String, k: Int, tol: Double) { + val conf = new SparkConf().setAppName("Spark EM Sample") + val ctx = new SparkContext(conf) + + val data = ctx.textFile(inputFile).map(line => + Vectors.dense(line.trim.split(' ').map(_.toDouble))).cache() + + val clusters = GMMExpectationMaximization.train(data, k) + + for(i <- 0 until clusters.k) { + println("w=%f mu=%s sigma=\n%s\n" format (clusters.w(i), clusters.mu(i), clusters.sigma(i))) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala new file mode 100644 index 0000000000000..9b4d5de65f200 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.{Transpose, det, inv} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} +import org.apache.spark.SparkContext.DoubleAccumulatorParam + +/** + * Expectation-Maximization for multivariate Gaussian Mixture Models. + * + */ +object GMMExpectationMaximization { + /** + * Trains a GMM using the given parameters + * + * @param data training points stores as RDD[Vector] + * @param k the number of Gaussians in the mixture + * @param maxIterations the maximum number of iterations to perform + * @param delta change in log-likelihood at which convergence is considered achieved + */ + def train(data: RDD[Vector], k: Int, maxIterations: Int, delta: Double): GaussianMixtureModel = { + new GMMExpectationMaximization().setK(k) + .setMaxIterations(maxIterations) + .setDelta(delta) + .run(data) + } + + /** + * Trains a GMM using the given parameters + * + * @param data training points stores as RDD[Vector] + * @param k the number of Gaussians in the mixture + * @param maxIterations the maximum number of iterations to perform + */ + def train(data: RDD[Vector], k: Int, maxIterations: Int): GaussianMixtureModel = { + new GMMExpectationMaximization().setK(k).setMaxIterations(maxIterations).run(data) + } + + /** + * Trains a GMM using the given parameters + * + * @param data training points stores as RDD[Vector] + * @param k the number of Gaussians in the mixture + */ + def train(data: RDD[Vector], k: Int): GaussianMixtureModel = { + new GMMExpectationMaximization().setK(k).run(data) + } +} + +/** + * This class performs multivariate Gaussian expectation maximization. It will + * maximize the log-likelihood for a mixture of k Gaussians, iterating until + * the log-likelihood changes by less than delta, or until it has reached + * the max number of iterations. + */ +class GMMExpectationMaximization private ( + private var k: Int, + private var delta: Double, + private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + + // A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold + def this() = this(2, 0.01, 100) + + /** Set the number of Gaussians in the mixture model. Default: 2 */ + def setK(k: Int): this.type = { + this.k = k + this + } + + /** Set the maximum number of iterations to run. Default: 100 */ + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** + * Set the largest change in log-likelihood at which convergence is + * considered to have occurred. + */ + def setDelta(delta: Double): this.type = { + this.delta = delta + this + } + + /** Machine precision value used to ensure matrix conditioning */ + private val eps = math.pow(2.0, -52) + + /** Perform expectation maximization */ + def run(data: RDD[Vector]): GaussianMixtureModel = { + val ctx = data.sparkContext + + // we will operate on the data as breeze data + val breezeData = data.map{ u => u.toBreeze.toDenseVector }.cache() + + // Get length of the input vectors + val d = breezeData.first.length + + // For each Gaussian, we will initialize the mean as some random + // point from the data. (This could be improved) + val samples = breezeData.takeSample(true, k, scala.util.Random.nextInt) + + // C will be array of (weight, mean, covariance) tuples + // we start with uniform weights, a random mean from the data, and + // identity matrices for covariance + var C = (0 until k).map(i => (1.0/k, + samples(i), + BreezeMatrix.eye[Double](d))).toArray + + val acc_w = new Array[Accumulator[Double]](k) + val acc_mu = new Array[Accumulator[DenseDoubleVector]](k) + val acc_sigma = new Array[Accumulator[DenseDoubleMatrix]](k) + + var llh = Double.MinValue // current log-likelihood + var llhp = 0.0 // previous log-likelihood + + var i, iter = 0 + do { + // reset accumulators + for(i <- 0 until k){ + acc_w(i) = ctx.accumulator(0.0) + acc_mu(i) = ctx.accumulator( + BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) + acc_sigma(i) = ctx.accumulator( + BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) + } + + val log_likelihood = ctx.accumulator(0.0) + + // broadcast the current weights and distributions to all nodes + val dists = ctx.broadcast((0 until k).map(i => + new MultivariateGaussian(C(i)._2, C(i)._3)).toArray) + val weights = ctx.broadcast((0 until k).map(i => C(i)._1).toArray) + + // calculate partial assignments for each sample in the data + // (often referred to as the "E" step in literature) + breezeData.foreach(x => { + val p = (0 until k).map(i => + eps + weights.value(i) * dists.value(i).pdf(x)).toArray + val norm = sum(p) + + log_likelihood += math.log(norm) + + // accumulate weighted sums + for(i <- 0 until k){ + p(i) /= norm + acc_w(i) += p(i) + acc_mu(i) += x * p(i) + acc_sigma(i) += x * new Transpose(x) * p(i) + } + }) + + // Collect the computed sums + val W = (0 until k).map(i => acc_w(i).value).toArray + val MU = (0 until k).map(i => acc_mu(i).value).toArray + val SIGMA = (0 until k).map(i => acc_sigma(i).value).toArray + + // Create new distributions based on the partial assignments + // (often referred to as the "M" step in literature) + C = (0 until k).map(i => { + val weight = W(i) / sum(W) + val mu = MU(i) / W(i) + val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) + (weight, mu, sigma) + }).toArray + + llhp = llh; // current becomes previous + llh = log_likelihood.value // this is the freshly computed log-likelihood + iter += 1 + } while(iter < maxIterations && Math.abs(llh-llhp) > delta) + + // Need to convert the breeze matrices to MLlib matrices + val weights = (0 until k).map(i => C(i)._1).toArray + val means = (0 until k).map(i => Vectors.fromBreeze(C(i)._2)).toArray + val sigmas = (0 until k).map(i => Matrices.fromBreeze(C(i)._3)).toArray + new GaussianMixtureModel(weights, means, sigmas) + } + + /** Sum the values in array of doubles */ + private def sum(x : Array[Double]) : Double = { + var s : Double = 0.0 + x.foreach(u => s += u) + s + } + + /** AccumulatorParam for Dense Breeze Vectors */ + private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { + def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = { + BreezeVector.zeros[Double](initialVector.length) + } + + def addInPlace(a : DenseDoubleVector, b : DenseDoubleVector) : DenseDoubleVector = { + a += b + } + } + + /** AccumulatorParam for Dense Breeze Matrices */ + private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] { + def zero(initialVector : DenseDoubleMatrix) : DenseDoubleMatrix = { + BreezeMatrix.zeros[Double](initialVector.rows, initialVector.cols) + } + + def addInPlace(a : DenseDoubleMatrix, b : DenseDoubleMatrix) : DenseDoubleMatrix = { + a += b + } + } + + /** + * Utility class to implement the density function for multivariate Gaussian distribution. + * Breeze provides this functionality, but it requires the Apache Commons Math library, + * so this class is here so-as to not introduce a new dependency in Spark. + */ + private class MultivariateGaussian(val mu : DenseDoubleVector, val sigma : DenseDoubleMatrix) + extends Serializable { + private val sigma_inv_2 = inv(sigma) * -0.5 + private val U = math.pow(2.0*math.Pi, -mu.length/2.0) * math.pow(det(sigma), -0.5) + + def pdf(x : DenseDoubleVector) : Double = { + val delta = x - mu + val delta_t = new Transpose(delta) + U * math.exp(delta_t * sigma_inv_2 * delta) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala new file mode 100644 index 0000000000000..b36123366c9a8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.mllib.linalg.Vector + +/** + * Multivariate Gaussian mixture model consisting of k Gaussians, where points are drawn + * from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are the respective + * mean and covariance for each Gaussian distribution i=1..k. + */ +class GaussianMixtureModel(val w: Array[Double], val mu: Array[Vector], val sigma: Array[Matrix]) { + + /** Number of gaussians in mixture */ + def k: Int = w.length; +} From c1a8e16ac9dc301d9eafca44100c5a857de903d5 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Tue, 11 Nov 2014 18:31:44 -0500 Subject: [PATCH 02/27] Made GaussianMixtureModel class serializable Modified sum function for better performance --- .../spark/mllib/clustering/GMMExpectationMaximization.scala | 2 +- .../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala index 9b4d5de65f200..0ff499eec7b9d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala @@ -201,7 +201,7 @@ class GMMExpectationMaximization private ( /** Sum the values in array of doubles */ private def sum(x : Array[Double]) : Double = { var s : Double = 0.0 - x.foreach(u => s += u) + (0 until x.length).foreach(j => s += x(j)) s } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index b36123366c9a8..072ed86edebe4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -25,7 +25,10 @@ import org.apache.spark.mllib.linalg.Vector * from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are the respective * mean and covariance for each Gaussian distribution i=1..k. */ -class GaussianMixtureModel(val w: Array[Double], val mu: Array[Vector], val sigma: Array[Matrix]) { +class GaussianMixtureModel( + val w: Array[Double], + val mu: Array[Vector], + val sigma: Array[Matrix]) extends Serializable { /** Number of gaussians in mixture */ def k: Int = w.length; From 719d8cc119e4ce117d33732c72c6c42da7456ae4 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 12 Nov 2014 20:23:53 -0500 Subject: [PATCH 03/27] Added scala test suite with basic test --- .../GMMExpectationMaximizationSuite.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala new file mode 100644 index 0000000000000..0aaedb1911715 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{Vectors, Matrices} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.TestingUtils._ + +class GMMExpectationMaximizationSuite extends FunSuite with LocalSparkContext { + test("single cluster") { + val data = sc.parallelize(Array( + Vectors.dense(6.0, 9.0), + Vectors.dense(5.0, 10.0), + Vectors.dense(4.0, 11.0) + )) + + // expectations + val Ew = 1.0; + val Emu = Vectors.dense(5.0, 10.0) + val Esigma = Matrices.dense(2, 2, Array(2.0 / 3.0, -2.0 / 3.0, -2.0 / 3.0, 2.0 / 3.0)) + + val gmm = GMMExpectationMaximization.train(data, 1) + assert(gmm.w(0) ~== Ew absTol 1E-5) + assert(gmm.mu(0) ~== Emu absTol 1E-5) + assert(gmm.sigma(0) ~== Esigma absTol 1E-5) + } +} From e6ea8051a5e883121d650a5a8d7e79600cc642ed Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Mon, 17 Nov 2014 19:52:12 -0500 Subject: [PATCH 04/27] Merged with master branch; update test suite with latest context changes. Improved cluster initialization strategy. --- .../GMMExpectationMaximization.scala | 23 ++++++++++++++----- .../GMMExpectationMaximizationSuite.scala | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala index 0ff499eec7b9d..a8db4bd862523 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala @@ -81,6 +81,9 @@ class GMMExpectationMaximization private ( private type DenseDoubleVector = BreezeVector[Double] private type DenseDoubleMatrix = BreezeMatrix[Double] + // number of samples per cluster to use when initializing Gaussians + private val nSamples = 5; + // A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold def this() = this(2, 0.01, 100) @@ -118,15 +121,15 @@ class GMMExpectationMaximization private ( // Get length of the input vectors val d = breezeData.first.length - // For each Gaussian, we will initialize the mean as some random - // point from the data. (This could be improved) - val samples = breezeData.takeSample(true, k, scala.util.Random.nextInt) + // For each Gaussian, we will initialize the mean as the average + // of some random samples from the data + val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) // C will be array of (weight, mean, covariance) tuples // we start with uniform weights, a random mean from the data, and // identity matrices for covariance var C = (0 until k).map(i => (1.0/k, - samples(i), + vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)), BreezeMatrix.eye[Double](d))).toArray val acc_w = new Array[Accumulator[Double]](k) @@ -148,7 +151,7 @@ class GMMExpectationMaximization private ( } val log_likelihood = ctx.accumulator(0.0) - + // broadcast the current weights and distributions to all nodes val dists = ctx.broadcast((0 until k).map(i => new MultivariateGaussian(C(i)._2, C(i)._3)).toArray) @@ -164,11 +167,12 @@ class GMMExpectationMaximization private ( log_likelihood += math.log(norm) // accumulate weighted sums + val xxt = x * new Transpose(x) for(i <- 0 until k){ p(i) /= norm acc_w(i) += p(i) acc_mu(i) += x * p(i) - acc_sigma(i) += x * new Transpose(x) * p(i) + acc_sigma(i) += xxt * p(i) } }) @@ -205,6 +209,13 @@ class GMMExpectationMaximization private ( s } + /** Average of dense breeze vectors */ + private def vec_mean(x : Array[DenseDoubleVector]) : DenseDoubleVector = { + val v = BreezeVector.zeros[Double](x(0).length) + (0 until x.length).foreach(j => v += x(j)) + v / x.length.asInstanceOf[Double] + } + /** AccumulatorParam for Dense Breeze Vectors */ private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index 0aaedb1911715..afe5e0cf03afa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.mllib.clustering import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.{Vectors, Matrices} -import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.apache.spark.mllib.util.TestingUtils._ -class GMMExpectationMaximizationSuite extends FunSuite with LocalSparkContext { +class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContext { test("single cluster") { val data = sc.parallelize(Array( Vectors.dense(6.0, 9.0), From 676e523a830020adf317d3c5bf445f94ed5c8a35 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 3 Dec 2014 10:14:19 -0500 Subject: [PATCH 05/27] Fixed to no longer ignore delta value provided on command line --- .../main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 4272280b261ea..d6dd4fd381089 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -38,7 +38,7 @@ object DenseGmmEM { val data = ctx.textFile(inputFile).map(line => Vectors.dense(line.trim.split(' ').map(_.toDouble))).cache() - val clusters = GMMExpectationMaximization.train(data, k) + val clusters = GMMExpectationMaximization.train(data, k, tol) for(i <- 0 until clusters.k) { println("w=%f mu=%s sigma=\n%s\n" format (clusters.w(i), clusters.mu(i), clusters.sigma(i))) From 8aaa17d14a39d32a9164d044318d54f30df86202 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 3 Dec 2014 10:15:01 -0500 Subject: [PATCH 06/27] Added additional train() method to companion object for cluster count and tolerance parameters. Modified cluster initialization strategy to use an initial covariance matrix derived from the sample points used to initialize the mean. --- .../GMMExpectationMaximization.scala | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala index a8db4bd862523..a733bdb4e4369 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala @@ -32,7 +32,7 @@ object GMMExpectationMaximization { /** * Trains a GMM using the given parameters * - * @param data training points stores as RDD[Vector] + * @param data training points stored as RDD[Vector] * @param k the number of Gaussians in the mixture * @param maxIterations the maximum number of iterations to perform * @param delta change in log-likelihood at which convergence is considered achieved @@ -47,7 +47,7 @@ object GMMExpectationMaximization { /** * Trains a GMM using the given parameters * - * @param data training points stores as RDD[Vector] + * @param data training points stored as RDD[Vector] * @param k the number of Gaussians in the mixture * @param maxIterations the maximum number of iterations to perform */ @@ -58,7 +58,18 @@ object GMMExpectationMaximization { /** * Trains a GMM using the given parameters * - * @param data training points stores as RDD[Vector] + * @param data training points stored as RDD[Vector] + * @param k the number of Gaussians in the mixture + * @param delta change in log-likelihood at which convergence is considered achieved + */ + def train(data: RDD[Vector], k: Int, delta: Double): GaussianMixtureModel = { + new GMMExpectationMaximization().setK(k).setDelta(delta).run(data) + } + + /** + * Trains a GMM using the given parameters + * + * @param data training points stored as RDD[Vector] * @param k the number of Gaussians in the mixture */ def train(data: RDD[Vector], k: Int): GaussianMixtureModel = { @@ -127,10 +138,12 @@ class GMMExpectationMaximization private ( // C will be array of (weight, mean, covariance) tuples // we start with uniform weights, a random mean from the data, and - // identity matrices for covariance + // diagonal covariance matrices using component variances + // derived from the samples var C = (0 until k).map(i => (1.0/k, vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)), - BreezeMatrix.eye[Double](d))).toArray + init_cov(samples.slice(i * nSamples, (i + 1) * nSamples))) + ).toArray val acc_w = new Array[Accumulator[Double]](k) val acc_mu = new Array[Accumulator[DenseDoubleVector]](k) @@ -216,6 +229,19 @@ class GMMExpectationMaximization private ( v / x.length.asInstanceOf[Double] } + /** + * Construct matrix where diagonal entries are element-wise + * variance of input vectors (computes biased variance) + */ + private def init_cov(x : Array[DenseDoubleVector]) : DenseDoubleMatrix = { + val mu = vec_mean(x) + val ss = BreezeVector.zeros[Double](x(0).length) + val result = BreezeMatrix.eye[Double](ss.length) + (0 until x.length).map(i => (x(i) - mu) :^ 2.0).foreach(u => ss += u) + (0 until ss.length).foreach(i => result(i,i) = ss(i) / x.length) + result + } + /** AccumulatorParam for Dense Breeze Vectors */ private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = { From 9770261fc67a56a3ef9178f77e42d4e0deecd433 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Fri, 12 Dec 2014 12:04:15 -0500 Subject: [PATCH 07/27] Corrected a variety of style and naming issues. --- .../spark/examples/mllib/DenseGmmEM.scala | 24 +- .../GMMExpectationMaximization.scala | 283 ------------------ .../clustering/GaussianMixtureModel.scala | 10 +- .../clustering/GaussianMixtureModelEM.scala | 239 +++++++++++++++ .../GMMExpectationMaximizationSuite.scala | 9 +- 5 files changed, 266 insertions(+), 299 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index d6dd4fd381089..2e774c57d7e59 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -18,30 +18,34 @@ package org.apache.spark.examples.mllib import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.clustering.GaussianMixtureModel -import org.apache.spark.mllib.clustering.GMMExpectationMaximization +import org.apache.spark.mllib.clustering.GaussianMixtureModelEM import org.apache.spark.mllib.linalg.Vectors object DenseGmmEM { def main(args: Array[String]): Unit = { - if( args.length != 3 ) { - println("usage: DenseGmmEM ") + if (args.length != 3) { + println("usage: DenseGmmEM ") } else { run(args(0), args(1).toInt, args(2).toDouble) } } - def run(inputFile: String, k: Int, tol: Double) { + def run(inputFile: String, k: Int, convergenceTol: Double) { val conf = new SparkConf().setAppName("Spark EM Sample") val ctx = new SparkContext(conf) - val data = ctx.textFile(inputFile).map(line => - Vectors.dense(line.trim.split(' ').map(_.toDouble))).cache() + val data = ctx.textFile(inputFile).map{ line => + Vectors.dense(line.trim.split(' ').map(_.toDouble)) + }.cache() - val clusters = GMMExpectationMaximization.train(data, k, tol) + val clusters = new GaussianMixtureModelEM() + .setK(k) + .setConvergenceTol(convergenceTol) + .run(data) - for(i <- 0 until clusters.k) { - println("w=%f mu=%s sigma=\n%s\n" format (clusters.w(i), clusters.mu(i), clusters.sigma(i))) + for (i <- 0 until clusters.k) { + println("weight=%f mu=%s sigma=\n%s\n" format + (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala deleted file mode 100644 index a733bdb4e4369..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.clustering - -import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.{Transpose, det, inv} -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} -import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} -import org.apache.spark.SparkContext.DoubleAccumulatorParam - -/** - * Expectation-Maximization for multivariate Gaussian Mixture Models. - * - */ -object GMMExpectationMaximization { - /** - * Trains a GMM using the given parameters - * - * @param data training points stored as RDD[Vector] - * @param k the number of Gaussians in the mixture - * @param maxIterations the maximum number of iterations to perform - * @param delta change in log-likelihood at which convergence is considered achieved - */ - def train(data: RDD[Vector], k: Int, maxIterations: Int, delta: Double): GaussianMixtureModel = { - new GMMExpectationMaximization().setK(k) - .setMaxIterations(maxIterations) - .setDelta(delta) - .run(data) - } - - /** - * Trains a GMM using the given parameters - * - * @param data training points stored as RDD[Vector] - * @param k the number of Gaussians in the mixture - * @param maxIterations the maximum number of iterations to perform - */ - def train(data: RDD[Vector], k: Int, maxIterations: Int): GaussianMixtureModel = { - new GMMExpectationMaximization().setK(k).setMaxIterations(maxIterations).run(data) - } - - /** - * Trains a GMM using the given parameters - * - * @param data training points stored as RDD[Vector] - * @param k the number of Gaussians in the mixture - * @param delta change in log-likelihood at which convergence is considered achieved - */ - def train(data: RDD[Vector], k: Int, delta: Double): GaussianMixtureModel = { - new GMMExpectationMaximization().setK(k).setDelta(delta).run(data) - } - - /** - * Trains a GMM using the given parameters - * - * @param data training points stored as RDD[Vector] - * @param k the number of Gaussians in the mixture - */ - def train(data: RDD[Vector], k: Int): GaussianMixtureModel = { - new GMMExpectationMaximization().setK(k).run(data) - } -} - -/** - * This class performs multivariate Gaussian expectation maximization. It will - * maximize the log-likelihood for a mixture of k Gaussians, iterating until - * the log-likelihood changes by less than delta, or until it has reached - * the max number of iterations. - */ -class GMMExpectationMaximization private ( - private var k: Int, - private var delta: Double, - private var maxIterations: Int) extends Serializable { - - // Type aliases for convenience - private type DenseDoubleVector = BreezeVector[Double] - private type DenseDoubleMatrix = BreezeMatrix[Double] - - // number of samples per cluster to use when initializing Gaussians - private val nSamples = 5; - - // A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold - def this() = this(2, 0.01, 100) - - /** Set the number of Gaussians in the mixture model. Default: 2 */ - def setK(k: Int): this.type = { - this.k = k - this - } - - /** Set the maximum number of iterations to run. Default: 100 */ - def setMaxIterations(maxIterations: Int): this.type = { - this.maxIterations = maxIterations - this - } - - /** - * Set the largest change in log-likelihood at which convergence is - * considered to have occurred. - */ - def setDelta(delta: Double): this.type = { - this.delta = delta - this - } - - /** Machine precision value used to ensure matrix conditioning */ - private val eps = math.pow(2.0, -52) - - /** Perform expectation maximization */ - def run(data: RDD[Vector]): GaussianMixtureModel = { - val ctx = data.sparkContext - - // we will operate on the data as breeze data - val breezeData = data.map{ u => u.toBreeze.toDenseVector }.cache() - - // Get length of the input vectors - val d = breezeData.first.length - - // For each Gaussian, we will initialize the mean as the average - // of some random samples from the data - val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - - // C will be array of (weight, mean, covariance) tuples - // we start with uniform weights, a random mean from the data, and - // diagonal covariance matrices using component variances - // derived from the samples - var C = (0 until k).map(i => (1.0/k, - vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)), - init_cov(samples.slice(i * nSamples, (i + 1) * nSamples))) - ).toArray - - val acc_w = new Array[Accumulator[Double]](k) - val acc_mu = new Array[Accumulator[DenseDoubleVector]](k) - val acc_sigma = new Array[Accumulator[DenseDoubleMatrix]](k) - - var llh = Double.MinValue // current log-likelihood - var llhp = 0.0 // previous log-likelihood - - var i, iter = 0 - do { - // reset accumulators - for(i <- 0 until k){ - acc_w(i) = ctx.accumulator(0.0) - acc_mu(i) = ctx.accumulator( - BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) - acc_sigma(i) = ctx.accumulator( - BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) - } - - val log_likelihood = ctx.accumulator(0.0) - - // broadcast the current weights and distributions to all nodes - val dists = ctx.broadcast((0 until k).map(i => - new MultivariateGaussian(C(i)._2, C(i)._3)).toArray) - val weights = ctx.broadcast((0 until k).map(i => C(i)._1).toArray) - - // calculate partial assignments for each sample in the data - // (often referred to as the "E" step in literature) - breezeData.foreach(x => { - val p = (0 until k).map(i => - eps + weights.value(i) * dists.value(i).pdf(x)).toArray - val norm = sum(p) - - log_likelihood += math.log(norm) - - // accumulate weighted sums - val xxt = x * new Transpose(x) - for(i <- 0 until k){ - p(i) /= norm - acc_w(i) += p(i) - acc_mu(i) += x * p(i) - acc_sigma(i) += xxt * p(i) - } - }) - - // Collect the computed sums - val W = (0 until k).map(i => acc_w(i).value).toArray - val MU = (0 until k).map(i => acc_mu(i).value).toArray - val SIGMA = (0 until k).map(i => acc_sigma(i).value).toArray - - // Create new distributions based on the partial assignments - // (often referred to as the "M" step in literature) - C = (0 until k).map(i => { - val weight = W(i) / sum(W) - val mu = MU(i) / W(i) - val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) - (weight, mu, sigma) - }).toArray - - llhp = llh; // current becomes previous - llh = log_likelihood.value // this is the freshly computed log-likelihood - iter += 1 - } while(iter < maxIterations && Math.abs(llh-llhp) > delta) - - // Need to convert the breeze matrices to MLlib matrices - val weights = (0 until k).map(i => C(i)._1).toArray - val means = (0 until k).map(i => Vectors.fromBreeze(C(i)._2)).toArray - val sigmas = (0 until k).map(i => Matrices.fromBreeze(C(i)._3)).toArray - new GaussianMixtureModel(weights, means, sigmas) - } - - /** Sum the values in array of doubles */ - private def sum(x : Array[Double]) : Double = { - var s : Double = 0.0 - (0 until x.length).foreach(j => s += x(j)) - s - } - - /** Average of dense breeze vectors */ - private def vec_mean(x : Array[DenseDoubleVector]) : DenseDoubleVector = { - val v = BreezeVector.zeros[Double](x(0).length) - (0 until x.length).foreach(j => v += x(j)) - v / x.length.asInstanceOf[Double] - } - - /** - * Construct matrix where diagonal entries are element-wise - * variance of input vectors (computes biased variance) - */ - private def init_cov(x : Array[DenseDoubleVector]) : DenseDoubleMatrix = { - val mu = vec_mean(x) - val ss = BreezeVector.zeros[Double](x(0).length) - val result = BreezeMatrix.eye[Double](ss.length) - (0 until x.length).map(i => (x(i) - mu) :^ 2.0).foreach(u => ss += u) - (0 until ss.length).foreach(i => result(i,i) = ss(i) / x.length) - result - } - - /** AccumulatorParam for Dense Breeze Vectors */ - private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { - def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = { - BreezeVector.zeros[Double](initialVector.length) - } - - def addInPlace(a : DenseDoubleVector, b : DenseDoubleVector) : DenseDoubleVector = { - a += b - } - } - - /** AccumulatorParam for Dense Breeze Matrices */ - private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] { - def zero(initialVector : DenseDoubleMatrix) : DenseDoubleMatrix = { - BreezeMatrix.zeros[Double](initialVector.rows, initialVector.cols) - } - - def addInPlace(a : DenseDoubleMatrix, b : DenseDoubleMatrix) : DenseDoubleMatrix = { - a += b - } - } - - /** - * Utility class to implement the density function for multivariate Gaussian distribution. - * Breeze provides this functionality, but it requires the Apache Commons Math library, - * so this class is here so-as to not introduce a new dependency in Spark. - */ - private class MultivariateGaussian(val mu : DenseDoubleVector, val sigma : DenseDoubleMatrix) - extends Serializable { - private val sigma_inv_2 = inv(sigma) * -0.5 - private val U = math.pow(2.0*math.Pi, -mu.length/2.0) * math.pow(det(sigma), -0.5) - - def pdf(x : DenseDoubleVector) : Double = { - val delta = x - mu - val delta_t = new Transpose(delta) - U * math.exp(delta_t * sigma_inv_2 * delta) - } - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 072ed86edebe4..32469ac7e2995 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -24,12 +24,18 @@ import org.apache.spark.mllib.linalg.Vector * Multivariate Gaussian mixture model consisting of k Gaussians, where points are drawn * from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are the respective * mean and covariance for each Gaussian distribution i=1..k. + * + * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * the weight for Gaussian i, and weight.sum == 1 + * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i + * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the + * covariance matrix for Gaussian i */ class GaussianMixtureModel( - val w: Array[Double], + val weight: Array[Double], val mu: Array[Vector], val sigma: Array[Matrix]) extends Serializable { /** Number of gaussians in mixture */ - def k: Int = w.length; + def k: Int = weight.length; } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala new file mode 100644 index 0000000000000..e5568c252ed5c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.{Transpose, det, inv} + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} +import org.apache.spark.SparkContext.DoubleAccumulatorParam + +/** + * This class performs multivariate Gaussian expectation maximization. It will + * maximize the log-likelihood for a mixture of k Gaussians, iterating until + * the log-likelihood changes by less than delta, or until it has reached + * the max number of iterations. + */ +class GaussianMixtureModelEM private ( + private var k: Int, + private var convergenceTol: Double, + private var maxIterations: Int) extends Serializable { + + // Type aliases for convenience + private type DenseDoubleVector = BreezeVector[Double] + private type DenseDoubleMatrix = BreezeMatrix[Double] + + /** number of samples per cluster to use when initializing Gaussians */ + private val nSamples = 5 + + /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ + def this() = this(2, 0.01, 100) + + /** Set the number of Gaussians in the mixture model. Default: 2 */ + def setK(k: Int): this.type = { + this.k = k + this + } + + /** Return the number of Gaussians in the mixture model */ + def getK: Int = k + + /** Set the maximum number of iterations to run. Default: 100 */ + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** Return the maximum number of iterations to run */ + def getMaxIterations: Int = maxIterations + + /** + * Set the largest change in log-likelihood at which convergence is + * considered to have occurred. + */ + def setConvergenceTol(convergenceTol: Double): this.type = { + this.convergenceTol = convergenceTol + this + } + + /** Return the largest change in log-likelihood at which convergence is + * considered to have occurred. + */ + def getConvergenceTol: Double = convergenceTol + + /** Machine precision value used to ensure matrix conditioning */ + private val eps = math.pow(2.0, -52) + + /** Perform expectation maximization */ + def run(data: RDD[Vector]): GaussianMixtureModel = { + val ctx = data.sparkContext + + // we will operate on the data as breeze data + val breezeData = data.map( u => u.toBreeze.toDenseVector ).cache() + + // Get length of the input vectors + val d = breezeData.first.length + + // For each Gaussian, we will initialize the mean as the average + // of some random samples from the data + val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) + + // gaussians will be array of (weight, mean, covariance) tuples + // we start with uniform weights, a random mean from the data, and + // diagonal covariance matrices using component variances + // derived from the samples + var gaussians = (0 until k).map{ i => (1.0 / k, + vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), + initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) + }.toArray + + val accW = new Array[Accumulator[Double]](k) + val accMu = new Array[Accumulator[DenseDoubleVector]](k) + val accSigma = new Array[Accumulator[DenseDoubleMatrix]](k) + + var llh = Double.MinValue // current log-likelihood + var llhp = 0.0 // previous log-likelihood + + var iter = 0 + do { + // reset accumulators + for (i <- 0 until k) { + accW(i) = ctx.accumulator(0.0) + accMu(i) = ctx.accumulator( + BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) + accSigma(i) = ctx.accumulator( + BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) + } + + val logLikelihood = ctx.accumulator(0.0) + + // broadcast the current weights and distributions to all nodes + val dists = ctx.broadcast((0 until k).map{ i => + new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3) + }.toArray) + val weights = ctx.broadcast((0 until k).map(i => gaussians(i)._1).toArray) + + // calculate partial assignments for each sample in the data + // (often referred to as the "E" step in literature) + breezeData.foreach(x => { + val p = (0 until k).map{ i => + eps + weights.value(i) * dists.value(i).pdf(x) + }.toArray + + val pSum = p.sum + + logLikelihood += math.log(pSum) + + // accumulate weighted sums + val xxt = x * new Transpose(x) + for (i <- 0 until k) { + p(i) /= pSum + accW(i) += p(i) + accMu(i) += x * p(i) + accSigma(i) += xxt * p(i) + } + }) + + // Collect the computed sums + val W = (0 until k).map(i => accW(i).value).toArray + val MU = (0 until k).map(i => accMu(i).value).toArray + val SIGMA = (0 until k).map(i => accSigma(i).value).toArray + + // Create new distributions based on the partial assignments + // (often referred to as the "M" step in literature) + gaussians = (0 until k).map{ i => { + val weight = W(i) / W.sum + val mu = MU(i) / W(i) + val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) + (weight, mu, sigma) + } + }.toArray + + llhp = llh; // current becomes previous + llh = logLikelihood.value // this is the freshly computed log-likelihood + iter += 1 + } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) + + // Need to convert the breeze matrices to MLlib matrices + val weights = (0 until k).map(i => gaussians(i)._1).toArray + val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i)._2)).toArray + val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i)._3)).toArray + new GaussianMixtureModel(weights, means, sigmas) + } + + /** Average of dense breeze vectors */ + private def vectorMean(x: Array[DenseDoubleVector]): DenseDoubleVector = { + val v = BreezeVector.zeros[Double](x(0).length) + x.foreach(xi => v += xi) + v / x.length.asInstanceOf[Double] + } + + /** + * Construct matrix where diagonal entries are element-wise + * variance of input vectors (computes biased variance) + */ + private def initCovariance(x: Array[DenseDoubleVector]): DenseDoubleMatrix = { + val mu = vectorMean(x) + val ss = BreezeVector.zeros[Double](x(0).length) + val cov = BreezeMatrix.eye[Double](ss.length) + x.map(xi => (xi - mu) :^ 2.0).foreach(u => ss += u) + (0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length) + cov + } + + /** AccumulatorParam for Dense Breeze Vectors */ + private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { + def zero(initialVector: DenseDoubleVector): DenseDoubleVector = { + BreezeVector.zeros[Double](initialVector.length) + } + + def addInPlace(a: DenseDoubleVector, b: DenseDoubleVector): DenseDoubleVector = { + a += b + } + } + + /** AccumulatorParam for Dense Breeze Matrices */ + private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] { + def zero(initialMatrix: DenseDoubleMatrix): DenseDoubleMatrix = { + BreezeMatrix.zeros[Double](initialMatrix.rows, initialMatrix.cols) + } + + def addInPlace(a: DenseDoubleMatrix, b: DenseDoubleMatrix): DenseDoubleMatrix = { + a += b + } + } + + /** + * Utility class to implement the density function for multivariate Gaussian distribution. + * Breeze provides this functionality, but it requires the Apache Commons Math library, + * so this class is here so-as to not introduce a new dependency in Spark. + */ + private class MultivariateGaussian(val mu: DenseDoubleVector, val sigma: DenseDoubleMatrix) + extends Serializable { + private val sigmaInv2 = inv(sigma) * -0.5 + private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) + + def pdf(x: DenseDoubleVector): Double = { + val delta = x - mu + val deltaTranspose = new Transpose(delta) + U * math.exp(deltaTranspose * sigmaInv2 * delta) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index afe5e0cf03afa..1d48c471d999f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.{Vectors, Matrices} -import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContext { @@ -32,12 +32,13 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex )) // expectations - val Ew = 1.0; + val Ew = 1.0 val Emu = Vectors.dense(5.0, 10.0) val Esigma = Matrices.dense(2, 2, Array(2.0 / 3.0, -2.0 / 3.0, -2.0 / 3.0, 2.0 / 3.0)) - val gmm = GMMExpectationMaximization.train(data, 1) - assert(gmm.w(0) ~== Ew absTol 1E-5) + val gmm = new GaussianMixtureModelEM().setK(1).run(data) + + assert(gmm.weight(0) ~== Ew absTol 1E-5) assert(gmm.mu(0) ~== Emu absTol 1E-5) assert(gmm.sigma(0) ~== Esigma absTol 1E-5) } From e7d413b8e7e8614097b0c3823652677f38062775 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Fri, 12 Dec 2014 14:10:47 -0500 Subject: [PATCH 08/27] Moved multivariate Gaussian utility class to mllib/stat/impl Improved comments --- .../clustering/GaussianMixtureModel.scala | 6 +-- .../clustering/GaussianMixtureModelEM.scala | 41 ++++++++----------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 32469ac7e2995..a64b3108de0bd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -21,9 +21,9 @@ import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.Vector /** - * Multivariate Gaussian mixture model consisting of k Gaussians, where points are drawn - * from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are the respective - * mean and covariance for each Gaussian distribution i=1..k. + * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points + * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are + * the respective mean and covariance for each Gaussian distribution i=1..k. * * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is * the weight for Gaussian i, and weight.sum == 1 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index e5568c252ed5c..ccea01277c683 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -18,18 +18,30 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.{Transpose, det, inv} +import breeze.linalg.Transpose import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.stat.impl.MultivariateGaussian import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} import org.apache.spark.SparkContext.DoubleAccumulatorParam /** - * This class performs multivariate Gaussian expectation maximization. It will - * maximize the log-likelihood for a mixture of k Gaussians, iterating until - * the log-likelihood changes by less than delta, or until it has reached - * the max number of iterations. + * This class performs expectation maximization for multivariate Gaussian + * Mixture Models (GMMs). A GMM represents a composite distribution of + * independent Gaussian distributions with associated "mixing" weights + * specifying each's contribution to the composite. + * + * Given a set of sample points, this class will maximize the log-likelihood + * for a mixture of k Gaussians, iterating until the log-likelihood changes by + * less than convergenceTol, or until it has reached the max number of iterations. + * While this process is generally guaranteed to converge, it is not guaranteed + * to find a global optimum. + * + * @param k The number of independent Gaussians in the mixture model + * @param convergenceTol The maximum change in log-likelihood at which convergence + * is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform */ class GaussianMixtureModelEM private ( private var k: Int, @@ -40,7 +52,7 @@ class GaussianMixtureModelEM private ( private type DenseDoubleVector = BreezeVector[Double] private type DenseDoubleMatrix = BreezeMatrix[Double] - /** number of samples per cluster to use when initializing Gaussians */ + // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ @@ -219,21 +231,4 @@ class GaussianMixtureModelEM private ( a += b } } - - /** - * Utility class to implement the density function for multivariate Gaussian distribution. - * Breeze provides this functionality, but it requires the Apache Commons Math library, - * so this class is here so-as to not introduce a new dependency in Spark. - */ - private class MultivariateGaussian(val mu: DenseDoubleVector, val sigma: DenseDoubleMatrix) - extends Serializable { - private val sigmaInv2 = inv(sigma) * -0.5 - private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) - - def pdf(x: DenseDoubleVector): Double = { - val delta = x - mu - val deltaTranspose = new Transpose(delta) - U * math.exp(deltaTranspose * sigmaInv2 * delta) - } - } } From dc9c74264631a80dbb718a7be4c184ccae6277c0 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Fri, 12 Dec 2014 14:12:36 -0500 Subject: [PATCH 09/27] Moved MultivariateGaussian utility class --- .../stat/impl/MultivariateGaussian.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala new file mode 100644 index 0000000000000..ab046a3acea05 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.impl + +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} +import breeze.linalg.{Transpose, det, inv} + +/** + * Utility class to implement the density function for multivariate Gaussian distribution. + * Breeze provides this functionality, but it requires the Apache Commons Math library, + * so this class is here so-as to not introduce a new dependency in Spark. + */ +private[mllib] class MultivariateGaussian( + val mu: BreezeVector[Double], + val sigma: BreezeMatrix[Double]) extends Serializable { + private val sigmaInv2 = inv(sigma) * -0.5 + private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) + + def pdf(x: BreezeVector[Double]): Double = { + val delta = x - mu + val deltaTranspose = new Transpose(delta) + U * math.exp(deltaTranspose * sigmaInv2 * delta) + } +} From 97044cff25d7e7450296706669b1c1ba6cf692f0 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Mon, 15 Dec 2014 20:09:46 -0500 Subject: [PATCH 10/27] Fixed style issues --- .../spark/examples/mllib/DenseGmmEM.scala | 12 +++--- .../clustering/GaussianMixtureModelEM.scala | 42 +++++++++---------- .../GMMExpectationMaximizationSuite.scala | 8 ++-- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 2e774c57d7e59..f7301f533b1c2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -35,17 +35,17 @@ object DenseGmmEM { val ctx = new SparkContext(conf) val data = ctx.textFile(inputFile).map{ line => - Vectors.dense(line.trim.split(' ').map(_.toDouble)) - }.cache() + Vectors.dense(line.trim.split(' ').map(_.toDouble)) + }.cache val clusters = new GaussianMixtureModelEM() - .setK(k) - .setConvergenceTol(convergenceTol) - .run(data) + .setK(k) + .setConvergenceTol(convergenceTol) + .run(data) for (i <- 0 until clusters.k) { println("weight=%f mu=%s sigma=\n%s\n" format - (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) + (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index ccea01277c683..c21631b5715e1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -111,10 +111,11 @@ class GaussianMixtureModelEM private ( // we start with uniform weights, a random mean from the data, and // diagonal covariance matrices using component variances // derived from the samples - var gaussians = (0 until k).map{ i => (1.0 / k, - vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), - initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) - }.toArray + var gaussians = (0 until k).map{ i => + (1.0 / k, + vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), + initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) + }.toArray val accW = new Array[Accumulator[Double]](k) val accMu = new Array[Accumulator[DenseDoubleVector]](k) @@ -129,25 +130,23 @@ class GaussianMixtureModelEM private ( for (i <- 0 until k) { accW(i) = ctx.accumulator(0.0) accMu(i) = ctx.accumulator( - BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) + BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) accSigma(i) = ctx.accumulator( - BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) + BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) } val logLikelihood = ctx.accumulator(0.0) // broadcast the current weights and distributions to all nodes - val dists = ctx.broadcast((0 until k).map{ i => - new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3) - }.toArray) + val dists = ctx.broadcast{ + (0 until k).map(i => new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)).toArray + } val weights = ctx.broadcast((0 until k).map(i => gaussians(i)._1).toArray) // calculate partial assignments for each sample in the data // (often referred to as the "E" step in literature) - breezeData.foreach(x => { - val p = (0 until k).map{ i => - eps + weights.value(i) * dists.value(i).pdf(x) - }.toArray + breezeData.foreach{ x => + val p = (0 until k).map(i => eps + weights.value(i) * dists.value(i).pdf(x)).toArray val pSum = p.sum @@ -161,7 +160,7 @@ class GaussianMixtureModelEM private ( accMu(i) += x * p(i) accSigma(i) += xxt * p(i) } - }) + } // Collect the computed sums val W = (0 until k).map(i => accW(i).value).toArray @@ -170,15 +169,14 @@ class GaussianMixtureModelEM private ( // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) - gaussians = (0 until k).map{ i => { - val weight = W(i) / W.sum - val mu = MU(i) / W(i) - val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) - (weight, mu, sigma) - } - }.toArray + gaussians = (0 until k).map{ i => + val weight = W(i) / W.sum + val mu = MU(i) / W(i) + val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) + (weight, mu, sigma) + }.toArray - llhp = llh; // current becomes previous + llhp = llh // current becomes previous llh = logLikelihood.value // this is the freshly computed log-likelihood iter += 1 } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index 1d48c471d999f..d1f3fe34bfb09 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -26,10 +26,10 @@ import org.apache.spark.mllib.util.TestingUtils._ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContext { test("single cluster") { val data = sc.parallelize(Array( - Vectors.dense(6.0, 9.0), - Vectors.dense(5.0, 10.0), - Vectors.dense(4.0, 11.0) - )) + Vectors.dense(6.0, 9.0), + Vectors.dense(5.0, 10.0), + Vectors.dense(4.0, 11.0) + )) // expectations val Ew = 1.0 From f407b4c22257a03409f4ab4dd15ed92738f58140 Mon Sep 17 00:00:00 2001 From: FlytxtRnD Date: Tue, 16 Dec 2014 17:17:40 +0530 Subject: [PATCH 11/27] Added predict() to return the cluster labels and membership values --- .../spark/examples/mllib/DenseGmmEM.scala | 4 +++ .../clustering/GaussianMixtureModel.scala | 9 ++++++ .../clustering/GaussianMixtureModelEM.scala | 30 ++++++++++++++++++- 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index f7301f533b1c2..01b8d92aabf07 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -47,5 +47,9 @@ object DenseGmmEM { println("weight=%f mu=%s sigma=\n%s\n" format (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } + val (responsibility_matrix, cluster_labels) = clusters.predict(data) + for(x <- cluster_labels.collect()){ + print(" " + x) + } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index a64b3108de0bd..df11bbeb89ef0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.clustering +import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.Vector @@ -38,4 +39,12 @@ class GaussianMixtureModel( /** Number of gaussians in mixture */ def k: Int = weight.length; + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { + val responsibility_matrix = new GaussianMixtureModelEM() + .predictClusters(points,mu,sigma,weight,k) + val cluster_labels = responsibility_matrix.map(r => r.indexOf(r.max)) + (responsibility_matrix,cluster_labels) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index c21631b5715e1..ef610aca8a3e6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -21,7 +21,7 @@ import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} import breeze.linalg.Transpose import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} import org.apache.spark.SparkContext.DoubleAccumulatorParam @@ -208,6 +208,34 @@ class GaussianMixtureModelEM private ( cov } + /** + Given the input vectors, return the membership value of each vector + to all mixture components. + */ + def predictClusters(points:RDD[Vector],mu:Array[Vector],sigma:Array[Matrix], + weight:Array[Double],k:Int):RDD[Array[Double]]= { + val ctx = points.sparkContext + val dists = ctx.broadcast((0 until k).map(i => + new MultivariateGaussian(mu(i).toBreeze.toDenseVector,sigma(i).toBreeze.toDenseMatrix)) + .toArray) + val weights = ctx.broadcast((0 until k).map(i => weight(i)).toArray) + points.map(x=>compute_log_likelihood(x.toBreeze.toDenseVector,dists.value,weights.value,k)) + + } + /** + * Compute the log density of each vector + */ + def compute_log_likelihood(pt:DenseDoubleVector,dists:Array[MultivariateGaussian], + weights:Array[Double],k:Int):Array[Double]={ + val p = (0 until k).map(i => + eps + weights(i) * dists(i).pdf(pt)).toArray + val pSum = p.sum + for(i<- 0 until k){ + p(i) /= pSum + } + p + } + /** AccumulatorParam for Dense Breeze Vectors */ private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { def zero(initialVector: DenseDoubleVector): DenseDoubleVector = { From 2df336b6609ed4edc201ea6c26aa7b3faba4f284 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Tue, 16 Dec 2014 07:37:48 -0500 Subject: [PATCH 12/27] Fixed style issue --- .../scala/org/apache/spark/examples/mllib/DenseGmmEM.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index f7301f533b1c2..a35274b088979 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -39,9 +39,9 @@ object DenseGmmEM { }.cache val clusters = new GaussianMixtureModelEM() - .setK(k) - .setConvergenceTol(convergenceTol) - .run(data) + .setK(k) + .setConvergenceTol(convergenceTol) + .run(data) for (i <- 0 until clusters.k) { println("weight=%f mu=%s sigma=\n%s\n" format From d695034fab97c617b4c590896898235a8218dc6b Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Tue, 16 Dec 2014 08:18:58 -0500 Subject: [PATCH 13/27] Fixed style issues --- .../spark/examples/mllib/DenseGmmEM.scala | 7 ++-- .../clustering/GaussianMixtureModel.scala | 10 ++--- .../clustering/GaussianMixtureModelEM.scala | 37 ++++++++++--------- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index ddb2722835356..12be814a150b5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -47,9 +47,10 @@ object DenseGmmEM { println("weight=%f mu=%s sigma=\n%s\n" format (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } - val (responsibility_matrix, cluster_labels) = clusters.predict(data) - for(x <- cluster_labels.collect()){ - print(" " + x) + + val (responsibilityMatrix, clusterLabels) = clusters.predict(data) + for (x <- clusterLabels.collect) { + print(" " + x) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index df11bbeb89ef0..b47d6820fd17f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -42,9 +42,9 @@ class GaussianMixtureModel( /** Maps given points to their cluster indices. */ def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { - val responsibility_matrix = new GaussianMixtureModelEM() - .predictClusters(points,mu,sigma,weight,k) - val cluster_labels = responsibility_matrix.map(r => r.indexOf(r.max)) - (responsibility_matrix,cluster_labels) - } + val responsibilityMatrix = new GaussianMixtureModelEM() + .predictClusters(points,mu,sigma,weight,k) + val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) + (responsibilityMatrix, clusterLabels) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index ef610aca8a3e6..fdc0ac7881879 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -208,27 +208,30 @@ class GaussianMixtureModelEM private ( cov } - /** - Given the input vectors, return the membership value of each vector - to all mixture components. - */ - def predictClusters(points:RDD[Vector],mu:Array[Vector],sigma:Array[Matrix], - weight:Array[Double],k:Int):RDD[Array[Double]]= { + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictClusters(points:RDD[Vector], mu:Array[Vector], sigma:Array[Matrix], + weight:Array[Double],k:Int): RDD[Array[Double]] = { val ctx = points.sparkContext - val dists = ctx.broadcast((0 until k).map(i => - new MultivariateGaussian(mu(i).toBreeze.toDenseVector,sigma(i).toBreeze.toDenseMatrix)) - .toArray) + val dists = ctx.broadcast{ + (0 until k).map{ i => + new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) + }.toArray + } val weights = ctx.broadcast((0 until k).map(i => weight(i)).toArray) - points.map(x=>compute_log_likelihood(x.toBreeze.toDenseVector,dists.value,weights.value,k)) - + points.map{ x => + computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) + } } + /** - * Compute the log density of each vector - */ - def compute_log_likelihood(pt:DenseDoubleVector,dists:Array[MultivariateGaussian], - weights:Array[Double],k:Int):Array[Double]={ - val p = (0 until k).map(i => - eps + weights(i) * dists(i).pdf(pt)).toArray + * Compute the partial assignments for each vector + */ + def computeSoftAssignments(pt: DenseDoubleVector, dists: Array[MultivariateGaussian], + weights: Array[Double], k: Int): Array[Double] = { + val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(pt)).toArray val pSum = p.sum for(i<- 0 until k){ p(i) /= pSum From 9be2534d67b0ac0f43ca083c0940afa81474e458 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Tue, 16 Dec 2014 08:32:45 -0500 Subject: [PATCH 14/27] Style issue --- .../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index b47d6820fd17f..35a9024165d19 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -38,7 +38,7 @@ class GaussianMixtureModel( val sigma: Array[Matrix]) extends Serializable { /** Number of gaussians in mixture */ - def k: Int = weight.length; + def k: Int = weight.length /** Maps given points to their cluster indices. */ def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { From 8b633f33a1a9b0ebc0f0af84bc2164cbd199bcbc Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Tue, 16 Dec 2014 08:34:30 -0500 Subject: [PATCH 15/27] Style issue --- .../spark/mllib/clustering/GaussianMixtureModelEM.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index fdc0ac7881879..0907c647596da 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -212,8 +212,8 @@ class GaussianMixtureModelEM private ( * Given the input vectors, return the membership value of each vector * to all mixture components. */ - def predictClusters(points:RDD[Vector], mu:Array[Vector], sigma:Array[Matrix], - weight:Array[Double],k:Int): RDD[Array[Double]] = { + def predictClusters(points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix], + weight: Array[Double], k: Int): RDD[Array[Double]] = { val ctx = points.sparkContext val dists = ctx.broadcast{ (0 until k).map{ i => From 42b2142ed2a7440869d75b3fc926a74d9d19e30b Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 17 Dec 2014 11:15:17 -0500 Subject: [PATCH 16/27] Added functionality to allow setting of GMM starting point. Added two cluster test to testing suite. --- .../clustering/GaussianMixtureModelEM.scala | 46 ++++++++++++++++--- .../GMMExpectationMaximizationSuite.scala | 33 +++++++++++++ 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 0907c647596da..a6e6ad9ef52b2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -55,9 +55,26 @@ class GaussianMixtureModelEM private ( // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 + // an initializing GMM can be provided rather than using the + // default random starting point + private var initialGmm: Option[GaussianMixtureModel] = None + /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ def this() = this(2, 0.01, 100) + /** Set the initial GMM starting point, bypassing the random initialization */ + def setInitialGmm(gmm: GaussianMixtureModel): this.type = { + if (gmm.k == k) { + initialGmm = Some(gmm) + } else { + throw new IllegalArgumentException("initialing GMM has mismatched cluster count (gmm.k != k)") + } + this + } + + /** Return the user supplied initial GMM, if supplied */ + def getInitialiGmm: Option[GaussianMixtureModel] = initialGmm + /** Set the number of Gaussians in the mixture model. Default: 2 */ def setK(k: Int): this.type = { this.k = k @@ -103,20 +120,35 @@ class GaussianMixtureModelEM private ( // Get length of the input vectors val d = breezeData.first.length - // For each Gaussian, we will initialize the mean as the average - // of some random samples from the data - val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - - // gaussians will be array of (weight, mean, covariance) tuples + // gaussians will be array of (weight, mean, covariance) tuples. + // If the user supplied an initial GMM, we use those values, otherwise // we start with uniform weights, a random mean from the data, and // diagonal covariance matrices using component variances // derived from the samples - var gaussians = (0 until k).map{ i => + var gaussians = initialGmm match { + case Some(gmm) => (0 until k).map{ i => + (gmm.weight(i), gmm.mu(i).toBreeze.toDenseVector, gmm.sigma(i).toBreeze.toDenseMatrix) + }.toArray + + case None => { + // For each Gaussian, we will initialize the mean as the average + // of some random samples from the data + val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) + + (0 until k).map{ i => + (1.0 / k, + vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), + initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) + }.toArray + } + } + + /*var gaussians = (0 until k).map{ i => (1.0 / k, vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) }.toArray - + */ val accW = new Array[Accumulator[Double]](k) val accMu = new Array[Accumulator[DenseDoubleVector]](k) val accSigma = new Array[Accumulator[DenseDoubleMatrix]](k) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index d1f3fe34bfb09..e44db28ceb614 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -42,4 +42,37 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex assert(gmm.mu(0) ~== Emu absTol 1E-5) assert(gmm.sigma(0) ~== Esigma absTol 1E-5) } + + test("two clusters") { + val data = sc.parallelize(Array( + Vectors.dense(-5.1971), Vectors.dense(-2.5359), Vectors.dense(-3.8220), + Vectors.dense(-5.2211), Vectors.dense(-5.0602), Vectors.dense( 4.7118), + Vectors.dense( 6.8989), Vectors.dense( 3.4592), Vectors.dense( 4.6322), + Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026), + Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734) + )) + + // we set an initial gaussian to induce expected results + val initialGmm = new GaussianMixtureModel( + Array(0.5, 0.5), + Array(Vectors.dense(-1.0), Vectors.dense(1.0)), + Array(Matrices.dense(1, 1, Array(1.0)), Matrices.dense(1, 1, Array(1.0))) + ) + + val Ew = Array(1.0 / 3.0, 2.0 / 3.0) + val Emu = Array(Vectors.dense(-4.3673), Vectors.dense(5.1604)) + val Esigma = Array(Matrices.dense(1, 1, Array(1.1098)), Matrices.dense(1, 1, Array(0.86644))) + + val gmm = new GaussianMixtureModelEM() + .setK(2) + .setInitialGmm(initialGmm) + .run(data) + + assert(gmm.weight(0) ~== Ew(0) absTol 1E-3) + assert(gmm.weight(1) ~== Ew(1) absTol 1E-3) + assert(gmm.mu(0) ~== Emu(0) absTol 1E-3) + assert(gmm.mu(1) ~== Emu(1) absTol 1E-3) + assert(gmm.sigma(0) ~== Esigma(0) absTol 1E-3) + assert(gmm.sigma(1) ~== Esigma(1) absTol 1E-3) + } } From 20ebca18ee09b31d201f21658c7b464fd0f5fad7 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 17 Dec 2014 11:20:10 -0500 Subject: [PATCH 17/27] Removed unusued code --- .../spark/mllib/clustering/GaussianMixtureModelEM.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index a6e6ad9ef52b2..752615dde576b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -143,12 +143,6 @@ class GaussianMixtureModelEM private ( } } - /*var gaussians = (0 until k).map{ i => - (1.0 / k, - vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), - initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) - }.toArray - */ val accW = new Array[Accumulator[Double]](k) val accMu = new Array[Accumulator[DenseDoubleVector]](k) val accSigma = new Array[Accumulator[DenseDoubleMatrix]](k) From cff73e0de3c7dd68fe7a23db385459678a646099 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 17 Dec 2014 15:22:49 -0500 Subject: [PATCH 18/27] Replaced accumulators with RDD.aggregate --- .../clustering/GaussianMixtureModelEM.scala | 125 +++++++++--------- 1 file changed, 60 insertions(+), 65 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 752615dde576b..5ab654f609bd2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -52,6 +52,50 @@ class GaussianMixtureModelEM private ( private type DenseDoubleVector = BreezeVector[Double] private type DenseDoubleMatrix = BreezeMatrix[Double] + private type ExpectationSum = ( + Array[Double], // log-likelihood in index 0 + Array[Double], // array of weights + Array[DenseDoubleVector], // array of means + Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { + (Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { + m1._1(0) += m2._1(0) + for (i <- 0 until m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + } + m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian]) + (model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { + val k = model._2.length + val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray + val pSum = p.sum + model._1(0) += math.log(pSum) + val xxt = x * new Transpose(x) + for (i <- 0 until k) { + p(i) /= pSum + model._2(i) += p(i) + model._3(i) += x * p(i) + model._4(i) += xxt * p(i) + } + model + } + // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 @@ -115,7 +159,7 @@ class GaussianMixtureModelEM private ( val ctx = data.sparkContext // we will operate on the data as breeze data - val breezeData = data.map( u => u.toBreeze.toDenseVector ).cache() + val breezeData = data.map(u => u.toBreeze.toDenseVector).cache() // Get length of the input vectors val d = breezeData.first.length @@ -143,55 +187,28 @@ class GaussianMixtureModelEM private ( } } - val accW = new Array[Accumulator[Double]](k) - val accMu = new Array[Accumulator[DenseDoubleVector]](k) - val accSigma = new Array[Accumulator[DenseDoubleMatrix]](k) - var llh = Double.MinValue // current log-likelihood var llhp = 0.0 // previous log-likelihood var iter = 0 do { - // reset accumulators - for (i <- 0 until k) { - accW(i) = ctx.accumulator(0.0) - accMu(i) = ctx.accumulator( - BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) - accSigma(i) = ctx.accumulator( - BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) - } + // pivot gaussians into weight and distribution arrays + val weights = (0 until k).map(i => gaussians(i)._1).toArray + val dists = (0 until k).map{ i => + new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3) + }.toArray - val logLikelihood = ctx.accumulator(0.0) - - // broadcast the current weights and distributions to all nodes - val dists = ctx.broadcast{ - (0 until k).map(i => new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)).toArray - } - val weights = ctx.broadcast((0 until k).map(i => gaussians(i)._1).toArray) + // create and broadcast curried cluster contribution function + val compute = ctx.broadcast(computeExpectation(weights, dists)_) - // calculate partial assignments for each sample in the data - // (often referred to as the "E" step in literature) - breezeData.foreach{ x => - val p = (0 until k).map(i => eps + weights.value(i) * dists.value(i).pdf(x)).toArray - - val pSum = p.sum - - logLikelihood += math.log(pSum) - - // accumulate weighted sums - val xxt = x * new Transpose(x) - for (i <- 0 until k) { - p(i) /= pSum - accW(i) += p(i) - accMu(i) += x * p(i) - accSigma(i) += xxt * p(i) - } - } + // aggregate the cluster contribution for all sample points + val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) - // Collect the computed sums - val W = (0 until k).map(i => accW(i).value).toArray - val MU = (0 until k).map(i => accMu(i).value).toArray - val SIGMA = (0 until k).map(i => accSigma(i).value).toArray + // Assignments to make the code more readable + val logLikelihood = sums._1(0) + val W = sums._2 + val MU = sums._3 + val SIGMA = sums._4 // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) @@ -203,7 +220,7 @@ class GaussianMixtureModelEM private ( }.toArray llhp = llh // current becomes previous - llh = logLikelihood.value // this is the freshly computed log-likelihood + llh = logLikelihood // this is the freshly computed log-likelihood iter += 1 } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) @@ -264,26 +281,4 @@ class GaussianMixtureModelEM private ( } p } - - /** AccumulatorParam for Dense Breeze Vectors */ - private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { - def zero(initialVector: DenseDoubleVector): DenseDoubleVector = { - BreezeVector.zeros[Double](initialVector.length) - } - - def addInPlace(a: DenseDoubleVector, b: DenseDoubleVector): DenseDoubleVector = { - a += b - } - } - - /** AccumulatorParam for Dense Breeze Matrices */ - private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] { - def zero(initialMatrix: DenseDoubleMatrix): DenseDoubleMatrix = { - BreezeMatrix.zeros[Double](initialMatrix.rows, initialMatrix.cols) - } - - def addInPlace(a: DenseDoubleMatrix, b: DenseDoubleMatrix): DenseDoubleMatrix = { - a += b - } - } } From 308c8ad9d88d275f11be0d3733f016d3119f98ee Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 01:11:47 -0500 Subject: [PATCH 19/27] Numerous changes to improve code --- .../spark/examples/mllib/DenseGmmEM.scala | 11 +- .../clustering/GaussianMixtureModelEM.scala | 105 +++++++++--------- .../stat/impl/MultivariateGaussian.scala | 4 +- 3 files changed, 64 insertions(+), 56 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 12be814a150b5..d59ba49ed1ba3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -21,6 +21,13 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.GaussianMixtureModelEM import org.apache.spark.mllib.linalg.Vectors +/** + * An example Gaussian Mixture Model EM app. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ object DenseGmmEM { def main(args: Array[String]): Unit = { if (args.length != 3) { @@ -44,13 +51,15 @@ object DenseGmmEM { .run(data) for (i <- 0 until clusters.k) { - println("weight=%f mu=%s sigma=\n%s\n" format + println("weight=%f\nmu=%s\nsigma=\n%s\n" format (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } + println("Cluster labels:") val (responsibilityMatrix, clusterLabels) = clusters.predict(data) for (x <- clusterLabels.collect) { print(" " + x) } + println } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 5ab654f609bd2..5c190120904b9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -26,6 +26,8 @@ import org.apache.spark.mllib.stat.impl.MultivariateGaussian import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} import org.apache.spark.SparkContext.DoubleAccumulatorParam +import scala.collection.mutable.IndexedSeqView + /** * This class performs expectation maximization for multivariate Gaussian * Mixture Models (GMMs). A GMM represents a composite distribution of @@ -51,6 +53,7 @@ class GaussianMixtureModelEM private ( // Type aliases for convenience private type DenseDoubleVector = BreezeVector[Double] private type DenseDoubleMatrix = BreezeMatrix[Double] + private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] private type ExpectationSum = ( Array[Double], // log-likelihood in index 0 @@ -80,10 +83,12 @@ class GaussianMixtureModelEM private ( // compute cluster contributions for each input point // (U, T) => U for aggregation - private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian]) + private def computeExpectation( + weights: Array[Double], + dists: Array[MultivariateGaussian]) (model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { val k = model._2.length - val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } val pSum = p.sum model._1(0) += math.log(pSum) val xxt = x * new Transpose(x) @@ -106,7 +111,10 @@ class GaussianMixtureModelEM private ( /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ def this() = this(2, 0.01, 100) - /** Set the initial GMM starting point, bypassing the random initialization */ + /** Set the initial GMM starting point, bypassing the random initialization. + * You must call setK() prior to calling this method, and the condition + * (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException + */ def setInitialGmm(gmm: GaussianMixtureModel): this.type = { if (gmm.k == k) { initialGmm = Some(gmm) @@ -156,7 +164,7 @@ class GaussianMixtureModelEM private ( /** Perform expectation maximization */ def run(data: RDD[Vector]): GaussianMixtureModel = { - val ctx = data.sparkContext + val sc = data.sparkContext // we will operate on the data as breeze data val breezeData = data.map(u => u.toBreeze.toDenseVector).cache() @@ -164,26 +172,22 @@ class GaussianMixtureModelEM private ( // Get length of the input vectors val d = breezeData.first.length - // gaussians will be array of (weight, mean, covariance) tuples. + // Determine initial weights and corresponding Gaussians. // If the user supplied an initial GMM, we use those values, otherwise // we start with uniform weights, a random mean from the data, and // diagonal covariance matrices using component variances - // derived from the samples - var gaussians = initialGmm match { - case Some(gmm) => (0 until k).map{ i => - (gmm.weight(i), gmm.mu(i).toBreeze.toDenseVector, gmm.sigma(i).toBreeze.toDenseMatrix) - }.toArray + // derived from the samples + val (weights, gaussians) = initialGmm match { + case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map{ case(mu, sigma) => + new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix) + }.toArray) case None => { - // For each Gaussian, we will initialize the mean as the average - // of some random samples from the data val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - - (0 until k).map{ i => - (1.0 / k, - vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)), - initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples))) - }.toArray + ((0 until k).map(_ => 1.0 / k).toArray, (0 until k).map{ i => + val slice = samples.view(i * nSamples, (i + 1) * nSamples) + new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) + }.toArray) } } @@ -192,47 +196,36 @@ class GaussianMixtureModelEM private ( var iter = 0 do { - // pivot gaussians into weight and distribution arrays - val weights = (0 until k).map(i => gaussians(i)._1).toArray - val dists = (0 until k).map{ i => - new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3) - }.toArray - // create and broadcast curried cluster contribution function - val compute = ctx.broadcast(computeExpectation(weights, dists)_) + val compute = sc.broadcast(computeExpectation(weights, gaussians)_) // aggregate the cluster contribution for all sample points - val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) - - // Assignments to make the code more readable - val logLikelihood = sums._1(0) - val W = sums._2 - val MU = sums._3 - val SIGMA = sums._4 + val (logLikelihood, wSums, muSums, sigmaSums) = + breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) - gaussians = (0 until k).map{ i => - val weight = W(i) / W.sum - val mu = MU(i) / W(i) - val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) - (weight, mu, sigma) - }.toArray - + val sumWeights = wSums.sum + for (i <- 0 until k) { + val mu = muSums(i) / wSums(i) + val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu) + weights(i) = wSums(i) / sumWeights + gaussians(i) = new MultivariateGaussian(mu, sigma) + } + llhp = llh // current becomes previous - llh = logLikelihood // this is the freshly computed log-likelihood + llh = logLikelihood(0) // this is the freshly computed log-likelihood iter += 1 } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) // Need to convert the breeze matrices to MLlib matrices - val weights = (0 until k).map(i => gaussians(i)._1).toArray - val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i)._2)).toArray - val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i)._3)).toArray + val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray + val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray new GaussianMixtureModel(weights, means, sigmas) } /** Average of dense breeze vectors */ - private def vectorMean(x: Array[DenseDoubleVector]): DenseDoubleVector = { + private def vectorMean(x: VectorArrayView): DenseDoubleVector = { val v = BreezeVector.zeros[Double](x(0).length) x.foreach(xi => v += xi) v / x.length.asInstanceOf[Double] @@ -242,7 +235,7 @@ class GaussianMixtureModelEM private ( * Construct matrix where diagonal entries are element-wise * variance of input vectors (computes biased variance) */ - private def initCovariance(x: Array[DenseDoubleVector]): DenseDoubleMatrix = { + private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = { val mu = vectorMean(x) val ss = BreezeVector.zeros[Double](x(0).length) val cov = BreezeMatrix.eye[Double](ss.length) @@ -255,15 +248,18 @@ class GaussianMixtureModelEM private ( * Given the input vectors, return the membership value of each vector * to all mixture components. */ - def predictClusters(points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix], + def predictClusters( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], weight: Array[Double], k: Int): RDD[Array[Double]] = { - val ctx = points.sparkContext - val dists = ctx.broadcast{ + val sc = points.sparkContext + val dists = sc.broadcast{ (0 until k).map{ i => new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) }.toArray } - val weights = ctx.broadcast((0 until k).map(i => weight(i)).toArray) + val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) points.map{ x => computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) } @@ -272,11 +268,14 @@ class GaussianMixtureModelEM private ( /** * Compute the partial assignments for each vector */ - def computeSoftAssignments(pt: DenseDoubleVector, dists: Array[MultivariateGaussian], - weights: Array[Double], k: Int): Array[Double] = { - val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(pt)).toArray + private def computeSoftAssignments( + pt: DenseDoubleVector, + dists: Array[MultivariateGaussian], + weights: Array[Double], + k: Int): Array[Double] = { + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } val pSum = p.sum - for(i<- 0 until k){ + for (i <- 0 until k){ p(i) /= pSum } p diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala index ab046a3acea05..d14adab09177e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.stat.impl import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.{Transpose, det, inv} +import breeze.linalg.{Transpose, det, pinv} /** * Utility class to implement the density function for multivariate Gaussian distribution. @@ -28,7 +28,7 @@ import breeze.linalg.{Transpose, det, inv} private[mllib] class MultivariateGaussian( val mu: BreezeVector[Double], val sigma: BreezeMatrix[Double]) extends Serializable { - private val sigmaInv2 = inv(sigma) * -0.5 + private val sigmaInv2 = pinv(sigma) * -0.5 private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) def pdf(x: BreezeVector[Double]): Double = { From 227ad66302f0d1166b452dc70b7cd3784a6f81e3 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 08:18:18 -0500 Subject: [PATCH 20/27] Moved prediction methods into model class. --- .../clustering/GaussianMixtureModel.scala | 48 ++++++++++++++++- .../clustering/GaussianMixtureModelEM.scala | 53 +++---------------- 2 files changed, 53 insertions(+), 48 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 35a9024165d19..a38381e505258 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -17,9 +17,12 @@ package org.apache.spark.mllib.clustering +import breeze.linalg.{DenseVector => BreezeVector} + import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.stat.impl.MultivariateGaussian /** * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points @@ -42,9 +45,50 @@ class GaussianMixtureModel( /** Maps given points to their cluster indices. */ def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { - val responsibilityMatrix = new GaussianMixtureModelEM() - .predictClusters(points,mu,sigma,weight,k) + val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) (responsibilityMatrix, clusterLabels) } + + /** + * Given the input vectors, return the membership value of each vector + * to all mixture components. + */ + def predictMembership( + points: RDD[Vector], + mu: Array[Vector], + sigma: Array[Matrix], + weight: Array[Double], k: Int): RDD[Array[Double]] = { + val sc = points.sparkContext + val dists = sc.broadcast{ + (0 until k).map{ i => + new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) + }.toArray + } + val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) + points.map{ x => + computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) + } + } + + // We use "eps" as the minimum likelihood density for any given point + // in every cluster; this prevents any divide by zero conditions for + // outlier points. + private val eps = math.pow(2.0, -52) + + /** + * Compute the partial assignments for each vector + */ + private def computeSoftAssignments( + pt: BreezeVector[Double], + dists: Array[MultivariateGaussian], + weights: Array[Double], + k: Int): Array[Double] = { + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } + val pSum = p.sum + for (i <- 0 until k){ + p(i) /= pSum + } + p + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 5c190120904b9..0c317c0a618b4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -23,8 +23,6 @@ import breeze.linalg.Transpose import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian -import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} -import org.apache.spark.SparkContext.DoubleAccumulatorParam import scala.collection.mutable.IndexedSeqView @@ -86,19 +84,19 @@ class GaussianMixtureModelEM private ( private def computeExpectation( weights: Array[Double], dists: Array[MultivariateGaussian]) - (model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { - val k = model._2.length + (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { + val k = sums._2.length val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } val pSum = p.sum - model._1(0) += math.log(pSum) + sums._1(0) += math.log(pSum) val xxt = x * new Transpose(x) for (i <- 0 until k) { p(i) /= pSum - model._2(i) += p(i) - model._3(i) += x * p(i) - model._4(i) += xxt * p(i) + sums._2(i) += p(i) + sums._3(i) += x * p(i) + sums._4(i) += xxt * p(i) } - model + sums } // number of samples per cluster to use when initializing Gaussians @@ -243,41 +241,4 @@ class GaussianMixtureModelEM private ( (0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length) cov } - - /** - * Given the input vectors, return the membership value of each vector - * to all mixture components. - */ - def predictClusters( - points: RDD[Vector], - mu: Array[Vector], - sigma: Array[Matrix], - weight: Array[Double], k: Int): RDD[Array[Double]] = { - val sc = points.sparkContext - val dists = sc.broadcast{ - (0 until k).map{ i => - new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) - }.toArray - } - val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) - points.map{ x => - computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) - } - } - - /** - * Compute the partial assignments for each vector - */ - private def computeSoftAssignments( - pt: DenseDoubleVector, - dists: Array[MultivariateGaussian], - weights: Array[Double], - k: Int): Array[Double] = { - val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } - val pSum = p.sum - for (i <- 0 until k){ - p(i) /= pSum - } - p - } } From 578c2d101f6e87d19850e75c066a685124a8b3ed Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 08:32:22 -0500 Subject: [PATCH 21/27] Removed unused import --- .../apache/spark/mllib/clustering/GaussianMixtureModelEM.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 0c317c0a618b4..4644259d9fa28 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -21,7 +21,7 @@ import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} import breeze.linalg.Transpose import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} +import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian import scala.collection.mutable.IndexedSeqView From 1de73f399a28060890c8617f1abddad46b30c132 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 16:45:28 -0500 Subject: [PATCH 22/27] Removed redundant array from array creation --- .../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index a38381e505258..c02aa60dd86ca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -65,7 +65,7 @@ class GaussianMixtureModel( new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) }.toArray } - val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray) + val weights = sc.broadcast(weight) points.map{ x => computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) } From b97fe00d17c9d6665bb63f1f0bcbd968a875dfe8 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Thu, 18 Dec 2014 20:28:02 -0500 Subject: [PATCH 23/27] Minor fixes and tweaks. --- .../org/apache/spark/examples/mllib/DenseGmmEM.scala | 10 +++++----- .../mllib/clustering/GaussianMixtureModelEM.scala | 12 ++++++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index d59ba49ed1ba3..41adcbb0a3115 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -37,13 +37,13 @@ object DenseGmmEM { } } - def run(inputFile: String, k: Int, convergenceTol: Double) { + private def run(inputFile: String, k: Int, convergenceTol: Double) { val conf = new SparkConf().setAppName("Spark EM Sample") val ctx = new SparkContext(conf) val data = ctx.textFile(inputFile).map{ line => Vectors.dense(line.trim.split(' ').map(_.toDouble)) - }.cache + }.cache() val clusters = new GaussianMixtureModelEM() .setK(k) @@ -55,11 +55,11 @@ object DenseGmmEM { (clusters.weight(i), clusters.mu(i), clusters.sigma(i))) } - println("Cluster labels:") + println("Cluster labels (first <= 100):") val (responsibilityMatrix, clusterLabels) = clusters.predict(data) - for (x <- clusterLabels.collect) { + clusterLabels.take(100).foreach{ x => print(" " + x) } - println + println() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 4644259d9fa28..b0d8c7a0aafbd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -71,10 +71,12 @@ class GaussianMixtureModelEM private ( // (U, U) => U for aggregation private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { m1._1(0) += m2._1(0) - for (i <- 0 until m1._2.length) { + var i = 0 + while (i < m1._2.length) { m1._2(i) += m2._2(i) m1._3(i) += m2._3(i) m1._4(i) += m2._4(i) + i = i + 1 } m1 } @@ -90,11 +92,13 @@ class GaussianMixtureModelEM private ( val pSum = p.sum sums._1(0) += math.log(pSum) val xxt = x * new Transpose(x) - for (i <- 0 until k) { + var i = 0 + while (i < k) { p(i) /= pSum sums._2(i) += p(i) sums._3(i) += x * p(i) sums._4(i) += xxt * p(i) + i = i + 1 } sums } @@ -123,7 +127,7 @@ class GaussianMixtureModelEM private ( } /** Return the user supplied initial GMM, if supplied */ - def getInitialiGmm: Option[GaussianMixtureModel] = initialGmm + def getInitialGmm: Option[GaussianMixtureModel] = initialGmm /** Set the number of Gaussians in the mixture model. Default: 2 */ def setK(k: Int): this.type = { @@ -182,7 +186,7 @@ class GaussianMixtureModelEM private ( case None => { val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - ((0 until k).map(_ => 1.0 / k).toArray, (0 until k).map{ i => + (Array.fill[Double](k)(1.0 / k), (0 until k).map{ i => val slice = samples.view(i * nSamples, (i + 1) * nSamples) new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) }.toArray) From 9b2fc2a96f6c5f44dbb6601c5c2f89b78352bed2 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Fri, 19 Dec 2014 19:34:15 -0500 Subject: [PATCH 24/27] Style improvements Changed ExpectationSum to a private class --- .../spark/examples/mllib/DenseGmmEM.scala | 8 +- .../clustering/GaussianMixtureModel.scala | 21 +- .../clustering/GaussianMixtureModelEM.scala | 182 +++++++++--------- .../stat/impl/MultivariateGaussian.scala | 10 +- .../GMMExpectationMaximizationSuite.scala | 2 +- 5 files changed, 108 insertions(+), 115 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 41adcbb0a3115..b56c4b3bd7789 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -38,10 +38,10 @@ object DenseGmmEM { } private def run(inputFile: String, k: Int, convergenceTol: Double) { - val conf = new SparkConf().setAppName("Spark EM Sample") + val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example") val ctx = new SparkContext(conf) - val data = ctx.textFile(inputFile).map{ line => + val data = ctx.textFile(inputFile).map { line => Vectors.dense(line.trim.split(' ').map(_.toDouble)) }.cache() @@ -56,8 +56,8 @@ object DenseGmmEM { } println("Cluster labels (first <= 100):") - val (responsibilityMatrix, clusterLabels) = clusters.predict(data) - clusterLabels.take(100).foreach{ x => + val clusterLabels = clusters.predictLabels(data) + clusterLabels.take(100).foreach { x => print(" " + x) } println() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index c02aa60dd86ca..734d67ea72a26 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -20,8 +20,7 @@ package org.apache.spark.mllib.clustering import breeze.linalg.{DenseVector => BreezeVector} import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.Matrix -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.impl.MultivariateGaussian /** @@ -44,10 +43,9 @@ class GaussianMixtureModel( def k: Int = weight.length /** Maps given points to their cluster indices. */ - def predict(points: RDD[Vector]): (RDD[Array[Double]],RDD[Int]) = { - val responsibilityMatrix = predictMembership(points,mu,sigma,weight,k) - val clusterLabels = responsibilityMatrix.map(r => r.indexOf(r.max)) - (responsibilityMatrix, clusterLabels) + def predictLabels(points: RDD[Vector]): RDD[Int] = { + val responsibilityMatrix = predictMembership(points, mu, sigma, weight, k) + responsibilityMatrix.map(r => r.indexOf(r.max)) } /** @@ -58,15 +56,16 @@ class GaussianMixtureModel( points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix], - weight: Array[Double], k: Int): RDD[Array[Double]] = { + weight: Array[Double], + k: Int): RDD[Array[Double]] = { val sc = points.sparkContext - val dists = sc.broadcast{ - (0 until k).map{ i => + val dists = sc.broadcast { + (0 until k).map { i => new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix) }.toArray } val weights = sc.broadcast(weight) - points.map{ x => + points.map { x => computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k) } } @@ -86,7 +85,7 @@ class GaussianMixtureModel( k: Int): Array[Double] = { val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } val pSum = p.sum - for (i <- 0 until k){ + for (i <- 0 until k) { p(i) /= pSum } p diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index b0d8c7a0aafbd..f985f3828952b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -17,15 +17,13 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.Transpose +import scala.collection.mutable.IndexedSeq +import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix, diag, Transpose} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian -import scala.collection.mutable.IndexedSeqView - /** * This class performs expectation maximization for multivariate Gaussian * Mixture Models (GMMs). A GMM represents a composite distribution of @@ -47,87 +45,34 @@ class GaussianMixtureModelEM private ( private var k: Int, private var convergenceTol: Double, private var maxIterations: Int) extends Serializable { - - // Type aliases for convenience - private type DenseDoubleVector = BreezeVector[Double] - private type DenseDoubleMatrix = BreezeMatrix[Double] - private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]] - - private type ExpectationSum = ( - Array[Double], // log-likelihood in index 0 - Array[Double], // array of weights - Array[DenseDoubleVector], // array of means - Array[DenseDoubleMatrix]) // array of cov matrices - - // create a zero'd ExpectationSum instance - private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { - (Array(0.0), - new Array[Double](k), - (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, - (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) - } - // add two ExpectationSum objects (allowed to use modify m1) - // (U, U) => U for aggregation - private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { - m1._1(0) += m2._1(0) - var i = 0 - while (i < m1._2.length) { - m1._2(i) += m2._2(i) - m1._3(i) += m2._3(i) - m1._4(i) += m2._4(i) - i = i + 1 - } - m1 - } + /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ + def this() = this(2, 0.01, 100) + - // compute cluster contributions for each input point - // (U, T) => U for aggregation - private def computeExpectation( - weights: Array[Double], - dists: Array[MultivariateGaussian]) - (sums: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { - val k = sums._2.length - val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } - val pSum = p.sum - sums._1(0) += math.log(pSum) - val xxt = x * new Transpose(x) - var i = 0 - while (i < k) { - p(i) /= pSum - sums._2(i) += p(i) - sums._3(i) += x * p(i) - sums._4(i) += xxt * p(i) - i = i + 1 - } - sums - } // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 // an initializing GMM can be provided rather than using the // default random starting point - private var initialGmm: Option[GaussianMixtureModel] = None - - /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ - def this() = this(2, 0.01, 100) + private var initialModel: Option[GaussianMixtureModel] = None /** Set the initial GMM starting point, bypassing the random initialization. * You must call setK() prior to calling this method, and the condition - * (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException + * (model.k == this.k) must be met; failure will result in an IllegalArgumentException */ - def setInitialGmm(gmm: GaussianMixtureModel): this.type = { - if (gmm.k == k) { - initialGmm = Some(gmm) + def setInitialModel(model: GaussianMixtureModel): this.type = { + if (model.k == k) { + initialModel = Some(model) } else { - throw new IllegalArgumentException("initialing GMM has mismatched cluster count (gmm.k != k)") + throw new IllegalArgumentException("mismatched cluster count (model.k != k)") } this } /** Return the user supplied initial GMM, if supplied */ - def getInitialGmm: Option[GaussianMixtureModel] = initialGmm + def getInitialModel: Option[GaussianMixtureModel] = initialModel /** Set the number of Gaussians in the mixture model. Default: 2 */ def setK(k: Int): this.type = { @@ -161,9 +106,6 @@ class GaussianMixtureModelEM private ( */ def getConvergenceTol: Double = convergenceTol - /** Machine precision value used to ensure matrix conditioning */ - private val eps = math.pow(2.0, -52) - /** Perform expectation maximization */ def run(data: RDD[Vector]): GaussianMixtureModel = { val sc = data.sparkContext @@ -179,17 +121,17 @@ class GaussianMixtureModelEM private ( // we start with uniform weights, a random mean from the data, and // diagonal covariance matrices using component variances // derived from the samples - val (weights, gaussians) = initialGmm match { - case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map{ case(mu, sigma) => + val (weights, gaussians) = initialModel match { + case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map { case(mu, sigma) => new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix) - }.toArray) + }) case None => { val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) - (Array.fill[Double](k)(1.0 / k), (0 until k).map{ i => + (Array.fill(k)(1.0 / k), Array.tabulate(k) { i => val slice = samples.view(i * nSamples, (i + 1) * nSamples) new MultivariateGaussian(vectorMean(slice), initCovariance(slice)) - }.toArray) + }) } } @@ -197,52 +139,104 @@ class GaussianMixtureModelEM private ( var llhp = 0.0 // previous log-likelihood var iter = 0 - do { + while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) { // create and broadcast curried cluster contribution function - val compute = sc.broadcast(computeExpectation(weights, gaussians)_) + val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_) // aggregate the cluster contribution for all sample points - val (logLikelihood, wSums, muSums, sigmaSums) = - breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) + val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) - val sumWeights = wSums.sum - for (i <- 0 until k) { - val mu = muSums(i) / wSums(i) - val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu) - weights(i) = wSums(i) / sumWeights + val sumWeights = sums.weights.sum + var i = 0 + while (i < k) { + val mu = sums.means(i) / sums.weights(i) + val sigma = sums.sigmas(i) / sums.weights(i) - mu * new Transpose(mu) // TODO: Use BLAS.dsyr + weights(i) = sums.weights(i) / sumWeights gaussians(i) = new MultivariateGaussian(mu, sigma) + i = i + 1 } llhp = llh // current becomes previous - llh = logLikelihood(0) // this is the freshly computed log-likelihood + llh = sums.logLikelihood // this is the freshly computed log-likelihood iter += 1 - } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) + } // Need to convert the breeze matrices to MLlib matrices - val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray - val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray + val means = Array.tabulate(k) { i => Vectors.fromBreeze(gaussians(i).mu) } + val sigmas = Array.tabulate(k) { i => Matrices.fromBreeze(gaussians(i).sigma) } new GaussianMixtureModel(weights, means, sigmas) } /** Average of dense breeze vectors */ - private def vectorMean(x: VectorArrayView): DenseDoubleVector = { + private def vectorMean(x: IndexedSeq[BreezeVector[Double]]): BreezeVector[Double] = { val v = BreezeVector.zeros[Double](x(0).length) x.foreach(xi => v += xi) - v / x.length.asInstanceOf[Double] + v / x.length.toDouble } /** * Construct matrix where diagonal entries are element-wise * variance of input vectors (computes biased variance) */ - private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = { + private def initCovariance(x: IndexedSeq[BreezeVector[Double]]): BreezeMatrix[Double] = { val mu = vectorMean(x) val ss = BreezeVector.zeros[Double](x(0).length) - val cov = BreezeMatrix.eye[Double](ss.length) x.map(xi => (xi - mu) :^ 2.0).foreach(u => ss += u) - (0 until ss.length).foreach(i => cov(i,i) = ss(i) / x.length) - cov + diag(ss / x.length.toDouble) } } + +// companion class to provide zero constructor for ExpectationSum +private object ExpectationSum { + private val eps = math.pow(2.0, -52) + + def zero(k: Int, d: Int): ExpectationSum = { + new ExpectationSum(0.0, Array.fill(k)(0.0), + Array.fill(k)(BreezeVector.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d))) + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + def add( + weights: Array[Double], + dists: Array[MultivariateGaussian]) + (sums: ExpectationSum, x: BreezeVector[Double]): ExpectationSum = { + val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } + val pSum = p.sum + sums.logLikelihood += math.log(pSum) + val xxt = x * new Transpose(x) + var i = 0 + while (i < sums.k) { + p(i) /= pSum + sums.weights(i) += p(i) + sums.means(i) += x * p(i) + sums.sigmas(i) += xxt * p(i) // TODO: use BLAS.dsyr + i = i + 1 + } + sums + } +} + +// Aggregation class for partial expectation results +private class ExpectationSum( + var logLikelihood: Double, + val weights: Array[Double], + val means: Array[BreezeVector[Double]], + val sigmas: Array[BreezeMatrix[Double]]) extends Serializable { + + val k = weights.length + + def +=(x: ExpectationSum): ExpectationSum = { + var i = 0 + while (i < k) { + weights(i) += x.weights(i) + means(i) += x.means(i) + sigmas(i) += x.sigmas(i) + i = i + 1 + } + logLikelihood += x.logLikelihood + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala index d14adab09177e..2eab5d277827d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala @@ -17,8 +17,7 @@ package org.apache.spark.mllib.stat.impl -import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} -import breeze.linalg.{Transpose, det, pinv} +import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, Transpose, det, pinv} /** * Utility class to implement the density function for multivariate Gaussian distribution. @@ -26,12 +25,13 @@ import breeze.linalg.{Transpose, det, pinv} * so this class is here so-as to not introduce a new dependency in Spark. */ private[mllib] class MultivariateGaussian( - val mu: BreezeVector[Double], - val sigma: BreezeMatrix[Double]) extends Serializable { + val mu: DBV[Double], + val sigma: DBM[Double]) extends Serializable { private val sigmaInv2 = pinv(sigma) * -0.5 private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5) - def pdf(x: BreezeVector[Double]): Double = { + /** Returns density of this multivariate Gaussian at given point, x */ + def pdf(x: DBV[Double]): Double = { val delta = x - mu val deltaTranspose = new Transpose(delta) U * math.exp(deltaTranspose * sigmaInv2 * delta) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index e44db28ceb614..d19b23c7b1600 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -65,7 +65,7 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex val gmm = new GaussianMixtureModelEM() .setK(2) - .setInitialGmm(initialGmm) + .setInitialModel(initialGmm) .run(data) assert(gmm.weight(0) ~== Ew(0) absTol 1E-3) From acf1fba6b0084511272cf4e19e0cab4587a11d16 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Mon, 22 Dec 2014 09:26:28 -0500 Subject: [PATCH 25/27] Fixed parameter comment in GaussianMixtureModel Made maximum iterations an optional parameter to DenseGmmEM --- .../org/apache/spark/examples/mllib/DenseGmmEM.scala | 8 +++++--- .../spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index b56c4b3bd7789..e0511eaec9cb5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -30,14 +30,15 @@ import org.apache.spark.mllib.linalg.Vectors */ object DenseGmmEM { def main(args: Array[String]): Unit = { - if (args.length != 3) { + if (args.length < 3) { println("usage: DenseGmmEM ") } else { - run(args(0), args(1).toInt, args(2).toDouble) + val maxIterations = if (args.length > 3) args(3).toInt else 100 + run(args(0), args(1).toInt, args(2).toDouble, maxIterations) } } - private def run(inputFile: String, k: Int, convergenceTol: Double) { + private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) { val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example") val ctx = new SparkContext(conf) @@ -48,6 +49,7 @@ object DenseGmmEM { val clusters = new GaussianMixtureModelEM() .setK(k) .setConvergenceTol(convergenceTol) + .setMaxIterations(maxIterations) .run(data) for (i <- 0 until clusters.k) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 734d67ea72a26..0285a847bd1b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -28,7 +28,7 @@ import org.apache.spark.mllib.stat.impl.MultivariateGaussian * are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are * the respective mean and covariance for each Gaussian distribution i=1..k. * - * @param weight Weights for each Gaussian distribution in the mixture, where mu(i) is + * @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is * the weight for Gaussian i, and weight.sum == 1 * @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i * @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the From 709e4bf854984b2eb3e5ed36c8a15590edf7e891 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Mon, 22 Dec 2014 13:32:35 -0500 Subject: [PATCH 26/27] fixed usage line to include optional maxIterations parameter --- .../main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index e0511eaec9cb5..02d73b1af59bf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -31,7 +31,7 @@ import org.apache.spark.mllib.linalg.Vectors object DenseGmmEM { def main(args: Array[String]): Unit = { if (args.length < 3) { - println("usage: DenseGmmEM ") + println("usage: DenseGmmEM [maxIterations]") } else { val maxIterations = if (args.length > 3) args(3).toInt else 100 run(args(0), args(1).toInt, args(2).toDouble, maxIterations) From aaa8f25a579d9c9aa191734377b503fb73299b78 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Mon, 22 Dec 2014 15:20:47 -0500 Subject: [PATCH 27/27] MLUtils: changed privacy of EPSILON from [util] to [mllib] GaussianMixtureEM: Renamed from GaussianMixtureModelEM; corrected formatting issues GaussianMixtureModel: Renamed predictLabels() to predict() Others: Modifications based on rename of GaussianMixtureEM --- .../org/apache/spark/examples/mllib/DenseGmmEM.scala | 6 +++--- ...nMixtureModelEM.scala => GaussianMixtureEM.scala} | 11 +++++------ .../mllib/clustering/GaussianMixtureModel.scala | 12 +++++------- .../scala/org/apache/spark/mllib/util/MLUtils.scala | 2 +- .../clustering/GMMExpectationMaximizationSuite.scala | 4 ++-- 5 files changed, 16 insertions(+), 19 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/clustering/{GaussianMixtureModelEM.scala => GaussianMixtureEM.scala} (97%) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala index 02d73b1af59bf..948c350953e27 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.clustering.GaussianMixtureModelEM +import org.apache.spark.mllib.clustering.GaussianMixtureEM import org.apache.spark.mllib.linalg.Vectors /** @@ -46,7 +46,7 @@ object DenseGmmEM { Vectors.dense(line.trim.split(' ').map(_.toDouble)) }.cache() - val clusters = new GaussianMixtureModelEM() + val clusters = new GaussianMixtureEM() .setK(k) .setConvergenceTol(convergenceTol) .setMaxIterations(maxIterations) @@ -58,7 +58,7 @@ object DenseGmmEM { } println("Cluster labels (first <= 100):") - val clusterLabels = clusters.predictLabels(data) + val clusterLabels = clusters.predict(data) clusterLabels.take(100).foreach { x => print(" " + x) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureEM.scala similarity index 97% rename from mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala rename to mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureEM.scala index f985f3828952b..bdf984aee4dae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureEM.scala @@ -23,6 +23,7 @@ import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix, import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} import org.apache.spark.mllib.stat.impl.MultivariateGaussian +import org.apache.spark.mllib.util.MLUtils /** * This class performs expectation maximization for multivariate Gaussian @@ -41,7 +42,7 @@ import org.apache.spark.mllib.stat.impl.MultivariateGaussian * is considered to have occurred. * @param maxIterations The maximum number of iterations to perform */ -class GaussianMixtureModelEM private ( +class GaussianMixtureEM private ( private var k: Int, private var convergenceTol: Double, private var maxIterations: Int) extends Serializable { @@ -49,8 +50,6 @@ class GaussianMixtureModelEM private ( /** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */ def this() = this(2, 0.01, 100) - - // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 @@ -190,8 +189,6 @@ class GaussianMixtureModelEM private ( // companion class to provide zero constructor for ExpectationSum private object ExpectationSum { - private val eps = math.pow(2.0, -52) - def zero(k: Int, d: Int): ExpectationSum = { new ExpectationSum(0.0, Array.fill(k)(0.0), Array.fill(k)(BreezeVector.zeros(d)), Array.fill(k)(BreezeMatrix.zeros(d,d))) @@ -203,7 +200,9 @@ private object ExpectationSum { weights: Array[Double], dists: Array[MultivariateGaussian]) (sums: ExpectationSum, x: BreezeVector[Double]): ExpectationSum = { - val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) } + val p = weights.zip(dists).map { + case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(x) + } val pSum = p.sum sums.logLikelihood += math.log(pSum) val xxt = x * new Transpose(x) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 0285a847bd1b3..11a110db1f7ca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -22,6 +22,7 @@ import breeze.linalg.{DenseVector => BreezeVector} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.impl.MultivariateGaussian +import org.apache.spark.mllib.util.MLUtils /** * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where points @@ -43,7 +44,7 @@ class GaussianMixtureModel( def k: Int = weight.length /** Maps given points to their cluster indices. */ - def predictLabels(points: RDD[Vector]): RDD[Int] = { + def predict(points: RDD[Vector]): RDD[Int] = { val responsibilityMatrix = predictMembership(points, mu, sigma, weight, k) responsibilityMatrix.map(r => r.indexOf(r.max)) } @@ -70,11 +71,6 @@ class GaussianMixtureModel( } } - // We use "eps" as the minimum likelihood density for any given point - // in every cluster; this prevents any divide by zero conditions for - // outlier points. - private val eps = math.pow(2.0, -52) - /** * Compute the partial assignments for each vector */ @@ -83,7 +79,9 @@ class GaussianMixtureModel( dists: Array[MultivariateGaussian], weights: Array[Double], k: Int): Array[Double] = { - val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) } + val p = weights.zip(dists).map { + case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(pt) + } val pSum = p.sum for (i <- 0 until k) { p(i) /= pSum diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 9353351af72a0..06e20e6451dd9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -38,7 +38,7 @@ import org.apache.spark.streaming.dstream.DStream */ object MLUtils { - private[util] lazy val EPSILON = { + private[mllib] lazy val EPSILON = { var eps = 1.0 while ((1.0 + (eps / 2.0)) != 1.0) { eps /= 2.0 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala index d19b23c7b1600..23feb82874b70 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximizationSuite.scala @@ -36,7 +36,7 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex val Emu = Vectors.dense(5.0, 10.0) val Esigma = Matrices.dense(2, 2, Array(2.0 / 3.0, -2.0 / 3.0, -2.0 / 3.0, 2.0 / 3.0)) - val gmm = new GaussianMixtureModelEM().setK(1).run(data) + val gmm = new GaussianMixtureEM().setK(1).run(data) assert(gmm.weight(0) ~== Ew absTol 1E-5) assert(gmm.mu(0) ~== Emu absTol 1E-5) @@ -63,7 +63,7 @@ class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContex val Emu = Array(Vectors.dense(-4.3673), Vectors.dense(5.1604)) val Esigma = Array(Matrices.dense(1, 1, Array(1.1098)), Matrices.dense(1, 1, Array(0.86644))) - val gmm = new GaussianMixtureModelEM() + val gmm = new GaussianMixtureEM() .setK(2) .setInitialModel(initialGmm) .run(data)