Skip to content

Commit

Permalink
Merge pull request #3603, configcenter share zookeeper connection wit…
Browse files Browse the repository at this point in the history
…h registry.

Fixes #3288
  • Loading branch information
cvictory authored and chickenlj committed Mar 14, 2019
1 parent 0c2232f commit 9598cd0
Show file tree
Hide file tree
Showing 21 changed files with 396 additions and 748 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ public ConfigChangeType getChangeType() {
return changeType;
}

}
@Override
public String toString() {
return "ConfigChangeEvent{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
", changeType=" + changeType +
'}';
}
}
15 changes: 4 additions & 11 deletions dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,14 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-zookeeper</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.EventType;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,9 +33,8 @@
/**
*
*/
public class CacheListener implements TreeCacheListener {
private static final byte[] EMPTY_BYTES = new byte[0];

public class CacheListener implements DataListener {
private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
private CountDownLatch initializedLatch;
private String rootPath;
Expand All @@ -49,76 +44,73 @@ public CacheListener(String rootPath, CountDownLatch initializedLatch) {
this.initializedLatch = initializedLatch;
}

public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}

public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}

/**
* This is used to convert a configuration nodePath into a key
* TODO doc
*
* @param path
* @return key (nodePath less the config root path)
*/
private String pathToKey(String path) {
if (StringUtils.isEmpty(path)) {
return path;
}
return path.replace(rootPath + "/", "").replaceAll("/", ".");
}


@Override
public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception {
public void dataChanged(String path, Object value, EventType eventType) {
if (eventType == null) {
return;
}

TreeCacheEvent.Type type = event.getType();
ChildData data = event.getData();
if (type == TreeCacheEvent.Type.INITIALIZED) {
if (eventType == EventType.INITIALIZED) {
initializedLatch.countDown();
return;
}

// TODO, ignore other event types
if (data == null) {
if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
return;
}

// TODO We limit the notification of config changes to a specific path level, for example
// /dubbo/config/service/configurators, other config changes not in this level will not get notified,
// say /dubbo/config/dubbo.properties
if (data.getPath().split("/").length >= 5) {
byte[] value = data.getData();
String key = pathToKey(data.getPath());
if (path.split("/").length >= 5) {
String key = pathToKey(path);
ConfigChangeType changeType;
switch (type) {
case NODE_ADDED:
switch (eventType) {
case NodeCreated:
changeType = ConfigChangeType.ADDED;
break;
case NODE_REMOVED:
case NodeDeleted:
changeType = ConfigChangeType.DELETED;
break;
case NODE_UPDATED:
case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}

if (value == null) {
value = EMPTY_BYTES;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
Set<ConfigurationListener> listeners = keyListeners.get(key);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}

public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}

public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}

/**
* This is used to convert a configuration nodePath into a key
* TODO doc
*
* @param path
* @return key (nodePath less the config root path)
*/
private String pathToKey(String path) {
if (StringUtils.isEmpty(path)) {
return path;
}
return path.replace(rootPath + "/", "").replaceAll("/", ".");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,81 +16,54 @@
*/
package org.apache.dubbo.configcenter.support.zookeeper;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.curator.framework.CuratorFrameworkFactory.newClient;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;

/**
*
*/
public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class);
private Executor executor;
private CuratorFramework client;

private Executor executor;
// The final root path would be: /configRootPath/"config"
private String rootPath;
private TreeCache treeCache;
private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch;

private CacheListener cacheListener;
private URL url;


ZookeeperDynamicConfiguration(URL url) {
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
this.url = url;
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";

RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000);
int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000);
String connectString = url.getBackupAddress();
client = newClient(connectString, sessionTimeout, connectTimeout, policy);
client.start();

try {
boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS);
if (!connected) {
if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) {
throw new IllegalStateException("Failed to connect to config center (zookeeper): "
+ connectString + " in " + 3 * connectTimeout + "ms.");
} else {
logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString);
}
}
} catch (InterruptedException e) {
throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper "
+ connectString + " config center, ", e);
}

initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
// build local cache

zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {
this.buildCache();
} catch (Exception e) {
logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString);
// Wait for connection
this.initializedLatch.await();
} catch (InterruptedException e) {
logger.warn("Failed to build local cache for config center (zookeeper)." + url);
}
}

Expand All @@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
*/
@Override
public Object getInternalProperty(String key) {
ChildData childData = treeCache.getCurrentData(key);
if (childData != null) {
return new String(childData.getData(), StandardCharsets.UTF_8);
}
return null;
return zkClient.getContent(key);
}

/**
Expand Down Expand Up @@ -141,18 +110,4 @@ public String getConfig(String key, String group, long timeout) throws IllegalSt

return (String) getInternalProperty(rootPath + "/" + key);
}

/**
* Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background
* thread
*/
private void buildCache() throws Exception {
this.treeCache = new TreeCache(client, rootPath);
// create the watcher for future configuration updates
treeCache.getListenable().addListener(cacheListener, executor);

// it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use.
treeCache.start();
initializedLatch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;

/**
*
*/
public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {

private ZookeeperTransporter zookeeperTransporter;

public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}


@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ZookeeperDynamicConfiguration(url);
return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public TestListener(CountDownLatch latch) {

@Override
public void process(ConfigChangeEvent event) {
System.out.println(this + ": " + event);
Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0));
countMap.put(event.getKey(), ++count);

Expand Down
14 changes: 1 addition & 13 deletions dubbo-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
<httpcore_version>4.4.6</httpcore_version>
<fastjson_version>1.2.46</fastjson_version>
<zookeeper_version>3.4.13</zookeeper_version>
<zkclient_version>0.2</zkclient_version>
<curator_version>4.0.1</curator_version>
<curator_test_version>2.12.0</curator_test_version>
<jedis_version>2.9.0</jedis_version>
Expand Down Expand Up @@ -151,7 +150,7 @@
<dependencies>
<!-- Common libs -->
<dependency>
<groupId>org.springframework</groupId>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>${spring_version}</version>
<type>pom</type>
Expand Down Expand Up @@ -208,17 +207,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient_version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
Loading

0 comments on commit 9598cd0

Please sign in to comment.