Skip to content
Permalink
Browse files

[SPARK-14322][MLLIB] Use treeAggregate instead of reduce in OnlineLDA…

…Optimizer

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-14322

OnlineLDAOptimizer uses RDD.reduce in two places where it could use treeAggregate. This can cause scalability issues. This should be an easy fix.
This is also a bug since it modifies the first argument to reduce, so we should use aggregate or treeAggregate.
See this line: https://github.com/apache/spark/blob/f12f11e578169b47e3f8b18b299948c0670ba585/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala#L452
and a few lines below it.

## How was this patch tested?
unit tests

Author: Yuhao Yang <hhbyyh@gmail.com>

Closes #12106 from hhbyyh/ldaTreeReduce.
  • Loading branch information
hhbyyh authored and jkbradley committed Apr 6, 2016
1 parent db0b06c commit 8cffcb60deb82d04a5c6e144ec9927f6f7addc8b
Showing with 3 additions and 2 deletions.
  1. +3 −2 mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -451,10 +451,11 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
}
Iterator((stat, gammaPart))
}
val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
_ += _, _ += _)
expElogbetaBc.unpersist()
val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)
stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
val batchResult = statsSum :* expElogbeta.t

// Note that this is an optimization to avoid batch.count

0 comments on commit 8cffcb6

Please sign in to comment.
You can’t perform that action at this time.