Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-4156 [MLLIB] EM algorithm for GMMs #3022

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c15405c
SPARK-4156
Oct 30, 2014
5c96c57
Merge remote-tracking branch 'upstream/master'
Nov 11, 2014
c1a8e16
Made GaussianMixtureModel class serializable
Nov 11, 2014
719d8cc
Added scala test suite with basic test
tgaloppo Nov 13, 2014
86fb382
Merge remote-tracking branch 'upstream/master'
tgaloppo Nov 17, 2014
e6ea805
Merged with master branch; update test suite with latest context chan…
tgaloppo Nov 18, 2014
676e523
Fixed to no longer ignore delta value provided on command line
tgaloppo Dec 3, 2014
8aaa17d
Added additional train() method to companion object for cluster count…
tgaloppo Dec 3, 2014
9770261
Corrected a variety of style and naming issues.
tgaloppo Dec 12, 2014
e7d413b
Moved multivariate Gaussian utility class to mllib/stat/impl
tgaloppo Dec 12, 2014
dc9c742
Moved MultivariateGaussian utility class
tgaloppo Dec 12, 2014
97044cf
Fixed style issues
tgaloppo Dec 16, 2014
f407b4c
Added predict() to return the cluster labels and membership values
FlytxtRnD Dec 16, 2014
b99ecc4
Merge pull request #1 from FlytxtRnD/predictBranch
tgaloppo Dec 16, 2014
2df336b
Fixed style issue
tgaloppo Dec 16, 2014
c3b8ce0
Merge branch 'master' of https://github.com/tgaloppo/spark
tgaloppo Dec 16, 2014
d695034
Fixed style issues
tgaloppo Dec 16, 2014
9be2534
Style issue
tgaloppo Dec 16, 2014
8b633f3
Style issue
tgaloppo Dec 16, 2014
42b2142
Added functionality to allow setting of GMM starting point.
tgaloppo Dec 17, 2014
20ebca1
Removed unusued code
tgaloppo Dec 17, 2014
cff73e0
Replaced accumulators with RDD.aggregate
tgaloppo Dec 17, 2014
308c8ad
Numerous changes to improve code
tgaloppo Dec 18, 2014
227ad66
Moved prediction methods into model class.
tgaloppo Dec 18, 2014
578c2d1
Removed unused import
tgaloppo Dec 18, 2014
1de73f3
Removed redundant array from array creation
tgaloppo Dec 18, 2014
b97fe00
Minor fixes and tweaks.
tgaloppo Dec 19, 2014
9b2fc2a
Style improvements
tgaloppo Dec 20, 2014
acf1fba
Fixed parameter comment in GaussianMixtureModel
tgaloppo Dec 22, 2014
709e4bf
fixed usage line to include optional maxIterations parameter
tgaloppo Dec 22, 2014
aaa8f25
MLUtils: changed privacy of EPSILON from [util] to [mllib]
tgaloppo Dec 22, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.GaussianMixtureModel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this import

import org.apache.spark.mllib.clustering.GMMExpectationMaximization
import org.apache.spark.mllib.linalg.Vectors

object DenseGmmEM {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation similar to other examples (e.g., DenseKMeans.scala)

def main(args: Array[String]): Unit = {
if( args.length != 3 ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: should use this spacing: "if (args.length != 3)"

println("usage: DenseGmmEM <input file> <k> <delta>")
} else {
run(args(0), args(1).toInt, args(2).toDouble)
}
}

def run(inputFile: String, k: Int, tol: Double) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"tol" --> "delta" (or whatever delta is changed to; see other comment about delta)

val conf = new SparkConf().setAppName("Spark EM Sample")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gaussian Mixture Model EM example.

val ctx = new SparkContext(conf)

val data = ctx.textFile(inputFile).map(line =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: For multi-line map() calls, use braces:

    val data = ctx.textFile(inputFile).map { line =>
        Vectors.dense(line.trim.split(' ').map(_.toDouble))
      }.cache()

This occurs in several other places in this PR. Could you please fix those too?

Vectors.dense(line.trim.split(' ').map(_.toDouble))).cache()

val clusters = GMMExpectationMaximization.train(data, k, tol)

for(i <- 0 until clusters.k) {
println("w=%f mu=%s sigma=\n%s\n" format (clusters.w(i), clusters.mu(i), clusters.sigma(i)))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix}
import breeze.linalg.{Transpose, det, inv}
import org.apache.spark.rdd.RDD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space between groups of imports

import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext}
import org.apache.spark.SparkContext.DoubleAccumulatorParam

/**
* Expectation-Maximization for multivariate Gaussian Mixture Models.
*
*/
object GMMExpectationMaximization {
/**
* Trains a GMM using the given parameters
*
* @param data training points stored as RDD[Vector]
* @param k the number of Gaussians in the mixture
* @param maxIterations the maximum number of iterations to perform
* @param delta change in log-likelihood at which convergence is considered achieved
*/
def train(data: RDD[Vector], k: Int, maxIterations: Int, delta: Double): GaussianMixtureModel = {
new GMMExpectationMaximization().setK(k)
.setMaxIterations(maxIterations)
.setDelta(delta)
.run(data)
}

/**
* Trains a GMM using the given parameters
*
* @param data training points stored as RDD[Vector]
* @param k the number of Gaussians in the mixture
* @param maxIterations the maximum number of iterations to perform
*/
def train(data: RDD[Vector], k: Int, maxIterations: Int): GaussianMixtureModel = {
new GMMExpectationMaximization().setK(k).setMaxIterations(maxIterations).run(data)
}

/**
* Trains a GMM using the given parameters
*
* @param data training points stored as RDD[Vector]
* @param k the number of Gaussians in the mixture
* @param delta change in log-likelihood at which convergence is considered achieved
*/
def train(data: RDD[Vector], k: Int, delta: Double): GaussianMixtureModel = {
new GMMExpectationMaximization().setK(k).setDelta(delta).run(data)
}

/**
* Trains a GMM using the given parameters
*
* @param data training points stored as RDD[Vector]
* @param k the number of Gaussians in the mixture
*/
def train(data: RDD[Vector], k: Int): GaussianMixtureModel = {
new GMMExpectationMaximization().setK(k).run(data)
}
}

/**
* This class performs multivariate Gaussian expectation maximization. It will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if you could add a sentence or two explaining what GMMs are.

* maximize the log-likelihood for a mixture of k Gaussians, iterating until
* the log-likelihood changes by less than delta, or until it has reached
* the max number of iterations.
*/
class GMMExpectationMaximization private (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to rename this class to "GaussianMixtureModelEM" so that its name is closer to the name of the model it produces.

private var k: Int,
private var delta: Double,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend using "convergenceTol" since that is already used elsewhere (e.g., in LBFGS).

private var maxIterations: Int) extends Serializable {

// Type aliases for convenience
private type DenseDoubleVector = BreezeVector[Double]
private type DenseDoubleMatrix = BreezeMatrix[Double]

// number of samples per cluster to use when initializing Gaussians
private val nSamples = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no semicolon


// A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "/** ... */" for comment so it is part of the generated documentation.

def this() = this(2, 0.01, 100)

/** Set the number of Gaussians in the mixture model. Default: 2 */
def setK(k: Int): this.type = {
this.k = k
this
}

/** Set the maximum number of iterations to run. Default: 100 */
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
this
}

/**
* Set the largest change in log-likelihood at which convergence is
* considered to have occurred.
*/
def setDelta(delta: Double): this.type = {
this.delta = delta
this
}

/** Machine precision value used to ensure matrix conditioning */
private val eps = math.pow(2.0, -52)

/** Perform expectation maximization */
def run(data: RDD[Vector]): GaussianMixtureModel = {
val ctx = data.sparkContext

// we will operate on the data as breeze data
val breezeData = data.map{ u => u.toBreeze.toDenseVector }.cache()

// Get length of the input vectors
val d = breezeData.first.length

// For each Gaussian, we will initialize the mean as the average
// of some random samples from the data
val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt)

// C will be array of (weight, mean, covariance) tuples
// we start with uniform weights, a random mean from the data, and
// diagonal covariance matrices using component variances
// derived from the samples
var C = (0 until k).map(i => (1.0/k,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaces around "/" operator

Also, it might be good to use a more descriptive variable name like "centers"

vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)),
init_cov(samples.slice(i * nSamples, (i + 1) * nSamples)))
).toArray

val acc_w = new Array[Accumulator[Double]](k)
val acc_mu = new Array[Accumulator[DenseDoubleVector]](k)
val acc_sigma = new Array[Accumulator[DenseDoubleMatrix]](k)

var llh = Double.MinValue // current log-likelihood
var llhp = 0.0 // previous log-likelihood

var i, iter = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i is not used (The for loops have implicit declarations of other "i" vars.)

do {
// reset accumulators
for(i <- 0 until k){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style:
for (i <- 0 until k) {

acc_w(i) = ctx.accumulator(0.0)
acc_mu(i) = ctx.accumulator(
BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam)
acc_sigma(i) = ctx.accumulator(
BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam)
}

val log_likelihood = ctx.accumulator(0.0)

// broadcast the current weights and distributions to all nodes
val dists = ctx.broadcast((0 until k).map(i =>
new MultivariateGaussian(C(i)._2, C(i)._3)).toArray)
val weights = ctx.broadcast((0 until k).map(i => C(i)._1).toArray)

// calculate partial assignments for each sample in the data
// (often referred to as the "E" step in literature)
breezeData.foreach(x => {
val p = (0 until k).map(i =>
eps + weights.value(i) * dists.value(i).pdf(x)).toArray
val norm = sum(p)

log_likelihood += math.log(norm)

// accumulate weighted sums
val xxt = x * new Transpose(x)
for(i <- 0 until k){
p(i) /= norm
acc_w(i) += p(i)
acc_mu(i) += x * p(i)
acc_sigma(i) += xxt * p(i)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra whitespace after }

})

// Collect the computed sums
val W = (0 until k).map(i => acc_w(i).value).toArray
val MU = (0 until k).map(i => acc_mu(i).value).toArray
val SIGMA = (0 until k).map(i => acc_sigma(i).value).toArray

// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
C = (0 until k).map(i => {
val weight = W(i) / sum(W)
val mu = MU(i) / W(i)
val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu)
(weight, mu, sigma)
}).toArray

llhp = llh; // current becomes previous
llh = log_likelihood.value // this is the freshly computed log-likelihood
iter += 1
} while(iter < maxIterations && Math.abs(llh-llhp) > delta)

// Need to convert the breeze matrices to MLlib matrices
val weights = (0 until k).map(i => C(i)._1).toArray
val means = (0 until k).map(i => Vectors.fromBreeze(C(i)._2)).toArray
val sigmas = (0 until k).map(i => Matrices.fromBreeze(C(i)._3)).toArray
new GaussianMixtureModel(weights, means, sigmas)
}

/** Sum the values in array of doubles */
private def sum(x : Array[Double]) : Double = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to write x.sum

var s : Double = 0.0
(0 until x.length).foreach(j => s += x(j))
s
}

/** Average of dense breeze vectors */
private def vec_mean(x : Array[DenseDoubleVector]) : DenseDoubleVector = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably write x.sum / x.length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work; the compiler can not find a suitable implicit conversion for the array of vectors when attempting x.sum

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK, good to know!

val v = BreezeVector.zeros[Double](x(0).length)
(0 until x.length).foreach(j => v += x(j))
v / x.length.asInstanceOf[Double]
}

/**
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
*/
private def init_cov(x : Array[DenseDoubleVector]) : DenseDoubleMatrix = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere, use camelCase naming convention

val mu = vec_mean(x)
val ss = BreezeVector.zeros[Double](x(0).length)
val result = BreezeMatrix.eye[Double](ss.length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"result" --> "cov" (or something more descriptive)

(0 until x.length).map(i => (x(i) - mu) :^ 2.0).foreach(u => ss += u)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you simplify this to be:

val ss = x.map(xi => (xi - mu) :^ 2.0).sum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here again, sum method on array of vectors fails to compile.

(0 until ss.length).foreach(i => result(i,i) = ss(i) / x.length)
result
}

/** AccumulatorParam for Dense Breeze Vectors */
private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] {
def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = {
BreezeVector.zeros[Double](initialVector.length)
}

def addInPlace(a : DenseDoubleVector, b : DenseDoubleVector) : DenseDoubleVector = {
a += b
}
}

/** AccumulatorParam for Dense Breeze Matrices */
private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] {
def zero(initialVector : DenseDoubleMatrix) : DenseDoubleMatrix = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space before ":" in method declaration (here and elsewhere)

Also, "initialVector" --> "initialMatrix"

BreezeMatrix.zeros[Double](initialVector.rows, initialVector.cols)
}

def addInPlace(a : DenseDoubleMatrix, b : DenseDoubleMatrix) : DenseDoubleMatrix = {
a += b
}
}

/**
* Utility class to implement the density function for multivariate Gaussian distribution.
* Breeze provides this functionality, but it requires the Apache Commons Math library,
* so this class is here so-as to not introduce a new dependency in Spark.
*/
private class MultivariateGaussian(val mu : DenseDoubleVector, val sigma : DenseDoubleMatrix)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move this class to a new folder mllib/stat/impl/ ? There are a few PRs introducing standard distributions. I think we should keep them private for now but collect them in stat/impl/. Later on, we can standardize the APIs and make them public classes.

extends Serializable {
private val sigma_inv_2 = inv(sigma) * -0.5
private val U = math.pow(2.0*math.Pi, -mu.length/2.0) * math.pow(det(sigma), -0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put spaces around operators "*" and "/"


def pdf(x : DenseDoubleVector) : Double = {
val delta = x - mu
val delta_t = new Transpose(delta)
U * math.exp(delta_t * sigma_inv_2 * delta)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.clustering

import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vector

/**
* Multivariate Gaussian mixture model consisting of k Gaussians, where points are drawn
* from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are the respective
* mean and covariance for each Gaussian distribution i=1..k.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would describe parameters using the param syntax. E.g.:

@param mu  Means for each Gaussian distribution in the mixture, where mu(i) is the mean for Gaussian i.

*/
class GaussianMixtureModel(
val w: Array[Double],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to use a more explicit parameter name than "w." Maybe "weight" or "weights?"

val mu: Array[Vector],
val sigma: Array[Matrix]) extends Serializable {

/** Number of gaussians in mixture */
def k: Int = w.length;
}
Loading