Skip to content

Commit

Permalink
[SPARK-5016] [MLLIB] Distribute GMM mixture components to executors
Browse files Browse the repository at this point in the history
Distribute expensive portions of computation for Gaussian mixture components (in particular, pre-computation of `MultivariateGaussian.rootSigmaInv`, the inverse covariance matrix and covariance determinant) across executors. Repost of PR#4654.

Notes for reviewers:
 * What should be the policy for when to distribute computation. Always? When numClusters > threshold? User-specified param?

TODO:
 * Performance testing and comparison for large number of clusters

Author: Feynman Liang <fliang@databricks.com>

Closes #7166 from feynmanliang/GMM_parallel_mixtures and squashes the following commits:

4f351fa [Feynman Liang] Update heuristic and scaladoc
5ea947e [Feynman Liang] Fix parallelization logic
00eb7db [Feynman Liang] Add helper method for GMM's M step, remove distributeGaussians flag
e7c8127 [Feynman Liang] Add distributeGaussians flag and tests
1da3c7f [Feynman Liang] Distribute mixtures
  • Loading branch information
Feynman Liang authored and mengxr committed Jul 8, 2015
1 parent 8c32b2e commit f472b8c
Showing 1 changed file with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class GaussianMixture private (
// Get length of the input vectors
val d = breezeData.first().length

// Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
// d > 25 except for when k is very small
val distributeGaussians = ((k - 1.0) / k) * d > 25

// Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
// we start with uniform weights, a random mean from the data, and
Expand Down Expand Up @@ -171,14 +175,25 @@ class GaussianMixture private (
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
val sumWeights = sums.weights.sum
var i = 0
while (i < k) {
val mu = sums.means(i) / sums.weights(i)
BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu),
Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
weights(i) = sums.weights(i) / sumWeights
gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i))
i = i + 1

if (distributeGaussians) {
val numPartitions = math.min(k, 1024)
val tuples =
Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i)))
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
}.collect.unzip
Array.copy(ws, 0, weights, 0, ws.length)
Array.copy(gs, 0, gaussians, 0, gs.length)
} else {
var i = 0
while (i < k) {
val (weight, gaussian) =
updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights)
weights(i) = weight
gaussians(i) = gaussian
i = i + 1
}
}

llhp = llh // current becomes previous
Expand All @@ -192,6 +207,19 @@ class GaussianMixture private (
/** Java-friendly version of [[run()]] */
def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd)

private def updateWeightsAndGaussians(
mean: BDV[Double],
sigma: BreezeMatrix[Double],
weight: Double,
sumWeights: Double): (Double, MultivariateGaussian) = {
val mu = (mean /= weight)
BLAS.syr(-weight, Vectors.fromBreeze(mu),
Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])
val newWeight = weight / sumWeights
val newGaussian = new MultivariateGaussian(mu, sigma / weight)
(newWeight, newGaussian)
}

/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)
Expand Down

0 comments on commit f472b8c

Please sign in to comment.