Skip to content

Commit

Permalink
[SPARK-31923][CORE] Ignore internal accumulators that use unrecognize…
Browse files Browse the repository at this point in the history
…d types rather than crashing (branch-2.4)

### What changes were proposed in this pull request?

Backport #28744 to branch-2.4.

### Why are the changes needed?

Low risky fix for branch-2.4.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests.

Closes #28758 from zsxwing/SPARK-31923-2.4.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
zsxwing committed Jun 8, 2020
1 parent 476010a commit 48017cc
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Expand Up @@ -326,12 +326,22 @@ private[spark] object JsonProtocol {
case v: Long => JInt(v)
// We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
// the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
case v =>
JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
case v: java.util.List[_] =>
JArray(v.asScala.toList.flatMap {
case (id: BlockId, status: BlockStatus) =>
Some(
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
)
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
// not crash.
None
})
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
// crash.
JNothing
}
} else {
// For all external accumulators, just use strings
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Expand Up @@ -436,6 +436,53 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

/** Create an AccumulableInfo and verify we can serialize and deserialize it. */
private def testAccumulableInfo(
name: String,
value: Option[Any],
expectedValue: Option[Any]): Unit = {
val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
val accum = AccumulableInfo(
123L,
Some(name),
update = value,
value = value,
internal = isInternal,
countFailedValues = false)
val json = JsonProtocol.accumulableInfoToJson(accum)
val newAccum = JsonProtocol.accumulableInfoFromJson(json)
assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
}

test("SPARK-31923: unexpected value type of internal accumulator") {
// Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected
// types to make sure we don't crash.
import InternalAccumulator.METRICS_PREFIX
testAccumulableInfo(
METRICS_PREFIX + "fooString",
value = Some("foo"),
expectedValue = None)
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList("string")),
expectedValue = Some(java.util.Collections.emptyList())
)
val blocks = Seq(
(TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
(TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList(
"string",
blocks(0),
blocks(1))),
expectedValue = Some(blocks.asJava)
)
testAccumulableInfo(
METRICS_PREFIX + "fooSet",
value = Some(Set("foo")),
expectedValue = None)
}
}


Expand Down

0 comments on commit 48017cc

Please sign in to comment.