Skip to content
Permalink
Browse files

[SPARK-29959][ML][PYSPARK] Summarizer support more metrics

### What changes were proposed in this pull request?
Summarizer support more metrics: sum, std

### Why are the changes needed?
Those metrics are widely used, it will be convenient to directly obtain them other than a conversion.
in `NaiveBayes`: we want the sum of vectors,  mean & weightSum need to computed then multiplied
in `StandardScaler`,`AFTSurvivalRegression`,`LinearRegression`,`LinearSVC`,`LogisticRegression`: we need to obtain `variance` and then sqrt it to get std

### Does this PR introduce any user-facing change?
yes, new metrics are exposed to end users

### How was this patch tested?
added testsuites

Closes #26596 from zhengruifeng/summarizer_add_metrics.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
  • Loading branch information
zhengruifeng committed Dec 2, 2019
1 parent 85cb388 commit 03ac1b799cf1e48489e8246a1b97110c80344160
@@ -109,7 +109,8 @@ Refer to the [`ChiSquareTest` Python docs](api/python/index.html#pyspark.ml.stat
## Summarizer

We provide vector column summary statistics for `Dataframe` through `Summarizer`.
Available metrics are the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.
Available metrics are the column-wise max, min, mean, sum, variance, std, and number of nonzeros,
as well as the total count.

<div class="codetabs">
<div data-lang="scala" markdown="1">
@@ -170,7 +170,7 @@ class LinearSVC @Since("2.2.0") (
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(createSummarizerBuffer("mean", "variance", "count"), new MultiClassSummarizer))(
(createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
(c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)),
combOp = (c1: (SummarizerBuffer, MultiClassSummarizer),
@@ -207,7 +207,7 @@ class LinearSVC @Since("2.2.0") (
throw new SparkException(msg)
}

val featuresStd = summarizer.variance.toArray.map(math.sqrt)
val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
@@ -501,7 +501,7 @@ class LogisticRegression @Since("1.2.0") (
fitIntercept)

val (summarizer, labelSummarizer) = instances.treeAggregate(
(createSummarizerBuffer("mean", "variance", "count"), new MultiClassSummarizer))(
(createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
(c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)),
combOp = (c1: (SummarizerBuffer, MultiClassSummarizer),
@@ -582,7 +582,7 @@ class LogisticRegression @Since("1.2.0") (
}

val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.variance.toArray.map(math.sqrt)
val featuresStd = summarizer.std.toArray

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@@ -186,16 +186,12 @@ class NaiveBayes @Since("1.5.0") (
}

// Aggregates term frequencies per label.
// TODO: Summarizer directly returns sum vector.
val aggregated = dataset.groupBy(col($(labelCol)))
.agg(sum(w).as("weightSum"), Summarizer.metrics("mean", "count")
.agg(sum(w).as("weightSum"), Summarizer.metrics("sum", "count")
.summary(validateUDF(col($(featuresCol))), w).as("summary"))
.select($(labelCol), "weightSum", "summary.mean", "summary.count")
.select($(labelCol), "weightSum", "summary.sum", "summary.count")
.as[(Double, Double, Vector, Long)]
.map { case (label, weightSum, mean, count) =>
BLAS.scal(weightSum, mean)
(label, weightSum, mean, count)
}.collect().sortBy(_._1)
.collect().sortBy(_._1)

val numFeatures = aggregated.head._3.size
instr.logNumFeatures(numFeatures)
@@ -269,7 +265,6 @@ class NaiveBayes @Since("1.5.0") (
}

// Aggregates mean vector and square-sum vector per label.
// TODO: Summarizer directly returns square-sum vector.
val aggregated = dataset.groupBy(col($(labelCol)))
.agg(sum(w).as("weightSum"), Summarizer.metrics("mean", "normL2")
.summary(col($(featuresCol)), w).as("summary"))
@@ -108,13 +108,11 @@ class StandardScaler @Since("1.4.0") (
override def fit(dataset: Dataset[_]): StandardScalerModel = {
transformSchema(dataset.schema, logging = true)

val Row(mean: Vector, variance: Vector) = dataset
.select(Summarizer.metrics("mean", "variance").summary(col($(inputCol))).as("summary"))
.select("summary.mean", "summary.variance")
val Row(mean: Vector, std: Vector) = dataset
.select(Summarizer.metrics("mean", "std").summary(col($(inputCol))).as("summary"))
.select("summary.mean", "summary.std")
.first()

val std = Vectors.dense(variance.toArray.map(math.sqrt))

copyValues(new StandardScalerModel(uid, std.compressed, mean.compressed).setParent(this))
}

@@ -215,13 +215,13 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val featuresSummarizer = instances.treeAggregate(
createSummarizerBuffer("mean", "variance", "count"))(
createSummarizerBuffer("mean", "std", "count"))(
seqOp = (c: SummarizerBuffer, v: AFTPoint) => c.add(v.features),
combOp = (c1: SummarizerBuffer, c2: SummarizerBuffer) => c1.merge(c2),
depth = $(aggregationDepth)
)

val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val featuresStd = featuresSummarizer.std.toArray
val numFeatures = featuresStd.size

instr.logPipelineStage(this)
@@ -358,8 +358,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val (featuresSummarizer, ySummarizer) = instances.treeAggregate(
(createSummarizerBuffer("mean", "variance"),
createSummarizerBuffer("mean", "variance", "count")))(
(createSummarizerBuffer("mean", "std"),
createSummarizerBuffer("mean", "std", "count")))(
seqOp = (c: (SummarizerBuffer, SummarizerBuffer), instance: Instance) =>
(c._1.add(instance.features, instance.weight),
c._2.add(Vectors.dense(instance.label), instance.weight)),
@@ -370,7 +370,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
)

val yMean = ySummarizer.mean(0)
val rawYStd = math.sqrt(ySummarizer.variance(0))
val rawYStd = ySummarizer.std(0)

instr.logNumExamples(ySummarizer.count)
instr.logNamedValue(Instrumentation.loggerTags.meanOfLabels, yMean)
@@ -421,7 +421,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
// setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm.
val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean)
val featuresMean = featuresSummarizer.mean.toArray
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val featuresStd = featuresSummarizer.std.toArray
val bcFeaturesMean = instances.context.broadcast(featuresMean)
val bcFeaturesStd = instances.context.broadcast(featuresStd)

@@ -89,7 +89,9 @@ object Summarizer extends Logging {
*
* The following metrics are accepted (case sensitive):
* - mean: a vector that contains the coefficient-wise mean.
* - sum: a vector that contains the coefficient-wise sum.
* - variance: a vector tha contains the coefficient-wise variance.
* - std: a vector tha contains the coefficient-wise standard deviation.
* - count: the count of all vectors seen.
* - numNonzeros: a vector with the number of non-zeros for each coefficients
* - max: the maximum for each coefficient.
@@ -106,7 +108,7 @@ object Summarizer extends Logging {
@Since("2.3.0")
@scala.annotation.varargs
def metrics(metrics: String*): SummaryBuilder = {
require(metrics.size >= 1, "Should include at least one metric")
require(metrics.nonEmpty, "Should include at least one metric")
val (typedMetrics, computeMetrics) = getRelevantMetrics(metrics)
new SummaryBuilderImpl(typedMetrics, computeMetrics)
}
@@ -119,6 +121,14 @@ object Summarizer extends Logging {
@Since("2.3.0")
def mean(col: Column): Column = mean(col, lit(1.0))

@Since("3.0.0")
def sum(col: Column, weightCol: Column): Column = {
getSingleMetric(col, weightCol, "sum")
}

@Since("3.0.0")
def sum(col: Column): Column = sum(col, lit(1.0))

@Since("2.3.0")
def variance(col: Column, weightCol: Column): Column = {
getSingleMetric(col, weightCol, "variance")
@@ -127,6 +137,14 @@ object Summarizer extends Logging {
@Since("2.3.0")
def variance(col: Column): Column = variance(col, lit(1.0))

@Since("3.0.0")
def std(col: Column, weightCol: Column): Column = {
getSingleMetric(col, weightCol, "std")
}

@Since("3.0.0")
def std(col: Column): Column = std(col, lit(1.0))

@Since("2.3.0")
def count(col: Column, weightCol: Column): Column = {
getSingleMetric(col, weightCol, "count")
@@ -245,7 +263,9 @@ private[ml] object SummaryBuilderImpl extends Logging {
*/
private val allMetrics: Seq[(String, Metric, DataType, Seq[ComputeMetric])] = Seq(
("mean", Mean, vectorUDT, Seq(ComputeMean, ComputeWeightSum)),
("sum", Sum, vectorUDT, Seq(ComputeMean, ComputeWeightSum)),
("variance", Variance, vectorUDT, Seq(ComputeWeightSum, ComputeMean, ComputeM2n)),
("std", Std, vectorUDT, Seq(ComputeWeightSum, ComputeMean, ComputeM2n)),
("count", Count, LongType, Seq()),
("numNonZeros", NumNonZeros, vectorUDT, Seq(ComputeNNZ)),
("max", Max, vectorUDT, Seq(ComputeMax, ComputeNNZ)),
@@ -259,7 +279,9 @@ private[ml] object SummaryBuilderImpl extends Logging {
*/
sealed trait Metric extends Serializable
private[stat] case object Mean extends Metric
private[stat] case object Sum extends Metric
private[stat] case object Variance extends Metric
private[stat] case object Std extends Metric
private[stat] case object Count extends Metric
private[stat] case object NumNonZeros extends Metric
private[stat] case object Max extends Metric
@@ -295,14 +317,15 @@ private[ml] object SummaryBuilderImpl extends Logging {
private var totalCnt: Long = 0
private var totalWeightSum: Double = 0.0
private var weightSquareSum: Double = 0.0
private var weightSum: Array[Double] = null
private var currWeightSum: Array[Double] = null
private var nnz: Array[Long] = null
private var currMax: Array[Double] = null
private var currMin: Array[Double] = null

def this() {
this(
Seq(Mean, Variance, Count, NumNonZeros, Max, Min, NormL2, NormL1),
Seq(Mean, Sum, Variance, Std, Count, NumNonZeros,
Max, Min, NormL2, NormL1),
Seq(ComputeMean, ComputeM2n, ComputeM2, ComputeL1,
ComputeWeightSum, ComputeNNZ, ComputeMax, ComputeMin)
)
@@ -323,7 +346,9 @@ private[ml] object SummaryBuilderImpl extends Logging {
if (requestedCompMetrics.contains(ComputeM2n)) { currM2n = Array.ofDim[Double](n) }
if (requestedCompMetrics.contains(ComputeM2)) { currM2 = Array.ofDim[Double](n) }
if (requestedCompMetrics.contains(ComputeL1)) { currL1 = Array.ofDim[Double](n) }
if (requestedCompMetrics.contains(ComputeWeightSum)) { weightSum = Array.ofDim[Double](n) }
if (requestedCompMetrics.contains(ComputeWeightSum)) {
currWeightSum = Array.ofDim[Double](n)
}
if (requestedCompMetrics.contains(ComputeNNZ)) { nnz = Array.ofDim[Long](n) }
if (requestedCompMetrics.contains(ComputeMax)) {
currMax = Array.fill[Double](n)(Double.MinValue)
@@ -340,7 +365,7 @@ private[ml] object SummaryBuilderImpl extends Logging {
val localCurrM2n = currM2n
val localCurrM2 = currM2
val localCurrL1 = currL1
val localWeightSum = weightSum
val localCurrWeightSum = currWeightSum
val localNumNonzeros = nnz
val localCurrMax = currMax
val localCurrMin = currMin
@@ -353,17 +378,18 @@ private[ml] object SummaryBuilderImpl extends Logging {
localCurrMin(index) = value
}

if (localWeightSum != null) {
if (localCurrWeightSum != null) {
if (localCurrMean != null) {
val prevMean = localCurrMean(index)
val diff = value - prevMean
localCurrMean(index) = prevMean + weight * diff / (localWeightSum(index) + weight)
localCurrMean(index) = prevMean +
weight * diff / (localCurrWeightSum(index) + weight)

if (localCurrM2n != null) {
localCurrM2n(index) += weight * (value - localCurrMean(index)) * diff
}
}
localWeightSum(index) += weight
localCurrWeightSum(index) += weight
}

if (localCurrM2 != null) {
@@ -402,9 +428,9 @@ private[ml] object SummaryBuilderImpl extends Logging {
weightSquareSum += other.weightSquareSum
var i = 0
while (i < n) {
if (weightSum != null) {
val thisWeightSum = weightSum(i)
val otherWeightSum = other.weightSum(i)
if (currWeightSum != null) {
val thisWeightSum = currWeightSum(i)
val otherWeightSum = other.currWeightSum(i)
val totalWeightSum = thisWeightSum + otherWeightSum

if (totalWeightSum != 0.0) {
@@ -420,7 +446,7 @@ private[ml] object SummaryBuilderImpl extends Logging {
}
}
}
weightSum(i) = totalWeightSum
currWeightSum(i) = totalWeightSum
}

// merge m2 together
@@ -442,7 +468,7 @@ private[ml] object SummaryBuilderImpl extends Logging {
this.totalCnt = other.totalCnt
this.totalWeightSum = other.totalWeightSum
this.weightSquareSum = other.weightSquareSum
if (other.weightSum != null) { this.weightSum = other.weightSum.clone() }
if (other.currWeightSum != null) { this.currWeightSum = other.currWeightSum.clone() }
if (other.nnz != null) { this.nnz = other.nnz.clone() }
if (other.currMax != null) { this.currMax = other.currMax.clone() }
if (other.currMin != null) { this.currMin = other.currMin.clone() }
@@ -460,21 +486,52 @@ private[ml] object SummaryBuilderImpl extends Logging {
val realMean = Array.ofDim[Double](n)
var i = 0
while (i < n) {
realMean(i) = currMean(i) * (weightSum(i) / totalWeightSum)
realMean(i) = currMean(i) * (currWeightSum(i) / totalWeightSum)
i += 1
}
Vectors.dense(realMean)
}

/**
* Sum of each dimension.
*/
def sum: Vector = {
require(requestedMetrics.contains(Sum))
require(totalWeightSum > 0, s"Nothing has been added to this summarizer.")

val realSum = Array.ofDim[Double](n)
var i = 0
while (i < n) {
realSum(i) = currMean(i) * currWeightSum(i)
i += 1
}
Vectors.dense(realSum)
}

/**
* Unbiased estimate of sample variance of each dimension.
*/
def variance: Vector = {
require(requestedMetrics.contains(Variance))
require(totalWeightSum > 0, s"Nothing has been added to this summarizer.")

val realVariance = Array.ofDim[Double](n)
val realVariance = computeVariance
Vectors.dense(realVariance)
}

/**
* Unbiased estimate of standard deviation of each dimension.
*/
def std: Vector = {
require(requestedMetrics.contains(Std))
require(totalWeightSum > 0, s"Nothing has been added to this summarizer.")

val realVariance = computeVariance
Vectors.dense(realVariance.map(math.sqrt))
}

private def computeVariance: Array[Double] = {
val realVariance = Array.ofDim[Double](n)
val denominator = totalWeightSum - (weightSquareSum / totalWeightSum)

// Sample variance is computed, if the denominator is less than 0, the variance is just 0.
@@ -484,12 +541,12 @@ private[ml] object SummaryBuilderImpl extends Logging {
val len = currM2n.length
while (i < len) {
// We prevent variance from negative value caused by numerical error.
realVariance(i) = math.max((currM2n(i) + deltaMean(i) * deltaMean(i) * weightSum(i) *
(totalWeightSum - weightSum(i)) / totalWeightSum) / denominator, 0.0)
realVariance(i) = math.max((currM2n(i) + deltaMean(i) * deltaMean(i) * currWeightSum(i) *
(totalWeightSum - currWeightSum(i)) / totalWeightSum) / denominator, 0.0)
i += 1
}
}
Vectors.dense(realVariance)
realVariance
}

/**
@@ -579,7 +636,9 @@ private[ml] object SummaryBuilderImpl extends Logging {
override def eval(state: SummarizerBuffer): Any = {
val metrics = requestedMetrics.map {
case Mean => vectorUDT.serialize(state.mean)
case Sum => vectorUDT.serialize(state.sum)
case Variance => vectorUDT.serialize(state.variance)
case Std => vectorUDT.serialize(state.std)
case Count => state.count
case NumNonZeros => vectorUDT.serialize(state.numNonzeros)
case Max => vectorUDT.serialize(state.max)

0 comments on commit 03ac1b7

Please sign in to comment.
You can’t perform that action at this time.