-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12007] [network] Avoid copies in the network lib's RPC layer. #9987
Conversation
This change seems large, but most of it is just replacing `byte[]` with `ByteBuffer` and `new byte[]` with `ByteBuffer.allocate()`, since it changes the network library's API. The following are parts of the code that actually have meaningful changes: - The Message implementations were changed to inherit from a new AbstractMessage that can optionally hold a reference to a body (in the form of a ManagedBuffer); this is similar to how ResponseWithBody worked before, except now it's not restricted to just responses. - The TransportFrameDecoder was pretty much rewritten to avoid copies as much as possible; it doesn't rely on CompositeByteBuf to accumulate incoming data anymore, since CompositeByteBuf has issues when slices are retained. The code now is able to create frames without having to resort to copying bytes except for a few bytes (containing the frame length) in very rare cases. - Some minor changes in the SASL layer to convert things back to `byte[]` since the JDK SASL API operates on those.
Test build #46729 has finished for PR 9987 at commit
|
retest this please |
Test build #46844 has finished for PR 9987 at commit
|
Test build #46846 has finished for PR 9987 at commit
|
This is really cool. I was thinking to do this this week :) |
public abstract class ResponseWithBody implements ResponseMessage { | ||
public final ManagedBuffer body; | ||
public final boolean isBodyInFrame; | ||
public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks AbstractResponseMessage
is not necessary if we move createFailureResponse
to ResponseMessage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would require dummy implementations in ChunkFetchFailure
, RpcFailure
and StreamFailure
...
I actually don't see a lot of value in having separate interfaces for ResponseMessage
and RequestMessage
; this is not Scala, so you can't have a sealed trait
and have the compiler help you when you miss something in a match. So you could just have Message
with no need for any of the other interfaces / abstract classes. But I didn't want to do that cleanup in this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okey, let's keep it.
LGTM |
OK, I'd like to merge this into 1.6 as well to further mitigate the scheduler throughput regression. |
Did you measure the effect this change has on the test? |
I think @zsxwing did some local benchmarks and saw like a 10% improvement |
Merging into master 1.6, thanks for fixing this @vanzin. |
This change seems large, but most of it is just replacing `byte[]` with `ByteBuffer` and `new byte[]` with `ByteBuffer.allocate()`, since it changes the network library's API. The following are parts of the code that actually have meaningful changes: - The Message implementations were changed to inherit from a new AbstractMessage that can optionally hold a reference to a body (in the form of a ManagedBuffer); this is similar to how ResponseWithBody worked before, except now it's not restricted to just responses. - The TransportFrameDecoder was pretty much rewritten to avoid copies as much as possible; it doesn't rely on CompositeByteBuf to accumulate incoming data anymore, since CompositeByteBuf has issues when slices are retained. The code now is able to create frames without having to resort to copying bytes except for a few bytes (containing the frame length) in very rare cases. - Some minor changes in the SASL layer to convert things back to `byte[]` since the JDK SASL API operates on those. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9987 from vanzin/SPARK-12007. (cherry picked from commit 9bf2120) Signed-off-by: Andrew Or <andrew@databricks.com>
16/02/26 10:35:04 WARN TaskSetManager: Lost task 213.0 in stage 1.0 (TID 16971, SZV1000041645): FetchFailed(BlockManagerId(118, 192.168.75.193, 23325), shuffleId=0, mapId=51, reduceId=213, message=
org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: digest response format violation. Mismatched nonce.
at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121)
at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:100)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:128)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:99)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745) When I test word count on a larger cluster(100 workers), these exception occurs every time, but on a small cluster never. Do you have any idea about this? Thanks! |
@yaooqinn please don't ask questions on github, especially on old, closed PRs. It's confusing. |
This change seems large, but most of it is just replacing
byte[]
with
ByteBuffer
andnew byte[]
withByteBuffer.allocate()
,since it changes the network library's API.
The following are parts of the code that actually have meaningful
changes:
AbstractMessage that can optionally hold a reference to a body
(in the form of a ManagedBuffer); this is similar to how
ResponseWithBody worked before, except now it's not restricted
to just responses.
copies as much as possible; it doesn't rely on CompositeByteBuf
to accumulate incoming data anymore, since CompositeByteBuf
has issues when slices are retained. The code now is able to
create frames without having to resort to copying bytes except
for a few bytes (containing the frame length) in very rare cases.
byte[]
since the JDK SASL API operates on those.