Skip to content

Commit

Permalink
Merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Ganelin committed Jul 9, 2015
2 parents 5e5d5e7 + 74d8d3d commit 8f6e327
Show file tree
Hide file tree
Showing 46 changed files with 6,579 additions and 1,229 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
4 changes: 2 additions & 2 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,8 +1153,8 @@ def ssh(host, opts, command):
# If this was an ssh failure, provide the user with hints.
if e.returncode == 255:
raise UsageError(
"Failed to SSH to remote host {0}.\n" +
"Please check that you have provided the correct --identity-file and " +
"Failed to SSH to remote host {0}.\n"
"Please check that you have provided the correct --identity-file and "
"--key-pair parameters and try again.".format(host))
else:
raise e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml.classification

import scala.collection.mutable

import breeze.linalg.{DenseVector => BDV, norm => brzNorm}
import breeze.linalg.{DenseVector => BDV}
import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}

import org.apache.spark.{Logging, SparkException}
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasThreshold
with HasThreshold with HasStandardization

/**
* :: Experimental ::
Expand Down Expand Up @@ -98,6 +98,18 @@ class LogisticRegression(override val uid: String)
def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
setDefault(fitIntercept -> true)

/**
* Whether to standardize the training features before fitting the model.
* The coefficients of models will be always returned on the original scale,
* so it will be transparent for users. Note that when no regularization,
* with or without standardization, the models should be always converged to
* the same solution.
* Default is true.
* @group setParam
* */
def setStandardization(value: Boolean): this.type = set(standardization, value)
setDefault(standardization -> true)

/** @group setParam */
def setThreshold(value: Double): this.type = set(threshold, value)
setDefault(threshold -> 0.5)
Expand Down Expand Up @@ -149,15 +161,28 @@ class LogisticRegression(override val uid: String)
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), $(standardization),
featuresStd, featuresMean, regParamL2)

val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
} else {
// Remove the L1 penalization on the intercept
def regParamL1Fun = (index: Int) => {
if (index == numFeatures) 0.0 else regParamL1
// Remove the L1 penalization on the intercept
if (index == numFeatures) {
0.0
} else {
if ($(standardization)) {
regParamL1
} else {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
if (featuresStd(index) != 0.0) regParamL1 / featuresStd(index) else 0.0
}
}
}
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
}
Expand Down Expand Up @@ -523,11 +548,13 @@ private class LogisticCostFun(
data: RDD[(Double, Vector)],
numClasses: Int,
fitIntercept: Boolean,
standardization: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
regParamL2: Double) extends DiffFunction[BDV[Double]] {

override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
val numFeatures = featuresStd.length
val w = Vectors.fromBreeze(weights)

val logisticAggregator = data.treeAggregate(new LogisticAggregator(w, numClasses, fitIntercept,
Expand All @@ -539,27 +566,43 @@ private class LogisticCostFun(
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

// regVal is the sum of weight squares for L2 regularization
val norm = if (regParamL2 == 0.0) {
0.0
} else if (fitIntercept) {
brzNorm(Vectors.dense(weights.toArray.slice(0, weights.size -1)).toBreeze, 2.0)
} else {
brzNorm(weights, 2.0)
}
val regVal = 0.5 * regParamL2 * norm * norm
val totalGradientArray = logisticAggregator.gradient.toArray

val loss = logisticAggregator.loss + regVal
val gradient = logisticAggregator.gradient

if (fitIntercept) {
val wArray = w.toArray.clone()
wArray(wArray.length - 1) = 0.0
axpy(regParamL2, Vectors.dense(wArray), gradient)
// regVal is the sum of weight squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
axpy(regParamL2, w, gradient)
var sum = 0.0
w.foreachActive { (index, value) =>
// If `fitIntercept` is true, the last term which is intercept doesn't
// contribute to the regularization.
if (index != numFeatures) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
totalGradientArray(index) += regParamL2 * value
value * value
} else {
if (featuresStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(index) * featuresStd(index))
totalGradientArray(index) += regParamL2 * temp
value * temp
} else {
0.0
}
}
}
}
}
0.5 * regParamL2 * sum
}

(loss, gradient.toBreeze.asInstanceOf[BDV[Double]])
(logisticAggregator.loss + regVal, new BDV(totalGradientArray))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ private[shared] object SharedParamsCodeGen {
isValid = "ParamValidators.gtEq(1)"),
ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", Some("true")),
ParamDesc[Boolean]("standardization", "whether to standardize the training features" +
" prior to fitting the model sequence. Note that the coefficients of models are" +
" always returned on the original scale.", Some("true")),
" before fitting the model.", Some("true")),
ParamDesc[Long]("seed", "random seed", Some("this.getClass.getName.hashCode.toLong")),
ParamDesc[Double]("elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]." +
" For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ private[ml] trait HasFitIntercept extends Params {
private[ml] trait HasStandardization extends Params {

/**
* Param for whether to standardize the training features prior to fitting the model sequence. Note that the coefficients of models are always returned on the original scale..
* Param for whether to standardize the training features before fitting the model..
* @group param
*/
final val standardization: BooleanParam = new BooleanParam(this, "standardization", "whether to standardize the training features prior to fitting the model sequence. Note that the coefficients of models are always returned on the original scale.")
final val standardization: BooleanParam = new BooleanParam(this, "standardization", "whether to standardize the training features before fitting the model.")

setDefault(standardization, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class GaussianMixture private (
// Get length of the input vectors
val d = breezeData.first().length

// Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
// d > 25 except for when k is very small
val distributeGaussians = ((k - 1.0) / k) * d > 25

// Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
// we start with uniform weights, a random mean from the data, and
Expand Down Expand Up @@ -171,14 +175,25 @@ class GaussianMixture private (
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
val sumWeights = sums.weights.sum
var i = 0
while (i < k) {
val mu = sums.means(i) / sums.weights(i)
BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu),
Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
weights(i) = sums.weights(i) / sumWeights
gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i))
i = i + 1

if (distributeGaussians) {
val numPartitions = math.min(k, 1024)
val tuples =
Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i)))
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
}.collect.unzip
Array.copy(ws, 0, weights, 0, ws.length)
Array.copy(gs, 0, gaussians, 0, gs.length)
} else {
var i = 0
while (i < k) {
val (weight, gaussian) =
updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights)
weights(i) = weight
gaussians(i) = gaussian
i = i + 1
}
}

llhp = llh // current becomes previous
Expand All @@ -192,6 +207,19 @@ class GaussianMixture private (
/** Java-friendly version of [[run()]] */
def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd)

private def updateWeightsAndGaussians(
mean: BDV[Double],
sigma: BreezeMatrix[Double],
weight: Double,
sumWeights: Double): (Double, MultivariateGaussian) = {
val mu = (mean /= weight)
BLAS.syr(-weight, Vectors.fromBreeze(mu),
Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])
val newWeight = weight / sumWeights
val newGaussian = new MultivariateGaussian(mu, sigma / weight)
(newWeight, newGaussian)
}

/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.rdd.RDD
* association rules which have a single item as the consequent.
*/
@Experimental
class AssociationRules private (
class AssociationRules private[fpm] (
private var minConfidence: Double) extends Logging with Serializable {

/**
Expand All @@ -45,6 +45,7 @@ class AssociationRules private (
* Sets the minimal confidence (default: `0.8`).
*/
def setMinConfidence(minConfidence: Double): this.type = {
require(minConfidence >= 0.0 && minConfidence <= 1.0)
this.minConfidence = minConfidence
this
}
Expand Down Expand Up @@ -91,7 +92,7 @@ object AssociationRules {
* @tparam Item item type
*/
@Experimental
class Rule[Item] private[mllib] (
class Rule[Item] private[fpm] (
val antecedent: Array[Item],
val consequent: Array[Item],
freqUnion: Double,
Expand Down
11 changes: 10 additions & 1 deletion mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,16 @@ import org.apache.spark.storage.StorageLevel
* @tparam Item item type
*/
@Experimental
class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) extends Serializable
class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) extends Serializable {
/**
* Generates association rules for the [[Item]]s in [[freqItemsets]].
* @param confidence minimal confidence of the rules produced
*/
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
val associationRules = new AssociationRules(confidence)
associationRules.run(freqItemsets)
}
}

/**
* :: Experimental ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHash
import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

/**
* Trait for a local matrix.
Expand Down Expand Up @@ -147,7 +147,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
))
}

override def serialize(obj: Any): Row = {
override def serialize(obj: Any): InternalRow = {
val row = new GenericMutableRow(7)
obj match {
case sm: SparseMatrix =>
Expand All @@ -173,9 +173,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {

override def deserialize(datum: Any): Matrix = {
datum match {
// TODO: something wrong with UDT serialization, should never happen.
case m: Matrix => m
case row: Row =>
case row: InternalRow =>
require(row.length == 7,
s"MatrixUDT.deserialize given row with length ${row.length} but requires length == 7")
val tpe = row.getByte(0)
Expand Down
16 changes: 3 additions & 13 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -175,7 +175,7 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)))
}

override def serialize(obj: Any): Row = {
override def serialize(obj: Any): InternalRow = {
obj match {
case SparseVector(size, indices, values) =>
val row = new GenericMutableRow(4)
Expand All @@ -191,17 +191,12 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, values.toSeq)
row
// TODO: There are bugs in UDT serialization because we don't have a clear separation between
// TODO: internal SQL types and language specific types (including UDT). UDT serialize and
// TODO: deserialize may get called twice. See SPARK-7186.
case row: Row =>
row
}
}

override def deserialize(datum: Any): Vector = {
datum match {
case row: Row =>
case row: InternalRow =>
require(row.length == 4,
s"VectorUDT.deserialize given row with length ${row.length} but requires length == 4")
val tpe = row.getByte(0)
Expand All @@ -215,11 +210,6 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
val values = row.getAs[Iterable[Double]](3).toArray
new DenseVector(values)
}
// TODO: There are bugs in UDT serialization because we don't have a clear separation between
// TODO: internal SQL types and language specific types (including UDT). UDT serialize and
// TODO: deserialize may get called twice. See SPARK-7186.
case v: Vector =>
v
}
}

Expand Down
Loading

0 comments on commit 8f6e327

Please sign in to comment.