-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#133] feat(netty): Add Netty Utils #727
Conversation
@smallzhongfeng @jerqi can you help review plz. |
public void test() { | ||
MockClient client = new MockClient(); | ||
JavaUtils.closeQuietly(client); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird indentation.
|
||
/** Creates a new ThreadFactory which prefixes each thread with the given name. */ | ||
public static ThreadFactory createThreadFactory(String threadPoolPrefix) { | ||
return new DefaultThreadFactory(threadPoolPrefix, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This thread factory will produce non-daemon threads. If non-daemon threads don't stop, they will block JVM exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have set the second parameter daemon=true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it will produce daemon threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use ThreadUtils.getThreadFactory(String threadPoolPrefix)
?
Codecov Report
@@ Coverage Diff @@
## master #727 +/- ##
============================================
+ Coverage 60.60% 62.81% +2.20%
- Complexity 1849 1894 +45
============================================
Files 229 224 -5
Lines 12749 11035 -1714
Branches 1064 1090 +26
============================================
- Hits 7727 6932 -795
+ Misses 4611 3727 -884
+ Partials 411 376 -35
... and 21 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
cc @jerqi Why does profile=spark2.3 need to set the version of netty separately?
|
The constructor of netty 4.1.47.Final's PooledByteBufAllocator is inconsistent. Can we use the same netty version? maybe there are more problems. |
|
||
/** Creates a new ThreadFactory which prefixes each thread with the given name. */ | ||
public static ThreadFactory createThreadFactory(String threadPoolPrefix) { | ||
return new DefaultThreadFactory(threadPoolPrefix, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use ThreadUtils.getThreadFactory(String threadPoolPrefix)
?
|
||
/** Creates a Netty EventLoopGroup based on the IOMode. */ | ||
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) { | ||
ThreadFactory threadFactory = createThreadFactory(threadPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DefaultThreadFactory implemented by netty seems to have more considerations, it uses FastThreadLocalRunnable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we better use DefaultThreadFactory provided by netty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, maybe we can move this method to ThreadUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I think the two methods are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I think the two methods are the same.
That's a good idea.
It should be used for tests. If we change the version, the tests pass, it's ok to remove the version. |
Ok, I got it. |
@Override | ||
public String toString() { | ||
return "RpcResponse{" | ||
+ "requestId=" + requestId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modify the indent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
throw ex; | ||
} | ||
} finally { | ||
serializedMsgBuf.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code return ctx.writeAndFlush(responseMsgBuf);
is executed, SerializedMsgBuf
will not be released, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't nedd serializedMsgBuf, I will delete it.
cc @smallzhongfeng can you help review plz? |
I think that's a flaky test. |
Yes. |
|
||
@Override | ||
public int encodedLength() { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do this method return 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it, actually the current Encoder and Decoder are not implemented yet, do not depend on this method.
} | ||
|
||
public static ChannelFuture writeResponseMsg(ChannelHandlerContext ctx, Message msg, boolean doWriteType) { | ||
ByteBuf responseMsgBuf = ctx.alloc().buffer(1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the value 1000
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will set it to 64. In fact, ByteBuf will automatically resize when the capacity is insufficient. Setting an initial value is to avoid frequent resize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to ensure proper initial size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to ensure proper initial size?
This is not very sure. Some messages may be smaller than 64 bytes, and some messages may be larger than 64 bytes. Maybe we can use the encodeLength of RpcResponse as an initial value, because most Response are RpcResponse, Assuming that the retMessage of RpcResponse is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to ensure proper initial size?
This is not very sure. Some messages may be smaller than 64 bytes, and some messages may be larger than 64 bytes. Maybe we can use the encodeLength of RpcResponse as an initial value, because most Response are RpcResponse, Assuming that the retMessage of RpcResponse is empty.
Could we refer to Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we can use encodeLength as the initial size, which is the determined size, and all Messages will implement this method.
One more flaky test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @smallzhongfeng @leixm
|
||
/** copy from spark, In order to override the createPooledByteBufAllocator method, | ||
* the property DEFAULT_TINY_CACHE_SIZE does not exist in netty>4.1.47. */ | ||
public class NettyUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any ways to achieve this? I believe we should avoid this kind code duplication as much as possible unless there's no other way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree, the key problem is that spark2.3 is not compatible with a higher version of netty (4.1.68.Final), and the uniffle client needs to use netty 4.1.68.Final version, which is a low-cost implementation, Or do you have a better suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So could we simply add the createPooledByteBufAllocator
and its related method here?
Other methods could be reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have shaded Netty in Grpc.
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
Could we use this Netty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem appropriate, maybe the grpc dependency will be removed when the netty version is stable in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So could we simply add the
createPooledByteBufAllocator
and its related method here? Other methods could be reused?
You're right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem appropriate, maybe the grpc dependency will be removed when the netty version is stable in the future.
Grpc and Netty will coexist in the future. Because Grpc is very convenient and bettter compatibility, if we don't need high performance in some rpcs, such control panel, we will still use Grpc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks ok, grpc1.47 is using netty4.1.72.Final, link: https://github.com/grpc/grpc-java/blob/v1.47.x/repositories.bzl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We found a compatibility issue when using the Spark 2.1.0 version. The following error will be thrown in Spark driver's log:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.network.util.NettyUtils.createEventLoop(Lorg/apache/spark/network/util/IOMode;ILjava/lang/String;)Lio/netty/channel/EventLoopGroup;
at org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:104)
at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:89)
at org.apache.spark.rpc.netty.NettyRpcEnv.(NettyRpcEnv.scala:70)
at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:449)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:264)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:271)
at org.apache.spark.SparkContext.(SparkContext.scala:474)
at org.apache.spark.deploy.yarn.SQLApplicationMaster.(SQLApplicationMaster.scala:96)
at org.apache.spark.deploy.yarn.SQLApplicationMaster.(SQLApplicationMaster.scala:53)
at org.apache.spark.deploy.yarn.SQLApplicationMaster$$anonfun$main$1.apply$mcV$sp(SQLApplicationMaster.scala:544)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2337)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60)
at org.apache.spark.deploy.yarn.SQLApplicationMaster$.main(SQLApplicationMaster.scala:543)
at org.apache.spark.deploy.yarn.SQLApplicationMaster.main(SQLApplicationMaster.scala)
This is because the return value of createEventLoop
in NettyUtils
within Uniffle is org.apache.uniffle.io.netty.channel.EventLoopGroup
(which is shaded), while the return value of createEventLoop
in NettyUtils
within Spark is io.netty.channel.EventLoopGroup
. When running a Spark application, the Driver loads NettyUtils
from the rss-client's JAR, causing inconsistency in the method's return values and ultimately leading to a NoSuchMethodError exception.
### What changes were proposed in this pull request? Add netty utils. ### Why are the changes needed? Add netty utils for netty replace grpc. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Co-authored-by: leixianming <leixianming@didiglobal.com>
### What changes were proposed in this pull request? Add netty utils. ### Why are the changes needed? Add netty utils for netty replace grpc. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Co-authored-by: leixianming <leixianming@didiglobal.com>
I will take a look, it seems that there is a class conflict problem, which was not found through UTs. |
When we package shaded, the netty dependency is prefixed with "org.apache.uniffle", causing Spark to throw an NoSuchMethodError when looking for org.apache.spark.network.util.NettyUtils.createEventLoop, because the return value of this method is org.apache. uniffle.io.netty.channel.EventLoopGroup instead of io.netty.channel.EventLoopGroup. See debug image |
Besides Spark 2.1, we also found this error when using Spark 2.4. Please have a look at both of the Spark versions. Thx. @leixm |
### What changes were proposed in this pull request? When we release the shaded client jar for Spark 2.x, the class `org.apache.spark.network.util.NettyUtils.class` should not be included in the package. ### Why are the changes needed? Fix #1567. & It's also a followup PR for #727. When running in Spark 2.4, we will encounter exceptions as below: ``` 24/03/07 16:34:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tdwadmin); groups with view permissions: Set(); users with modify permissions: Set(tdwadmin); groups with modify permissions: Set() Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.network.util.NettyUtils.createEventLoop(Lorg/apache/spark/network/util/IOMode;ILjava/lang/String;)Lio/netty/channel/EventLoopGroup; at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:104) at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:89) at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:70) at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:449) at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:264) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:271) at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) at org.apache.spark.deploy.yarn.SQLApplicationMaster.<init>(SQLApplicationMaster.scala:96) at org.apache.spark.deploy.yarn.SQLApplicationMaster.<init>(SQLApplicationMaster.scala:53) at org.apache.spark.deploy.yarn.SQLApplicationMaster$$anonfun$main$1.apply$mcV$sp(SQLApplicationMaster.scala:544) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2286) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.yarn.SQLApplicationMaster$.main(SQLApplicationMaster.scala:543) at org.apache.spark.deploy.yarn.SQLApplicationMaster.main(SQLApplicationMaster.scala) ``` This is because the return value of `createEventLoop` in `NettyUtils` within Uniffle is `org.apache.uniffle.io.netty.channel.EventLoopGroup` (which is shaded), while the return value of `createEventLoop` in `NettyUtils` within Spark is `io.netty.channel.EventLoopGroup`. When running a Spark application, the Driver loads `NettyUtils` from the rss-client's JAR, causing inconsistency in the method's return values and ultimately leading to a `NoSuchMethodError` exception. We should let Spark use its own `NettyUtils` instead of ours. However, if we simply remove the `org.apache.spark.network.util.NettyUtils` file from the code repository, we will encounter errors when running integration tests. ``` java.lang.RuntimeException: java.lang.NoSuchFieldException: DEFAULT_TINY_CACHE_SIZE at org.apache.spark.network.util.NettyUtils.getPrivateStaticField(NettyUtils.java:131) at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:118) at org.apache.spark.network.server.TransportServer.init(TransportServer.java:94) at org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:73) at org.apache.spark.network.TransportContext.createServer(TransportContext.java:114) at org.apache.spark.rpc.netty.NettyRpcEnv.startServer(NettyRpcEnv.scala:119) at org.apache.spark.rpc.netty.NettyRpcEnvFactory$$anonfun$4.apply(NettyRpcEnv.scala:465) at org.apache.spark.rpc.netty.NettyRpcEnvFactory$$anonfun$4.apply(NettyRpcEnv.scala:464) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2275) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2267) at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:469) at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:256) at org.apache.spark.SparkContext.<init>(SparkContext.scala:423) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2493) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:934) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:925) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:925) at org.apache.uniffle.test.SparkIntegrationTestBase.runSparkApp(SparkIntegrationTestBase.java:92) at org.apache.uniffle.test.SparkIntegrationTestBase.run(SparkIntegrationTestBase.java:53) at org.apache.uniffle.test.RSSStageResubmitTest.testRSSStageResubmit(RSSStageResubmitTest.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ``` This is because our code project _**globally controls**_ the version of Netty in the root `pom.xml`'s `dependencyManagement`, which leads to Spark2's own lower version of Netty being upgraded to a higher version. This causes exceptions due to Netty version incompatibility, resulting in certain fields not being found. This issue does not occur in the production environment, as Spark has its own `NettyUtils` and does not need to use our provided `NettyUtils`. Retaining `org.apache.spark.network.util.NettyUtils` is somewhat of a workaround for passing integration tests. But given that Spark2 is not frequently updated anymore, maintaining a static version of `NettyUtils` should not pose a significant problem. Of course, the optimal approach would be to shade our own Netty during integration testing, allowing Spark to continue using its own Netty dependency, effectively separating the two. This would provide the most accurate testing, as any changes in Spark2's Netty version could be verified through unit tests. However, this would mean that a large amount of integration test code would need to prefix `org.apache.uniffle` to the `import` statements where Netty is used. Ultimately, this could lead to significant redundancy in the code and cause confusion for those who write codes in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
What changes were proposed in this pull request?
Add netty utils.
Why are the changes needed?
Add netty utils for netty replace grpc.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT.