Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46581][CORE] Update comment on isZero in AccumulatorV2 #44583

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -40,20 +40,20 @@ import org.apache.spark.util.AccumulatorContext.internOption
class SQLMetric(
val metricType: String,
initValue: Long = 0L,
zeroValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// initValue defines the initial value of the metric. zeroValue defines the lowest value
// considered valid. If a SQLMetric is invalid, it is set to zeroValue upon receiving any
// updates, and it also reports zeroValue as its value to avoid exposing it to the user
defaultValidValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// initValue defines the initial value of the metric. defaultValidValue defines the lowest value
// considered valid. If a SQLMetric is invalid, it is set to defaultValidValue upon receiving any
// updates, and it also reports defaultValidValue as its value to avoid exposing it to the user
// programatically.
//
// For many SQLMetrics, we use initValue = -1 and zeroValue = 0 to indicate that the metric is
// by default invalid. At the end of a task, we will update the metric making it valid, and the
// invalid metrics will be filtered out when calculating min, max, etc. as a workaround for
// SPARK-11013.
// For many SQLMetrics, we use initValue = -1 and defaultValidValue = 0 to indicate that the
// metric is by default invalid. At the end of a task, we will update the metric making it valid,
// and the invalid metrics will be filtered out when calculating min, max, etc. as a workaround
// for SPARK-11013.
private var _value = initValue

override def copy(): SQLMetric = {
val newAcc = new SQLMetric(metricType, initValue, zeroValue)
val newAcc = new SQLMetric(metricType, initValue, defaultValidValue)
newAcc._value = _value
newAcc
}
Expand All @@ -63,7 +63,7 @@ class SQLMetric(
override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: SQLMetric =>
if (o.isValid) {
if (!isValid) _value = zeroValue
if (!isValid) _value = defaultValidValue
_value += o.value
}
case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
Expand All @@ -73,14 +73,14 @@ class SQLMetric(
// This is used to filter out metrics. Metrics with value equal to initValue should
// be filtered out, since they are either invalid or safe to filter without changing
// the aggregation defined in [[SQLMetrics.stringValue]].
// Note that we don't use zeroValue here since we may want to collect zeroValue metrics
// for calculating min, max, etc. See SPARK-11013.
// Note that we don't use defaultValidValue here since we may want to collect
// defaultValidValue metrics for calculating min, max, etc. See SPARK-11013.
override def isZero: Boolean = _value == initValue

def isValid: Boolean = _value >= zeroValue
def isValid: Boolean = _value >= defaultValidValue

override def add(v: Long): Unit = {
if (!isValid) _value = zeroValue
if (!isValid) _value = defaultValidValue
_value += v
}

Expand All @@ -93,8 +93,8 @@ class SQLMetric(
def +=(v: Long): Unit = add(v)

// _value may be invalid, in many cases being -1. We should not expose it to the user
// and instead return zeroValue.
override def value: Long = if (!isValid) zeroValue else _value
// and instead return defaultValidValue.
override def value: Long = if (!isValid) defaultValidValue else _value

// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
Expand Down