Skip to content

SPARK-2372 [MLLIB] Grouped Optimization/Learning #1292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
664196a
Adding files to do grouped optimization (Gradient Decent right now)
kellrott Jul 1, 2014
f99c8ab
Adding GroupedGeneralizedLinearAlgorithm class
kellrott Jul 1, 2014
02a192a
Working GroupedSVM and unit tests
kellrott Jul 2, 2014
1d52163
Adding apache license to newly created files.
kellrott Jul 3, 2014
a9d3ea6
Fixing formatting issues.
kellrott Jul 3, 2014
45485e5
Changing group optimization/learning to use generics for key value.
kellrott Jul 7, 2014
0b6fa80
Fixing formatting issues
kellrott Jul 7, 2014
4712bc9
Broadcasting weightset to decrease network lag
kellrott Jul 12, 2014
36b9257
Merge branch 'master' into mllib-grouped
kellrott Jul 12, 2014
b6481d9
Broadcasting gradient function
kellrott Jul 12, 2014
22ae0b2
Fixing some formatting
kellrott Jul 12, 2014
ac423f8
Broadcasting the updater.
kellrott Jul 19, 2014
f22c40c
Cleaning closures to improve performance
kellrott Jul 21, 2014
c0683f2
Merge branch 'master' into mllib-grouped
kellrott Jul 22, 2014
4f87127
Adding grouped logistic regression and fixing how the unit tests gene…
kellrott Jul 24, 2014
8d560c7
Merge branch 'master' into mllib-grouped
kellrott Aug 4, 2014
dcabb2f
Adding in GroupedBinaryClassificationMetrics which runs BinaryClassif…
kellrott Aug 12, 2014
873b4c4
Merge branch 'master' into mllib-grouped
kellrott Aug 12, 2014
fa9c540
Adding matthewsByThreshold method to BinaryClassificationMetrics classes
kellrott Aug 13, 2014
d53eba2
Making the confusions RDD public
kellrott Aug 13, 2014
30bb5cf
Making BinaryConfusionMatrix public and adding toString method
kellrott Aug 13, 2014
5340f82
Merge branch 'master' into mllib-grouped
kellrott Sep 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._
/**
* Computes the area under the curve (AUC) using the trapezoidal rule.
*/
private[evaluation] object AreaUnderCurve {
private[mllib] object AreaUnderCurve {

/**
* Uses the trapezoidal rule to compute the area under the line connecting the two input points.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends
/** Returns the (threshold, recall) curve. */
def recallByThreshold(): RDD[(Double, Double)] = createCurve(Recall)

private lazy val (
def matthewsByThreshold(): RDD[(Double,Double)] = createCurve(MatthewsCorrelationCoefficient)

lazy val (
cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// Create a bin for each distinct score value, count positives and negatives within each bin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,49 @@ package org.apache.spark.mllib.evaluation.binary
/**
* Trait for a binary classification evaluation metric computer.
*/
private[evaluation] trait BinaryClassificationMetricComputer extends Serializable {
private[mllib] trait BinaryClassificationMetricComputer extends Serializable {
def apply(c: BinaryConfusionMatrix): Double
}

/** Precision. */
private[evaluation] object Precision extends BinaryClassificationMetricComputer {
private[mllib] object Precision extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double =
c.numTruePositives.toDouble / (c.numTruePositives + c.numFalsePositives)
}

/** False positive rate. */
private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer {
private[mllib] object FalsePositiveRate extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double =
c.numFalsePositives.toDouble / c.numNegatives
}

/** Recall. */
private[evaluation] object Recall extends BinaryClassificationMetricComputer {
private[mllib] object Recall extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double =
c.numTruePositives.toDouble / c.numPositives
}

/**
* MatthewsCorrelationCoefficient
* @see http://en.wikipedia.org/wiki/Matthews_correlation_coefficient
*/
private[mllib] object MatthewsCorrelationCoefficient extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
val a = c.numTruePositives * c.numTrueNegatives - c.numFalsePositives * c.numFalseNegatives
val b = (c.numTruePositives + c.numFalsePositives) *
(c.numTruePositives + c.numFalseNegatives) *
(c.numTrueNegatives + c.numFalsePositives) *
(c.numTrueNegatives + c.numFalseNegatives)
a / Math.sqrt(b)
}
}

/**
* F-Measure.
* @param beta the beta constant in F-Measure
* @see http://en.wikipedia.org/wiki/F1_score
*/
private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetricComputer {
private[mllib] case class FMeasure(beta: Double) extends BinaryClassificationMetricComputer {
private val beta2 = beta * beta
override def apply(c: BinaryConfusionMatrix): Double = {
val precision = Precision(c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.evaluation.binary
/**
* Trait for a binary confusion matrix.
*/
private[evaluation] trait BinaryConfusionMatrix {
trait BinaryConfusionMatrix {
/** number of true positives */
def numTruePositives: Long

Expand All @@ -46,7 +46,7 @@ private[evaluation] trait BinaryConfusionMatrix {
* @param count label counter for labels with scores greater than or equal to the current score
* @param totalCount label counter for all labels
*/
private[evaluation] case class BinaryConfusionMatrixImpl(
private[mllib] case class BinaryConfusionMatrixImpl(
count: BinaryLabelCounter,
totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix {

Expand All @@ -67,4 +67,8 @@ private[evaluation] case class BinaryConfusionMatrixImpl(

/** number of negatives */
override def numNegatives: Long = totalCount.numNegatives

override def toString() = "BinaryConfusionMatrix(tp=%d, fp=%d, tn=%d, fp=%s)"
.format(numTruePositives, numFalsePositives, numTrueNegatives, numFalseNegatives)

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package org.apache.spark.mllib.evaluation.binary
* @param numPositives number of positive labels
* @param numNegatives number of negative labels
*/
private[evaluation] class BinaryLabelCounter(
private[mllib] class BinaryLabelCounter(
var numPositives: Long = 0L,
var numNegatives: Long = 0L) extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.grouped

import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.HashMap

/**
* Computes the area under the curve (AUC) using the trapezoidal rule.
*/
private[mllib] object GroupedAreaUnderCurve {

/**
* Uses the trapezoidal rule to compute the area under the line connecting the two input points.
* @param points two 2D points stored in Seq
*/
private def trapezoid[K](points: Seq[(Double, Double)]): Double = {
require(points.length == 2)
val x = points.head
val y = points.last
(y._1 - x._1) * (y._2 + x._2) / 2.0
}

/**
* Returns the area under the given curve.
*
* @param curve a RDD of ordered 2D points stored in pairs representing a curve
*/
def of[K](curve: RDD[(K,(Double, Double))]): Map[K,Double] = {
val agg = curve.mapPartitions( iter => {
val first = new HashMap[K,(Double,Double)]()
val last = new HashMap[K,(Double,Double)]()
val aucs = new HashMap[K,Double]()
iter.foreach( x => {
if (last.contains(x._1)) {
aucs(x._1) = aucs.getOrElse(x._1, 0.0) + trapezoid(Seq(last(x._1), x._2))
}
if (!first.contains(x._1)) {
first(x._1) = x._2
}
last(x._1) = x._2
})
Iterator((first.toMap,last.toMap,aucs.toMap))
}).collect()

val s = agg.foldLeft( (Map[K,(Double,Double)]()), Map[K,Double]() )(
(agg:(Map[K,(Double,Double)], Map[K,Double]),
n:(Map[K,(Double,Double)],Map[K,(Double,Double)],Map[K,Double]) ) => {
val prev = new HashMap[K,(Double,Double)]()
val aucs = new HashMap[K,Double]()
(agg._1.keySet ++ n._1.keySet).foreach( k => {
// sum the aucs from the two partitions
aucs(k) = agg._2.getOrElse(k, 0.0)
if (n._3.contains(k)) {
aucs(k) += n._3.getOrElse(k,0.0)
}
// sum areas between the partitions
if (agg._1.contains(k) && n._1.contains(k)) {
aucs(k) += trapezoid(Seq(agg._1(k),n._1(k)))
}
// get the last occurance for each key
if (n._2.contains(k)) {
prev(k) = n._2(k)
} else if (agg._1.contains(k)) {
prev(k) = agg._1(k)
}
})
(prev.toMap, aucs.toMap)
})
s._2
}

/**
* Returns the area under the given curve.
*
* @param curve an iterator over ordered 2D points stored in pairs representing a curve
*/
def of[K](curve: Iterable[(K,(Double, Double))]): Map[K,Double] = {
val prev = new HashMap[K,(Double,Double)]()
val aucs = new HashMap[K,Double]()
curve.foreach( x => {
if (prev.contains(x._1)) {
aucs(x._1) = aucs.getOrElse(x._1, 0.0) + trapezoid(Seq(prev(x._1), x._2))
}
prev(x._1) = x._2
})
aucs.toMap
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.spark.mllib.grouped

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.{UnionRDD, RDD}
import org.apache.spark.SparkContext._
import org.apache.spark.Logging
import org.apache.spark.mllib.evaluation.binary._
import org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrixImpl
import scala.reflect.ClassTag
import scala.collection.mutable.HashMap

/**
* :: Experimental ::
* Grouped Evaluator for binary classification.
*
* @param scoreAndLabels an RDD of (groupKey, (score, label)) pairs.
*/
@Experimental
class GroupedBinaryClassificationMetrics[K](scoreAndLabels: RDD[(K,(Double, Double))])
(implicit tag:ClassTag[K])
extends Logging {

/** Unpersist intermediate RDDs used in the computation. */
def unpersist() {
cumulativeCounts.unpersist()
}

/** Returns thresholds in descending order. */
def thresholds(): RDD[(K,Double)] = cumulativeCounts.map(x => (x._1, x._2._1))

/**
* Returns the receiver operating characteristic (ROC) curve,
* which is an RDD of (false positive rate, true positive rate)
* with (0.0, 0.0) prepended and (1.0, 1.0) appended to it.
* @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic
*/
def roc(): RDD[(K,(Double, Double))] = {
val rocCurve = createCurve(FalsePositiveRate, Recall)
val sc = confusions.context
val keys = cumulativeCounts.keys
val firsts = keys.distinct().map( x => (x, (0.0,0.0) ) )
val lasts = keys.distinct().map( x => (x, (1.0,1.0) ) )
new UnionRDD[(K, (Double, Double))](sc, Seq(firsts, rocCurve, lasts))
}

/**
* Computes the area under the receiver operating characteristic (ROC) curve.
*/
def areaUnderROC(): Map[K,Double] = GroupedAreaUnderCurve.of(roc())

/**
* Returns the precision-recall curve, which is an RDD of (recall, precision),
* NOT (precision, recall), with (0.0, 1.0) prepended to it.
* @see http://en.wikipedia.org/wiki/Precision_and_recall
*/
def pr()(implicit tag:ClassTag[K]) : RDD[(K,(Double, Double))] = {
val prCurve = createCurve(Recall, Precision)
val sc = confusions.context
val firsts = cumulativeCounts.keys.distinct().map( x => (x, (0.0,1.0) ) )
firsts.union(prCurve)
}

/**
* Computes the area under the precision-recall curve.
*/
def areaUnderPR(): Map[K,Double] = GroupedAreaUnderCurve.of(pr())

/**
* Returns the (threshold, F-Measure) curve.
* @param beta the beta factor in F-Measure computation.
* @return an RDD of (threshold, F-Measure) pairs.
* @see http://en.wikipedia.org/wiki/F1_score
*/
def fMeasureByThreshold(beta: Double): RDD[(K,(Double, Double))] = createCurve(FMeasure(beta))

/** Returns the (threshold, F-Measure) curve with beta = 1.0. */
def fMeasureByThreshold(): RDD[(K,(Double, Double))] = fMeasureByThreshold(1.0)

/** Returns the (threshold, precision) curve. */
def precisionByThreshold(): RDD[(K,(Double, Double))] = createCurve(Precision)

/** Returns the (threshold, recall) curve. */
def recallByThreshold(): RDD[(K,(Double, Double))] = createCurve(Recall)

def matthewsByThreshold(): RDD[(K,(Double,Double))] = createCurve(MatthewsCorrelationCoefficient)

lazy val (
cumulativeCounts: RDD[(K,(Double, BinaryLabelCounter))],
confusions: RDD[(K,(Double, BinaryConfusionMatrix))]) = {
// Create a bin for each distinct score value, count positives and negatives within each bin,
// and then sort by score values in descending order.
val counts = scoreAndLabels.map( x => ((x._1,x._2._1), x._2._2)).combineByKey(
createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
).sortBy(x => x._1._2, ascending = false)
val aggs = counts.mapPartitions( x => {
val c = new HashMap[K,BinaryLabelCounter]()
x.foreach( y => {
var o = c.getOrElse(y._1._1, new BinaryLabelCounter())
o += y._2
c.put(y._1._1, o)
})
Iterator(c.toMap)
}).collect()

val keys = scoreAndLabels.keys.distinct().collect()
val partitionwiseCumulativeCounts = aggs.scanLeft(keys.map(
x => (x, new BinaryLabelCounter()) ).toMap) (
(agg: Map[K,BinaryLabelCounter], c:Map[K, BinaryLabelCounter]) => {
keys.map( x => {
var b = agg(x).clone()
b += c.getOrElse(x, new BinaryLabelCounter())
(x, b)
} ).toMap
}
)
val totalCounts = partitionwiseCumulativeCounts.last

val cumulativeCounts = counts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[ ((K,Double), BinaryLabelCounter)]) => {
val cumCount = partitionwiseCumulativeCounts(index)
val out = iter.map { case ((key,score), c:BinaryLabelCounter) =>
val a = cumCount(key)
a += c
(key, (score, a.clone()))
}
out
}, preservesPartitioning = true)
cumulativeCounts.persist()

val confusions = cumulativeCounts.map { case (key, (score, cumCount)) =>
(key, (
score,
BinaryConfusionMatrixImpl(cumCount, totalCounts(key)).asInstanceOf[BinaryConfusionMatrix]
)
)
}
(cumulativeCounts, confusions)
}

/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(K, (Double, Double))] = {
confusions.map { case (k, (s, c)) =>
(k, (s, y(c)))
}
}

/** Creates a curve of (metricX, metricY). */
private def createCurve(
x: BinaryClassificationMetricComputer,
y: BinaryClassificationMetricComputer): RDD[(K, (Double, Double))] = {
confusions.map { case (k, (s, c)) =>
(k, (x(c), y(c)))
}
}


}
Loading