Skip to content

Commit

Permalink
Merged with master branch; update test suite with latest context chan…
Browse files Browse the repository at this point in the history
…ges.

Improved cluster initialization strategy.
  • Loading branch information
tgaloppo committed Nov 18, 2014
1 parent 86fb382 commit e6ea805
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class GMMExpectationMaximization private (
private type DenseDoubleVector = BreezeVector[Double]
private type DenseDoubleMatrix = BreezeMatrix[Double]

// number of samples per cluster to use when initializing Gaussians
private val nSamples = 5;

// A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold
def this() = this(2, 0.01, 100)

Expand Down Expand Up @@ -118,15 +121,15 @@ class GMMExpectationMaximization private (
// Get length of the input vectors
val d = breezeData.first.length

// For each Gaussian, we will initialize the mean as some random
// point from the data. (This could be improved)
val samples = breezeData.takeSample(true, k, scala.util.Random.nextInt)
// For each Gaussian, we will initialize the mean as the average
// of some random samples from the data
val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt)

// C will be array of (weight, mean, covariance) tuples
// we start with uniform weights, a random mean from the data, and
// identity matrices for covariance
var C = (0 until k).map(i => (1.0/k,
samples(i),
vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)),
BreezeMatrix.eye[Double](d))).toArray

val acc_w = new Array[Accumulator[Double]](k)
Expand All @@ -148,7 +151,7 @@ class GMMExpectationMaximization private (
}

val log_likelihood = ctx.accumulator(0.0)

// broadcast the current weights and distributions to all nodes
val dists = ctx.broadcast((0 until k).map(i =>
new MultivariateGaussian(C(i)._2, C(i)._3)).toArray)
Expand All @@ -164,11 +167,12 @@ class GMMExpectationMaximization private (
log_likelihood += math.log(norm)

// accumulate weighted sums
val xxt = x * new Transpose(x)
for(i <- 0 until k){
p(i) /= norm
acc_w(i) += p(i)
acc_mu(i) += x * p(i)
acc_sigma(i) += x * new Transpose(x) * p(i)
acc_sigma(i) += xxt * p(i)
}
})

Expand Down Expand Up @@ -205,6 +209,13 @@ class GMMExpectationMaximization private (
s
}

/** Average of dense breeze vectors */
private def vec_mean(x : Array[DenseDoubleVector]) : DenseDoubleVector = {
val v = BreezeVector.zeros[Double](x(0).length)
(0 until x.length).foreach(j => v += x(j))
v / x.length.asInstanceOf[Double]
}

/** AccumulatorParam for Dense Breeze Vectors */
private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] {
def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.spark.mllib.clustering
import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{Vectors, Matrices}
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.TestingUtils._

class GMMExpectationMaximizationSuite extends FunSuite with LocalSparkContext {
class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContext {
test("single cluster") {
val data = sc.parallelize(Array(
Vectors.dense(6.0, 9.0),
Expand Down

0 comments on commit e6ea805

Please sign in to comment.