From 5ae4f6e147a97c4c4af5d82810597ad92b10ad1f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 10 May 2017 13:26:32 +0800 Subject: [PATCH 1/2] getting name should not fail if accumulator is garbage collected --- .../main/scala/org/apache/spark/util/AccumulatorV2.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index a65ec75cc5db6..32058efaee54b 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { * Returns the name of this accumulator, can only be called after registration. */ final def name: Option[String] = { + assertMetadataNotNull() + if (atDriverSide) { - AccumulatorContext.get(id).flatMap(_.metadata.name) + metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name)) } else { - assertMetadataNotNull() metadata.name } } @@ -165,9 +166,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") - val isInternalAcc = - (name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) || - getClass.getSimpleName == "SQLMetric" + val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) From 2a3f7730c87d71982e13c85e8ec3d10669e3b756 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 15 May 2017 16:01:21 +0800 Subject: [PATCH 2/2] add comment --- core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 32058efaee54b..1a9a6929541aa 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -171,6 +171,10 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { + // For non-internal accumulators, we still need to send the name because users may need to + // access the accumulator name at executor side, or they may keep the accumulators sent from + // executors and access the name when the registered accumulator is already garbage + // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc