From 99c6e6398b56e77815a816a283c860cd966f439f Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Wed, 5 Sep 2018 23:36:24 +0800 Subject: [PATCH 01/23] make dubbo support multiple shared links, upgrading RPC throughput --- .../org/apache/dubbo/common/Constants.java | 6 +- .../rpc/protocol/dubbo/DubboProtocol.java | 84 ++++++++++--------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 95069306907..4b97c756be4 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -116,13 +116,15 @@ public class Constants { public static final int DEFAULT_THREADS = 200; - public static final boolean DEFAULT_KEEP_ALIVE = true; + public static final boolean DEFAULT_KEEP_ALIVE = true;````` public static final int DEFAULT_QUEUES = 0; public static final int DEFAULT_ALIVE = 60 * 1000; - public static final int DEFAULT_CONNECTIONS = 0; + public static final String DEFAULT_CONNECTIONS = "1"; + + public static final String DEFAULT_CONNECTIONS_KEY = "default.connections.key"; public static final int DEFAULT_ACCEPTS = 0; diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index d7fe35d3b9c..c4d33695042 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -21,10 +21,7 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; import org.apache.dubbo.common.serialize.support.SerializationOptimizer; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.common.utils.ConfigUtils; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.common.utils.*; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.Transporter; @@ -47,11 +44,7 @@ import org.apache.dubbo.rpc.protocol.AbstractProtocol; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -67,9 +60,8 @@ public class DubboProtocol extends AbstractProtocol { private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke"; private static DubboProtocol INSTANCE; private final Map serverMap = new ConcurrentHashMap(); // - private final Map referenceClientMap = new ConcurrentHashMap(); // + private final Map> referenceClientMap = new ConcurrentHashMap<>(); // private final ConcurrentMap ghostClientMap = new ConcurrentHashMap(); - private final ConcurrentMap locks = new ConcurrentHashMap(); private final Set optimizers = new ConcurrentHashSet(); //consumer side export a stub service for dispatching event //servicekey-stubmethods @@ -364,10 +356,13 @@ private ExchangeClient[] getClients(URL url) { // whether to share connection boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); + List shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { service_share_connect = true; - connections = 1; + connections = Integer.parseInt(ConfigUtils.getProperty(Constants.DEFAULT_CONNECTIONS_KEY, + Constants.DEFAULT_CONNECTIONS)); + shareClients = getSharedClient(url, connections); } ExchangeClient[] clients = new ExchangeClient[connections]; @@ -384,31 +379,37 @@ private ExchangeClient[] getClients(URL url) { /** * Get shared connection */ - private ExchangeClient getSharedClient(URL url) { + private List getSharedClient(URL url) { String key = url.getAddress(); - ReferenceCountExchangeClient client = referenceClientMap.get(key); - if (client != null) { - if (!client.isClosed()) { - client.incrementAndGetCount(); - return client; - } else { - referenceClientMap.remove(key); - } + List clients = referenceClientMap.get(key); + if(clients == null) { + List referenceCountExchangeClients = buildReferenceCountExchangeClientList(url, key, connectNum); + referenceClientMap.put(key, referenceCountExchangeClients); + + clients = referenceCountExchangeClients; } - locks.putIfAbsent(key, new Object()); - synchronized (locks.get(key)) { - if (referenceClientMap.containsKey(key)) { - return referenceClientMap.get(key); + for (int i = 0; i < clients.size(); i++) { + ReferenceCountExchangeClient client = clients.get(i); + if (client.isClosed()){ + client = buildReferenceCountExchangeClient(url, key); + clients.set(i, client); } - ExchangeClient exchangeClient = initClient(url); - client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); - referenceClientMap.put(key, client); - ghostClientMap.remove(key); - locks.remove(key); - return client; + client.incrementAndGetCount(); } + + return clients; + } + + private ReferenceCountExchangeClient buildReferenceCountExchangeClient (URL url, String key) { + + ExchangeClient exchangeClient = initClient(url); + + ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); + ghostClientMap.remove(key); + + return client; } /** @@ -460,15 +461,20 @@ public void destroy() { } for (String key : new ArrayList(referenceClientMap.keySet())) { - ExchangeClient client = referenceClientMap.remove(key); - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); + List clients = referenceClientMap.remove(key); + + if(CollectionUtils.isNotEmpty(clients)) { + for (ReferenceCountExchangeClient client : clients) { + if (client != null) { + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); + } + client.close(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } } - client.close(ConfigUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); } } } From 32197c57a014ead5ea7c536e39972e2dab3e9713 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Thu, 6 Sep 2018 19:18:39 +0800 Subject: [PATCH 02/23] Fix compilation error --- .../src/main/java/org/apache/dubbo/common/Constants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 4b97c756be4..1104aaee90f 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -116,7 +116,7 @@ public class Constants { public static final int DEFAULT_THREADS = 200; - public static final boolean DEFAULT_KEEP_ALIVE = true;````` + public static final boolean DEFAULT_KEEP_ALIVE = true; public static final int DEFAULT_QUEUES = 0; From 5ecdbe744989ee02d25f05104ebf551f077aaa8a Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Thu, 6 Sep 2018 19:26:42 +0800 Subject: [PATCH 03/23] Fix compilation error --- .../rpc/protocol/dubbo/DubboProtocol.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index c4d33695042..ab2c76b344c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -364,11 +364,10 @@ private ExchangeClient[] getClients(URL url) { Constants.DEFAULT_CONNECTIONS)); shareClients = getSharedClient(url, connections); } - ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { - if (service_share_connect) { - clients[i] = getSharedClient(url); + if (service_share_connect){ + clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); } @@ -379,7 +378,7 @@ private ExchangeClient[] getClients(URL url) { /** * Get shared connection */ - private List getSharedClient(URL url) { + private List getSharedClient(URL url, int connectNum) { String key = url.getAddress(); List clients = referenceClientMap.get(key); if(clients == null) { @@ -402,6 +401,17 @@ private List getSharedClient(URL url) { return clients; } + private List buildReferenceCountExchangeClientList (URL url, String key, int connectNum) { + + List clients = new ArrayList(connectNum); + + for (int i = 0; i < connectNum; i++) { + clients.add(buildReferenceCountExchangeClient(url, key)); + } + + return clients; + } + private ReferenceCountExchangeClient buildReferenceCountExchangeClient (URL url, String key) { ExchangeClient exchangeClient = initClient(url); From 71c1ac23fa84e86af92b83dea324afeeb4d2490b Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Thu, 6 Sep 2018 19:30:29 +0800 Subject: [PATCH 04/23] opti import --- .../org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index ab2c76b344c..3e95f34ae38 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -18,10 +18,14 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.common.utils.ConfigUtils; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; import org.apache.dubbo.common.serialize.support.SerializationOptimizer; -import org.apache.dubbo.common.utils.*; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.Transporter; From 1467d21fb0fe29953e2645bc5fa685061626016d Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Thu, 6 Sep 2018 20:11:47 +0800 Subject: [PATCH 05/23] if add {} --- .../org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 3e95f34ae38..610a39810f0 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -221,8 +221,9 @@ Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); - if (exporter == null) + if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + } return exporter.getInvoker(); } @@ -293,8 +294,9 @@ private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); - if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) + if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); + } url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; From 52a3ac78d9b55212e02362b1d22a5fa1c7eb350a Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Thu, 6 Sep 2018 23:52:11 +0800 Subject: [PATCH 06/23] checkstyle fail --- .../org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java | 3 ++- .../org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java index 7c574ffb856..93343c0c26f 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java @@ -116,8 +116,9 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { @Override public boolean isAvailable() { - if (!super.isAvailable()) + if (!super.isAvailable()) { return false; + } for (ExchangeClient client : clients) { if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) { //cannot write == not Available ? diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 610a39810f0..25e1bd3b07f 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -48,7 +48,12 @@ import org.apache.dubbo.rpc.protocol.AbstractProtocol; import java.net.InetSocketAddress; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; From 1cb48fcb074ee27b1b16701bae79b64079220a70 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Fri, 7 Sep 2018 16:22:38 +0800 Subject: [PATCH 07/23] fix getSharedClient referenceCount calculation error bug --- .../rpc/protocol/dubbo/DubboProtocol.java | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 25e1bd3b07f..6e6bbdf8668 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -18,42 +18,20 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.common.utils.StringUtils; -import org.apache.dubbo.common.utils.ConfigUtils; -import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; import org.apache.dubbo.common.serialize.support.SerializationOptimizer; +import org.apache.dubbo.common.utils.*; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.Transporter; -import org.apache.dubbo.remoting.exchange.ExchangeChannel; -import org.apache.dubbo.remoting.exchange.ExchangeClient; -import org.apache.dubbo.remoting.exchange.ExchangeHandler; -import org.apache.dubbo.remoting.exchange.ExchangeServer; -import org.apache.dubbo.remoting.exchange.Exchangers; +import org.apache.dubbo.remoting.exchange.*; import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.dubbo.rpc.AsyncContextImpl; -import org.apache.dubbo.rpc.AsyncRpcResult; -import org.apache.dubbo.rpc.Exporter; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Protocol; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.protocol.AbstractProtocol; import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.ArrayList; -import java.util.Collections; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -377,7 +355,7 @@ private ExchangeClient[] getClients(URL url) { } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { - if (service_share_connect){ + if (service_share_connect) { clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); @@ -388,31 +366,39 @@ private ExchangeClient[] getClients(URL url) { /** * Get shared connection + * + * @param url + * @param connectNum */ private List getSharedClient(URL url, int connectNum) { String key = url.getAddress(); List clients = referenceClientMap.get(key); - if(clients == null) { + boolean firstBuild = false; + + if (clients == null) { List referenceCountExchangeClients = buildReferenceCountExchangeClientList(url, key, connectNum); referenceClientMap.put(key, referenceCountExchangeClients); clients = referenceCountExchangeClients; + + firstBuild = true; } for (int i = 0; i < clients.size(); i++) { ReferenceCountExchangeClient client = clients.get(i); - if (client.isClosed()){ + if (client.isClosed()) { client = buildReferenceCountExchangeClient(url, key); clients.set(i, client); - } - client.incrementAndGetCount(); + } else if (!firstBuild) { + client.incrementAndGetCount(); + } } return clients; } - private List buildReferenceCountExchangeClientList (URL url, String key, int connectNum) { + private List buildReferenceCountExchangeClientList(URL url, String key, int connectNum) { List clients = new ArrayList(connectNum); @@ -423,7 +409,7 @@ private List buildReferenceCountExchangeClientList return clients; } - private ReferenceCountExchangeClient buildReferenceCountExchangeClient (URL url, String key) { + private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, String key) { ExchangeClient exchangeClient = initClient(url); @@ -484,7 +470,7 @@ public void destroy() { for (String key : new ArrayList(referenceClientMap.keySet())) { List clients = referenceClientMap.remove(key); - if(CollectionUtils.isNotEmpty(clients)) { + if (CollectionUtils.isNotEmpty(clients)) { for (ReferenceCountExchangeClient client : clients) { if (client != null) { try { From d6ddcd5d23aae682125eccb8f94ee6a3805b71e2 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Fri, 7 Sep 2018 16:54:18 +0800 Subject: [PATCH 08/23] =?UTF-8?q?=E4=BC=98=E5=8C=96=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rpc/protocol/dubbo/DubboProtocol.java | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 6e6bbdf8668..b210931f485 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -21,17 +21,39 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; import org.apache.dubbo.common.serialize.support.SerializationOptimizer; -import org.apache.dubbo.common.utils.*; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.ConfigUtils; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.Transporter; -import org.apache.dubbo.remoting.exchange.*; +import org.apache.dubbo.remoting.exchange.ExchangeChannel; +import org.apache.dubbo.remoting.exchange.ExchangeClient; +import org.apache.dubbo.remoting.exchange.ExchangeHandler; +import org.apache.dubbo.remoting.exchange.ExchangeServer; +import org.apache.dubbo.remoting.exchange.Exchangers; import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.dubbo.rpc.*; +import org.apache.dubbo.rpc.AsyncContextImpl; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Protocol; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.protocol.AbstractProtocol; import java.net.InetSocketAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; From b3fdee693af7ab4cade4165fd412a2030c12e762 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sat, 10 Nov 2018 13:00:06 +0800 Subject: [PATCH 09/23] Fix the problem that the getSharedClient thread is not safe --- .../rpc/protocol/dubbo/DubboProtocol.java | 116 +++++++++++++++--- 1 file changed, 97 insertions(+), 19 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index b210931f485..3d3886a822d 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -57,6 +57,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; /** * dubbo protocol support. @@ -71,6 +72,7 @@ public class DubboProtocol extends AbstractProtocol { private final Map serverMap = new ConcurrentHashMap(); // private final Map> referenceClientMap = new ConcurrentHashMap<>(); // private final ConcurrentMap ghostClientMap = new ConcurrentHashMap(); + private final ConcurrentMap locks = new ConcurrentHashMap(); private final Set optimizers = new ConcurrentHashSet(); //consumer side export a stub service for dispatching event //servicekey-stubmethods @@ -390,53 +392,129 @@ private ExchangeClient[] getClients(URL url) { * Get shared connection * * @param url - * @param connectNum + * @param connectNum connectNum must be greater than or equal to 1 */ private List getSharedClient(URL url, int connectNum) { String key = url.getAddress(); List clients = referenceClientMap.get(key); - boolean firstBuild = false; - if (clients == null) { - List referenceCountExchangeClients = buildReferenceCountExchangeClientList(url, key, connectNum); - referenceClientMap.put(key, referenceCountExchangeClients); + if (checkClientCanUse(clients)) { + batchClientRefIncr(clients); + return clients; + } + + locks.putIfAbsent(key, new Object()); + synchronized (locks.get(key)) { + clients = referenceClientMap.get(key); + // dubbo check + if (checkClientCanUse(clients)) { + batchClientRefIncr(clients); + return clients; + } + + // connectNum must be greater than or equal to 1 + connectNum = Math.max(connectNum, 1); + + // If the clients is empty, then the first initialization is + if(CollectionUtils.isEmpty(clients)) { + clients = new CopyOnWriteArrayList(); + + for (int i = 0; i < connectNum; i++) { + clients.add(buildReferenceCountExchangeClient(url, key)); + } + + referenceClientMap.put(key, clients); + + } else { + for (int i = 0; i < clients.size(); i++) { + ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); + // If there is a client in the list that is no longer available, create a new one to replace him. + if(referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + clients.set(i, buildReferenceCountExchangeClient(url, key)); + continue; + } + + referenceCountExchangeClient.incrementAndGetCount(); + } + } - clients = referenceCountExchangeClients; + /** + * I understand that the purpose of the remove operation here is to avoid the expired url key + * always occupying this memory space. + */ + locks.remove(key); - firstBuild = true; + return clients; } + } - for (int i = 0; i < clients.size(); i++) { - ReferenceCountExchangeClient client = clients.get(i); - if (client.isClosed()) { - client = buildReferenceCountExchangeClient(url, key); - clients.set(i, client); + /** + * Check if the client list is all available + * + * @param referenceCountExchangeClients + * @return true-available,false-unavailable + */ + private boolean checkClientCanUse(List referenceCountExchangeClients) { + if (CollectionUtils.isEmpty(referenceCountExchangeClients)) { + return false; + } - } else if (!firstBuild) { - client.incrementAndGetCount(); + for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) { + // As long as one client is not available, you need to replace the unavailable client with the available one. + if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + return false; } } - return clients; + return true; } - private List buildReferenceCountExchangeClientList(URL url, String key, int connectNum) { + /** + * Add client references in bulk + * + * @param referenceCountExchangeClients + */ + private void batchClientRefIncr(List referenceCountExchangeClients) { + if (CollectionUtils.isEmpty(referenceCountExchangeClients)) { + return; + } - List clients = new ArrayList(connectNum); + for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) { + if (referenceCountExchangeClient != null) { + referenceCountExchangeClient.incrementAndGetCount(); + } + } + } + + /** + * Bulk build client + * @param url + * @param key + * @param connectNum + * @return + */ + private List buildReferenceCountExchangeClientList(URL url, String key, int connectNum) { + List clients = new CopyOnWriteArrayList(); for (int i = 0; i < connectNum; i++) { clients.add(buildReferenceCountExchangeClient(url, key)); } + ghostClientMap.remove(key); + return clients; } + /** + * Build a single client + * @param url + * @param key + * @return + */ private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, String key) { - ExchangeClient exchangeClient = initClient(url); ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); - ghostClientMap.remove(key); return client; } From e75855caa21a3ad13f510b39719369f3161a7327 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sat, 10 Nov 2018 13:14:32 +0800 Subject: [PATCH 10/23] Fix the problem that the getSharedClient thread is not safe --- .../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 3d3886a822d..d4be78cd21b 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -417,12 +417,7 @@ private List getSharedClient(URL url, int connectN // If the clients is empty, then the first initialization is if(CollectionUtils.isEmpty(clients)) { - clients = new CopyOnWriteArrayList(); - - for (int i = 0; i < connectNum; i++) { - clients.add(buildReferenceCountExchangeClient(url, key)); - } - + clients = buildReferenceCountExchangeClientList(url, key, connectNum); referenceClientMap.put(key, clients); } else { @@ -500,8 +495,6 @@ private List buildReferenceCountExchangeClientList clients.add(buildReferenceCountExchangeClient(url, key)); } - ghostClientMap.remove(key); - return clients; } @@ -516,6 +509,8 @@ private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); + ghostClientMap.remove(key); + return client; } From 560dfb941ac808c8d9ac6080c2cda3213fa72ddc Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sat, 10 Nov 2018 19:00:04 +0800 Subject: [PATCH 11/23] Try fixing ci error, https://travis-ci.org/apache/incubator-dubbo/jobs/453185295 --- .../java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index d4be78cd21b..de2f3a82be6 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -516,6 +516,7 @@ private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, /** * Create new connection + * @param url */ private ExchangeClient initClient(URL url) { From cf47e78552454833b058403586e65054df45a5b1 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sun, 11 Nov 2018 10:56:45 +0800 Subject: [PATCH 12/23] =?UTF-8?q?=E5=B0=86DEFAULT=5FCONNECTIONS=5FKEY?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=88=90SERVICE=5FCONNECTIONS=5FKEY?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/apache/dubbo/common/Constants.java | 8 ++++++-- .../apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 237d52583df..04d6b7899dc 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -123,9 +123,13 @@ public class Constants { public static final int DEFAULT_ALIVE = 60 * 1000; - public static final String DEFAULT_CONNECTIONS = "1"; + /** + * By default, a consumer JVM instance and a provider JVM instance share a long TCP connection (except when connections are set), + * which can set the number of long TCP connections shared to avoid the bottleneck of sharing a single long TCP connection. + */ + public static final String SERVICE_CONNECTIONS = "1"; - public static final String DEFAULT_CONNECTIONS_KEY = "default.connections.key"; + public static final String SERVICE_CONNECTIONS_KEY = "service.connections.key"; public static final int DEFAULT_ACCEPTS = 0; diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index de2f3a82be6..622466b3994 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -373,8 +373,8 @@ private ExchangeClient[] getClients(URL url) { // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { service_share_connect = true; - connections = Integer.parseInt(ConfigUtils.getProperty(Constants.DEFAULT_CONNECTIONS_KEY, - Constants.DEFAULT_CONNECTIONS)); + connections = Integer.parseInt(ConfigUtils.getProperty(Constants.SERVICE_CONNECTIONS_KEY, + Constants.SERVICE_CONNECTIONS)); shareClients = getSharedClient(url, connections); } ExchangeClient[] clients = new ExchangeClient[connections]; From 52efa43e9d6c6f1042e9dbd78e063b50e7492545 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sun, 13 Jan 2019 08:27:20 +0800 Subject: [PATCH 13/23] dubbo.xsd add shareconnections attribute, --- .../org/apache/dubbo/common/Constants.java | 4 ++-- .../apache/dubbo/config/ConsumerConfig.java | 14 +++++++++++ .../src/main/resources/META-INF/dubbo.xsd | 6 +++++ .../rpc/protocol/dubbo/DubboProtocol.java | 24 +++++++++++++------ 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index daf7fed832f..fd057c54b29 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -147,9 +147,9 @@ public class Constants { * By default, a consumer JVM instance and a provider JVM instance share a long TCP connection (except when connections are set), * which can set the number of long TCP connections shared to avoid the bottleneck of sharing a single long TCP connection. */ - public static final String SERVICE_CONNECTIONS = "1"; + public static final String DEFAULT_SHARE_CONNECTIONS = "1"; - public static final String SERVICE_CONNECTIONS_KEY = "service.connections.key"; + public static final String SHARE_CONNECTIONS_KEY = "shareconnections"; public static final int DEFAULT_ACCEPTS = 0; diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java index ce106550601..3a556e7f8c4 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java @@ -57,6 +57,12 @@ public class ConsumerConfig extends AbstractReferenceConfig { */ private Integer queues; + /** + * By default, a TCP long-connection communication is shared between the consumer process and the provider process. + * This property can be set to share multiple TCP long-connection communications. Note that only the dubbo protocol takes effect. + */ + private Integer shareconnections; + @Override public void setTimeout(Integer timeout) { super.setTimeout(timeout); @@ -118,4 +124,12 @@ public Integer getQueues() { public void setQueues(Integer queues) { this.queues = queues; } + + public Integer getShareconnections() { + return shareconnections; + } + + public void setShareconnections(Integer shareconnections) { + this.shareconnections = shareconnections; + } } diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd index b943684d444..0330253772e 100644 --- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd +++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd @@ -865,6 +865,12 @@ + + + + + + diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 5ccff94dd7a..1bfff171a15 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -24,6 +24,7 @@ import org.apache.dubbo.common.serialize.support.SerializationOptimizer; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; @@ -367,19 +368,25 @@ public Invoker refer(Class serviceType, URL url) throws RpcException { private ExchangeClient[] getClients(URL url) { // whether to share connection - boolean service_share_connect = false; + boolean useShareConnect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); List shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { - service_share_connect = true; - connections = Integer.parseInt(ConfigUtils.getProperty(Constants.SERVICE_CONNECTIONS_KEY, - Constants.SERVICE_CONNECTIONS)); + useShareConnect = true; + + /** + * The xml configuration should have a higher priority than properties. + */ + String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null); + connections = Integer.parseInt( StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY, + Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); shareClients = getSharedClient(url, connections); } + ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { - if (service_share_connect) { + if (useShareConnect) { clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); @@ -416,7 +423,7 @@ private List getSharedClient(URL url, int connectN connectNum = Math.max(connectNum, 1); // If the clients is empty, then the first initialization is - if(CollectionUtils.isEmpty(clients)) { + if (CollectionUtils.isEmpty(clients)) { clients = buildReferenceCountExchangeClientList(url, key, connectNum); referenceClientMap.put(key, clients); @@ -424,7 +431,7 @@ private List getSharedClient(URL url, int connectN for (int i = 0; i < clients.size(); i++) { ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); // If there is a client in the list that is no longer available, create a new one to replace him. - if(referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { + if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { clients.set(i, buildReferenceCountExchangeClient(url, key)); continue; } @@ -483,6 +490,7 @@ private void batchClientRefIncr(List referenceCoun /** * Bulk build client + * * @param url * @param key * @param connectNum @@ -500,6 +508,7 @@ private List buildReferenceCountExchangeClientList /** * Build a single client + * * @param url * @param key * @return @@ -516,6 +525,7 @@ private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, /** * Create new connection + * * @param url */ private ExchangeClient initClient(URL url) { From 33039fc8492ec770e739d461c555f62c7456066c Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sun, 13 Jan 2019 09:13:22 +0800 Subject: [PATCH 14/23] Optimize code format --- .../java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 1bfff171a15..07e6e9c5cea 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -313,6 +313,7 @@ private ExchangeServer createServer(URL url) { } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } + str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); @@ -320,6 +321,7 @@ private ExchangeServer createServer(URL url) { throw new RpcException("Unsupported client type: " + str); } } + return server; } From 8f4efc09e2cb5b2424f895051c32b623cd06f68c Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Tue, 29 Jan 2019 00:42:49 +0800 Subject: [PATCH 15/23] Fix mult connect ghost connect problem --- .../rpc/protocol/dubbo/DubboProtocol.java | 26 ++++++++++------ .../dubbo/ReferenceCountExchangeClient.java | 31 +++++++++++++------ 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 3f13a34b10b..2316ff7d3fa 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -78,7 +78,7 @@ public class DubboProtocol extends AbstractProtocol { * */ private final Map> referenceClientMap = new ConcurrentHashMap<>(); - private final ConcurrentMap ghostClientMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> ghostClientMap = new ConcurrentHashMap<>(); private final ConcurrentMap locks = new ConcurrentHashMap<>(); private final Set optimizers = new ConcurrentHashSet<>(); /** @@ -604,18 +604,26 @@ public void destroy() { } for (String key : new ArrayList<>(ghostClientMap.keySet())) { - ExchangeClient client = ghostClientMap.remove(key); - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); + List removeClients = ghostClientMap.remove(key); + if (CollectionUtils.isNotEmpty(removeClients)) { + + for (LazyConnectExchangeClient client : removeClients) { + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); + } + + client.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); } - client.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); + } } } + stubServiceMethodsMap.clear(); super.destroy(); } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index 834711515f8..f71a58ceb61 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -26,7 +26,9 @@ import org.apache.dubbo.remoting.exchange.ResponseFuture; import java.net.InetSocketAddress; +import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** @@ -38,12 +40,12 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0); - // private final ExchangeHandler handler; - private final ConcurrentMap ghostClientMap; + private final ConcurrentMap> ghostClientMap; + private LazyConnectExchangeClient lazyConnectExchangeClient; private ExchangeClient client; - public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap ghostClientMap) { + public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap> ghostClientMap) { this.client = client; referenceCount.incrementAndGet(); this.url = client.getUrl(); @@ -174,13 +176,24 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); String key = url.getAddress(); - // in worst case there's only one ghost connection. - LazyConnectExchangeClient gclient = ghostClientMap.get(key); - if (gclient == null || gclient.isClosed()) { - gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); - ghostClientMap.put(key, gclient); + + if (lazyConnectExchangeClient == null || lazyConnectExchangeClient.isClosed()) { + + ghostClientMap.putIfAbsent(key, new CopyOnWriteArrayList<>()); + List lazyConnectExchangeClients = ghostClientMap.get(key); + + int index = lazyConnectExchangeClients.indexOf(lazyConnectExchangeClient); + lazyConnectExchangeClient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); + + if (index >= 0) { + lazyConnectExchangeClients.set(index, lazyConnectExchangeClient); + + } else { + lazyConnectExchangeClients.add(lazyConnectExchangeClient); + } } - return gclient; + + return lazyConnectExchangeClient; } @Override From 42fd2c7fe910577ea1befe1a296c2d9a0e39bcd9 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Tue, 29 Jan 2019 00:59:13 +0800 Subject: [PATCH 16/23] format code --- .../rpc/protocol/dubbo/DubboProtocol.java | 74 +++++++++++-------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 2316ff7d3fa..3046fc77a77 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -572,54 +572,70 @@ private ExchangeClient initClient(URL url) { public void destroy() { for (String key : new ArrayList<>(serverMap.keySet())) { ExchangeServer server = serverMap.remove(key); - if (server != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo server: " + server.getLocalAddress()); - } - server.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); + + if (server == null) { + continue; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo server: " + server.getLocalAddress()); } + + server.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); } } - for (String key : new ArrayList(referenceClientMap.keySet())) { + for (String key : new ArrayList<>(referenceClientMap.keySet())) { List clients = referenceClientMap.remove(key); - if (CollectionUtils.isNotEmpty(clients)) { - for (ReferenceCountExchangeClient client : clients) { - if (client != null) { - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } - client.close(ConfigurationUtils.getServerShutdownTimeout()); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + if (CollectionUtils.isEmpty(clients)) { + continue; + } + + for (ReferenceCountExchangeClient client : clients) { + if (client == null) { + continue; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } + + client.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); } } } for (String key : new ArrayList<>(ghostClientMap.keySet())) { List removeClients = ghostClientMap.remove(key); - if (CollectionUtils.isNotEmpty(removeClients)) { - for (LazyConnectExchangeClient client : removeClients) { + if (CollectionUtils.isEmpty(removeClients)) { + continue; + } - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } + for (LazyConnectExchangeClient client : removeClients) { - client.close(ConfigurationUtils.getServerShutdownTimeout()); + if (client == null) { + continue; + } - } catch (Throwable t) { - logger.warn(t.getMessage(), t); + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } + client.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); } } } From de4c9be41aa67b10135b43fb0fd5f344fed1bc95 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Thu, 31 Jan 2019 13:09:17 +0800 Subject: [PATCH 17/23] Remove the concept of ghostClientMap and ghost connection. In fact, ghostClient is LazyConnectExchangeClient. At present, the LazyConnectExchangeClient object is added directly in ReferenceCountExchangeClient to realize the mapping relationship with ReferenceCountExchangeClient. The relationship between previous ghostClient and url mapping is not applicable to the current new share. Multiple connections. --- .../rpc/protocol/dubbo/DubboProtocol.java | 185 ++++++++++-------- .../dubbo/LazyConnectExchangeClient.java | 1 - .../dubbo/ReferenceCountExchangeClient.java | 32 +-- 3 files changed, 107 insertions(+), 111 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 3046fc77a77..21b23fe2312 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -78,7 +78,6 @@ public class DubboProtocol extends AbstractProtocol { * */ private final Map> referenceClientMap = new ConcurrentHashMap<>(); - private final ConcurrentMap> ghostClientMap = new ConcurrentHashMap<>(); private final ConcurrentMap locks = new ConcurrentHashMap<>(); private final Set optimizers = new ConcurrentHashSet<>(); /** @@ -91,51 +90,55 @@ public class DubboProtocol extends AbstractProtocol { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { - if (message instanceof Invocation) { - Invocation inv = (Invocation) message; - Invoker invoker = getInvoker(channel, inv); - // need to consider backward-compatibility if it's a callback - if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { - String methodsStr = invoker.getUrl().getParameters().get("methods"); - boolean hasMethod = false; - if (methodsStr == null || !methodsStr.contains(",")) { - hasMethod = inv.getMethodName().equals(methodsStr); - } else { - String[] methods = methodsStr.split(","); - for (String method : methods) { - if (inv.getMethodName().equals(method)) { - hasMethod = true; - break; - } + + if (!(message instanceof Invocation)) { + throw new RemotingException(channel, "Unsupported request: " + + (message == null ? null : (message.getClass().getName() + ": " + message)) + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + } + + Invocation inv = (Invocation) message; + Invoker invoker = getInvoker(channel, inv); + // need to consider backward-compatibility if it's a callback + if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { + String methodsStr = invoker.getUrl().getParameters().get("methods"); + boolean hasMethod = false; + if (methodsStr == null || !methodsStr.contains(",")) { + hasMethod = inv.getMethodName().equals(methodsStr); + } else { + String[] methods = methodsStr.split(","); + for (String method : methods) { + if (inv.getMethodName().equals(method)) { + hasMethod = true; + break; } } - if (!hasMethod) { - logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() - + " not found in callback service interface ,invoke will be ignored." - + " please update the api interface. url is:" - + invoker.getUrl()) + " ,invocation is :" + inv); - return null; - } } - RpcContext rpcContext = RpcContext.getContext(); - rpcContext.setRemoteAddress(channel.getRemoteAddress()); - Result result = invoker.invoke(inv); - - if (result instanceof AsyncRpcResult) { - return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); - } else { - return CompletableFuture.completedFuture(result); + if (!hasMethod) { + logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + + " not found in callback service interface ,invoke will be ignored." + + " please update the api interface. url is:" + + invoker.getUrl()) + " ,invocation is :" + inv); + return null; } } - throw new RemotingException(channel, "Unsupported request: " - + (message == null ? null : (message.getClass().getName() + ": " + message)) - + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + RpcContext rpcContext = RpcContext.getContext(); + rpcContext.setRemoteAddress(channel.getRemoteAddress()); + Result result = invoker.invoke(inv); + + if (result instanceof AsyncRpcResult) { + return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); + + } else { + return CompletableFuture.completedFuture(result); + } } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); + } else { super.received(channel, message); } @@ -170,6 +173,7 @@ private Invocation createInvocation(Channel channel, URL url, String methodKey) if (method == null || method.length() == 0) { return null; } + RpcInvocation invocation = new RpcInvocation(method, new Class[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); @@ -178,6 +182,7 @@ private Invocation createInvocation(Channel channel, URL url, String methodKey) if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } + return invocation; } }; @@ -219,24 +224,26 @@ Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); + // if it's callback service on client side isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } + //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } - String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); + String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); if (exporter == null) { - throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + - exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); } return exporter.getInvoker(); @@ -270,6 +277,7 @@ public Exporter export(Invoker invoker) throws RpcException { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } + } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } @@ -356,10 +364,13 @@ private void optimizeSerialization(URL url) throws RpcException { } optimizers.add(className); + } catch (ClassNotFoundException e) { throw new RpcException("Cannot find the serialization optimizer class: " + className, e); + } catch (InstantiationException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); + } catch (IllegalAccessException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } @@ -435,7 +446,7 @@ private List getSharedClient(URL url, int connectN // If the clients is empty, then the first initialization is if (CollectionUtils.isEmpty(clients)) { - clients = buildReferenceCountExchangeClientList(url, key, connectNum); + clients = buildReferenceCountExchangeClientList(url, connectNum); referenceClientMap.put(key, clients); } else { @@ -443,7 +454,7 @@ private List getSharedClient(URL url, int connectN ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); // If there is a client in the list that is no longer available, create a new one to replace him. if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { - clients.set(i, buildReferenceCountExchangeClient(url, key)); + clients.set(i, buildReferenceCountExchangeClient(url)); continue; } @@ -503,15 +514,14 @@ private void batchClientRefIncr(List referenceCoun * Bulk build client * * @param url - * @param key * @param connectNum * @return */ - private List buildReferenceCountExchangeClientList(URL url, String key, int connectNum) { - List clients = new CopyOnWriteArrayList(); + private List buildReferenceCountExchangeClientList(URL url, int connectNum) { + List clients = new CopyOnWriteArrayList<>(); for (int i = 0; i < connectNum; i++) { - clients.add(buildReferenceCountExchangeClient(url, key)); + clients.add(buildReferenceCountExchangeClient(url)); } return clients; @@ -521,17 +531,12 @@ private List buildReferenceCountExchangeClientList * Build a single client * * @param url - * @param key * @return */ - private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, String key) { + private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) { ExchangeClient exchangeClient = initClient(url); - ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); - - ghostClientMap.remove(key); - - return client; + return new ReferenceCountExchangeClient(exchangeClient); } /** @@ -562,9 +567,11 @@ private ExchangeClient initClient(URL url) { } else { client = Exchangers.connect(url, requestHandler); } + } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } + return client; } @@ -597,50 +604,60 @@ public void destroy() { } for (ReferenceCountExchangeClient client : clients) { - if (client == null) { - continue; - } - - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } - - client.close(ConfigurationUtils.getServerShutdownTimeout()); - - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + closeReferenceCountExchangeClient(client); } } - for (String key : new ArrayList<>(ghostClientMap.keySet())) { - List removeClients = ghostClientMap.remove(key); + stubServiceMethodsMap.clear(); + super.destroy(); + } - if (CollectionUtils.isEmpty(removeClients)) { - continue; + /** + * close ReferenceCountExchangeClient + * + * @param client + */ + private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient client) { + if (client == null) { + return; + } + + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } - for (LazyConnectExchangeClient client : removeClients) { + client.close(ConfigurationUtils.getServerShutdownTimeout()); - if (client == null) { - continue; - } + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); - } + /** + * When you need to completely close the ReferenceCountExchangeClient, don't forget to close the LazyConnectExchangeClient (if any) it is bound to. + */ + closeLazyConnectExchangeClient(client.getLazyConnectExchangeClient()); + } - client.close(ConfigurationUtils.getServerShutdownTimeout()); + /** + * close LazyConnectExchangeClient + * + * @param lazyConnectExchangeClient + */ + private void closeLazyConnectExchangeClient(LazyConnectExchangeClient lazyConnectExchangeClient) { + if (lazyConnectExchangeClient == null) { + return; + } - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo lazy connect: " + lazyConnectExchangeClient.getLocalAddress() + "-->" + lazyConnectExchangeClient.getRemoteAddress()); } - } - stubServiceMethodsMap.clear(); - super.destroy(); + lazyConnectExchangeClient.close(ConfigurationUtils.getServerShutdownTimeout()); + + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java index eaebb1985f3..f2bc4534fdc 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java @@ -65,7 +65,6 @@ public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) { this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false); } - private void initClient() throws RemotingException { if (client != null) { return; diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index f71a58ceb61..18c99d30692 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -26,9 +26,6 @@ import org.apache.dubbo.remoting.exchange.ResponseFuture; import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** @@ -40,19 +37,13 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0); - private final ConcurrentMap> ghostClientMap; - private LazyConnectExchangeClient lazyConnectExchangeClient; private ExchangeClient client; + private LazyConnectExchangeClient lazyConnectExchangeClient; - - public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap> ghostClientMap) { + public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; referenceCount.incrementAndGet(); this.url = client.getUrl(); - if (ghostClientMap == null) { - throw new IllegalStateException("ghostClientMap can not be null, url: " + url); - } - this.ghostClientMap = ghostClientMap; } @Override @@ -165,7 +156,6 @@ public void startClose() { client.startClose(); } - // ghost client private LazyConnectExchangeClient replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE) @@ -175,22 +165,8 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); - String key = url.getAddress(); - if (lazyConnectExchangeClient == null || lazyConnectExchangeClient.isClosed()) { - - ghostClientMap.putIfAbsent(key, new CopyOnWriteArrayList<>()); - List lazyConnectExchangeClients = ghostClientMap.get(key); - - int index = lazyConnectExchangeClients.indexOf(lazyConnectExchangeClient); lazyConnectExchangeClient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); - - if (index >= 0) { - lazyConnectExchangeClients.set(index, lazyConnectExchangeClient); - - } else { - lazyConnectExchangeClients.add(lazyConnectExchangeClient); - } } return lazyConnectExchangeClient; @@ -204,4 +180,8 @@ public boolean isClosed() { public void incrementAndGetCount() { referenceCount.incrementAndGet(); } + + public LazyConnectExchangeClient getLazyConnectExchangeClient() { + return lazyConnectExchangeClient; + } } From 209f966ac0d0cf0a9b4707c08ea4c825f1108540 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Fri, 1 Feb 2019 21:48:09 +0800 Subject: [PATCH 18/23] Optimize the ReferenceCountExchangeClient and remove the reference to the lazyConnectExchangeClient because it doesn't make much sense; add locks in the close operation of the AbstractClient, because connect, disconnect, and close should not be done at the same time. --- .../remoting/transport/AbstractClient.java | 69 +++++++++++++------ .../rpc/protocol/dubbo/DubboProtocol.java | 27 -------- .../dubbo/ReferenceCountExchangeClient.java | 13 +--- 3 files changed, 51 insertions(+), 58 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 7280b508f5e..fa2dd8ea413 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -183,9 +183,9 @@ private boolean cancelFutureIfOffline() { * * issue: https://github.com/apache/incubator-dubbo/issues/3158 */ - if(isClosed()) { + if (isClosed()) { ScheduledFuture future = reconnectExecutorFuture; - if(future != null && !future.isCancelled()){ + if (future != null && !future.isCancelled()) { /** * Client has been destroyed and * scheduled task should be cancelled. @@ -298,17 +298,29 @@ public void send(Object message, boolean sent) throws RemotingException { } protected void connect() throws RemotingException { + connectLock.lock(); + try { + if (isClosed()) { + throw new RemotingException(this, "No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + + ", cause: client status is closed."); + } + if (isConnected()) { return; } + initConnectStatusCheckCommand(); + doConnect(); + if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms."); + } else { if (logger.isInfoEnabled()) { logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " @@ -316,14 +328,18 @@ protected void connect() throws RemotingException { + ", channel is " + this.getChannel()); } } + reconnect_count.set(0); reconnect_error_log_flag.set(false); + } catch (RemotingException e) { throw e; + } catch (Throwable e) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); + } finally { connectLock.unlock(); } @@ -368,27 +384,38 @@ public void reconnect() throws RemotingException { @Override public void close() { + + connectLock.lock(); + try { - super.close(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - try { - if (executor != null) { - ExecutorUtil.shutdownNow(executor, 100); + try { + super.close(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - try { - disconnect(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - try { - doClose(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); + + try { + if (executor != null) { + ExecutorUtil.shutdownNow(executor, 100); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + + try { + disconnect(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + + try { + doClose(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + + } finally { + connectLock.unlock(); } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 21b23fe2312..b7b26af546b 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -632,32 +632,5 @@ private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient clie } catch (Throwable t) { logger.warn(t.getMessage(), t); } - - /** - * When you need to completely close the ReferenceCountExchangeClient, don't forget to close the LazyConnectExchangeClient (if any) it is bound to. - */ - closeLazyConnectExchangeClient(client.getLazyConnectExchangeClient()); - } - - /** - * close LazyConnectExchangeClient - * - * @param lazyConnectExchangeClient - */ - private void closeLazyConnectExchangeClient(LazyConnectExchangeClient lazyConnectExchangeClient) { - if (lazyConnectExchangeClient == null) { - return; - } - - try { - if (logger.isInfoEnabled()) { - logger.info("Close dubbo lazy connect: " + lazyConnectExchangeClient.getLocalAddress() + "-->" + lazyConnectExchangeClient.getRemoteAddress()); - } - - lazyConnectExchangeClient.close(ConfigurationUtils.getServerShutdownTimeout()); - - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index 18c99d30692..d1b3e2cf8c2 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -38,7 +38,6 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final AtomicInteger referenceCount = new AtomicInteger(0); private ExchangeClient client; - private LazyConnectExchangeClient lazyConnectExchangeClient; public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; @@ -144,9 +143,11 @@ public void close(int timeout) { if (referenceCount.decrementAndGet() <= 0) { if (timeout == 0) { client.close(); + } else { client.close(timeout); } + client = replaceWithLazyClient(); } } @@ -165,11 +166,7 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); - if (lazyConnectExchangeClient == null || lazyConnectExchangeClient.isClosed()) { - lazyConnectExchangeClient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); - } - - return lazyConnectExchangeClient; + return new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); } @Override @@ -180,8 +177,4 @@ public boolean isClosed() { public void incrementAndGetCount() { referenceCount.incrementAndGet(); } - - public LazyConnectExchangeClient getLazyConnectExchangeClient() { - return lazyConnectExchangeClient; - } } From 9dc5c1b0b65df87653a194a0979f6230e307b964 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Fri, 1 Feb 2019 22:44:48 +0800 Subject: [PATCH 19/23] format code --- .../java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index b7b26af546b..086659a43b2 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -564,6 +564,7 @@ private ExchangeClient initClient(URL url) { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); + } else { client = Exchangers.connect(url, requestHandler); } From dc2bad3d45bf0ed9ae669bfe9651a039107f2e99 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sat, 2 Feb 2019 10:44:35 +0800 Subject: [PATCH 20/23] try remove close lock --- .../org/apache/dubbo/remoting/transport/AbstractClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index fe7a6f4816f..b0c011de2ba 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -255,7 +255,7 @@ public void reconnect() throws RemotingException { @Override public void close() { - connectLock.lock(); +// connectLock.lock(); try { try { @@ -285,7 +285,7 @@ public void close() { } } finally { - connectLock.unlock(); +// connectLock.unlock(); } } From 53aa6308f672c96140deffe0843133d550de09a0 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sat, 2 Feb 2019 11:05:17 +0800 Subject: [PATCH 21/23] Restore close method --- .../org/apache/dubbo/remoting/transport/AbstractClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index b0c011de2ba..fe7a6f4816f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -255,7 +255,7 @@ public void reconnect() throws RemotingException { @Override public void close() { -// connectLock.lock(); + connectLock.lock(); try { try { @@ -285,7 +285,7 @@ public void close() { } } finally { -// connectLock.unlock(); + connectLock.unlock(); } } From 35b9e31c2d5115b3907652732cfa961b35766a62 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sat, 2 Feb 2019 15:23:03 +0800 Subject: [PATCH 22/23] Restore ReferenceCountExchangeClient reference to LazyConnectExchangeClient object --- .../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 6 ++++++ .../dubbo/ReferenceCountExchangeClient.java | 13 ++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 086659a43b2..04a525ff544 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -630,6 +630,12 @@ private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient clie client.close(ConfigurationUtils.getServerShutdownTimeout()); + // TODO + /** + * At this time, ReferenceCountExchangeClient#client has been replaced with LazyConnectExchangeClient. + * Do you need to call client.close again to ensure that LazyConnectExchangeClient is also closed? + */ + } catch (Throwable t) { logger.warn(t.getMessage(), t); } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index d1b3e2cf8c2..b944dbbcf3c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -38,6 +38,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final AtomicInteger referenceCount = new AtomicInteger(0); private ExchangeClient client; + private LazyConnectExchangeClient lazyConnectExchangeClient; public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; @@ -157,6 +158,11 @@ public void startClose() { client.startClose(); } + /** + * close client + * + * @return + */ private LazyConnectExchangeClient replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE) @@ -166,7 +172,11 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); - return new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); + if (lazyConnectExchangeClient == null || lazyConnectExchangeClient.isClosed()) { + lazyConnectExchangeClient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); + } + + return lazyConnectExchangeClient; } @Override @@ -178,3 +188,4 @@ public void incrementAndGetCount() { referenceCount.incrementAndGet(); } } + From 51ba8556ec65922e818093c5c9a3d2f10221f351 Mon Sep 17 00:00:00 2001 From: manzhizhen Date: Sun, 3 Feb 2019 14:41:28 +0800 Subject: [PATCH 23/23] Optimize the logic of using the LazyConnectExchangeClient inside the ReferenceCountExchangeClient; Supplemental shared multi-connected unit test --- .../remoting/transport/AbstractClient.java | 50 ++++------ .../rpc/protocol/dubbo/DubboProtocol.java | 5 + .../dubbo/ReferenceCountExchangeClient.java | 17 ++-- .../ReferenceCountExchangeClientTest.java | 94 ++++++++++++++++--- 4 files changed, 116 insertions(+), 50 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index fe7a6f4816f..2afdc4d0f02 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -178,11 +178,6 @@ protected void connect() throws RemotingException { connectLock.lock(); try { - if (isClosed()) { - throw new RemotingException(this, "No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " - + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() - + ", cause: client status is closed."); - } if (isConnected()) { return; @@ -255,37 +250,30 @@ public void reconnect() throws RemotingException { @Override public void close() { - connectLock.lock(); - try { - try { - super.close(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - - try { - if (executor != null) { - ExecutorUtil.shutdownNow(executor, 100); - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } + super.close(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } - try { - disconnect(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); + try { + if (executor != null) { + ExecutorUtil.shutdownNow(executor, 100); } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } - try { - doClose(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } + try { + disconnect(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } - } finally { - connectLock.unlock(); + try { + doClose(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 04a525ff544..f284dd40dd6 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -196,6 +196,7 @@ public static DubboProtocol getDubboProtocol() { // load ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); } + return INSTANCE; } @@ -285,6 +286,7 @@ public Exporter export(Invoker invoker) throws RpcException { openServer(url); optimizeSerialization(url); + return exporter; } @@ -379,9 +381,11 @@ private void optimizeSerialization(URL url) throws RpcException { @Override public Invoker refer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); + // create rpc invoker. DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers); invokers.add(invoker); + return invoker; } @@ -414,6 +418,7 @@ private ExchangeClient[] getClients(URL url) { clients[i] = initClient(url); } } + return clients; } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index b944dbbcf3c..7a720480463 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -38,7 +38,6 @@ final class ReferenceCountExchangeClient implements ExchangeClient { private final AtomicInteger referenceCount = new AtomicInteger(0); private ExchangeClient client; - private LazyConnectExchangeClient lazyConnectExchangeClient; public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; @@ -149,7 +148,7 @@ public void close(int timeout) { client.close(timeout); } - client = replaceWithLazyClient(); + replaceWithLazyClient(); } } @@ -159,11 +158,12 @@ public void startClose() { } /** - * close client + * when closing the client, the client needs to be set to LazyConnectExchangeClient, and if a new call is made, + * the client will "resurrect". * * @return */ - private LazyConnectExchangeClient replaceWithLazyClient() { + private void replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE) .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE) @@ -172,11 +172,12 @@ private LazyConnectExchangeClient replaceWithLazyClient() { .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); - if (lazyConnectExchangeClient == null || lazyConnectExchangeClient.isClosed()) { - lazyConnectExchangeClient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); + /** + * the order of judgment in the if statement cannot be changed. + */ + if (!(client instanceof LazyConnectExchangeClient) || client.isClosed()) { + client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); } - - return lazyConnectExchangeClient; } @Override diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java index c34f6d9f148..b77e5e18b3c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java @@ -27,7 +27,6 @@ import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.ProxyFactory; import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -35,6 +34,12 @@ import org.junit.jupiter.api.Test; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + public class ReferenceCountExchangeClientTest { @@ -80,7 +85,7 @@ public void setUp() throws Exception { */ @Test public void test_share_connect() { - init(0); + init(0, 1); Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress()); Assertions.assertEquals(demoClient, helloClient); destoy(); @@ -91,18 +96,43 @@ public void test_share_connect() { */ @Test public void test_not_share_connect() { - init(1); + init(1, 1); Assertions.assertNotSame(demoClient.getLocalAddress(), helloClient.getLocalAddress()); Assertions.assertNotSame(demoClient, helloClient); destoy(); } + /** + * test using multiple shared connections + */ + @Test + public void test_mult_share_connect() { + // here a three shared connection is established between a consumer process and a provider process. + final int shareConnectionNum = 3; + + init(0, shareConnectionNum); + + List helloReferenceClientList = getReferenceClientList(helloServiceInvoker); + Assertions.assertEquals(shareConnectionNum, helloReferenceClientList.size()); + + List demoReferenceClientList = getReferenceClientList(demoServiceInvoker); + Assertions.assertEquals(shareConnectionNum, demoReferenceClientList.size()); + + // because helloServiceInvoker and demoServiceInvoker use share connect, so client list must be equal + Assertions.assertTrue(Objects.equals(helloReferenceClientList, demoReferenceClientList)); + + Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress()); + Assertions.assertEquals(demoClient, helloClient); + + destoy(); + } + /** * test counter won't count down incorrectly when invoker is destroyed for multiple times */ @Test public void test_multi_destory() { - init(0); + init(0, 1); DubboAppender.doStart(); DubboAppender.clear(); demoServiceInvoker.destroy(); @@ -119,16 +149,19 @@ public void test_multi_destory() { */ @Test public void test_counter_error() { - init(0); + init(0, 1); DubboAppender.doStart(); DubboAppender.clear(); + // because the two interfaces are initialized, the ReferenceCountExchangeClient reference counter is 2 ReferenceCountExchangeClient client = getReferenceClient(helloServiceInvoker); + // close once, counter counts down from 2 to 1, no warning occurs client.close(); Assertions.assertEquals("hello", helloService.hello()); Assertions.assertEquals(0, LogUtil.findMessage(errorMsg), "should not warning message"); - // counter is incorrect, invocation still succeeds + + // generally a client can only be closed once, here it is closed twice, counter is incorrect client.close(); // wait close done. @@ -138,6 +171,7 @@ public void test_counter_error() { Assertions.fail(); } + // due to the effect of LazyConnectExchangeClient, the client will be "revived" whenever there is a call. Assertions.assertEquals("hello", helloService.hello()); Assertions.assertEquals(1, LogUtil.findMessage(errorMsg), "should warning message"); @@ -150,7 +184,17 @@ public void test_counter_error() { // status switch to available once invoke again Assertions.assertEquals(true, helloServiceInvoker.isAvailable(), "client status available"); + /** + * This is the third time to close the same client. Under normal circumstances, + * a client value should be closed once (that is, the shutdown operation is irreversible). + * After closing, the value of the reference counter of the client has become -1. + * + * But this is a bit special, because after the client is closed twice, there are several calls to helloService, + * that is, the client inside the ReferenceCountExchangeClient is actually active, so the third shutdown here is still effective, + * let the resurrection After the client is really closed. + */ client.close(); + // client has been replaced with lazy client. lazy client is fetched from referenceclientmap, and since it's // been invoked once, it's close status is false Assertions.assertEquals(false, client.isClosed(), "client status close"); @@ -159,10 +203,13 @@ public void test_counter_error() { } @SuppressWarnings("unchecked") - private void init(int connections) { + private void init(int connections, int shareConnections) { + Assertions.assertTrue(connections >= 0); + Assertions.assertTrue(shareConnections >= 1); + int port = NetUtils.getAvailablePort(); - URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections); - URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections); + URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections); + URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections); demoExporter = export(new DemoServiceImpl(), IDemoService.class, demoUrl); helloExporter = export(new HelloServiceImpl(), IHelloService.class, helloUrl); @@ -204,17 +251,42 @@ private ExchangeClient getClient(Invoker invoker) { } private ReferenceCountExchangeClient getReferenceClient(Invoker invoker) { - return (ReferenceCountExchangeClient) getInvokerClient(invoker); + return getReferenceClientList(invoker).get(0); + } + + private List getReferenceClientList(Invoker invoker) { + List invokerClientList = getInvokerClientList(invoker); + + List referenceCountExchangeClientList = new ArrayList<>(invokerClientList.size()); + for (ExchangeClient exchangeClient : invokerClientList) { + Assertions.assertTrue(exchangeClient instanceof ReferenceCountExchangeClient); + referenceCountExchangeClientList.add((ReferenceCountExchangeClient) exchangeClient); + } + + return referenceCountExchangeClientList; } private ExchangeClient getInvokerClient(Invoker invoker) { + return getInvokerClientList(invoker).get(0); + } + + private List getInvokerClientList(Invoker invoker) { @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker) invoker; try { Field clientField = DubboInvoker.class.getDeclaredField("clients"); clientField.setAccessible(true); ExchangeClient[] clients = (ExchangeClient[]) clientField.get(dInvoker); - return clients[0]; + + List clientList = new ArrayList(clients.length); + for (ExchangeClient client : clients) { + clientList.add(client); + } + + // sorting makes it easy to compare between lists + Collections.sort(clientList, Comparator.comparing(c -> Integer.valueOf(Objects.hashCode(c)))); + + return clientList; } catch (Exception e) { e.printStackTrace();