New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39146][CORE][SQL] Introduce local singleton for ObjectMapper
that may be reused
#37999
Conversation
Write a mirco-benchmark to test Jackson ObjectWriter read and write: def testReadJsonToMap(valuesPerIteration: Int): Unit = {
val input =
"""
|{"mergeDir":"/a/b/c/mergeDirName","attemptId":"appattempt_1648454518011_994053_000001"}
""".stripMargin
val benchmark = new Benchmark("Test read json to map",
valuesPerIteration, output = output)
benchmark.addCase("Test Multiple") { _: Int =>
for (_ <- 0L until valuesPerIteration) {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(input, classOf[mutable.HashMap[String, String]])
}
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
benchmark.addCase("Test Single") { _: Int =>
for (_ <- 0L until valuesPerIteration) {
mapper.readValue(input, classOf[mutable.HashMap[String, String]])
}
}
benchmark.run()
}
def testWriteMapToJson(valuesPerIteration: Int): Unit = {
val map: mutable.HashMap[String, String] = new mutable.HashMap[String, String]()
map.put("mergeDir", "/a/b/c/mergeDirName")
map.put("attemptId", "yarn_appattempt_1648454518011_994053_000001")
val benchmark = new Benchmark("Test write map to json",
valuesPerIteration, output = output)
benchmark.addCase("Test Multiple") { _: Int =>
for (_ <- 0L until valuesPerIteration) {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(map)
}
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
benchmark.addCase("Test Single") { _: Int =>
for (_ <- 0L until valuesPerIteration) {
mapper.writeValueAsString(map)
}
}
benchmark.run()
}
def testCreateObjectMapper(valuesPerIteration: Int): Unit = {
val benchmark = new Benchmark("Test create ObjectMapper",
valuesPerIteration, output = output)
benchmark.addCase("Test create ObjectMapper") { _: Int =>
for (_ <- 0L until valuesPerIteration) {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
}
}
benchmark.run()
}
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val valuesPerIteration = 10000
testCreateObjectMapper(valuesPerIteration = valuesPerIteration)
testWriteMapToJson(valuesPerIteration = valuesPerIteration)
testReadJsonToMap(valuesPerIteration = valuesPerIteration)
} and run this use GA:
From the test results, we should use singleton Jackson ObjectMapper, because it seems expensive to new a ObjectMapper instance. |
JacksonUtils
to use singleton Jackson ObjectMapper JacksonUtils
to use singleton Jackson ObjectMapper
JacksonUtils
to use singleton Jackson ObjectMapper JacksonUtils
to use singleton Jackson ObjectMapper
My concern is that ObjectMapper, while thread-safe, is synchronized in some methods, IIRC. This could introduce contention for locks. Is the perf win really compelling? I wonder if we can reuse ObjectMapper inside classes where it matters for perf and not try to share one instance so widely. |
OK, let me add a multi thread comparison to check this |
In the serial r/w scenario, the benefits are obvious,
|
I write a multi thread test as follows: def testWriteMapToJson(valuesPerIteration: Int, threads: Int): Unit = {
val map = Map("intValue" -> 1,
"longValue" -> 2L,
"doubleValue" -> 3.0D,
"stringValue" -> "4",
"floatValue" -> 5.0F,
"booleanValue" -> true)
val benchmark = new Benchmark(s"Test $threads threads write map to json",
valuesPerIteration, output = output)
val multi = Array.fill(threads)({
val ret = new ObjectMapper()
ret.registerModule(DefaultScalaModule)
ret
})
benchmark.addCase("Test use multi mapper") { _: Int =>
val latch = new CountDownLatch(valuesPerIteration)
val executor = ThreadUtils.newDaemonFixedThreadPool(threads, "multi")
for (i <- 0 until valuesPerIteration) {
executor.submit(new Runnable {
override def run(): Unit = {
val idx = i % threads
multi(idx).writeValueAsString(map)
latch.countDown()
}
})
}
latch.await()
executor.shutdown()
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val singleton = Array.fill(threads)(mapper)
benchmark.addCase("Test use singleton mapper") { _: Int =>
val latch = new CountDownLatch(valuesPerIteration)
val executor = ThreadUtils.newDaemonFixedThreadPool(threads, "singleton")
for (i <- 0 until valuesPerIteration) {
executor.submit(new Runnable {
override def run(): Unit = {
val idx = i % threads
singleton(idx).writeValueAsString(map)
latch.countDown()
}
})
}
latch.await()
executor.shutdown()
}
benchmark.run()
}
def testReadJsonToMap(valuesPerIteration: Int, threads: Int): Unit = {
val input = {
val map = Map("intValue" -> 1,
"longValue" -> 2L,
"doubleValue" -> 3.0D,
"stringValue" -> "4",
"floatValue" -> 5.0F,
"booleanValue" -> true)
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(map)
}
val benchmark = new Benchmark(s"Test $threads threads read json to map",
valuesPerIteration, output = output)
val multi = Array.fill(threads)({
val ret = new ObjectMapper()
ret.registerModule(DefaultScalaModule)
ret
})
benchmark.addCase("Test use multi mapper") { _: Int =>
val latch = new CountDownLatch(valuesPerIteration)
val executor = ThreadUtils.newDaemonFixedThreadPool(threads, "multi")
for (i <- 0 until valuesPerIteration) {
executor.submit(new Runnable {
override def run(): Unit = {
val idx = i % threads
multi(idx).readValue(input, classOf[mutable.HashMap[String, String]])
latch.countDown()
}
})
}
latch.await()
executor.shutdown()
}
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val singleton = Array.fill(threads)(mapper)
benchmark.addCase("Test use singleton mapper") { _: Int =>
val latch = new CountDownLatch(valuesPerIteration)
val executor = ThreadUtils.newDaemonFixedThreadPool(threads, "singleton")
for (i <- 0 until valuesPerIteration) {
executor.submit(new Runnable {
override def run(): Unit = {
val idx = i % threads
singleton(idx).readValue(input, classOf[mutable.HashMap[String, String]])
latch.countDown()
}
})
}
latch.await()
executor.shutdown()
}
benchmark.run()
} The result from GA as follows:
and I use a bare metal server to test more threads, the test result as follows:
|
@srowen From the above test results, there is no significant performance difference between using global and local singletons. From a code perspective, thread safety should not be guaranteed by locks, it seems guaranteed by using a new So there seems to be no problem with the global instance. |
To be clear I was saying it is already thread-safe but the issue could be lock contention. It may not be an issue in Spark, and/or fixed in Jackson, but I'm looking at posts like https://medium.com/feedzaitech/when-jackson-becomes-a-parallelism-bottleneck-f1440a50b429 I agree it doesn't look like there is evidence of contention here, so probably worth the perf improvement. |
FasterXML/jackson-core#349 (comment) FasterXML/jackson-core@67add8c#diff-190cbec71e87394830d19fa2fea51b3bc324aa5fe694fc036ef85d1ad39d528f It seems that Jackson 2.8.7 has fixed the problems mentioned in post. If we have doubts about similar issue, I can only make limited changes as you said |
According to this principle, |
I'm OK with the change. I guess I'd slightly prefer keeping the change more 'local' by having a singleton in each of the classes that need this. That minimizes the scope. I don't feel strongly about it. |
JacksonUtils
to use singleton Jackson ObjectMapper JacksonUtils
to use singleton Jackson ObjectMapper
@@ -39,11 +38,10 @@ import org.apache.spark.annotation.DeveloperApi | |||
class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { | |||
assert(jsonFileURLs.nonEmpty) | |||
|
|||
private lazy val mapper = Utils.withScalaModuleMapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WOuldn't we want these in a companion object?
* Return a new `ObjectMapper` with `ClassTagExtensions`. | ||
* The mapper registers `DefaultScalaModule` by default. | ||
*/ | ||
def withScalaModuleMapper: ObjectMapper with ClassTagExtensions = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, don't know if we need to factor out a utility method just for this, I'd be OK inlining this in the ~3 places it is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems a little redundant and has been removed
JacksonUtils
to use singleton Jackson ObjectMapper ObjectMapper
that may be reused
GA passed |
Merged to master |
thanks @srowen |
What changes were proposed in this pull request?
This pr introduce local singletons for Jackson
ObjectMapper
that may be reused in Spark code to reduce the cost of repeatedly creatingObjectMapper
.Why are the changes needed?
Minor performance improvement.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Pass GitHub Actions