Skip to content

Commit

Permalink
Renamed CostModel trait and added pca optimizer unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerk committed Mar 21, 2016
1 parent 91cf4ba commit b9a1013
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/nodes/learning/BlockLinearMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ object BlockLeastSquaresEstimator {
class BlockLeastSquaresEstimator(blockSize: Int, numIter: Int, lambda: Double = 0.0, numFeaturesOpt: Option[Int] = None)
extends LabelEstimator[DenseVector[Double], DenseVector[Double], DenseVector[Double]]
with WeightedNode
with SolverCostModel {
with CostModel {

override val weight = (3*numIter)+1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package nodes.learning
/**
* A trait that represents a known system performance cost model for a solver.
*/
trait SolverCostModel {
trait CostModel {
def cost(
n: Long,
d: Int,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/nodes/learning/DistributedPCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import edu.berkeley.cs.amplab.mlmatrix.{RowPartition, NormalEquations, RowPartit
* @param dims Dimensions to reduce input dataset to.
*/
class DistributedPCAEstimator(dims: Int) extends Estimator[DenseVector[Float], DenseVector[Float]]
with SolverCostModel with Logging {
with CostModel with Logging {

/**
* Adapted from the "PCA2" matlab code given in appendix B of this paper:
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/nodes/learning/LBFGS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class DenseLBFGSwithL2[T <: Vector[Double]](
val convergenceTol: Double = 1e-4,
val numIterations: Int = 100,
val regParam: Double = 0.0)
extends LabelEstimator[T, DenseVector[Double], DenseVector[Double]] with WeightedNode with SolverCostModel {
extends LabelEstimator[T, DenseVector[Double], DenseVector[Double]] with WeightedNode with CostModel {

override val weight: Int = numIterations + 1

Expand Down Expand Up @@ -215,7 +215,7 @@ class SparseLBFGSwithL2(
val sparseOverhead: Double = 8)
extends LabelEstimator[SparseVector[Double], DenseVector[Double], DenseVector[Double]]
with WeightedNode
with SolverCostModel {
with CostModel {

override val weight: Int = numIterations + 1

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/nodes/learning/LeastSquaresEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LeastSquaresEstimator[T <: Vector[Double]: ClassTag](
with WeightedNode
with Logging {

val options: Seq[(SolverCostModel, (RDD[T], RDD[DenseVector[Double]]) => Pipeline[T, DenseVector[Double]])] = Seq(
val options: Seq[(CostModel, (RDD[T], RDD[DenseVector[Double]]) => Pipeline[T, DenseVector[Double]])] = Seq(
{
val solver = new DenseLBFGSwithL2[T](new LeastSquaresDenseGradient, regParam = lambda, numIterations = 20)
(solver, solver.withData(_, _))
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/nodes/learning/LinearMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ case class LinearMapper[T <: Vector[Double]](
* @param lambda L2 Regularization parameter
*/
class LinearMapEstimator(lambda: Option[Double] = None)
extends LabelEstimator[DenseVector[Double], DenseVector[Double], DenseVector[Double]] with SolverCostModel {
extends LabelEstimator[DenseVector[Double], DenseVector[Double], DenseVector[Double]] with CostModel {

/**
* Learns a linear model (OLS) based on training features and training labels.
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/nodes/learning/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class BatchPCATransformer(pcaMat: DenseMatrix[Float]) extends Transformer[D
* @param dims Dimensions to reduce input dataset to.
*/
case class LocalColumnPCAEstimator(dims: Int) extends Estimator[DenseMatrix[Float], DenseMatrix[Float]]
with SolverCostModel {
with CostModel {

val pcaEstimator = new PCAEstimator(dims)

Expand Down Expand Up @@ -79,7 +79,7 @@ case class LocalColumnPCAEstimator(dims: Int) extends Estimator[DenseMatrix[Floa
* @param dims Dimensions to reduce input dataset to.
*/
case class DistributedColumnPCAEstimator(dims: Int) extends Estimator[DenseMatrix[Float], DenseMatrix[Float]]
with SolverCostModel {
with CostModel {

val pcaEstimator = new DistributedPCAEstimator(dims)

Expand Down Expand Up @@ -161,7 +161,7 @@ class ColumnPCAEstimator(
* @param dims Dimensions to reduce input dataset to.
*/
class PCAEstimator(dims: Int) extends Estimator[DenseVector[Float], DenseVector[Float]]
with SolverCostModel with Logging {
with CostModel with Logging {

/**
* Adapted from the "PCA2" matlab code given in appendix B of this paper:
Expand Down
57 changes: 56 additions & 1 deletion src/test/scala/nodes/learning/PCASuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.spark.SparkContext
import org.scalatest.FunSuite
import pipelines._
import utils.{TestUtils, Stats, MatrixUtils}
import workflow.WorkflowUtils

class PCASuite extends FunSuite with LocalSparkContext with Logging {

Expand Down Expand Up @@ -111,7 +112,8 @@ class PCASuite extends FunSuite with LocalSparkContext with Logging {
* B is Gaussian(0,1) \in R^{k \times d}
* E is Gaussian(0,eps) in R^{n \times d}
*
* @param n Number of rows.
*
* @param n Number of rows.
* @param d Number of columns.
* @param k Rank of factors.
* @param eps Variance of the Gaussian noise.
Expand Down Expand Up @@ -223,4 +225,57 @@ class PCASuite extends FunSuite with LocalSparkContext with Logging {

assert(Stats.aboutEq(offDiagCadm, DenseMatrix.zeros[Double](cadm.rows, cadm.rows), 0.1))
}


test("small n small d dense column pca") {
sc = new SparkContext("local", "test")

val n = 1000
val numColsPerMatrix = 10
val d = 1000
val k = 100
val numMachines = 16
val numParts = numMachines

val data = sc.parallelize(
Seq.fill(numParts)(convert(DenseMatrix.rand[Double](d, numColsPerMatrix), Float)), numParts
)
val numPerPartition = WorkflowUtils.numPerPartition(data).mapValues(x => n / (numColsPerMatrix * numParts))

val solver = new ColumnPCAEstimator(dims = k, numMachines = Some(numMachines))
val optimizedSolver = solver.optimize(data, numPerPartition).apply(data)

val instructions = WorkflowUtils.pipelineToInstructions(optimizedSolver)
val isLocalColumnPCAEstimator = instructions.exists {
case _: LocalColumnPCAEstimator => true
case _ => false
}
assert(isLocalColumnPCAEstimator, "Expected local pca estimator")
}

test("big n big d dense column pca") {
sc = new SparkContext("local", "test")

val n = 100000
val numColsPerMatrix = 10
val d = 10000
val k = 100
val numMachines = 16
val numParts = numMachines

val data = sc.parallelize(
Seq.fill(numParts)(convert(DenseMatrix.rand[Double](d, numColsPerMatrix), Float)), numParts
)
val numPerPartition = WorkflowUtils.numPerPartition(data).mapValues(x => n / (numColsPerMatrix * numParts))

val solver = new ColumnPCAEstimator(dims = k, numMachines = Some(numMachines))
val optimizedSolver = solver.optimize(data, numPerPartition).apply(data)

val instructions = WorkflowUtils.pipelineToInstructions(optimizedSolver)
val isDistributedColumnPCAEstimator = instructions.exists {
case _: DistributedColumnPCAEstimator => true
case _ => false
}
assert(isDistributedColumnPCAEstimator, "Expected distributed pca estimator")
}
}

0 comments on commit b9a1013

Please sign in to comment.