Skip to content

Commit

Permalink
Replaced accumulators with RDD.aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaloppo committed Dec 17, 2014
1 parent 20ebca1 commit cff73e0
Showing 1 changed file with 60 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,50 @@ class GaussianMixtureModelEM private (
private type DenseDoubleVector = BreezeVector[Double]
private type DenseDoubleMatrix = BreezeMatrix[Double]

private type ExpectationSum = (
Array[Double], // log-likelihood in index 0
Array[Double], // array of weights
Array[DenseDoubleVector], // array of means
Array[DenseDoubleMatrix]) // array of cov matrices

// create a zero'd ExpectationSum instance
private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = {
(Array(0.0),
new Array[Double](k),
(0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray,
(0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray)
}

// add two ExpectationSum objects (allowed to use modify m1)
// (U, U) => U for aggregation
private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = {
m1._1(0) += m2._1(0)
for (i <- 0 until m1._2.length) {
m1._2(i) += m2._2(i)
m1._3(i) += m2._3(i)
m1._4(i) += m2._4(i)
}
m1
}

// compute cluster contributions for each input point
// (U, T) => U for aggregation
private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian])
(model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
val k = model._2.length
val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray
val pSum = p.sum
model._1(0) += math.log(pSum)
val xxt = x * new Transpose(x)
for (i <- 0 until k) {
p(i) /= pSum
model._2(i) += p(i)
model._3(i) += x * p(i)
model._4(i) += xxt * p(i)
}
model
}

// number of samples per cluster to use when initializing Gaussians
private val nSamples = 5

Expand Down Expand Up @@ -115,7 +159,7 @@ class GaussianMixtureModelEM private (
val ctx = data.sparkContext

// we will operate on the data as breeze data
val breezeData = data.map( u => u.toBreeze.toDenseVector ).cache()
val breezeData = data.map(u => u.toBreeze.toDenseVector).cache()

// Get length of the input vectors
val d = breezeData.first.length
Expand Down Expand Up @@ -143,55 +187,28 @@ class GaussianMixtureModelEM private (
}
}

val accW = new Array[Accumulator[Double]](k)
val accMu = new Array[Accumulator[DenseDoubleVector]](k)
val accSigma = new Array[Accumulator[DenseDoubleMatrix]](k)

var llh = Double.MinValue // current log-likelihood
var llhp = 0.0 // previous log-likelihood

var iter = 0
do {
// reset accumulators
for (i <- 0 until k) {
accW(i) = ctx.accumulator(0.0)
accMu(i) = ctx.accumulator(
BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam)
accSigma(i) = ctx.accumulator(
BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam)
}
// pivot gaussians into weight and distribution arrays
val weights = (0 until k).map(i => gaussians(i)._1).toArray
val dists = (0 until k).map{ i =>
new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)
}.toArray

val logLikelihood = ctx.accumulator(0.0)

// broadcast the current weights and distributions to all nodes
val dists = ctx.broadcast{
(0 until k).map(i => new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)).toArray
}
val weights = ctx.broadcast((0 until k).map(i => gaussians(i)._1).toArray)
// create and broadcast curried cluster contribution function
val compute = ctx.broadcast(computeExpectation(weights, dists)_)

// calculate partial assignments for each sample in the data
// (often referred to as the "E" step in literature)
breezeData.foreach{ x =>
val p = (0 until k).map(i => eps + weights.value(i) * dists.value(i).pdf(x)).toArray

val pSum = p.sum

logLikelihood += math.log(pSum)

// accumulate weighted sums
val xxt = x * new Transpose(x)
for (i <- 0 until k) {
p(i) /= pSum
accW(i) += p(i)
accMu(i) += x * p(i)
accSigma(i) += xxt * p(i)
}
}
// aggregate the cluster contribution for all sample points
val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums)

// Collect the computed sums
val W = (0 until k).map(i => accW(i).value).toArray
val MU = (0 until k).map(i => accMu(i).value).toArray
val SIGMA = (0 until k).map(i => accSigma(i).value).toArray
// Assignments to make the code more readable
val logLikelihood = sums._1(0)
val W = sums._2
val MU = sums._3
val SIGMA = sums._4

// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
Expand All @@ -203,7 +220,7 @@ class GaussianMixtureModelEM private (
}.toArray

llhp = llh // current becomes previous
llh = logLikelihood.value // this is the freshly computed log-likelihood
llh = logLikelihood // this is the freshly computed log-likelihood
iter += 1
} while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol)

Expand Down Expand Up @@ -264,26 +281,4 @@ class GaussianMixtureModelEM private (
}
p
}

/** AccumulatorParam for Dense Breeze Vectors */
private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] {
def zero(initialVector: DenseDoubleVector): DenseDoubleVector = {
BreezeVector.zeros[Double](initialVector.length)
}

def addInPlace(a: DenseDoubleVector, b: DenseDoubleVector): DenseDoubleVector = {
a += b
}
}

/** AccumulatorParam for Dense Breeze Matrices */
private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] {
def zero(initialMatrix: DenseDoubleMatrix): DenseDoubleMatrix = {
BreezeMatrix.zeros[Double](initialMatrix.rows, initialMatrix.cols)
}

def addInPlace(a: DenseDoubleMatrix, b: DenseDoubleMatrix): DenseDoubleMatrix = {
a += b
}
}
}

0 comments on commit cff73e0

Please sign in to comment.