Skip to content

Commit

Permalink
Numerous changes to improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaloppo committed Dec 18, 2014
1 parent cff73e0 commit 308c8ad
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.GaussianMixtureModelEM
import org.apache.spark.mllib.linalg.Vectors

/**
* An example Gaussian Mixture Model EM app. Run with
* {{{
* ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM <input> <k> <covergenceTol>
* }}}
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object DenseGmmEM {
def main(args: Array[String]): Unit = {
if (args.length != 3) {
Expand All @@ -44,13 +51,15 @@ object DenseGmmEM {
.run(data)

for (i <- 0 until clusters.k) {
println("weight=%f mu=%s sigma=\n%s\n" format
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(clusters.weight(i), clusters.mu(i), clusters.sigma(i)))
}

println("Cluster labels:")
val (responsibilityMatrix, clusterLabels) = clusters.predict(data)
for (x <- clusterLabels.collect) {
print(" " + x)
}
println
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.mllib.stat.impl.MultivariateGaussian
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext}
import org.apache.spark.SparkContext.DoubleAccumulatorParam

import scala.collection.mutable.IndexedSeqView

/**
* This class performs expectation maximization for multivariate Gaussian
* Mixture Models (GMMs). A GMM represents a composite distribution of
Expand All @@ -51,6 +53,7 @@ class GaussianMixtureModelEM private (
// Type aliases for convenience
private type DenseDoubleVector = BreezeVector[Double]
private type DenseDoubleMatrix = BreezeMatrix[Double]
private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]]

private type ExpectationSum = (
Array[Double], // log-likelihood in index 0
Expand Down Expand Up @@ -80,10 +83,12 @@ class GaussianMixtureModelEM private (

// compute cluster contributions for each input point
// (U, T) => U for aggregation
private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian])
private def computeExpectation(
weights: Array[Double],
dists: Array[MultivariateGaussian])
(model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
val k = model._2.length
val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) }
val pSum = p.sum
model._1(0) += math.log(pSum)
val xxt = x * new Transpose(x)
Expand All @@ -106,7 +111,10 @@ class GaussianMixtureModelEM private (
/** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */
def this() = this(2, 0.01, 100)

/** Set the initial GMM starting point, bypassing the random initialization */
/** Set the initial GMM starting point, bypassing the random initialization.
* You must call setK() prior to calling this method, and the condition
* (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException
*/
def setInitialGmm(gmm: GaussianMixtureModel): this.type = {
if (gmm.k == k) {
initialGmm = Some(gmm)
Expand Down Expand Up @@ -156,34 +164,30 @@ class GaussianMixtureModelEM private (

/** Perform expectation maximization */
def run(data: RDD[Vector]): GaussianMixtureModel = {
val ctx = data.sparkContext
val sc = data.sparkContext

// we will operate on the data as breeze data
val breezeData = data.map(u => u.toBreeze.toDenseVector).cache()

// Get length of the input vectors
val d = breezeData.first.length

// gaussians will be array of (weight, mean, covariance) tuples.
// Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
// we start with uniform weights, a random mean from the data, and
// diagonal covariance matrices using component variances
// derived from the samples
var gaussians = initialGmm match {
case Some(gmm) => (0 until k).map{ i =>
(gmm.weight(i), gmm.mu(i).toBreeze.toDenseVector, gmm.sigma(i).toBreeze.toDenseMatrix)
}.toArray
// derived from the samples
val (weights, gaussians) = initialGmm match {
case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map{ case(mu, sigma) =>
new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix)
}.toArray)

case None => {
// For each Gaussian, we will initialize the mean as the average
// of some random samples from the data
val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt)

(0 until k).map{ i =>
(1.0 / k,
vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)),
initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples)))
}.toArray
((0 until k).map(_ => 1.0 / k).toArray, (0 until k).map{ i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
}.toArray)
}
}

Expand All @@ -192,47 +196,36 @@ class GaussianMixtureModelEM private (

var iter = 0
do {
// pivot gaussians into weight and distribution arrays
val weights = (0 until k).map(i => gaussians(i)._1).toArray
val dists = (0 until k).map{ i =>
new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)
}.toArray

// create and broadcast curried cluster contribution function
val compute = ctx.broadcast(computeExpectation(weights, dists)_)
val compute = sc.broadcast(computeExpectation(weights, gaussians)_)

// aggregate the cluster contribution for all sample points
val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums)

// Assignments to make the code more readable
val logLikelihood = sums._1(0)
val W = sums._2
val MU = sums._3
val SIGMA = sums._4
val (logLikelihood, wSums, muSums, sigmaSums) =
breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums)

// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
gaussians = (0 until k).map{ i =>
val weight = W(i) / W.sum
val mu = MU(i) / W(i)
val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu)
(weight, mu, sigma)
}.toArray

val sumWeights = wSums.sum
for (i <- 0 until k) {
val mu = muSums(i) / wSums(i)
val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu)
weights(i) = wSums(i) / sumWeights
gaussians(i) = new MultivariateGaussian(mu, sigma)
}

llhp = llh // current becomes previous
llh = logLikelihood // this is the freshly computed log-likelihood
llh = logLikelihood(0) // this is the freshly computed log-likelihood
iter += 1
} while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol)

// Need to convert the breeze matrices to MLlib matrices
val weights = (0 until k).map(i => gaussians(i)._1).toArray
val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i)._2)).toArray
val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i)._3)).toArray
val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray
val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray
new GaussianMixtureModel(weights, means, sigmas)
}

/** Average of dense breeze vectors */
private def vectorMean(x: Array[DenseDoubleVector]): DenseDoubleVector = {
private def vectorMean(x: VectorArrayView): DenseDoubleVector = {
val v = BreezeVector.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.asInstanceOf[Double]
Expand All @@ -242,7 +235,7 @@ class GaussianMixtureModelEM private (
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
*/
private def initCovariance(x: Array[DenseDoubleVector]): DenseDoubleMatrix = {
private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = {
val mu = vectorMean(x)
val ss = BreezeVector.zeros[Double](x(0).length)
val cov = BreezeMatrix.eye[Double](ss.length)
Expand All @@ -255,15 +248,18 @@ class GaussianMixtureModelEM private (
* Given the input vectors, return the membership value of each vector
* to all mixture components.
*/
def predictClusters(points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix],
def predictClusters(
points: RDD[Vector],
mu: Array[Vector],
sigma: Array[Matrix],
weight: Array[Double], k: Int): RDD[Array[Double]] = {
val ctx = points.sparkContext
val dists = ctx.broadcast{
val sc = points.sparkContext
val dists = sc.broadcast{
(0 until k).map{ i =>
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
}.toArray
}
val weights = ctx.broadcast((0 until k).map(i => weight(i)).toArray)
val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
points.map{ x =>
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
}
Expand All @@ -272,11 +268,14 @@ class GaussianMixtureModelEM private (
/**
* Compute the partial assignments for each vector
*/
def computeSoftAssignments(pt: DenseDoubleVector, dists: Array[MultivariateGaussian],
weights: Array[Double], k: Int): Array[Double] = {
val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(pt)).toArray
private def computeSoftAssignments(
pt: DenseDoubleVector,
dists: Array[MultivariateGaussian],
weights: Array[Double],
k: Int): Array[Double] = {
val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
val pSum = p.sum
for(i<- 0 until k){
for (i <- 0 until k){
p(i) /= pSum
}
p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.mllib.stat.impl

import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix}
import breeze.linalg.{Transpose, det, inv}
import breeze.linalg.{Transpose, det, pinv}

/**
* Utility class to implement the density function for multivariate Gaussian distribution.
Expand All @@ -28,7 +28,7 @@ import breeze.linalg.{Transpose, det, inv}
private[mllib] class MultivariateGaussian(
val mu: BreezeVector[Double],
val sigma: BreezeMatrix[Double]) extends Serializable {
private val sigmaInv2 = inv(sigma) * -0.5
private val sigmaInv2 = pinv(sigma) * -0.5
private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5)

def pdf(x: BreezeVector[Double]): Double = {
Expand Down

0 comments on commit 308c8ad

Please sign in to comment.