New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream #38428
Conversation
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change itself looks promising, thanks for working on it @eejbyfeldt !
Given DeserializationStream
is a public api, I would want to be more conservative in changes to it. Let us see how to proposal develops.
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
Outdated
Show resolved
Hide resolved
final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] { | ||
override protected def getNext() = { | ||
if (KryoDeserializationStream.this.hasNext) { | ||
(readKey[Any](), readValue[Any]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we are fix this, not make assumptions that if key is present, value will be as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that if only a key exist we just ignore it like the current implementation would?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or potentially do something better.
if (hasNext) {
val key = readKey()
if (hasNext) {
return (key, readValue())
}
}
But given this is corner case enough, I would consider this change mostly a nit.
case e: KryoException | ||
if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") => | ||
throw new EOFException | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preserve this even with the proposed change of checking eof - to continue catching cases where EOF is encountered prematurely ?
This will be mainly to handle abnormal cases, instead of the common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure will add it back. I think that catching and ignoring the exceptions here should be revisited in some other change as it seems to me like it could case dataloss that we just assume the exception here means EOF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We should investigate that - but let us do it separately from this PR, since this change will be beneficial even without that.
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
Outdated
Show resolved
Hide resolved
cff6396
to
017cab8
Compare
93486cd
to
208a430
Compare
The PR as such looks reasonable to me - can we add a test to explicitly test for EOF behavior ? +CC @JoshRosen who had worked on this in the distant past :-) I want to make sure there are more eyes on this change. |
Want to see if we can make this for 3.4 - more eyes on it would be good. +CC @Ngone51, @srowen, @dongjoon-hyun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rebase to master
branch and run the microbenchmarks?
- https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
- https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
We run the benchmark via GitHub Action. Please see Running benchmarks in your forked repository
section of our developer guide, @eejbyfeldt .
208a430
to
86eda97
Compare
So I ran the benchmark:
This is seems to be within the stdev of what we have in the master branch. Which is expected since this code does not use the interface that that uses the deserialization stream iterators. (And it used the same cpu) For KryoSerializerBenchmark the branch had:
This is a slower than what we have on master:
But it ran with a different (slower) cpu then master. I also ran the benchmark on current master
There it was even slower since it used an even slower cpu. Is there someway to better control the cpu used? Or should I just run the benchmark a couple of times? |
I ran the master branch again and used an executor with the same cpu.
Based on this it looks like the branch is branch might is a bit faster. But I think it might also be in noise territory and that one would need a more specific benchmark that creates a lot of small streams for the difference to show up. I think it only expected to be order of percent better in the "worst case" when we are creating lots of small streams. |
Thank you for sharing the result. Without a clear win, it's hard for us to accept this proposal because this is one of the crucial part.
|
b1253ae
to
aad5357
Compare
@mridulm I added a spec for this in: 77e616a
Added a benchmark that shows that there is overhead in using |
Looks OK to me; @mridulm ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you run the benchmark once more? The generated files look wrong to me because the Java version is downgraded.
- OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1031-azure
- Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+ OpenJDK 64-Bit Server VM 11.0.17+8 on Linux 5.15.0-1031-azure
+ Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
According to the document, 11.0.18+10 (default)
is supposed to be there.
Looks fine to me. |
My plan was to update the benchmarks. But I did not get around to uploading the results until today. But now the branch should be updated with an up to date run. |
Merged to master |
val name = "Benchmark of kryo asIterator on deserialization stream" | ||
runBenchmark(name) { | ||
val benchmark = new Benchmark(name, N, 10, output = output) | ||
Seq(true, false).map(useIterator => run(useIterator, benchmark)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should use .foreach
instead of .map
val elements = Array.fill[T](elementCount)(createElement) | ||
|
||
benchmark.addCase( | ||
s"Colletion of $name with $elementCount elements, useIterator: $useIterator") { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: Colletion -> Collection
useIterator: Boolean, | ||
ser: SerializerInstance): Int = { | ||
val serialized: Array[Byte] = { | ||
val baos = new ByteArrayOutputStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial size of ByteArrayOutputStream
is 32. Will the grow of underlying byte[]
and GC affect the test results? If so, is it possible to estimate a reasonable initial size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GC will/might make the benchmark more noisy but it should not introduce any bias?
I guess choosing a bigger initial size will reduce the issue for some of the benchmark for some of the cases as it will not need to resize, but I can not see any simple way to estimate the total size in general. But maybe using a bigger initial size is better/good enough?
What changes were proposed in this pull request?
This PR avoid exceptions in the implementation of KryoDeserializationStream.
Why are the changes needed?
Using an exceptions for end of stream is slow, especially for small streams. It also problematic as it the exception caught in the KryoDeserializationStream could also be caused by corrupt data which would just be ignored in the current implementation.
Does this PR introduce any user-facing change?
Yes, it changes so some method on KryoDeserializationStream no longer raises EOFException.
How was this patch tested?
Existing tests.
This PR only changes KryoDeserializationStream as a proof of concept. If this is the direction we want to go we should probably change DerserializationStream isntead so that the interface is consistent.