Skip to content

Commit

Permalink
Fix all usages
Browse files Browse the repository at this point in the history
  • Loading branch information
davintjong-db committed Jan 3, 2024
1 parent ff99cb2 commit be532ac
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 37 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Expand Up @@ -120,8 +120,8 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
def isUpdated: Boolean

/**
* Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy
* must return true.
* Creates a new copy of this accumulator, which is zero value. i.e. call `isUpdated` on the copy
* must return false.
*/
def copyAndReset(): AccumulatorV2[IN, OUT] = {
val copyAcc = copy()
Expand All @@ -135,8 +135,8 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
def copy(): AccumulatorV2[IN, OUT]

/**
* Resets this accumulator, which is zero value. i.e. call `isZero` must
* return true.
* Resets this accumulator, which is zero value. i.e. call `isUpdated` must
* return false.
*/
def reset(): Unit

Expand Down
24 changes: 12 additions & 12 deletions core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
Expand Up @@ -23,13 +23,13 @@ class AccumulatorV2Suite extends SparkFunSuite {

test("LongAccumulator add/avg/sum/count/isZero") {
val acc = new LongAccumulator
assert(acc.isUpdated)
assert(!acc.isUpdated)
assert(acc.count == 0)
assert(acc.sum == 0)
assert(acc.avg.isNaN)

acc.add(0)
assert(!acc.isUpdated)
assert(acc.isUpdated)
assert(acc.count == 1)
assert(acc.sum == 0)
assert(acc.avg == 0.0)
Expand All @@ -56,13 +56,13 @@ class AccumulatorV2Suite extends SparkFunSuite {

test("DoubleAccumulator add/avg/sum/count/isZero") {
val acc = new DoubleAccumulator
assert(acc.isUpdated)
assert(!acc.isUpdated)
assert(acc.count == 0)
assert(acc.sum == 0.0)
assert(acc.avg.isNaN)

acc.add(0.0)
assert(!acc.isUpdated)
assert(acc.isUpdated)
assert(acc.count == 1)
assert(acc.sum == 0.0)
assert(acc.avg == 0.0)
Expand Down Expand Up @@ -90,40 +90,40 @@ class AccumulatorV2Suite extends SparkFunSuite {
test("ListAccumulator") {
val acc = new CollectionAccumulator[Double]
assert(acc.value.isEmpty)
assert(acc.isUpdated)
assert(!acc.isUpdated)

acc.add(0.0)
assert(acc.value.contains(0.0))
assert(!acc.isUpdated)
assert(acc.isUpdated)

acc.add(java.lang.Double.valueOf(1.0))

val acc2 = acc.copyAndReset()
assert(acc2.value.isEmpty)
assert(acc2.isUpdated)
assert(!acc2.isUpdated)

assert(acc.value.contains(1.0))
assert(!acc.isUpdated)
assert(acc.isUpdated)
assert(acc.value.size() === 2)

acc2.add(2.0)
assert(acc2.value.contains(2.0))
assert(!acc2.isUpdated)
assert(acc2.isUpdated)
assert(acc2.value.size() === 1)

// Test merging
acc.merge(acc2)
assert(acc.value.contains(2.0))
assert(!acc.isUpdated)
assert(acc.isUpdated)
assert(acc.value.size() === 3)

val acc3 = acc.copy()
assert(acc3.value.contains(2.0))
assert(!acc3.isUpdated)
assert(acc3.isUpdated)
assert(acc3.value.size() === 3)

acc3.reset()
assert(acc3.isUpdated)
assert(!acc3.isUpdated)
assert(acc3.value.isEmpty)
}

Expand Down
Expand Up @@ -185,7 +185,7 @@ class AggregatingAccumulator private(

override def value: InternalRow = withSQLConf(false, InternalRow.empty) {
// Either use the existing buffer or create a temporary one.
val input = if (!isUpdated) {
val input = if (isUpdated) {
buffer
} else {
// Create a temporary buffer because we want to avoid changing the state of the accumulator
Expand Down
Expand Up @@ -72,7 +72,7 @@ case class CollectMetricsExec(
// - Performance issues due to excessive serialization.
val updater = collector.copyAndReset()
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
if (collector.isUpdated) {
if (!collector.isUpdated) {
collector.setState(updater)
} else {
collector.merge(updater)
Expand Down
Expand Up @@ -66,7 +66,7 @@ class MapperRowCounter extends AccumulatorV2[jl.Long, java.util.List[(jl.Integer

def setPartitionId(id: jl.Integer): Unit = {
this.synchronized {
assert(isUpdated, "agg must not have been initialized")
assert(!isUpdated, "agg must not have been initialized")
getOrCreate.add((id, 0))
}
}
Expand Down
Expand Up @@ -56,18 +56,18 @@ class AggregatingAccumulatorSuite

// Merge empty
acc1.merge(accEmpty)
assert(acc1.isUpdated)
assert(!acc1.isUpdated)

// No updates
assert(acc1.isUpdated)
assert(!acc1.isUpdated)
checkResult(acc1.value, InternalRow(null, null, 0), expectedSchema, false)
assert(acc1.isUpdated)
assert(!acc1.isUpdated)

// A few updates
acc1.add(InternalRow(4L, str("foo"), 4.9d))
acc1.add(InternalRow(98L, str("bar"), -323.9d))
acc1.add(InternalRow(-30L, str("baz"), 4129.8d))
assert(!acc1.isUpdated)
assert(acc1.isUpdated)
checkResult(acc1.value, InternalRow(73L, str("baz"), 3L), expectedSchema, false)

// Idempotency of result
Expand All @@ -77,7 +77,7 @@ class AggregatingAccumulatorSuite
val updater = acc2.copyAndReset()
updater.add(InternalRow(-2L, str("qwerty"), -6773.9d))
updater.add(InternalRow(-35L, str("zzz-top"), -323.9d))
assert(acc2.isUpdated)
assert(!acc2.isUpdated)
acc2.setState(updater)
checkResult(acc2.value, InternalRow(-36L, str("zzz-top"), 2L), expectedSchema, false)

Expand All @@ -90,7 +90,7 @@ class AggregatingAccumulatorSuite

// Reset
acc1.reset()
assert(acc1.isUpdated)
assert(!acc1.isUpdated)
}

test("non-deterministic expressions") {
Expand Down Expand Up @@ -128,7 +128,7 @@ class AggregatingAccumulatorSuite
acc_driver.merge(acc1)
acc_driver.merge(acc2)
acc_driver.merge(acc3)
assert(!acc_driver.isUpdated)
assert(acc_driver.isUpdated)
checkResult(acc_driver.value, InternalRow(3, 96, 0), acc_driver.schema, false)
}

Expand Down
Expand Up @@ -940,25 +940,25 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).value === 0)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = 5).value === 5)

assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").isUpdated)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).isUpdated)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = 5).isUpdated)
assert(!SQLMetrics.createSizeMetric(sparkContext, name = "m").isUpdated)
assert(!SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).isUpdated)
assert(!SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = 5).isUpdated)

assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").value === 0)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).value === 0)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = 5).value === 5)

assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").isUpdated)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).isUpdated)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = 5).isUpdated)
assert(!SQLMetrics.createTimingMetric(sparkContext, name = "m").isUpdated)
assert(!SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).isUpdated)
assert(!SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = 5).isUpdated)

assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").value === 0)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).value === 0)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).value === 5)

assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").isUpdated)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isUpdated)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).isUpdated)
assert(!SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").isUpdated)
assert(!SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isUpdated)
assert(!SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).isUpdated)
}
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ class MapperRowCounterSuite extends SparkFunSuite {

test("Test MapperRowCounter") {
val counter = new MapperRowCounter()
assert(counter.isUpdated)
assert(!counter.isUpdated)

counter.setPartitionId(0)
counter.add(1L)
Expand All @@ -31,7 +31,7 @@ class MapperRowCounterSuite extends SparkFunSuite {
assert(counter.value.get(0)._2 == 2L)

counter.reset()
assert(counter.isUpdated)
assert(!counter.isUpdated)
counter.setPartitionId(100)
counter.add(1L)
assert(counter.value.get(0)._1 == 100L)
Expand Down

0 comments on commit be532ac

Please sign in to comment.