diff --git a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java
index 4a2190a4796..cdedd15e34e 100644
--- a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java
+++ b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java
@@ -49,4 +49,12 @@ public ConfigChangeType getChangeType() {
return changeType;
}
-}
\ No newline at end of file
+ @Override
+ public String toString() {
+ return "ConfigChangeEvent{" +
+ "key='" + key + '\'' +
+ ", value='" + value + '\'' +
+ ", changeType=" + changeType +
+ '}';
+ }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
index bb9e1ad5d45..5c84f6515ac 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
@@ -33,16 +33,9 @@
${project.parent.version}
- org.apache.curator
- curator-framework
-
-
- org.apache.curator
- curator-recipes
-
-
- org.apache.zookeeper
- zookeeper
+ org.apache.dubbo
+ dubbo-remoting-zookeeper
+ ${project.parent.version}
org.apache.curator
@@ -50,4 +43,4 @@
test
-
\ No newline at end of file
+
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
index 1851a22b2f0..4f6c6382952 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
@@ -21,13 +21,9 @@
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
+import org.apache.dubbo.remoting.zookeeper.EventType;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-
-import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,9 +33,8 @@
/**
*
*/
-public class CacheListener implements TreeCacheListener {
- private static final byte[] EMPTY_BYTES = new byte[0];
+public class CacheListener implements DataListener {
private Map> keyListeners = new ConcurrentHashMap<>();
private CountDownLatch initializedLatch;
private String rootPath;
@@ -49,76 +44,73 @@ public CacheListener(String rootPath, CountDownLatch initializedLatch) {
this.initializedLatch = initializedLatch;
}
+ public void addListener(String key, ConfigurationListener configurationListener) {
+ Set listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
+ listeners.add(configurationListener);
+ }
+
+ public void removeListener(String key, ConfigurationListener configurationListener) {
+ Set listeners = this.keyListeners.get(key);
+ if (listeners != null) {
+ listeners.remove(configurationListener);
+ }
+ }
+
+ /**
+ * This is used to convert a configuration nodePath into a key
+ * TODO doc
+ *
+ * @param path
+ * @return key (nodePath less the config root path)
+ */
+ private String pathToKey(String path) {
+ if (StringUtils.isEmpty(path)) {
+ return path;
+ }
+ return path.replace(rootPath + "/", "").replaceAll("/", ".");
+ }
+
+
@Override
- public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception {
+ public void dataChanged(String path, Object value, EventType eventType) {
+ if (eventType == null) {
+ return;
+ }
- TreeCacheEvent.Type type = event.getType();
- ChildData data = event.getData();
- if (type == TreeCacheEvent.Type.INITIALIZED) {
+ if (eventType == EventType.INITIALIZED) {
initializedLatch.countDown();
return;
}
- // TODO, ignore other event types
- if (data == null) {
+ if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
return;
}
// TODO We limit the notification of config changes to a specific path level, for example
// /dubbo/config/service/configurators, other config changes not in this level will not get notified,
// say /dubbo/config/dubbo.properties
- if (data.getPath().split("/").length >= 5) {
- byte[] value = data.getData();
- String key = pathToKey(data.getPath());
+ if (path.split("/").length >= 5) {
+ String key = pathToKey(path);
ConfigChangeType changeType;
- switch (type) {
- case NODE_ADDED:
+ switch (eventType) {
+ case NodeCreated:
changeType = ConfigChangeType.ADDED;
break;
- case NODE_REMOVED:
+ case NodeDeleted:
changeType = ConfigChangeType.DELETED;
break;
- case NODE_UPDATED:
+ case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}
- if (value == null) {
- value = EMPTY_BYTES;
- }
- ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
+ ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
Set listeners = keyListeners.get(key);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}
-
- public void addListener(String key, ConfigurationListener configurationListener) {
- Set listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
- listeners.add(configurationListener);
- }
-
- public void removeListener(String key, ConfigurationListener configurationListener) {
- Set listeners = this.keyListeners.get(key);
- if (listeners != null) {
- listeners.remove(configurationListener);
- }
- }
-
- /**
- * This is used to convert a configuration nodePath into a key
- * TODO doc
- *
- * @param path
- * @return key (nodePath less the config root path)
- */
- private String pathToKey(String path) {
- if (StringUtils.isEmpty(path)) {
- return path;
- }
- return path.replace(rootPath + "/", "").replaceAll("/", ".");
- }
}
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
index 7a106f86dec..dac49ccb0e6 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
@@ -16,28 +16,21 @@
*/
package org.apache.dubbo.configcenter.support.zookeeper;
-import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
+import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import static org.apache.curator.framework.CuratorFrameworkFactory.newClient;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
/**
@@ -45,52 +38,32 @@
*/
public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class);
- private Executor executor;
- private CuratorFramework client;
+ private Executor executor;
// The final root path would be: /configRootPath/"config"
private String rootPath;
- private TreeCache treeCache;
+ private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch;
private CacheListener cacheListener;
private URL url;
- ZookeeperDynamicConfiguration(URL url) {
+ ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
this.url = url;
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
- RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
- int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000);
- int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000);
- String connectString = url.getBackupAddress();
- client = newClient(connectString, sessionTimeout, connectTimeout, policy);
- client.start();
-
- try {
- boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS);
- if (!connected) {
- if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) {
- throw new IllegalStateException("Failed to connect to config center (zookeeper): "
- + connectString + " in " + 3 * connectTimeout + "ms.");
- } else {
- logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString);
- }
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper "
- + connectString + " config center, ", e);
- }
-
initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
- // build local cache
+
+ zkClient = zookeeperTransporter.connect(url);
+ zkClient.addDataListener(rootPath, cacheListener, executor);
try {
- this.buildCache();
- } catch (Exception e) {
- logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString);
+ // Wait for connection
+ this.initializedLatch.await();
+ } catch (InterruptedException e) {
+ logger.warn("Failed to build local cache for config center (zookeeper)." + url);
}
}
@@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
*/
@Override
public Object getInternalProperty(String key) {
- ChildData childData = treeCache.getCurrentData(key);
- if (childData != null) {
- return new String(childData.getData(), StandardCharsets.UTF_8);
- }
- return null;
+ return zkClient.getContent(key);
}
/**
@@ -141,18 +110,4 @@ public String getConfig(String key, String group, long timeout) throws IllegalSt
return (String) getInternalProperty(rootPath + "/" + key);
}
-
- /**
- * Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background
- * thread
- */
- private void buildCache() throws Exception {
- this.treeCache = new TreeCache(client, rootPath);
- // create the watcher for future configuration updates
- treeCache.getListenable().addListener(cacheListener, executor);
-
- // it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use.
- treeCache.start();
- initializedLatch.await();
- }
}
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
index 7994e0461f3..4d78133dbaf 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
@@ -19,13 +19,22 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
/**
*
*/
public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
+
+ private ZookeeperTransporter zookeeperTransporter;
+
+ public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
+ this.zookeeperTransporter = zookeeperTransporter;
+ }
+
+
@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
- return new ZookeeperDynamicConfiguration(url);
+ return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
}
}
diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
index e1ca40f8040..40f9f04a95a 100644
--- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
+++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java
@@ -133,6 +133,7 @@ public TestListener(CountDownLatch latch) {
@Override
public void process(ConfigChangeEvent event) {
+ System.out.println(this + ": " + event);
Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0));
countMap.put(event.getKey(), ++count);
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index baa2eed5fc5..4cf42d85d37 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -101,7 +101,6 @@
4.4.6
1.2.46
3.4.13
- 0.2
4.0.1
2.12.0
2.9.0
@@ -150,7 +149,7 @@
- org.springframework
+org.springframework
spring-framework-bom
${spring_version}
pom
@@ -207,17 +206,6 @@
-
- com.101tec
- zkclient
- ${zkclient_version}
-
-
- org.apache.zookeeper
- zookeeper
-
-
-
org.apache.curator
curator-framework
diff --git a/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml b/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
index 7305c2bc81c..18a3f2abcc7 100644
--- a/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
+++ b/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
@@ -50,10 +50,6 @@
org.apache.curator
curator-recipes
-
- com.101tec
- zkclient
-
org.apache.zookeeper
zookeeper
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
index 97ce12adc23..24b14e9bd7e 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
@@ -35,16 +35,10 @@
${project.parent.version}
- org.apache.zookeeper
- zookeeper
-
-
- com.101tec
- zkclient
-
-
- org.apache.curator
- curator-framework
+ org.apache.dubbo
+ dubbo-dependencies-zookeeper
+ ${project.parent.version}
+ pom
org.apache.curator
@@ -52,4 +46,4 @@
test
-
\ No newline at end of file
+
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java
similarity index 64%
rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java
rename to dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java
index 0ad86ff7850..95b948adafa 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java
@@ -14,16 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.zookeeper.zkclient;
+package org.apache.dubbo.remoting.zookeeper;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
-import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter;
-
-public class ZkclientZookeeperTransporter extends AbstractZookeeperTransporter {
- @Override
- public ZookeeperClient createZookeeperClient(URL url) {
- return new ZkclientZookeeperClient(url);
- }
+/**
+ * 2019-02-26
+ */
+public interface DataListener {
+ void dataChanged(String path, Object value, EventType eventType);
}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java
new file mode 100644
index 00000000000..a1de0373652
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.zookeeper;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * 2019-02-26
+ */
+public enum EventType {
+ None(-1),
+ NodeCreated(1),
+ NodeDeleted(2),
+ NodeDataChanged(3),
+ NodeChildrenChanged(4),
+ CONNECTION_SUSPENDED(11),
+ CONNECTION_RECONNECTED(12),
+ CONNECTION_LOST(12),
+ INITIALIZED(10);
+
+
+
+ private final int intValue; // Integer representation of value
+ // for sending over wire
+
+ EventType(int intValue) {
+ this.intValue = intValue;
+ }
+
+ public int getIntValue() {
+ return intValue;
+ }
+
+ public static Watcher.Event.EventType fromInt(int intValue) {
+ switch (intValue) {
+ case -1:
+ return Watcher.Event.EventType.None;
+ case 1:
+ return Watcher.Event.EventType.NodeCreated;
+ case 2:
+ return Watcher.Event.EventType.NodeDeleted;
+ case 3:
+ return Watcher.Event.EventType.NodeDataChanged;
+ case 4:
+ return Watcher.Event.EventType.NodeChildrenChanged;
+
+ default:
+ throw new RuntimeException("Invalid integer value for conversion to EventType");
+ }
+ }
+}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
index b6875ee8e52..cbb37479cd3 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java
@@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import java.util.List;
+import java.util.concurrent.Executor;
public interface ZookeeperClient {
@@ -30,6 +31,21 @@ public interface ZookeeperClient {
List addChildListener(String path, ChildListener listener);
+ /**
+ * @param path: directory. All of child of path will be listened.
+ * @param listener
+ */
+ void addDataListener(String path, DataListener listener);
+
+ /**
+ * @param path: directory. All of child of path will be listened.
+ * @param listener
+ * @param executor another thread
+ */
+ void addDataListener(String path, DataListener listener, Executor executor);
+
+ void removeDataListener(String path, DataListener listener);
+
void removeChildListener(String path, ChildListener listener);
void addStateListener(StateListener listener);
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
index a78edda76a6..4bf7b6d3bf5 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java
@@ -16,18 +16,24 @@
*/
package org.apache.dubbo.remoting.zookeeper.curator;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.RetryNTimes;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
+import org.apache.dubbo.remoting.zookeeper.EventType;
import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -36,11 +42,15 @@
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
-public class CuratorZookeeperClient extends AbstractZookeeperClient {
+public class CuratorZookeeperClient extends AbstractZookeeperClient {
- private final Charset charset = Charset.forName("UTF-8");
+ static final Charset charset = Charset.forName("UTF-8");
private final CuratorFramework client;
+ private Map treeCacheMap = new ConcurrentHashMap<>();
public CuratorZookeeperClient(URL url) {
@@ -96,10 +106,15 @@ public void createEphemeral(String path) {
@Override
protected void createPersistent(String path, String data) {
+ byte[] dataBytes = data.getBytes(charset);
try {
- byte[] dataBytes = data.getBytes(charset);
client.create().forPath(path, dataBytes);
} catch (NodeExistsException e) {
+ try {
+ client.setData().forPath(path, dataBytes);
+ } catch (Exception e1) {
+ throw new IllegalStateException(e.getMessage(), e1);
+ }
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -107,10 +122,15 @@ protected void createPersistent(String path, String data) {
@Override
protected void createEphemeral(String path, String data) {
+ byte[] dataBytes = data.getBytes(charset);
try {
- byte[] dataBytes = data.getBytes(charset);
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
} catch (NodeExistsException e) {
+ try {
+ client.setData().forPath(path, dataBytes);
+ } catch (Exception e1) {
+ throw new IllegalStateException(e.getMessage(), e1);
+ }
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -172,12 +192,12 @@ public void doClose() {
}
@Override
- public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
- return new CuratorWatcherImpl(listener);
+ public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) {
+ return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener);
}
@Override
- public List addTargetChildListener(String path, CuratorWatcher listener) {
+ public List addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
@@ -188,27 +208,73 @@ public List addTargetChildListener(String path, CuratorWatcher listener)
}
@Override
- public void removeTargetChildListener(String path, CuratorWatcher listener) {
- ((CuratorWatcherImpl) listener).unwatch();
+ protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
+ return new CuratorWatcherImpl(client, listener);
+ }
+
+ @Override
+ protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
+ this.addTargetDataListener(path, treeCacheListener, null);
+ }
+
+ @Override
+ protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
+ try {
+ TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
+ treeCacheMap.putIfAbsent(path, treeCache);
+ treeCache.start();
+ if (executor == null) {
+ treeCache.getListenable().addListener(treeCacheListener);
+ } else {
+ treeCache.getListenable().addListener(treeCacheListener, executor);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Add treeCache listener for path:" + path, e);
+ }
+ }
+
+ @Override
+ protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
+ TreeCache treeCache = treeCacheMap.get(path);
+ if (treeCache != null) {
+ treeCache.getListenable().removeListener(treeCacheListener);
+ }
+ treeCacheListener.dataListener = null;
}
- private class CuratorWatcherImpl implements CuratorWatcher {
+ @Override
+ public void removeTargetChildListener(String path, CuratorWatcherImpl listener) {
+ listener.unwatch();
+ }
+
+ static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
+
+ private CuratorFramework client;
+ private volatile ChildListener childListener;
+ private volatile DataListener dataListener;
- private volatile ChildListener listener;
- public CuratorWatcherImpl(ChildListener listener) {
- this.listener = listener;
+ public CuratorWatcherImpl(CuratorFramework client, ChildListener listener) {
+ this.client = client;
+ this.childListener = listener;
+ }
+
+ public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
+ this.dataListener = dataListener;
+ }
+
+ protected CuratorWatcherImpl() {
}
public void unwatch() {
- this.listener = null;
+ this.childListener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
- if (listener != null) {
+ if (childListener != null) {
String path = event.getPath() == null ? "" : event.getPath();
- listener.childChanged(path,
+ childListener.childChanged(path,
// if path is null, curator using watcher will throw NullPointerException.
// if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null).
@@ -217,6 +283,54 @@ public void process(WatchedEvent event) throws Exception {
: Collections.emptyList());
}
}
+
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (dataListener != null) {
+ TreeCacheEvent.Type type = event.getType();
+ EventType eventType = null;
+ String content = null;
+ String path = null;
+ switch (type) {
+ case NODE_ADDED:
+ eventType = EventType.NodeCreated;
+ path = event.getData().getPath();
+ content = new String(event.getData().getData(), charset);
+ break;
+ case NODE_UPDATED:
+ eventType = EventType.NodeDataChanged;
+ path = event.getData().getPath();
+ content = new String(event.getData().getData(), charset);
+ break;
+ case NODE_REMOVED:
+ path = event.getData().getPath();
+ eventType = EventType.NodeDeleted;
+ break;
+ case INITIALIZED:
+ eventType = EventType.INITIALIZED;
+ break;
+ case CONNECTION_LOST:
+ eventType = EventType.CONNECTION_LOST;
+ break;
+ case CONNECTION_RECONNECTED:
+ eventType = EventType.CONNECTION_RECONNECTED;
+ break;
+ case CONNECTION_SUSPENDED:
+ eventType = EventType.CONNECTION_SUSPENDED;
+ break;
+
+ }
+ dataListener.dataChanged(path, content, eventType);
+ }
+ }
}
+ /**
+ * just for unit test
+ *
+ * @return
+ */
+ CuratorFramework getClient() {
+ return client;
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java
index e90f7fb9c71..9697cea012e 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java
@@ -20,6 +20,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
@@ -28,8 +29,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
-public abstract class AbstractZookeeperClient implements ZookeeperClient {
+public abstract class AbstractZookeeperClient implements ZookeeperClient {
protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class);
@@ -39,6 +41,8 @@ public abstract class AbstractZookeeperClient implements Zo
private final ConcurrentMap> childListeners = new ConcurrentHashMap>();
+ private final ConcurrentMap> listeners = new ConcurrentHashMap>();
+
private volatile boolean closed = false;
public AbstractZookeeperClient(URL url) {
@@ -97,6 +101,37 @@ public List addChildListener(String path, final ChildListener listener)
return addTargetChildListener(path, targetListener);
}
+ @Override
+ public void addDataListener(String path, DataListener listener) {
+ this.addDataListener(path, listener, null);
+ }
+
+ @Override
+ public void addDataListener(String path, DataListener listener, Executor executor) {
+ ConcurrentMap dataListenerMap = listeners.get(path);
+ if (dataListenerMap == null) {
+ listeners.putIfAbsent(path, new ConcurrentHashMap());
+ dataListenerMap = listeners.get(path);
+ }
+ TargetDataListener targetListener = dataListenerMap.get(listener);
+ if (targetListener == null) {
+ dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener));
+ targetListener = dataListenerMap.get(listener);
+ }
+ addTargetDataListener(path, targetListener, executor);
+ }
+
+ @Override
+ public void removeDataListener(String path, DataListener listener ){
+ ConcurrentMap dataListenerMap = listeners.get(path);
+ if (dataListenerMap != null) {
+ TargetDataListener targetListener = dataListenerMap.remove(listener);
+ if(targetListener != null){
+ removeTargetDataListener(path, targetListener);
+ }
+ }
+ }
+
@Override
public void removeChildListener(String path, ChildListener listener) {
ConcurrentMap listeners = childListeners.get(path);
@@ -167,6 +202,14 @@ public String getContent(String path) {
protected abstract List addTargetChildListener(String path, TargetChildListener listener);
+ protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener);
+
+ protected abstract void addTargetDataListener(String path, TargetDataListener listener);
+
+ protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor);
+
+ protected abstract void removeTargetDataListener(String path, TargetDataListener listener);
+
protected abstract void removeTargetChildListener(String path, TargetChildListener listener);
protected abstract String doGetContent(String path);
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java
deleted file mode 100644
index ae8a3ef87b2..00000000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.Assert;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Zkclient wrapper class that can monitor the state of the connection automatically after the connection is out of time
- * It is also consistent with the use of curator
- *
- * @date 2017/10/29
- */
-public class ZkClientWrapper {
- private Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);
- private long timeout;
- private ZkClient client;
- private volatile KeeperState state;
- private CompletableFuture completableFuture;
- private volatile boolean started = false;
-
- public ZkClientWrapper(final String serverAddr, long timeout) {
- this.timeout = timeout;
- completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE));
- }
-
- public void start() {
- if (!started) {
- try {
- client = completableFuture.get(timeout, TimeUnit.MILLISECONDS);
-// this.client.subscribeStateChanges(IZkStateListener);
- } catch (Throwable t) {
- logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
- completableFuture.whenComplete(this::makeClientReady);
- }
- started = true;
- } else {
- logger.warn("Zkclient has already been started!");
- }
- }
-
- public void addListener(IZkStateListener listener) {
- completableFuture.whenComplete((value, exception) -> {
- this.makeClientReady(value, exception);
- if (exception == null) {
- client.subscribeStateChanges(listener);
- }
- });
- }
-
- public boolean isConnected() {
-// return client != null && state == KeeperState.SyncConnected;
- return client != null;
- }
-
- public void createPersistent(String path) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.createPersistent(path, true);
- }
-
- public void createEphemeral(String path) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.createEphemeral(path);
- }
-
- public void createPersistent(String path, String data) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.createPersistent(path, data);
- }
-
- public void createEphemeral(String path, String data) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.createEphemeral(path, data);
- }
-
- public void delete(String path) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.delete(path);
- }
-
- public List getChildren(String path) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- return client.getChildren(path);
- }
-
- public String getData(String path) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- return client.readData(path);
- }
-
- public boolean exists(String path) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- return client.exists(path);
- }
-
- public void close() {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.close();
- }
-
- public List subscribeChildChanges(String path, final IZkChildListener listener) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- return client.subscribeChildChanges(path, listener);
- }
-
- public void unsubscribeChildChanges(String path, IZkChildListener listener) {
- Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
- client.unsubscribeChildChanges(path, listener);
- }
-
- private void makeClientReady(ZkClient client, Throwable e) {
- if (e != null) {
- logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
- } else {
- this.client = client;
-// this.client.subscribeStateChanges(IZkStateListener);
- }
- }
-
-
-}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
deleted file mode 100644
index c36640025b9..00000000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.remoting.zookeeper.ChildListener;
-import org.apache.dubbo.remoting.zookeeper.StateListener;
-import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import java.util.List;
-
-public class ZkclientZookeeperClient extends AbstractZookeeperClient {
-
- private Logger logger = LoggerFactory.getLogger(ZkclientZookeeperClient.class);
-
- private final ZkClientWrapper client;
-
- private volatile KeeperState state = KeeperState.SyncConnected;
-
- public ZkclientZookeeperClient(URL url) {
- super(url);
- long timeout = url.getParameter(Constants.TIMEOUT_KEY, 30000L);
- client = new ZkClientWrapper(url.getBackupAddress(), timeout);
- client.addListener(new IZkStateListener() {
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- ZkclientZookeeperClient.this.state = state;
- if (state == KeeperState.Disconnected) {
- stateChanged(StateListener.DISCONNECTED);
- } else if (state == KeeperState.SyncConnected) {
- stateChanged(StateListener.CONNECTED);
- }
- }
-
- @Override
- public void handleNewSession() throws Exception {
- stateChanged(StateListener.RECONNECTED);
- }
- });
- client.start();
- }
-
- @Override
- public void createPersistent(String path) {
- try {
- client.createPersistent(path);
- } catch (ZkNodeExistsException e) {
- logger.error("zookeeper failed to create persistent node with " + path + ": ", e);
- }
- }
-
- @Override
- public void createEphemeral(String path) {
- try {
- client.createEphemeral(path);
- } catch (ZkNodeExistsException e) {
- logger.error("zookeeper failed to create ephemeral node with " + path + ": ", e);
- }
- }
-
- @Override
- protected void createPersistent(String path, String data) {
- try {
- client.createPersistent(path, data);
- } catch (ZkNodeExistsException e) {
- logger.error("zookeeper failed to create persistent node with " +
- path + " and " + data + " : ", e);
- }
- }
-
- @Override
- protected void createEphemeral(String path, String data) {
- try {
- client.createEphemeral(path, data);
- } catch (ZkNodeExistsException e) {
- logger.error("zookeeper failed to create ephemeral node with " +
- path + " and " + data + " : ", e);
- }
- }
-
- @Override
- public void delete(String path) {
- try {
- client.delete(path);
- } catch (ZkNoNodeException e) {
- logger.error("zookeeper failed to delete node with " + path + ": ", e);
- }
- }
-
- @Override
- public List getChildren(String path) {
- try {
- return client.getChildren(path);
- } catch (ZkNoNodeException e) {
- logger.error("zookeeper failed to get children node with " + path + ": ", e);
- return null;
- }
- }
-
- @Override
- public boolean checkExists(String path) {
- try {
- return client.exists(path);
- } catch (Throwable t) {
- logger.error("zookeeper failed to check node existing with " + path + ": ", t);
- }
- return false;
- }
-
- @Override
- public boolean isConnected() {
- return state == KeeperState.SyncConnected;
- }
-
- @Override
- public String doGetContent(String path) {
- try {
- return client.getData(path);
- } catch (ZkNoNodeException e) {
- logger.error("zookeeper failed to get data with " + path + ": ", e);
- return null;
- }
- }
-
- @Override
- public void doClose() {
- client.close();
- }
-
- @Override
- public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
- return listener::childChanged;
- }
-
- @Override
- public List addTargetChildListener(String path, final IZkChildListener listener) {
- return client.subscribeChildChanges(path, listener);
- }
-
- @Override
- public void removeTargetChildListener(String path, IZkChildListener listener) {
- client.unsubscribeChildChanges(path, listener);
- }
-
-}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
index e9b9349f339..f8cbd5b417d 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
@@ -1,2 +1 @@
-zkclient=org.apache.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter
-curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
\ No newline at end of file
+curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
index cb89b166c1c..f1882e1c759 100644
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
+++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java
@@ -20,7 +20,11 @@
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.WatchedEvent;
import org.junit.jupiter.api.AfterEach;
@@ -31,6 +35,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -41,6 +46,7 @@
public class CuratorZookeeperClientTest {
private TestingServer zkServer;
private CuratorZookeeperClient curatorClient;
+ CuratorFramework client = null;
@BeforeEach
public void setUp() throws Exception {
@@ -48,6 +54,8 @@ public void setUp() throws Exception {
zkServer = new TestingServer(zkServerPort, true);
curatorClient = new CuratorZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" +
zkServerPort + "/org.apache.dubbo.registry.RegistryService"));
+ client = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+ client.start();
}
@Test
@@ -74,7 +82,8 @@ public void testChildrenListener() throws InterruptedException {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
curatorClient.create(path, false);
final CountDownLatch countDownLatch = new CountDownLatch(1);
- curatorClient.addTargetChildListener(path, new CuratorWatcher() {
+ curatorClient.addTargetChildListener(path, new CuratorZookeeperClient.CuratorWatcherImpl() {
+
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
countDownLatch.countDown();
@@ -153,4 +162,35 @@ public void tearDown() throws Exception {
curatorClient.close();
zkServer.stop();
}
+
+ @Test
+ public void testAddTargetDataListener() throws Exception {
+ String listenerPath = "/dubbo/service.name/configuration";
+ String path = listenerPath + "/dat/data";
+ String value = "vav";
+
+ curatorClient.create(path + "/d.json", value, true);
+ String valueFromCache = curatorClient.getContent(path + "/d.json");
+ Assertions.assertEquals(value, valueFromCache);
+ final AtomicInteger atomicInteger = new AtomicInteger(0);
+ curatorClient.addTargetDataListener(listenerPath, new CuratorZookeeperClient.CuratorWatcherImpl() {
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ System.out.println("===" + event);
+ atomicInteger.incrementAndGet();
+ }
+ });
+
+ valueFromCache = curatorClient.getContent(path + "/d.json");
+ Assertions.assertNotNull(valueFromCache);
+ curatorClient.getClient().setData().forPath(path + "/d.json", "sdsdf".getBytes());
+ curatorClient.getClient().setData().forPath(path + "/d.json", "dfsasf".getBytes());
+ curatorClient.delete(path + "/d.json");
+ curatorClient.delete(path);
+ valueFromCache = curatorClient.getContent(path + "/d.json");
+ Assertions.assertNull(valueFromCache);
+ Thread.sleep(2000l);
+ Assertions.assertTrue(9l >= atomicInteger.get());
+ Assertions.assertTrue(2l <= atomicInteger.get());
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java
deleted file mode 100644
index 629c0e9f772..00000000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.apache.dubbo.common.utils.NetUtils;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
-
-public class ZkClientWrapperTest {
- private TestingServer zkServer;
- private ZkClientWrapper zkClientWrapper;
-
- @BeforeEach
- public void setUp() throws Exception {
- int zkServerPort = NetUtils.getAvailablePort();
- zkServer = new TestingServer(zkServerPort, true);
- zkClientWrapper = new ZkClientWrapper("127.0.0.1:" + zkServerPort, 10000);
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- zkServer.stop();
- }
-
- @Test
- public void testConnectedStatus() {
- boolean connected = zkClientWrapper.isConnected();
- assertThat(connected, is(false));
- zkClientWrapper.start();
-
- IZkChildListener listener = mock(IZkChildListener.class);
- zkClientWrapper.subscribeChildChanges("/path", listener);
- zkClientWrapper.unsubscribeChildChanges("/path", listener);
- }
-}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java
deleted file mode 100644
index 73c402a4b2c..00000000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.remoting.zookeeper.StateListener;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.core.Is.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-public class ZkclientZookeeperClientTest {
- private TestingServer zkServer;
- private ZkclientZookeeperClient zkclientZookeeperClient;
-
- @BeforeEach
- public void setUp() throws Exception {
- int zkServerPort = NetUtils.getAvailablePort();
- zkServer = new TestingServer(zkServerPort, true);
- zkclientZookeeperClient = new ZkclientZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" +
- zkServerPort + "/org.apache.dubbo.registry.RegistryService"));
- }
-
- @Test
- public void testCheckExists() {
- String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
- zkclientZookeeperClient.create(path, false);
- assertThat(zkclientZookeeperClient.checkExists(path), is(true));
- assertThat(zkclientZookeeperClient.checkExists(path + "/noneexits"), is(false));
- }
-
- @Test
- public void testDeletePath() {
- String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
- zkclientZookeeperClient.create(path, false);
- assertThat(zkclientZookeeperClient.checkExists(path), is(true));
-
- zkclientZookeeperClient.delete(path);
- assertThat(zkclientZookeeperClient.checkExists(path), is(false));
- }
-
- @Test
- public void testConnectState() throws Exception {
- assertThat(zkclientZookeeperClient.isConnected(), is(true));
- final CountDownLatch stopLatch = new CountDownLatch(1);
- zkclientZookeeperClient.addStateListener(new StateListener() {
- @Override
- public void stateChanged(int connected) {
- stopLatch.countDown();
- }
- });
- zkServer.stop();
- stopLatch.await();
- assertThat(zkclientZookeeperClient.isConnected(), is(false));
- }
-
- @Test
- public void testChildrenListener() throws InterruptedException {
- String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
- zkclientZookeeperClient.create(path, false);
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- zkclientZookeeperClient.addTargetChildListener(path, new IZkChildListener() {
- @Override
- public void handleChildChange(String s, List list) throws Exception {
- countDownLatch.countDown();
- }
- });
- zkclientZookeeperClient.createPersistent(path + "/provider1");
- countDownLatch.await();
- }
-
- @Test
- public void testGetChildren() throws IOException {
- String path = "/dubbo/org.apache.dubbo.demo.DemoService/parentProviders";
- zkclientZookeeperClient.create(path, false);
- for (int i = 0; i < 5; i++) {
- zkclientZookeeperClient.createEphemeral(path + "/server" + i);
- }
- List zookeeperClientChildren = zkclientZookeeperClient.getChildren(path);
- assertThat(zookeeperClientChildren, hasSize(5));
- }
-
- @Test
- public void testCreateContentPersistent() {
- String path = "/ZkclientZookeeperClient/content.data";
- String content = "createContentTest";
- zkclientZookeeperClient.delete(path);
- assertThat(zkclientZookeeperClient.checkExists(path), is(false));
- assertNull(zkclientZookeeperClient.getContent(path));
-
- zkclientZookeeperClient.create(path, content, false);
- assertThat(zkclientZookeeperClient.checkExists(path), is(true));
- assertEquals(zkclientZookeeperClient.getContent(path), content);
- }
-
- @Test
- public void testCreateContentTem() {
- String path = "/ZkclientZookeeperClient/content.data";
- String content = "createContentTest";
- zkclientZookeeperClient.delete(path);
- assertThat(zkclientZookeeperClient.checkExists(path), is(false));
- assertNull(zkclientZookeeperClient.getContent(path));
-
- zkclientZookeeperClient.create(path, content, true);
- assertThat(zkclientZookeeperClient.checkExists(path), is(true));
- assertEquals(zkclientZookeeperClient.getContent(path), content);
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- zkclientZookeeperClient.close();
- zkServer.stop();
- }
-}
diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java
deleted file mode 100644
index cbadda97aa5..00000000000
--- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.zookeeper.zkclient;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
-import org.apache.curator.test.TestingServer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsNot.not;
-import static org.hamcrest.core.IsNull.nullValue;
-
-public class ZkclientZookeeperTransporterTest {
- private TestingServer zkServer;
- private ZookeeperClient zookeeperClient;
-
- @BeforeEach
- public void setUp() throws Exception {
- int zkServerPort = NetUtils.getAvailablePort();
- zkServer = new TestingServer(zkServerPort, true);
- zookeeperClient = new ZkclientZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" +
- zkServerPort + "/service"));
- }
-
- @Test
- public void testZookeeperClient() {
- assertThat(zookeeperClient, not(nullValue()));
- zookeeperClient.close();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- zkServer.stop();
- }
-}
\ No newline at end of file