-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#584] feat(netty): Add transport client pool for netty #771
Conversation
Codecov Report
@@ Coverage Diff @@
## master #771 +/- ##
============================================
+ Coverage 60.95% 61.11% +0.16%
+ Complexity 1956 1851 -105
============================================
Files 244 226 -18
Lines 13308 10967 -2341
Branches 1119 1089 -30
============================================
- Hits 8112 6703 -1409
+ Misses 4740 3889 -851
+ Partials 456 375 -81
... and 26 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
common/src/main/java/org/apache/uniffle/common/netty/client/TransportContext.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/netty/client/TransportClient.java
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
Outdated
Show resolved
Hide resolved
…ansportClientFactory.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@xumanbu would you mind modify the pr description to the current status? |
done. |
I think the content of And regarding the user changes, you could describe it as |
thanks for your guidance. edited. |
ClientPool clientPool = connectionPool.get(unresolvedAddress); | ||
if (clientPool == null) { | ||
connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer)); | ||
clientPool = connectionPool.get(unresolvedAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use computeIfAbsent
to merge this two lines?
public TransportClientFactory(TransportContext context) { | ||
this.context = Preconditions.checkNotNull(context); | ||
this.conf = context.getConf(); | ||
this.connectionPool = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use JavaUtils.newConcurrentMap()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
public static final ConfigOption<Integer> NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER = ConfigOptions | ||
.key("rss.client.netty.client.connections.per.peer") | ||
.intType() | ||
.defaultValue(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this too small?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think default value small is safty. In small number uniffle server cluster,the large value will case a lot of connections to server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
.withDescription("netty connect to server time out mills"); | ||
|
||
public static final ConfigOption<IOMode> NETTY_IO_MODE = ConfigOptions | ||
.key("rss.client.netty.io.mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leixm added rss.server.netty.epoll.enable
for netty sever,do you guys it's better to rename that configuration to rss.server.netty.io.mode
instead? So they are more consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for me, i will fix it later PRs.
|
||
public TransportClient(Channel channel, TransportResponseHandler handler) { | ||
this.channel = Preconditions.checkNotNull(channel); | ||
this.handler = Preconditions.checkNotNull(handler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use java.util.Objects#requireNonNull
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions#checkNotNull will throw NPE when object is null,Preconditions may easyer to use than Object#requireNonNull.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions#checkNotNull will throw NPE when object is null,
Objects#requireNonNull
does exactly the same. The point is that we should prefer std lib over guava as guava has its reputation to break things in various versions, therefore it will cause class conflict for other systems which also has different version of guava.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense.I'll instead.
@Override | ||
protected void handleFailure(String errorMsg, Throwable cause) { | ||
handler.removeRpcRequest(rpcRequestId); | ||
callback.onFailure(new IOException(errorMsg, cause)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a logging here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense.I'll add same log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same log is already add in parent method,so I think it's enough at now.
return counter.getAndIncrement(); | ||
} | ||
|
||
public class StdChannelListener implements GenericFutureListener<Future<? super Void>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like that it's better to declare it as public static class
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use non-static inner class for Listener is for the method to access the attributes of the out class,such as channel,handle.
public TransportClientFactory(TransportContext context) { | ||
this.context = Preconditions.checkNotNull(context); | ||
this.conf = context.getConf(); | ||
this.connectionPool = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use JavaUtils.newConcurrentMap?
common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
Outdated
Show resolved
Hide resolved
synchronized (clientPool.locks[clientIndex]) { | ||
cachedClient = clientPool.clients[clientIndex]; | ||
|
||
if (cachedClient != null) { | ||
if (cachedClient.isActive()) { | ||
logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient); | ||
return cachedClient; | ||
} else { | ||
logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress); | ||
} | ||
} | ||
clientPool.clients[clientIndex] = internalCreateClient(resolvedAddress, decoder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you wrap these block of code into the ClientPool structure?
Such as clientPoo.createClientIfAbsent
, which should be similar as ConcurrentMap's computeIfAbsent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to create createClientIfAbsent function in ClientPool. like this :
TransportClient createClientIfAbsent(
int clientIndex
, InetSocketAddress resolvedAddress
, ChannelInboundHandlerAdapter decoder
, BiFunction<InetSocketAddress,ChannelInboundHandlerAdapter,TransportClient> createClientFunction ){
clients[clientIndex] = createClientFunction.apply(resolvedAddress, decoder);
return clients[clientIndex];
}
}
but It seems not very graceful becuase of createClientFunction have two unrelated args.
common/src/main/java/org/apache/uniffle/common/netty/client/TransportContext.java
Show resolved
Hide resolved
…ansportClientFactory.java make sense Co-authored-by: advancedxy <xianjin@apache.org>
.withDescription("netty connect to server time out mills"); | ||
|
||
public static final ConfigOption<IOMode> NETTY_IO_MODE = ConfigOptions | ||
.key("rss.client.netty.io.mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for me, i will fix it later PRs.
public static final ConfigOption<Integer> NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER = ConfigOptions | ||
.key("rss.client.netty.client.connections.per.peer") | ||
.intType() | ||
.defaultValue(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
* <p>After `onSuccess` returns, `response` will be recycled and its content will become invalid. | ||
* Please copy the content of `response` if you want to use it after `onSuccess` returns. | ||
*/ | ||
void onSuccess(RpcResponse rpcResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, not all response types are RPCResponse, such as getInMemoryShuffleData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getInMemoryShuffleData result will extends RPCResponse in your design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetMemoryShuffleDataResponse
will extend Message
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do GetMemoryShuffleDataResponse
need extend Message
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems every Response type will have status
and retMsg
attr, it can extend RPCResponse, my mistake.
We better add some UTs. |
Spark don't have UTs, too. Maybe we can use Integration tests to guarantee the correctness when other prs were merged. |
Ok. |
@xumanbu There is one comment left. #771 (comment) Could you address it? |
My mistake.done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, merged. thanks all
…#771) ### What changes were proposed in this pull request? 1. add for netty rpc client TransportClient 2. TransportClientFactory for connection pool 3. TransportContext contains the context to create a TransportClientFactory, setup Netty Channel pipelines with a TransportResponseHandler 4. TransportConf for netty transport config create by RssConf ### Why are the changes needed? Fix: apache#584 ### Does this PR introduce _any_ user-facing change? add client configurations and add the ability to reuse netty clients. Todo: update the user documentation after the netty feature is completed @xumanbu ### How was this patch tested? local test Co-authored-by: jam.xu <jam.xu@vipshop.com>
What changes were proposed in this pull request?
Why are the changes needed?
Fix: #584
Does this PR introduce any user-facing change?
add client configurations and add the ability to reuse netty clients.
Todo: update the user documentation after the netty feature is completed @xumanbu
How was this patch tested?
local test