Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #3288. make configcenter use zookeeperClient defined by dubbo #3603

Merged
merged 13 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ public ConfigChangeType getChangeType() {
return changeType;
}

}
@Override
public String toString() {
return "ConfigChangeEvent{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
", changeType=" + changeType +
'}';
}
}
15 changes: 4 additions & 11 deletions dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,14 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-zookeeper</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.EventType;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,9 +33,8 @@
/**
*
*/
public class CacheListener implements TreeCacheListener {
private static final byte[] EMPTY_BYTES = new byte[0];

public class CacheListener implements DataListener {
private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
private CountDownLatch initializedLatch;
private String rootPath;
Expand All @@ -49,76 +44,73 @@ public CacheListener(String rootPath, CountDownLatch initializedLatch) {
this.initializedLatch = initializedLatch;
}

public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}

public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}

/**
* This is used to convert a configuration nodePath into a key
* TODO doc
*
* @param path
* @return key (nodePath less the config root path)
*/
private String pathToKey(String path) {
if (StringUtils.isEmpty(path)) {
return path;
}
return path.replace(rootPath + "/", "").replaceAll("/", ".");
}


@Override
public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception {
public void dataChanged(String path, Object value, EventType eventType) {
if (eventType == null) {
return;
}

TreeCacheEvent.Type type = event.getType();
ChildData data = event.getData();
if (type == TreeCacheEvent.Type.INITIALIZED) {
if (eventType == EventType.INITIALIZED) {
initializedLatch.countDown();
return;
}

// TODO, ignore other event types
if (data == null) {
if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
return;
}

// TODO We limit the notification of config changes to a specific path level, for example
// /dubbo/config/service/configurators, other config changes not in this level will not get notified,
// say /dubbo/config/dubbo.properties
if (data.getPath().split("/").length >= 5) {
byte[] value = data.getData();
String key = pathToKey(data.getPath());
if (path.split("/").length >= 5) {
String key = pathToKey(path);
ConfigChangeType changeType;
switch (type) {
case NODE_ADDED:
switch (eventType) {
case NodeCreated:
changeType = ConfigChangeType.ADDED;
break;
case NODE_REMOVED:
case NodeDeleted:
changeType = ConfigChangeType.DELETED;
break;
case NODE_UPDATED:
case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}

if (value == null) {
value = EMPTY_BYTES;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
Set<ConfigurationListener> listeners = keyListeners.get(key);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}

public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}

public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}

/**
* This is used to convert a configuration nodePath into a key
* TODO doc
*
* @param path
* @return key (nodePath less the config root path)
*/
private String pathToKey(String path) {
if (StringUtils.isEmpty(path)) {
return path;
}
return path.replace(rootPath + "/", "").replaceAll("/", ".");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,81 +16,54 @@
*/
package org.apache.dubbo.configcenter.support.zookeeper;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.curator.framework.CuratorFrameworkFactory.newClient;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;

/**
*
*/
public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class);
private Executor executor;
private CuratorFramework client;

private Executor executor;
// The final root path would be: /configRootPath/"config"
private String rootPath;
private TreeCache treeCache;
private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch;

private CacheListener cacheListener;
private URL url;


ZookeeperDynamicConfiguration(URL url) {
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
this.url = url;
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";

RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000);
int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000);
String connectString = url.getBackupAddress();
client = newClient(connectString, sessionTimeout, connectTimeout, policy);
client.start();

try {
boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS);
if (!connected) {
if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) {
throw new IllegalStateException("Failed to connect to config center (zookeeper): "
+ connectString + " in " + 3 * connectTimeout + "ms.");
} else {
logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString);
}
}
} catch (InterruptedException e) {
throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper "
+ connectString + " config center, ", e);
}

initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
// build local cache

zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {
this.buildCache();
} catch (Exception e) {
logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString);
// Wait for connection
this.initializedLatch.await();
} catch (InterruptedException e) {
logger.warn("Failed to build local cache for config center (zookeeper)." + url);
}
}

Expand All @@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
*/
@Override
public Object getInternalProperty(String key) {
ChildData childData = treeCache.getCurrentData(key);
if (childData != null) {
return new String(childData.getData(), StandardCharsets.UTF_8);
}
return null;
return zkClient.getContent(key);
}

/**
Expand Down Expand Up @@ -141,18 +110,4 @@ public String getConfig(String key, String group, long timeout) throws IllegalSt

return (String) getInternalProperty(rootPath + "/" + key);
}

/**
* Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background
* thread
*/
private void buildCache() throws Exception {
this.treeCache = new TreeCache(client, rootPath);
// create the watcher for future configuration updates
treeCache.getListenable().addListener(cacheListener, executor);

// it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use.
treeCache.start();
initializedLatch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;

/**
*
*/
public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {

private ZookeeperTransporter zookeeperTransporter;

public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}


@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ZookeeperDynamicConfiguration(url);
return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public TestListener(CountDownLatch latch) {

@Override
public void process(ConfigChangeEvent event) {
System.out.println(this + ": " + event);
Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0));
countMap.put(event.getKey(), ++count);

Expand Down
14 changes: 1 addition & 13 deletions dubbo-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
<httpcore_version>4.4.6</httpcore_version>
<fastjson_version>1.2.46</fastjson_version>
<zookeeper_version>3.4.13</zookeeper_version>
<zkclient_version>0.2</zkclient_version>
<curator_version>4.0.1</curator_version>
<curator_test_version>2.12.0</curator_test_version>
<jedis_version>2.9.0</jedis_version>
Expand Down Expand Up @@ -150,7 +149,7 @@
<dependencies>
<!-- Common libs -->
<dependency>
<groupId>org.springframework</groupId>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>${spring_version}</version>
<type>pom</type>
Expand Down Expand Up @@ -207,17 +206,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient_version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
Loading