Skip to content

Commit

Permalink
[SPARK-16697][ML][MLLIB] improve LDA submitMiniBatch method to avoid …
Browse files Browse the repository at this point in the history
…redundant RDD computation

## What changes were proposed in this pull request?

In `LDAOptimizer.submitMiniBatch`, do persist on `stats: RDD[(BDM[Double], List[BDV[Double]])]`
and also move the place of unpersisting `expElogbetaBc` broadcast variable,
to avoid the `expElogbetaBc` broadcast variable to be unpersisted too early,
and update previous `expElogbetaBc.unpersist()` into `expElogbetaBc.destroy(false)`

## How was this patch tested?

Existing test.

Author: WeichenXu <WeichenXu123@outlook.com>

Closes #14335 from WeichenXu123/improve_LDA.
  • Loading branch information
WeichenXu123 authored and srowen committed Jul 26, 2016
1 parent 3b2b785 commit 4c96955
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.graphx._
import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
gammaPart = gammad :: gammaPart
}
Iterator((stat, gammaPart))
}
}.persist(StorageLevel.MEMORY_AND_DISK)
val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
_ += _, _ += _)
expElogbetaBc.unpersist()
val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
stats.unpersist()
expElogbetaBc.destroy(false)
val batchResult = statsSum :* expElogbeta.t

// Note that this is an optimization to avoid batch.count
Expand Down

0 comments on commit 4c96955

Please sign in to comment.