diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java index 36c134b87a4..eead60f08b9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java @@ -42,4 +42,6 @@ XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient, boolean topologyAware); + XceiverClientSpi acquireClientUncached(Pipeline pipeline, boolean topologyAware) + throws Exception; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index f77670a454a..0be839578c3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.Config; @@ -187,6 +186,39 @@ public XceiverClientSpi acquireClient(Pipeline pipeline, } } + /** + * Similar to acquireClient() but does not use the cache. + * User must close the client explicitly after use. + * @param pipeline pipeline + * @param topologyAware topology aware + * @return XceiverClientSpi object + * @throws IOException + */ + @Override + public XceiverClientSpi acquireClientUncached(Pipeline pipeline, + boolean topologyAware) throws + Exception { + HddsProtos.ReplicationType type = pipeline.getType(); + XceiverClientSpi client = null; + switch (type) { + case RATIS: + client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, + trustManager); + break; + case STAND_ALONE: + client = new XceiverClientGrpc(pipeline, conf, trustManager); + break; + case EC: + client = new ECXceiverClientGrpc(pipeline, conf, trustManager); + break; + case CHAINED: + default: + throw new IOException("not implemented " + pipeline.getType()); + } + client.connect(); + return client; + } + /** * Releases a XceiverClientSpi after use. * @@ -229,34 +261,12 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient, private XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware) throws IOException { - HddsProtos.ReplicationType type = pipeline.getType(); try { // create different client different pipeline node based on // network topology String key = getPipelineCacheKey(pipeline, topologyAware); - return clientCache.get(key, new Callable() { - @Override - public XceiverClientSpi call() throws Exception { - XceiverClientSpi client = null; - switch (type) { - case RATIS: - client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, - trustManager); - break; - case STAND_ALONE: - client = new XceiverClientGrpc(pipeline, conf, trustManager); - break; - case EC: - client = new ECXceiverClientGrpc(pipeline, conf, trustManager); - break; - case CHAINED: - default: - throw new IOException("not implemented " + pipeline.getType()); - } - client.connect(); - return client; - } - }); + return clientCache.get(key, + () -> acquireClientUncached(pipeline, topologyAware)); } catch (Exception e) { throw new IOException( "Exception getting XceiverClient: " + e.toString(), e); diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java index 6edcca65f92..17f1888dd26 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java @@ -121,6 +121,12 @@ public XceiverClientSpi acquireClient(Pipeline pipeline, return mockXceiverClientSpi; } + @Override + public XceiverClientSpi acquireClientUncached(Pipeline pipeline, + boolean topologyAware) throws IOException { + return acquireClient(pipeline, topologyAware); + } + @Override public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient, boolean topologyAware) { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java index 5d3d3af9e1c..39f315daf43 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java @@ -160,7 +160,7 @@ public Void call() throws Exception { } clients = new ArrayList<>(numClients); for (int i = 0; i < numClients; i++) { - clients.add(xceiverClientManager.acquireClient(pipeline)); + clients.add(xceiverClientManager.acquireClientUncached(pipeline, true)); } init(); @@ -171,7 +171,7 @@ public Void call() throws Exception { runTests(this::sendRPCReq); } finally { for (XceiverClientSpi client : clients) { - xceiverClientManager.releaseClient(client, false); + client.close(); } xceiverClientManager.close(); scmClient.close();