Skip to content

Commit

Permalink
Ignore internal accumulators that use unrecognized types rather than …
Browse files Browse the repository at this point in the history
…crashing
  • Loading branch information
zsxwing committed Jun 6, 2020
1 parent 04f66bf commit 7716ab6
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Expand Up @@ -363,12 +363,22 @@ private[spark] object JsonProtocol {
case v: Long => JInt(v)
// We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
// the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
case v =>
JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
case v: java.util.List[_] =>
JArray(v.asScala.toList.flatMap {
case (id: BlockId, status: BlockStatus) =>
Some(
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
)
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
// not crash.
None
})
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
// crash.
JNothing
}
} else {
// For all external accumulators, just use strings
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Expand Up @@ -507,6 +507,54 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

/** Create an AccumulableInfo and verify we can serialize and deserialize it. */
private def testAccumulableInfo(
name: String,
value: Option[Any],
expectedValue: Option[Any]): Unit = {
val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
val accum = AccumulableInfo(
123L,
Some(name),
update = value,
value = value,
internal = isInternal,
countFailedValues = false)
val json = JsonProtocol.accumulableInfoToJson(accum)
val newAccum = JsonProtocol.accumulableInfoFromJson(json)
assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
}

test("SPARK-31923: unexpected value type of internal accumulator") {
// Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected
// types to make sure we don't crash.
import InternalAccumulator.METRICS_PREFIX
testAccumulableInfo(
METRICS_PREFIX + "fooString",
value = Some("foo"),
expectedValue = None)
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList("string")),
expectedValue = Some(java.util.Collections.emptyList())
)
val blocks = Seq(
(TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
(TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList(
"string",
blocks(0),
blocks(1))),
expectedValue = Some(blocks.asJava)
)
testAccumulableInfo(
METRICS_PREFIX + "fooSet",
value = Some(Set("foo")),
expectedValue = None)
}

test("SPARK-30936: forwards compatibility - ignore unknown fields") {
val expected = TestListenerEvent("foo", 123)
val unknownFieldsJson =
Expand Down

0 comments on commit 7716ab6

Please sign in to comment.