From 903875a7eb4aa268932885a54cc9a6088a7f5ddd Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Sun, 29 Oct 2017 20:02:56 +0800 Subject: [PATCH 1/5] Fixed block on app start when zookeeper server can not be connected --- .../dubbo/common/concurrent/ExecutionList.java | 9 ++++++++- .../dubbo/common/concurrent/ListenableFuture.java | 2 ++ .../dubbo/common/concurrent/ListenableFutureTask.java | 5 +++++ .../java/com/alibaba/dubbo/common/utils/Assert.java | 6 ++++++ .../dubbo/monitor/support/AbstractMonitorFactory.java | 2 +- .../zookeeper/curator/CuratorZookeeperClient.java | 2 +- .../zookeeper/zkclient/ZkclientZookeeperClient.java | 11 ++++++----- 7 files changed, 29 insertions(+), 8 deletions(-) diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java index 2e4b039287a5..7fdd8e494e42 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java @@ -2,8 +2,10 @@ import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.NamedThreadFactory; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; /** *

A list of listeners, each with an associated {@code Executor}, that @@ -35,6 +37,8 @@ public final class ExecutionList { private boolean executed; + private static final Executor DEFAULT_EXECUTOR = Executors.newSingleThreadExecutor(new NamedThreadFactory("DubboFutureCallbackDefaultExecutor", true)); + /** * Creates a new, empty {@link ExecutionList}. */ @@ -63,9 +67,12 @@ public void add(Runnable runnable, Executor executor) { // Fail fast on a null. We throw NPE here because the contract of // Executor states that it throws NPE on null listener, so we propagate // that contract up into the add method as well. - if (runnable == null || executor == null) { + if (runnable == null) { throw new NullPointerException("Both Runnable and Executor can not be null!"); } + if (executor == null) { + executor = DEFAULT_EXECUTOR; + } // Lock while we check state. We must maintain the lock while adding the // new pair so that another thread can't run the list out from under us. diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java index 7b1ff0e9d77f..1600b3007985 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFuture.java @@ -116,4 +116,6 @@ public interface ListenableFuture extends Future { * immediately but the executor rejected it. */ void addListener(Runnable listener, Executor executor); + + void addListener(Runnable listener); } diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java index b77d173b6b70..538d05c983e9 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ListenableFutureTask.java @@ -65,6 +65,11 @@ public void addListener(Runnable listener, Executor exec) { executionList.add(listener, exec); } + @Override + public void addListener(Runnable listener) { + executionList.add(listener, null); + } + /** * Internal implementation detail used to invoke the listeners. */ diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java index bda7c4b60cc9..333f3e7d7d88 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java @@ -30,4 +30,10 @@ public static void notNull(Object obj, String message) { } } + public static void notNull(Object obj, RuntimeException exeception) { + if (obj == null) { + throw exeception; + } + } + } diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java index d4bda27fe540..8e91ab3f8c38 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/com/alibaba/dubbo/monitor/support/AbstractMonitorFactory.java @@ -80,7 +80,7 @@ public Monitor getMonitor(URL url) { final URL monitorUrl = url; final ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl)); - listenableFutureTask.addListener(new MonitorListener(key), executor); + listenableFutureTask.addListener(new MonitorListener(key)); executor.execute(listenableFutureTask); FUTURES.put(key, listenableFutureTask); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index de3b3fbff3e5..f211a3e02a35 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -27,7 +27,7 @@ public CuratorZookeeperClient(URL url) { try { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) - .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000)) + .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java index dec4da12542c..e067104f91b1 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java @@ -7,7 +7,6 @@ import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -16,14 +15,14 @@ public class ZkclientZookeeperClient extends AbstractZookeeperClient { - private final ZkClient client; + private final ZkClientWrapper client; private volatile KeeperState state = KeeperState.SyncConnected; public ZkclientZookeeperClient(URL url) { super(url); - client = new ZkClient(url.getBackupAddress()); - client.subscribeStateChanges(new IZkStateListener() { + client = new ZkClientWrapper(url.getBackupAddress(), 5000); + client.addListener(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; if (state == KeeperState.Disconnected) { @@ -37,11 +36,13 @@ public void handleNewSession() throws Exception { stateChanged(StateListener.RECONNECTED); } }); + client.start(); } + public void createPersistent(String path) { try { - client.createPersistent(path, true); + client.createPersistent(path); } catch (ZkNodeExistsException e) { } } From 24fcaf0cc8871b042818f3f24ad265d63077dca6 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Sun, 29 Oct 2017 20:04:17 +0800 Subject: [PATCH 2/5] Fixed block on app start when zookeeper server can not be connected --- .../zookeeper/zkclient/ZkClientWrapper.java | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java new file mode 100644 index 000000000000..e156e59dfe42 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java @@ -0,0 +1,122 @@ +package com.alibaba.dubbo.remoting.zookeeper.zkclient; + +import com.alibaba.dubbo.common.concurrent.ListenableFutureTask; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.Assert; + +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkStateListener; +import org.I0Itec.zkclient.ZkClient; +import org.apache.zookeeper.Watcher.Event.KeeperState; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * @author ken.lj + * @date 2017/10/29 + */ +public class ZkClientWrapper { + Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class); + + private long timeout; + private ZkClient client; + private volatile KeeperState state; + private ListenableFutureTask listenableFutureTask; + private volatile boolean started = false; + + + public ZkClientWrapper(final String serverAddr, long timeout) { + this.timeout = timeout; + listenableFutureTask = ListenableFutureTask.create(new Callable() { + @Override + public ZkClient call() throws Exception { + return new ZkClient(serverAddr, Integer.MAX_VALUE); + } + }); + } + + public void start() { + if (!started) { + Thread connectThread = new Thread(listenableFutureTask); + connectThread.setName("DubboZkclientConnector"); + connectThread.setDaemon(true); + connectThread.start(); + try { + client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); + } + started = true; + } else { + throw new IllegalStateException("Zkclient has already been started!"); + } + } + + public void addListener(final IZkStateListener listener) { + listenableFutureTask.addListener(new Runnable() { + @Override + public void run() { + try { + client = listenableFutureTask.get(); + client.subscribeStateChanges(listener); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + }); + } + + public boolean isConnected() { + return client != null && state == KeeperState.SyncConnected; + } + + public void createPersistent(String path) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + client.createPersistent(path, true); + } + + public void createEphemeral(String path) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + client.createEphemeral(path); + } + + public void delete(String path) { + if (client == null) { + throw new IllegalStateException("Zookeeper is not connected!"); + } + client.delete(path); + } + + public List getChildren(String path) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + return client.getChildren(path); + } + + public boolean exists(String path) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + return client.exists(path); + } + + public void close() { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + client.close(); + } + + public List subscribeChildChanges(String path, final IZkChildListener listener) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + return client.subscribeChildChanges(path, listener); + } + + public void unsubscribeChildChanges(String path, IZkChildListener listener) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + client.unsubscribeChildChanges(path, listener); + } + + +} From 2848c9ebde0ec4109cc384abd5686cb3dba0482c Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Sun, 29 Oct 2017 21:11:18 +0800 Subject: [PATCH 3/5] Increase zkclient timeout to 30s --- .../dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java | 2 ++ .../remoting/zookeeper/zkclient/ZkclientZookeeperClient.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java index e156e59dfe42..839a230e0b62 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java @@ -16,6 +16,8 @@ import java.util.concurrent.TimeUnit; /** + * 连接超时后,能自动监听连接状态的zkclient包装类 + * 也为和curator在使用上总体保持一致 * @author ken.lj * @date 2017/10/29 */ diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java index e067104f91b1..498f43158b95 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java @@ -21,7 +21,7 @@ public class ZkclientZookeeperClient extends AbstractZookeeperClient Date: Mon, 30 Oct 2017 16:04:01 +0800 Subject: [PATCH 4/5] Increase default executor size in ListenableFuture --- .../alibaba/dubbo/common/concurrent/ExecutionList.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java index 7fdd8e494e42..5a1448581dda 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/concurrent/ExecutionList.java @@ -5,7 +5,9 @@ import com.alibaba.dubbo.common.utils.NamedThreadFactory; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** *

A list of listeners, each with an associated {@code Executor}, that @@ -37,7 +39,7 @@ public final class ExecutionList { private boolean executed; - private static final Executor DEFAULT_EXECUTOR = Executors.newSingleThreadExecutor(new NamedThreadFactory("DubboFutureCallbackDefaultExecutor", true)); + private static final Executor DEFAULT_EXECUTOR = new ThreadPoolExecutor(1, 10, 60000L, TimeUnit.MILLISECONDS, new SynchronousQueue(), new NamedThreadFactory("DubboFutureCallbackDefault", true)); /** * Creates a new, empty {@link ExecutionList}. @@ -68,12 +70,12 @@ public void add(Runnable runnable, Executor executor) { // Executor states that it throws NPE on null listener, so we propagate // that contract up into the add method as well. if (runnable == null) { - throw new NullPointerException("Both Runnable and Executor can not be null!"); + throw new NullPointerException("Runnable can not be null!"); } if (executor == null) { + logger.info("Executor for listenablefuture is null, will use default executor!"); executor = DEFAULT_EXECUTOR; } - // Lock while we check state. We must maintain the lock while adding the // new pair so that another thread can't run the list out from under us. // We only add to the list if we have not yet started execution. From e1aa50f842fb78acebad9d66d51d04bd95dd3105 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Mon, 30 Oct 2017 16:05:23 +0800 Subject: [PATCH 5/5] Record exception with logger --- .../zookeeper/zkclient/ZkClientWrapper.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java index 839a230e0b62..318443ae53c8 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/com/alibaba/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java @@ -54,7 +54,7 @@ public void start() { } started = true; } else { - throw new IllegalStateException("Zkclient has already been started!"); + logger.warn("Zkclient has already been started!"); } } @@ -66,9 +66,9 @@ public void run() { client = listenableFutureTask.get(); client.subscribeStateChanges(listener); } catch (InterruptedException e) { - e.printStackTrace(); + logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!"); } catch (ExecutionException e) { - e.printStackTrace(); + logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); } } }); @@ -79,44 +79,42 @@ public boolean isConnected() { } public void createPersistent(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); client.createPersistent(path, true); } public void createEphemeral(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); client.createEphemeral(path); } public void delete(String path) { - if (client == null) { - throw new IllegalStateException("Zookeeper is not connected!"); - } + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); client.delete(path); } public List getChildren(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); return client.getChildren(path); } public boolean exists(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); return client.exists(path); } public void close() { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); client.close(); } public List subscribeChildChanges(String path, final IZkChildListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); return client.subscribeChildChanges(path, listener); } public void unsubscribeChildChanges(String path, IZkChildListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected!")); + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); client.unsubscribeChildChanges(path, listener); }