From 0dcad4b1502ec8c7c45e17e4a8029545d7337e2a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Nov 2016 17:17:58 -0800 Subject: [PATCH 1/2] Add failing regression test. --- .../sql/execution/ui/SQLListenerSuite.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 948a155457b65..857ca9134d3f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties +import org.json4s.jackson.JsonMethods._ import org.mockito.Mockito.mock import org.apache.spark._ @@ -35,7 +36,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator} +import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { @@ -416,6 +417,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue) } + test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") { + val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L))) + val actualJsonString = compact(render(JsonProtocol.sparkEventToJson(event))) + val newEvent = JsonProtocol.sparkEventFromJson(parse(actualJsonString)) + newEvent match { + case SparkListenerDriverAccumUpdates(executionId, accums) => + assert(executionId == 1L) + accums.foreach { case (a, b) => + assert(a == 2L) + assert(b == 3L) + } + } + } + } From 9f674be5efd72cf58e2139343202904f26e14966 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Nov 2016 16:06:12 -0800 Subject: [PATCH 2/2] Add fix using custom Jackson converter. --- .../spark/sql/execution/ui/SQLListener.scala | 39 ++++++++++++++++++- .../sql/execution/ui/SQLListenerSuite.scala | 33 ++++++++++++++-- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 60f13432d78d2..5daf21595d8a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui import scala.collection.mutable +import com.fasterxml.jackson.databind.JavaType +import com.fasterxml.jackson.databind.`type`.TypeFactory +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.util.Converter + import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging @@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent @DeveloperApi -case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)]) +case class SparkListenerDriverAccumUpdates( + executionId: Long, + @JsonDeserialize(contentConverter = classOf[LongLongTupleConverter]) + accumUpdates: Seq[(Long, Long)]) extends SparkListenerEvent +/** + * Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple. + * + * This is necessary due to limitations in how Jackson's scala module deserializes primitives; + * see the "Deserializing Option[Int] and other primitive challenges" section in + * https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and + * SPARK-18462 for the specific problem that motivated this conversion. + */ +private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] { + + override def convert(in: (Object, Object)): (Long, Long) = { + def toLong(a: Object): Long = a match { + case i: java.lang.Integer => i.intValue() + case l: java.lang.Long => l.longValue() + } + (toLong(in._1), toLong(in._2)) + } + + override def getInputType(typeFactory: TypeFactory): JavaType = { + val objectType = typeFactory.uncheckedSimpleType(classOf[Object]) + typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType)) + } + + override def getOutputType(typeFactory: TypeFactory): JavaType = { + val longType = typeFactory.uncheckedSimpleType(classOf[Long]) + typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType)) + } +} + class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 857ca9134d3f4..8aea112897fb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { +class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ import org.apache.spark.AccumulatorSuite.makeInfo @@ -419,9 +419,16 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") { val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L))) - val actualJsonString = compact(render(JsonProtocol.sparkEventToJson(event))) - val newEvent = JsonProtocol.sparkEventFromJson(parse(actualJsonString)) - newEvent match { + val json = JsonProtocol.sparkEventToJson(event) + assertValidDataInJson(json, + parse(""" + |{ + | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", + | "executionId": 1, + | "accumUpdates": [[2,3]] + |} + """.stripMargin)) + JsonProtocol.sparkEventFromJson(json) match { case SparkListenerDriverAccumUpdates(executionId, accums) => assert(executionId == 1L) accums.foreach { case (a, b) => @@ -429,6 +436,24 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(b == 3L) } } + + // Test a case where the numbers in the JSON can only fit in longs: + val longJson = parse( + """ + |{ + | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", + | "executionId": 4294967294, + | "accumUpdates": [[4294967294,3]] + |} + """.stripMargin) + JsonProtocol.sparkEventFromJson(longJson) match { + case SparkListenerDriverAccumUpdates(executionId, accums) => + assert(executionId == 4294967294L) + accums.foreach { case (a, b) => + assert(a == 4294967294L) + assert(b == 3L) + } + } } }