Skip to content

Commit

Permalink
Revert "Instead what about if we didn't use the Impl pattern and inst…
Browse files Browse the repository at this point in the history
…ead wraped the current merge. Only downside then merge and add are wrapped differently and that is maybe not good."

This reverts commit ddb3a57.
  • Loading branch information
holdenk committed Sep 16, 2016
1 parent ddb3a57 commit 6acbd98
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private[spark] object TaskMetrics extends Logging {
internalAccums.foreach { acc =>
val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[AccumulatorV2[Any, Any]]
tmAcc.metadata = acc.metadata
tmAcc.internalMerge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
}

tm.externalAccums ++= externalAccums
Expand All @@ -345,7 +345,7 @@ private[spark] class BlockStatusesAccumulator

override def addImpl(v: (BlockId, BlockStatus)): Unit = _seq.add(v)

override def merge(
override def mergeImpl(
other: AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]]): Unit = {
other match {
case o: BlockStatusesAccumulator => _seq.addAll(o.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ class DAGScheduler(
case None =>
throw new SparkException(s"attempted to access non-existent accumulator $id")
}
acc.internalMerge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && !updates.isZero) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Se

/**
* Merges another same-type accumulator into this one and update its state, i.e. this should be
* merge-in-place. Developers should extend merge to customize the merge functionality.
* merge-in-place. Developers should extend mergeImpl to customize the merge functionality.
*/
final private[spark] lazy val internalMerge: (AccumulatorV2[IN, OUT] => Unit) = {
final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
// Handle data property accumulators
if (metadata != null && metadata.dataProperty) {
dataPropertyMerge _
} else {
merge _
mergeImpl _
}
}

Expand All @@ -238,17 +238,17 @@ abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Se
val splits = processed.getOrElseUpdate((rddId, shuffleWriteId), new mutable.BitSet())
if (!splits.contains(splitId)) {
splits += splitId
merge(v)
mergeImpl(v)
}
}
}


/**
* Merges another same-type accumulator into this one and update its state, i.e. this should be
* merge-in-place. Developers should extend this merge to customize the merge functionality.
* merge-in-place. Developers should extend mergeImpl to customize the merge functionality.
*/
protected def merge(other: AccumulatorV2[IN, OUT]): Unit
protected def mergeImpl(other: AccumulatorV2[IN, OUT]): Unit

/**
* Defines the current value of this accumulator
Expand Down Expand Up @@ -444,7 +444,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
*/
def avg: Double = _sum.toDouble / _count

override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match {
override def mergeImpl(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match {
case o: LongAccumulator =>
_sum += o.sum
_count += o.count
Expand Down Expand Up @@ -510,7 +510,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
*/
def avg: Double = _sum / _count

override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match {
override def mergeImpl(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match {
case o: DoubleAccumulator =>
_sum += o.sum
_count += o.count
Expand Down Expand Up @@ -547,7 +547,7 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

override def addImpl(v: T): Unit = _list.add(v)

override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
override def mergeImpl(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
case o: CollectionAccumulator[T] => _list.addAll(o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
Expand Down Expand Up @@ -583,7 +583,7 @@ class LegacyAccumulatorWrapper[R, T](

override def addImpl(v: T): Unit = _value = param.addAccumulator(_value, v)

override def merge(other: AccumulatorV2[T, R]): Unit = other match {
override def mergeImpl(other: AccumulatorV2[T, R]): Unit = other match {
case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
Expand Down
2 changes: 1 addition & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object MimaExcludes {
) ++ Seq(
// SPARK-12469 Add data property accumulators to Spark
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.setRDDPartitionInfo"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.getRDDPartitionInfo"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.getRDDPartitionInfo")
) ++ Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"),
// [SPARK-16853][SQL] Fixes encoder error in DataSet typed select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ package object debug {
}
override def reset(): Unit = _set.clear()
override def addImpl(v: T): Unit = _set.add(v)
override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
override def mergeImpl(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
_set.addAll(other.value)
}
override def value: java.util.Set[T] = _set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato

override def reset(): Unit = _value = _zeroValue

override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
override def mergeImpl(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: SQLMetric => _value += o.value
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
Expand Down

0 comments on commit 6acbd98

Please sign in to comment.