diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 50c6461373dee..0e613ce958f4d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -326,12 +326,22 @@ private[spark] object JsonProtocol { case v: Long => JInt(v) // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]` - case v => - JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map { - case (id, status) => - ("Block ID" -> id.toString) ~ - ("Status" -> blockStatusToJson(status)) + case v: java.util.List[_] => + JArray(v.asScala.toList.flatMap { + case (id: BlockId, status: BlockStatus) => + Some( + ("Block ID" -> id.toString) ~ + ("Status" -> blockStatusToJson(status)) + ) + case _ => + // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should + // not crash. + None }) + case _ => + // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not + // crash. + JNothing } } else { // For all external accumulators, just use strings diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 74b72d940eeef..40fb2e3497033 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -436,6 +436,53 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } + /** Create an AccumulableInfo and verify we can serialize and deserialize it. */ + private def testAccumulableInfo( + name: String, + value: Option[Any], + expectedValue: Option[Any]): Unit = { + val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX) + val accum = AccumulableInfo( + 123L, + Some(name), + update = value, + value = value, + internal = isInternal, + countFailedValues = false) + val json = JsonProtocol.accumulableInfoToJson(accum) + val newAccum = JsonProtocol.accumulableInfoFromJson(json) + assert(newAccum == accum.copy(update = expectedValue, value = expectedValue)) + } + + test("SPARK-31923: unexpected value type of internal accumulator") { + // Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected + // types to make sure we don't crash. + import InternalAccumulator.METRICS_PREFIX + testAccumulableInfo( + METRICS_PREFIX + "fooString", + value = Some("foo"), + expectedValue = None) + testAccumulableInfo( + METRICS_PREFIX + "fooList", + value = Some(java.util.Arrays.asList("string")), + expectedValue = Some(java.util.Collections.emptyList()) + ) + val blocks = Seq( + (TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)), + (TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L))) + testAccumulableInfo( + METRICS_PREFIX + "fooList", + value = Some(java.util.Arrays.asList( + "string", + blocks(0), + blocks(1))), + expectedValue = Some(blocks.asJava) + ) + testAccumulableInfo( + METRICS_PREFIX + "fooSet", + value = Some(Set("foo")), + expectedValue = None) + } }