Skip to content

Commit

Permalink
[SPARK-46581][CORE] Update comment on isZero in AccumulatorV2
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Two changes:
- Update comment on `AccumulatorV2`'s `isZero` to reflect what it actually does.
- Update variable name in `SQLMetrics` to `defaultValidValue` to reflect this

### Why are the changes needed?

`AccumulatorV2`'s `isZero` doesn't do what the comment implies - it actually checks if the accumulator hasn't been updated.

The comment implies that for a `LongAccumulator`, for example, a value of `0` would cause `isZero` to be `true`. But if we were to `add(0)`, then the value would still be `0` but `isZero` would return `false`.

Changing the name of `zeroValue` to `defaultValidValue` to avoid confusion since `isZero` doesn't use `zeroValue` in `SQLMetric`.

Thanks arvindsaik for pointing this out.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44583 from davintjong-db/sqlmetric-zerovalue-refactor.

Authored-by: Davin Tjong <davin.tjong@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
davintjong-db authored and cloud-fan committed Jan 8, 2024
1 parent 8facc99 commit 40078a4
Showing 1 changed file with 16 additions and 16 deletions.
Expand Up @@ -40,20 +40,20 @@ import org.apache.spark.util.AccumulatorContext.internOption
class SQLMetric(
val metricType: String,
initValue: Long = 0L,
zeroValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// initValue defines the initial value of the metric. zeroValue defines the lowest value
// considered valid. If a SQLMetric is invalid, it is set to zeroValue upon receiving any
// updates, and it also reports zeroValue as its value to avoid exposing it to the user
defaultValidValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// initValue defines the initial value of the metric. defaultValidValue defines the lowest value
// considered valid. If a SQLMetric is invalid, it is set to defaultValidValue upon receiving any
// updates, and it also reports defaultValidValue as its value to avoid exposing it to the user
// programatically.
//
// For many SQLMetrics, we use initValue = -1 and zeroValue = 0 to indicate that the metric is
// by default invalid. At the end of a task, we will update the metric making it valid, and the
// invalid metrics will be filtered out when calculating min, max, etc. as a workaround for
// SPARK-11013.
// For many SQLMetrics, we use initValue = -1 and defaultValidValue = 0 to indicate that the
// metric is by default invalid. At the end of a task, we will update the metric making it valid,
// and the invalid metrics will be filtered out when calculating min, max, etc. as a workaround
// for SPARK-11013.
private var _value = initValue

override def copy(): SQLMetric = {
val newAcc = new SQLMetric(metricType, initValue, zeroValue)
val newAcc = new SQLMetric(metricType, initValue, defaultValidValue)
newAcc._value = _value
newAcc
}
Expand All @@ -63,7 +63,7 @@ class SQLMetric(
override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: SQLMetric =>
if (o.isValid) {
if (!isValid) _value = zeroValue
if (!isValid) _value = defaultValidValue
_value += o.value
}
case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
Expand All @@ -73,14 +73,14 @@ class SQLMetric(
// This is used to filter out metrics. Metrics with value equal to initValue should
// be filtered out, since they are either invalid or safe to filter without changing
// the aggregation defined in [[SQLMetrics.stringValue]].
// Note that we don't use zeroValue here since we may want to collect zeroValue metrics
// for calculating min, max, etc. See SPARK-11013.
// Note that we don't use defaultValidValue here since we may want to collect
// defaultValidValue metrics for calculating min, max, etc. See SPARK-11013.
override def isZero: Boolean = _value == initValue

def isValid: Boolean = _value >= zeroValue
def isValid: Boolean = _value >= defaultValidValue

override def add(v: Long): Unit = {
if (!isValid) _value = zeroValue
if (!isValid) _value = defaultValidValue
_value += v
}

Expand All @@ -93,8 +93,8 @@ class SQLMetric(
def +=(v: Long): Unit = add(v)

// _value may be invalid, in many cases being -1. We should not expose it to the user
// and instead return zeroValue.
override def value: Long = if (!isValid) zeroValue else _value
// and instead return defaultValidValue.
override def value: Long = if (!isValid) defaultValidValue else _value

// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
Expand Down

0 comments on commit 40078a4

Please sign in to comment.