Skip to content

Commit

Permalink
Addressing reviewers comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
avulanov committed Jul 30, 2015
1 parent 374bea6 commit f69bb3d
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 87 deletions.
14 changes: 5 additions & 9 deletions mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala
Expand Up @@ -23,8 +23,9 @@ import com.github.fommil.netlib.BLAS.{getInstance => NativeBLAS}
/**
* In-place DGEMM and DGEMV for Breeze
*/
object BreezeUtil {
private[ann] object BreezeUtil {

// TODO: switch to MLlib BLAS interface
private def transposeString(a: BDM[Double]): String = if (a.isTranspose) "T" else "N"

/**
Expand All @@ -40,12 +41,9 @@ object BreezeUtil {
require(a.cols == b.rows, "A & B Dimension mismatch!")
require(a.rows == c.rows, "A & C Dimension mismatch!")
require(b.cols == c.cols, "A & C Dimension mismatch!")
if(a.rows == 0 || b.rows == 0 || a.cols == 0 || b.cols == 0) {
} else {
NativeBLAS.dgemm(transposeString(a), transposeString(b), c.rows, c.cols, a.cols,
alpha, a.data, a.offset, a.majorStride, b.data, b.offset, b.majorStride,
beta, c.data, c.offset, c.rows)
}
NativeBLAS.dgemm(transposeString(a), transposeString(b), c.rows, c.cols, a.cols,
alpha, a.data, a.offset, a.majorStride, b.data, b.offset, b.majorStride,
beta, c.data, c.offset, c.rows)
}

/**
Expand All @@ -57,9 +55,7 @@ object BreezeUtil {
* @param y y
*/
def dgemv(alpha: Double, a: BDM[Double], x: BDV[Double], beta: Double, y: BDV[Double]): Unit = {

require(a.cols == x.length, "A & b Dimension mismatch!")

NativeBLAS.dgemv(transposeString(a), a.rows, a.cols,
alpha, a.data, a.offset, a.majorStride, x.data, x.offset, x.stride,
beta, y.data, y.offset, y.stride)
Expand Down
98 changes: 61 additions & 37 deletions mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.ml.ann

import breeze.linalg.{*, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, axpy => brzAxpy,
sum => Bsum}
import breeze.linalg.{*, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, axpy => Baxpy,
sum => Bsum}
import breeze.numerics.{log => Blog, sigmoid => Bsigmoid}

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.optimization._
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -177,8 +178,11 @@ private[ann] object AffineLayerModel {
* @param numOut number of layer outputs
* @return matrix A and vector b
*/
def unroll(weights: Vector, position: Int,
numIn: Int, numOut: Int): (BDM[Double], BDV[Double]) = {
def unroll(
weights: Vector,
position: Int,
numIn: Int,
numOut: Int): (BDM[Double], BDV[Double]) = {
val weightsCopy = weights.toArray
// TODO: the array is not copied to BDMs, make sure this is OK!
val a = new BDM[Double](numOut, numIn, weightsCopy, position)
Expand Down Expand Up @@ -272,8 +276,11 @@ private[ann] object ActivationFunction {
}
}

def apply(x1: BDM[Double], x2: BDM[Double], y: BDM[Double],
func: (Double, Double) => Double): Unit = {
def apply(
x1: BDM[Double],
x2: BDM[Double],
y: BDM[Double],
func: (Double, Double) => Double): Unit = {
var i = 0
while (i < x1.rows) {
var j = 0
Expand All @@ -284,7 +291,6 @@ private[ann] object ActivationFunction {
i += 1
}
}

}

/**
Expand Down Expand Up @@ -320,8 +326,10 @@ private[ann] class SoftmaxFunction extends ActivationFunction {
}
}

override def crossEntropy(output: BDM[Double], target: BDM[Double],
result: BDM[Double]): Double = {
override def crossEntropy(
output: BDM[Double],
target: BDM[Double],
result: BDM[Double]): Double = {
def m(o: Double, t: Double): Double = o - t
ActivationFunction(output, target, result, m)
-Bsum( target :* Blog(output)) / output.cols
Expand All @@ -346,11 +354,13 @@ private[ann] class SigmoidFunction extends ActivationFunction {
ActivationFunction(x, y, s)
}

override def crossEntropy(output: BDM[Double], target: BDM[Double],
result: BDM[Double]): Double = {
override def crossEntropy(
output: BDM[Double],
target: BDM[Double],
result: BDM[Double]): Double = {
def m(o: Double, t: Double): Double = o - t
ActivationFunction(output, target, result, m)
-Bsum( target :* Blog(output)) / output.cols
-Bsum(target :* Blog(output)) / output.cols
}

override def derivative(x: BDM[Double], y: BDM[Double]): Unit = {
Expand Down Expand Up @@ -384,13 +394,17 @@ private[ann] class FunctionalLayer (val activationFunction: ActivationFunction)
* Functional layer model. Holds no weights.
* @param activationFunction activation function
*/
private[ann] class FunctionalLayerModel private (val activationFunction: ActivationFunction
) extends LayerModel {
private[ann] class FunctionalLayerModel private (val activationFunction: ActivationFunction)
extends LayerModel {
val size = 0

// matrices for in-place computations
// outputs
private var f: BDM[Double] = null
// delta
private var d: BDM[Double] = null
// matrix for error computation
private var e: BDM[Double] = null
// delta gradient
private lazy val dg = new Array[Double](0)

override def eval(data: BDM[Double]): BDM[Double] = {
Expand Down Expand Up @@ -487,7 +501,7 @@ private[ann] trait TopologyModel extends Serializable{
* Feed forward ANN
* @param layers
*/
class FeedForwardTopology private(val layers: Array[Layer]) extends Topology {
private[ann] class FeedForwardTopology private(val layers: Array[Layer]) extends Topology {
override def getInstance(weights: Vector): TopologyModel = FeedForwardModel(this, weights)

override def getInstance(seed: Long): TopologyModel = FeedForwardModel(this, seed)
Expand All @@ -496,7 +510,7 @@ class FeedForwardTopology private(val layers: Array[Layer]) extends Topology {
/**
* Factory for some of the frequently-used topologies
*/
object FeedForwardTopology {
private[ml] object FeedForwardTopology {
/**
* Creates a feed forward topology from the array of layers
* @param layers array of layers
Expand Down Expand Up @@ -534,19 +548,23 @@ object FeedForwardTopology {
* @param layerModels models of layers
* @param topology topology of the network
*/
private[spark] class FeedForwardModel private(val layerModels: Array[LayerModel],
val topology: FeedForwardTopology) extends TopologyModel {
private[ml] class FeedForwardModel private(
val layerModels: Array[LayerModel],
val topology: FeedForwardTopology) extends TopologyModel {
override def forward(data: BDM[Double]): Array[BDM[Double]] = {
val outputs = new Array[BDM[Double]](layerModels.length)
outputs(0) = layerModels(0).eval(data)
for(i <- 1 until layerModels.length){
for (i <- 1 until layerModels.length) {
outputs(i) = layerModels(i).eval(outputs(i-1))
}
outputs
}

override def computeGradient(data: BDM[Double], target: BDM[Double], cumGradient: Vector,
realBatchSize: Int): Double = {
override def computeGradient(
data: BDM[Double],
target: BDM[Double],
cumGradient: Vector,
realBatchSize: Int): Double = {
val outputs = forward(data)
val deltas = new Array[BDM[Double]](layerModels.length)
val L = layerModels.length - 1
Expand Down Expand Up @@ -585,12 +603,12 @@ private[spark] class FeedForwardModel private(val layerModels: Array[LayerModel]
override def weights(): Vector = {
// TODO: extract roll
var size = 0
for(i <- 0 until layerModels.length) {
for (i <- 0 until layerModels.length) {
size += layerModels(i).size
}
val array = new Array[Double](size)
var offset = 0
for(i <- 0 until layerModels.length) {
for (i <- 0 until layerModels.length) {
val layerWeights = layerModels(i).weights().toArray
System.arraycopy(layerWeights, 0, array, offset, layerWeights.length)
offset += layerWeights.length
Expand Down Expand Up @@ -620,7 +638,7 @@ private[ann] object FeedForwardModel {
val layers = topology.layers
val layerModels = new Array[LayerModel](layers.length)
var offset = 0
for(i <- 0 until layers.length){
for (i <- 0 until layers.length) {
layerModels(i) = layers(i).getInstance(weights, offset)
offset += layerModels(i).size
}
Expand Down Expand Up @@ -658,8 +676,11 @@ private[ann] class ANNGradient(topology: Topology, dataStacker: DataStacker) ext
(gradient, loss)
}

override def compute(data: Vector, label: Double, weights: Vector,
cumGradient: Vector): Double = {
override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val (input, target, realBatchSize) = dataStacker.unstack(data)
val model = topology.getInstance(weights)
model.computeGradient(input, target, cumGradient, realBatchSize)
Expand All @@ -684,12 +705,12 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int)
*/
def stack(data: RDD[(Vector, Vector)]): RDD[(Double, Vector)] = {
val stackedData = if (stackSize == 1) {
data.map(v =>
data.map { v =>
(0.0,
Vectors.fromBreeze(BDV.vertcat(
v._1.toBreeze.toDenseVector,
v._2.toBreeze.toDenseVector))
))
) }
} else {
data.mapPartitions { it =>
it.grouped(stackSize).map { seq =>
Expand Down Expand Up @@ -728,14 +749,15 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int)
*/
private[ann] class ANNUpdater extends Updater {

override def compute(weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
Baxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
(Vectors.fromBreeze(brzWeights), 0)
}
}
Expand All @@ -746,8 +768,10 @@ private[ann] class ANNUpdater extends Updater {
* @param inputSize input size
* @param outputSize output size
*/
private[ml] class FeedForwardTrainer (topology: Topology, val inputSize: Int,
val outputSize: Int) extends Serializable {
private[ml] class FeedForwardTrainer(
topology: Topology,
val inputSize: Int,
val outputSize: Int) extends Serializable {

// TODO: what if we need to pass random seed?
private var _weights = topology.getInstance(11L).weights()
Expand Down
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.DataFrame

/** Params for Multilayer Perceptron. */
private[ml] trait MultilayerPerceptronParams extends PredictorParams
with HasSeed with HasMaxIter with HasTol {
with HasSeed with HasMaxIter with HasTol {
/**
* Layer sizes including input size and output size.
* @group param
Expand All @@ -39,7 +39,7 @@ with HasSeed with HasMaxIter with HasTol {
" E.g., Array(780, 100, 10) means 780 inputs, " +
"one hidden layer with 100 neurons and output layer of 10 neurons.",
// TODO: how to check ALSO that all elements are greater than 0?
ParamValidators.lengthGt(1)
ParamValidators.arrayLengthGt(1)
)

/** @group setParam */
Expand Down Expand Up @@ -94,12 +94,12 @@ private object LabelConverter {
* Returns a vector of given length with zeroes at all positions
* and value 1.0 at the position that corresponds to the label.
*
* @param labeledPoint labeled point
* @param labeledPoint labeled point
* @param labelCount total number of labels
* @return vector encoding of a label
* @return pair of features and vector encoding of a label
*/
def apply(labeledPoint: LabeledPoint, labelCount: Int): (Vector, Vector) = {
val output = Array.fill(labelCount){0.0}
def encodeLabeledPoint(labeledPoint: LabeledPoint, labelCount: Int): (Vector, Vector) = {
val output = Array.fill(labelCount)(0.0)
output(labeledPoint.label.toInt) = 1.0
(labeledPoint.features, Vectors.dense(output))
}
Expand All @@ -108,10 +108,10 @@ private object LabelConverter {
* Converts a vector to a label.
* Returns the position of the maximal element of a vector.
*
* @param output label encoded with a vector
* @return label
* @param output label encoded with a vector
* @return label
*/
def apply(output: Vector): Double = {
def decodeLabel(output: Vector): Double = {
output.argmax.toDouble
}
}
Expand All @@ -138,14 +138,14 @@ class MultilayerPerceptronClassifier(override val uid: String)
* Developers can implement this instead of [[fit()]] to avoid dealing with schema validation
* and copying parameters into the model.
*
* @param dataset Training dataset
* @return Fitted model
* @param dataset Training dataset
* @return Fitted model
*/
override protected def train(dataset: DataFrame): MultilayerPerceptronClassifierModel = {
val labels = getLayers.last.toInt
val myLayers = $(layers)
val labels = myLayers.last
val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter(lp, labels))
val myLayers = getLayers.map(_.toInt)
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, true)
val FeedForwardTrainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)
FeedForwardTrainer.LBFGSOptimizer.setConvergenceTol(getTol).setNumIterations(getMaxIter)
Expand Down Expand Up @@ -179,7 +179,7 @@ class MultilayerPerceptronClassifierModel private[ml](
* This internal method is used to implement [[transform()]] and output [[predictionCol]].
*/
override protected def predict(features: Vector): Double = {
LabelConverter(mlpModel.predict(features))
LabelConverter.decodeLabel(mlpModel.predict(features))
}

override def copy(extra: ParamMap): MultilayerPerceptronClassifierModel = {
Expand Down
20 changes: 9 additions & 11 deletions mllib/src/main/scala/org/apache/spark/ml/param/params.scala
Expand Up @@ -167,18 +167,16 @@ object ParamValidators {
allowed.contains(value)
}

/** Private method for checking array types and converting to Array. */
private def getArray[T](value: T): Array[_] = value match {
case x: Array[_] => x
case _ =>
// The type should be checked before this is ever called.
throw new IllegalArgumentException("Array Param validation failed because" +
s" of unexpected input type: ${value.getClass}")
}

/** Check that the array length is greater than lowerBound. */
def lengthGt[T](lowerBound: Double): T => Boolean = { (value: T) =>
getArray(value).length > lowerBound
def arrayLengthGt[T](lowerBound: Double): T => Boolean = { (value: T) =>
val array: Array[_] = value match {
case x: Array[_] => x
case _ =>
// The type should be checked before this is ever called.
throw new IllegalArgumentException("Array Param validation failed because" +
s" of unexpected input type: ${value.getClass}")
}
array.length > lowerBound
}
}

Expand Down

0 comments on commit f69bb3d

Please sign in to comment.